I am using the Tornado chat demo example from here: https://github.com/tornadoweb/tornado/tree/master/demos/chat and just altering it very slightly. The code change is just a small class called Connections and a bit in the MessageNewHandler(). All I am doing is just saving a reference to self and trying to write(message) to a previous client.
But when I go to save on this line conns.conns[0].write(message)
I get this error message:
[E 220107 23:18:38 web:1789] Uncaught exception POST /a/message/new (::1) HTTPServerRequest(protocol='http', host='localhost:8888', method='POST', uri='/a/message/new', version='HTTP/1.1', remote_ip='::1') Traceback (most recent call last): File "/home/joe/dev/tornado/lib/python3.8/site-packages/tornado/web.py", line 1702, in _execute result = method(*self.path_args, **self.path_kwargs) File "server.py", line 89, in post MessageNewHandler.clients[0].write(message) File "/home/joe/dev/tornado/lib/python3.8/site-packages/tornado/web.py", line 833, in write raise RuntimeError("Cannot write() after finish()") RuntimeError: Cannot write() after finish()
[E 220107 23:18:38 web:2239] 500 POST /a/message/new (::1) 5.98ms
Here is the code:
import asyncio import tornado.escape import tornado.ioloop import tornado.locks import tornado.web import os.path import uuid from tornado.options import define, options, parse_command_line define("port", default=8888, help="run on the given port", type=int) define("debug", default=True, help="run in debug mode") class Connections(object): def __init__(self): self.conns = [] def add_connection(self, conn_self): self.conns.append(conn_self) def conns(self): return self.conns conns = Connections() class MessageBuffer(object): def __init__(self): # cond is notified whenever the message cache is updated self.cond = tornado.locks.Condition() self.cache = [] self.cache_size = 200 def get_messages_since(self, cursor): """Returns a list of messages newer than the given cursor. ``cursor`` should be the ``id`` of the last message received. """ results = [] for msg in reversed(self.cache): if msg["id"] == cursor: break results.append(msg) results.reverse() return results def add_message(self, message): self.cache.append(message) if len(self.cache) > self.cache_size: self.cache = self.cache[-self.cache_size :] self.cond.notify_all() # Making this a non-singleton is left as an exercise for the reader. global_message_buffer = MessageBuffer() class MainHandler(tornado.web.RequestHandler): def get(self): self.render("index.html", messages=global_message_buffer.cache) class MessageNewHandler(tornado.web.RequestHandler): """Post a new message to the chat room.""" def post(self): message = {"id": str(uuid.uuid4()), "body": self.get_argument("body")} # render_string() returns a byte string, which is not supported # in json, so we must convert it to a character string. message["html"] = tornado.escape.to_unicode( self.render_string("message.html", message=message) ) conns.add_connection(self) if (len(conns.conns)>2): conns.conns[0].write(message) self.finish() class MessageUpdatesHandler(tornado.web.RequestHandler): """Long-polling request for new messages. Waits until new messages are available before returning anything. """ async def post(self): cursor = self.get_argument("cursor", None) messages = global_message_buffer.get_messages_since(cursor) while not messages: # Save the Future returned here so we can cancel it in # on_connection_close. self.wait_future = global_message_buffer.cond.wait() try: await self.wait_future except asyncio.CancelledError: return messages = global_message_buffer.get_messages_since(cursor) if self.request.connection.stream.closed(): return self.write(dict(messages=messages)) def on_connection_close(self): self.wait_future.cancel() def main(): parse_command_line() app = tornado.web.Application( [ (r"/", MainHandler), (r"/a/message/new", MessageNewHandler), (r"/a/message/updates", MessageUpdatesHandler), ], cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__", template_path=os.path.join(os.path.dirname(__file__), "templates"), static_path=os.path.join(os.path.dirname(__file__), "static"), xsrf_cookies=True, debug=options.debug, ) app.listen(options.port) tornado.ioloop.IOLoop.current().start() if __name__ == "__main__": main()
Advertisement
Answer
You’re writing to an already closed connection, that’s why you’re seeing the error.
If you want to write to a previously connected client, you’ve keep that connection open.
However, this – conns.add_connection(self)
– doesn’t make sense to track regular http connections.
You should consider using websockets if you want to keep previous connections open and track them.
Update: Here’s how you can keep a connection open. If I understand correctly, you want to send a message from current client to the previous client.
1. Using tornado.locks.Condition()
:
import tornado.locks class MessageNewHandler(tornado.web.RequestHandler): """Post a new message to the chat room.""" clients = [] condition = tornado.locks.Condition() async def post(self): message = {"id": str(uuid.uuid4()), "body": self.get_argument("body")} # render_string() returns a byte string, which is not supported # in json, so we must convert it to a character string. message["html"] = tornado.escape.to_unicode( self.render_string("message.html", message=message) ) MessageNewHandler.clients.append(self) if len(MessageNewHandler.clients) < 2: # less than 2 clients # wait until notified await MessageNewHandler.condition.wait() else: # at least 2 clients # write to previous client's response MessageNewHandler.clients[0].finish(message) # notify the first waiting client # so it can send the response MessageNewHandler.condition.notify() # Note: since you've finished previous client's response # you should also remove it from clients list # since you can't use that connection again
2. Using tornado.concurrent.Future()
:
import tornado.concurrent class MessageNewHandler(tornado.web.RequestHandler): """Post a new message to the chat room.""" waiters = [] async def post(self): message = {"id": str(uuid.uuid4()), "body": self.get_argument("body")} # render_string() returns a byte string, which is not supported # in json, so we must convert it to a character string. message["html"] = tornado.escape.to_unicode( self.render_string("message.html", message=message) ) future = tornado.concurrent.Future() # create a future # instead of a saving the reference to the client, # save the future MessageNewHandler.waiters.append(future) if len(MessageNewHandler.waiters) < 2: # less than 2 clients # wait for next client's message msg_from_next_client = await future # the future will resolve when the next client # sets a result on it # then python will execute the following code self.finish(msg_from_next_client) # Note: since you've finished this connection # you should remove this future from the waiters list # since you can't reuse this connection again else: # at least 2 clients # set the current client's message # as a result on previous client's future previous_client_future = MessageNewHandler.waiters[0] if not previous_client_future.done(): # only set a result if you haven't set it already # otherwise you'll get an error previous_client_future.set_result(message)
3: A more practical example using tornado.concurrent.Future()
:
import tornado.concurrent class Queue: """We'll keep the future related code in this class. This will allow us to present a cleaner, more intuitive usage api. """ waiters = [] @classmethod def get_message_from_next_client(cls): future = tornado.concurrent.Future() cls.waiters.append(future) return future @classmethod def send_message_to_prev_client(cls, message): previous_client_future = cls.waiters[0] if not previous_client_future.done(): previous_client_future.set_result(message) class MessageNewHandler(tornado.web.RequestHandler): """Post a new message to the chat room.""" async def post(self): message = {"id": str(uuid.uuid4()), "body": self.get_argument("body")} message["html"] = tornado.escape.to_unicode( self.render_string("message.html", message=message) ) if len(Queue.waiters) < 2: msg_from_next_client = await Queue.get_message_from_next_client() self.finish(msg_from_next_client) else: Queue.send_message_to_prev_client(message)