Skip to content
Advertisement

Tornado gives error Cannot write() after finish()

I am using the Tornado chat demo example from here: https://github.com/tornadoweb/tornado/tree/master/demos/chat and just altering it very slightly. The code change is just a small class called Connections and a bit in the MessageNewHandler(). All I am doing is just saving a reference to self and trying to write(message) to a previous client.

But when I go to save on this line conns.conns[0].write(message) I get this error message:

[E 220107 23:18:38 web:1789] Uncaught exception POST /a/message/new (::1)
HTTPServerRequest(protocol='http', host='localhost:8888', method='POST', uri='/a/message/new', version='HTTP/1.1', remote_ip='::1')
Traceback (most recent call last):
  File "/home/joe/dev/tornado/lib/python3.8/site-packages/tornado/web.py", line 1702, in _execute
    result = method(*self.path_args, **self.path_kwargs)
  File "server.py", line 89, in post
    MessageNewHandler.clients[0].write(message)
  File "/home/joe/dev/tornado/lib/python3.8/site-packages/tornado/web.py", line 833, in write
    raise RuntimeError("Cannot write() after finish()")
RuntimeError: Cannot write() after finish()

[E 220107 23:18:38 web:2239] 500 POST /a/message/new (::1) 5.98ms

Here is the code:

import asyncio
import tornado.escape
import tornado.ioloop
import tornado.locks
import tornado.web
import os.path
import uuid
from tornado.options import define, options, parse_command_line

define("port", default=8888, help="run on the given port", type=int)
define("debug", default=True, help="run in debug mode")


class Connections(object):
    def __init__(self):
        self.conns = []
    
    def add_connection(self, conn_self):
        self.conns.append(conn_self)
    
    def conns(self):
        return self.conns

conns = Connections()
    

class MessageBuffer(object):
    def __init__(self):
        # cond is notified whenever the message cache is updated
        self.cond = tornado.locks.Condition()
        self.cache = []
        self.cache_size = 200

    def get_messages_since(self, cursor):
        """Returns a list of messages newer than the given cursor.
        ``cursor`` should be the ``id`` of the last message received.
        """
        results = []
        
        for msg in reversed(self.cache):
            
            if msg["id"] == cursor:
                break
            results.append(msg)
        results.reverse()
        return results

    def add_message(self, message):
        self.cache.append(message)
        if len(self.cache) > self.cache_size:
            self.cache = self.cache[-self.cache_size :]
        self.cond.notify_all()


# Making this a non-singleton is left as an exercise for the reader.
global_message_buffer = MessageBuffer()


class MainHandler(tornado.web.RequestHandler):
    def get(self):
        
        
        self.render("index.html", messages=global_message_buffer.cache)


class MessageNewHandler(tornado.web.RequestHandler):
    """Post a new message to the chat room."""
    
    
    def post(self):
        message = {"id": str(uuid.uuid4()), "body": self.get_argument("body")}
        # render_string() returns a byte string, which is not supported
        # in json, so we must convert it to a character string.
        message["html"] = tornado.escape.to_unicode(
            self.render_string("message.html", message=message)
        )
        conns.add_connection(self)

        if (len(conns.conns)>2):
            conns.conns[0].write(message)
    
            self.finish()
        

class MessageUpdatesHandler(tornado.web.RequestHandler):
    """Long-polling request for new messages.
    Waits until new messages are available before returning anything.
    """

    async def post(self):
        cursor = self.get_argument("cursor", None)
        
        messages = global_message_buffer.get_messages_since(cursor)
        while not messages:
            # Save the Future returned here so we can cancel it in
            # on_connection_close.
            self.wait_future = global_message_buffer.cond.wait()
            try:
                await self.wait_future
            except asyncio.CancelledError:
                return
            messages = global_message_buffer.get_messages_since(cursor)
        if self.request.connection.stream.closed():
            return
        self.write(dict(messages=messages))

    def on_connection_close(self):
        self.wait_future.cancel()


def main():
    parse_command_line()
    app = tornado.web.Application(
        [
            (r"/", MainHandler),
            (r"/a/message/new", MessageNewHandler),
            (r"/a/message/updates", MessageUpdatesHandler),
        ],
        cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",
        template_path=os.path.join(os.path.dirname(__file__), "templates"),
        static_path=os.path.join(os.path.dirname(__file__), "static"),
        xsrf_cookies=True,
        debug=options.debug,
    )
    app.listen(options.port)
    tornado.ioloop.IOLoop.current().start()


if __name__ == "__main__":
    main()

Advertisement

Answer

You’re writing to an already closed connection, that’s why you’re seeing the error.

If you want to write to a previously connected client, you’ve keep that connection open.

However, this – conns.add_connection(self) – doesn’t make sense to track regular http connections.

You should consider using websockets if you want to keep previous connections open and track them.


Update: Here’s how you can keep a connection open. If I understand correctly, you want to send a message from current client to the previous client.

1. Using tornado.locks.Condition():

import tornado.locks


class MessageNewHandler(tornado.web.RequestHandler):
    """Post a new message to the chat room."""

    clients = []
    condition = tornado.locks.Condition()
    
    async def post(self):
        message = {"id": str(uuid.uuid4()), "body": self.get_argument("body")}
        # render_string() returns a byte string, which is not supported
        # in json, so we must convert it to a character string.
        message["html"] = tornado.escape.to_unicode(
            self.render_string("message.html", message=message)
        )

        MessageNewHandler.clients.append(self)

        if len(MessageNewHandler.clients) < 2:
            # less than 2 clients

            # wait until notified
            await MessageNewHandler.condition.wait()
        else:
            # at least 2 clients

            # write to previous client's response
            MessageNewHandler.clients[0].finish(message)
            # notify the first waiting client
            # so it can send the response
            MessageNewHandler.condition.notify()

            # Note: since you've finished previous client's response
            # you should also remove it from clients list
            # since you can't use that connection again


2. Using tornado.concurrent.Future():

import tornado.concurrent


class MessageNewHandler(tornado.web.RequestHandler):
    """Post a new message to the chat room."""

    waiters = []
    
    async def post(self):
        message = {"id": str(uuid.uuid4()), "body": self.get_argument("body")}
        # render_string() returns a byte string, which is not supported
        # in json, so we must convert it to a character string.
        message["html"] = tornado.escape.to_unicode(
            self.render_string("message.html", message=message)
        )

        future = tornado.concurrent.Future() # create a future

        # instead of a saving the reference to the client,
        # save the future
        MessageNewHandler.waiters.append(future)

        if len(MessageNewHandler.waiters) < 2:
            # less than 2 clients

            # wait for next client's message
            msg_from_next_client = await future
            # the future will resolve when the next client
            # sets a result on it
            # then python will execute the following code
            self.finish(msg_from_next_client)
            # Note: since you've finished this connection
            # you should remove this future from the waiters list
            # since you can't reuse this connection again
        else:
            # at least 2 clients

            # set the current client's message
            # as a result on previous client's future
            previous_client_future = MessageNewHandler.waiters[0]
            if not previous_client_future.done():
                # only set a result if you haven't set it already
                # otherwise you'll get an error
                previous_client_future.set_result(message)


3: A more practical example using tornado.concurrent.Future():

import tornado.concurrent


class Queue:
    """We'll keep the future related code in this class.
    This will allow us to present a cleaner, more intuitive usage api.
    """
    waiters = []

    @classmethod
    def get_message_from_next_client(cls):
        future = tornado.concurrent.Future()
        cls.waiters.append(future)
        return future

    @classmethod
    def send_message_to_prev_client(cls, message):
        previous_client_future = cls.waiters[0]
        if not previous_client_future.done():
            previous_client_future.set_result(message)


class MessageNewHandler(tornado.web.RequestHandler):
    """Post a new message to the chat room."""

    async def post(self):
        message = {"id": str(uuid.uuid4()), "body": self.get_argument("body")}
        message["html"] = tornado.escape.to_unicode(
            self.render_string("message.html", message=message)
        )

        if len(Queue.waiters) < 2:
            msg_from_next_client = await Queue.get_message_from_next_client()
            self.finish(msg_from_next_client)
        else:
            Queue.send_message_to_prev_client(message)
User contributions licensed under: CC BY-SA
5 People found this is helpful
Advertisement