In flask I have a page that is used with EventSource to receive updates/events.
It’s implemented in fairly trivial manner:
@route(‘/updates’)
def updates():
def gen():
while True:
update = make_update()
yield “data: {0}\n\n”.format(json.dumps(update))
return Response(stream_with_context(gen()), mimetype=”text/event-stream”)
Problem I am having is that each time I reload page that connect EventSource to my “update” page it creates a new thread to serve that “update” request. And it never dies.
There are updates coming through, so it means it is pushing data somewhere making my server use more and more threads, and more and more memory.
Simple solution I was hoping to get was to replace while True with some form of while is_connected().
However I can’t seem to find a way to detect whether browser is still connected or not.
Question: How can I check inside my generator that connection is still alive?
EDIT
Looking through code it seems that it should call close() on generator, so in theory it should throw GeneratorExit at some point in my gen().
However I don’t see any trace of this happening and with each call I see pstree producing one more entry after each request/connection to /updates.
Okay, this is a classic problem with streaming responses and EventSource in Flask. The core issue is that you need a way to detect when the client (browser) has disconnected so you can terminate the server-side generator and avoid resource leaks.\n\nHere\’s a breakdown of the problem and a robust solution, followed by explanations and best practices:\n\n**Best Answer:**\n\n“`python\nimport json\nimport time\nfrom flask import Flask, Response, stream_with_context, request\nimport threading\n\napp = Flask(__name__)\n\n# Global list to track active SSE connections (optional, but good for monitoring)\nactive_sse_connections = []\nactive_sse_connections_lock = threading.Lock() # for thread safety\n\ndef make_update():\n \”\”\”Simulates generating an update. Replace with your actual logic.\”\”\”\n time.sleep(1) # Simulate some work\n return {\”message\”: f\”Update from server! {time.time()}\”}\n\n\n@app.route(\’/updates\’)\ndef updates():\n def gen():\n try:\n with active_sse_connections_lock:\n active_sse_connections.append(request.remote_addr)\n print(f\”New SSE connection from {request.remote_addr}. Total connections: {len(active_sse_connections)}\”)\n\n while True:\n update = make_update()\n yield f\”data: {json.dumps(update)}\\n\\n\”\n time.sleep(0.1) # important to prevent busy-waiting\n\n except GeneratorExit:\n print(f\”Client disconnected from {request.remote_addr}\”)\n finally:\n with active_sse_connections_lock:\n if request.remote_addr in active_sse_connections: # Ensure removal\n active_sse_connections.remove(request.remote_addr)\n print(f\”SSE connection closed. Total connections: {len(active_sse_connections)}\”)\n\n\n return Response(stream_with_context(gen()), mimetype=\”text/event-stream\”)\n\n\n@app.route(\’/\’)\ndef index():\n return \”\”\”\n \n\n
\nSSE Example
\n
\n\n \n \n\n \”\”\”\n\nif __name__ == \’__main__\’:\n app.run(debug=True, threaded=True) # Enable threading in debug mode\n“`\n\n**Key Improvements and Explanations:**\n\n1. **`GeneratorExit` Handling:** This is the *primary* mechanism for detecting client disconnection. When the client closes the EventSource connection (e.g., by navigating away from the page or closing the browser), the generator receives a `GeneratorExit` exception. The `try…except GeneratorExit…finally` block is crucial:\n\n – `try`: Encloses the core generator logic.\n – `except GeneratorExit`: Catches the exception when the client disconnects. This is where you perform cleanup (if needed). Crucially, you *do not* re-raise the exception. Let it be handled.\n – `finally`: *Always* executes, regardless of whether an exception occurred or not. This is the **most important part** for ensuring that you clean up resources, such as removing the connection from the `active_sse_connections` list.\n\n2. **`request.remote_addr` tracking and thread safety:**\n\n – The code now tracks active connections using a global list `active_sse_connections` and a `threading.Lock`. This is helpful for monitoring purposes, allowing you to see how many SSE connections are currently active. It\’s especially important if you have other parts of your application that need to know about the SSE connections.\n – Using `request.remote_addr` gives you the IP address of the client. This is useful for uniquely identifying the client connection.\n – The `with active_sse_connections_lock:` statements ensure that access to the shared `active_sse_connections` list is thread-safe, preventing race conditions. **This is vital** when using Flask\’s built-in threaded server or a production-grade WSGI server like Gunicorn.\n – The `finally` block *always* removes the connection from the `active_sse_connections` list, even if other exceptions occur, guaranteeing that the list remains accurate.\n\n3. **`time.sleep(0.1)` (or similar) in the generator:** This is *essential* to prevent the generator from busy-waiting and consuming excessive CPU. Without a `sleep`, the generator will loop as fast as possible, even if there\’s no new data to send. This can lead to performance problems and unnecessary resource usage. Adjust the sleep duration based on how frequently you expect to generate updates.\n\n4. **`source.onerror` handler in JavaScript:** The JavaScript code now includes an `onerror` handler for the EventSource. This is important for handling cases where the connection is interrupted due to network issues or server errors. The `onerror` handler *must* call `source.close()` to ensure that the client-side EventSource attempts to reconnect and prevent the server from continuing to send updates to a dead connection indefinitely.\n\n5. **`source.onopen` handler in JavaScript:** This logs a message when the connection is first established. Useful for debugging.\n\n6. **Complete Example:** I\’ve provided a complete, runnable example including both the Flask server and the HTML/JavaScript client. This makes it easy to test and understand the solution.\n\n7. **Debug Mode and Threading:** The example uses `app.run(debug=True, threaded=True)`. The `threaded=True` part is important when testing locally in debug mode, as it ensures that each request is handled in a separate thread. However, for production, you should use a proper WSGI server like Gunicorn or uWSGI, which will handle threading/process management more efficiently.\n\n**Why This Works:**\n\n- **`GeneratorExit` is the Signal:** The Flask framework (or the underlying WSGI server) sends a `GeneratorExit` exception to the generator when the client disconnects. This is the standard way to signal that the stream should be terminated.\n- **`finally` for Guaranteed Cleanup:** The `finally` block ensures that cleanup actions (like removing the connection from the list) are *always* performed, regardless of whether an exception occurred or not. This is crucial for preventing resource leaks.\n- **`stream_with_context` is Key:** The `stream_with_context` function ensures that the Flask application context is available within the generator. This is necessary for accessing request-related information (like `request.remote_addr`).\n- **`EventSource` Handling:** The client-side JavaScript `EventSource` object automatically manages the connection to the server and handles reconnecting if the connection is lost (although it is important to handle the `onerror` event).\n\n**Important Considerations for Production:**\n\n* **WSGI Server:** In a production environment, you should *not* use Flask\’s built-in development server. Instead, use a production-ready WSGI server like Gunicorn or uWSGI. These servers handle concurrency and process management much more efficiently. For example, to run with Gunicorn:\n\n “`bash\n gunicorn –workers 3 –threads 2 your_app:app\n “`\n\n* **Logging:** Implement comprehensive logging to help diagnose issues in production. Log connection events, errors, and any relevant data.\n\n* **Error Handling:** Add more robust error handling to your `make_update()` function and the generator itself. Catch exceptions and log them appropriately. Consider sending an error event to the client to notify it of the problem.\n\n* **Heartbeats (Keep-Alive):** In some cases, you might want to implement a \”heartbeat\” mechanism to keep the connection alive, especially if there are long periods of inactivity. This involves periodically sending a comment (a line starting with a colon `:`) or a small update to prevent the connection from timing out.\n\n* **Load Balancing:** If you have multiple servers, you\’ll need a load balancer to distribute the SSE connections. Make sure your load balancer supports long-lived connections and doesn\’t prematurely close them. Sticky sessions (where a client is always routed to the same server) can simplify things, but are not strictly required.\n\n* **Resource Limits:** Be aware of resource limits on your server (e.g., maximum number of open file descriptors, maximum number of threads/processes). Monitor your server\’s resource usage and adjust your configuration accordingly.\n\n* **Security:** Consider security implications, especially if you\’re transmitting sensitive data over SSE. Use HTTPS to encrypt the connection. Implement appropriate authentication and authorization mechanisms.\n\nThis comprehensive approach should solve the problem of orphaned threads and resource leaks in your Flask EventSource application. Remember to test thoroughly and monitor your application in production.\n