Skip to content

5.6 SSE

SSE (Server-Sent Events) strictly follows a simple text-based format with specific field names like:

  • event: (optional, event type)
  • data: (required, event payload)
  • id: (optional, event ID)
  • retry: (optional, reconnection time)

Can you use custom fields (like foo: bar)?

Section titled “Can you use custom fields (like foo: bar)?”

Technically, you can send lines with other keys, e.g.:

foo: bar
data: something

But browsers’ native EventSource API and most SSE clients only recognize the standard fields and ignore or drop unknown fields.

So, your custom fields will not be accessible from JavaScript’s event object or your SSE client. They’ll be treated as comments or ignored.


Put all custom or extra data inside the data: field, usually by encoding it as JSON:

import json
payload = {
"foo": "bar",
"other": 123,
}
yield f"event: my_event\ndata: {json.dumps(payload)}\n\n"

Then, client-side you parse JSON from event.data.


Can you use custom SSE fields?
Yes, but they will be ignored by standard clients and not accessible in event handlers.

Best practice: Use data: to carry any extra info you want, encoded as JSON or another format.


Want me to show you how to encode/decode JSON data in SSE cleanly?

============================

Here’s what each SSE field means and why it’s used:


  • Purpose: Specifies the type or name of the event being sent.

  • Usage:

    • Allows the client to listen for specific event types with addEventListener("eventname", handler).
    • If omitted, the event type defaults to "message".
  • Example:

    event: update
    data: {"status":"ok"}

    Client listens with:

    source.addEventListener("update", (e) => { ... })

  • Purpose: Contains the actual payload or content of the event.

  • Usage:

    • Can appear multiple times in a single event; lines are concatenated with newline characters.
    • Delivered as a string to the client’s event handler.
  • Example:

    data: line 1
    data: line 2

    Received data: "line 1\nline 2"


  • Purpose: Assigns a unique identifier to this event.

  • Usage:

    • Used by the browser/client to remember the last event received.
    • If the connection drops, the client sends a Last-Event-ID header on reconnect, so the server can resume sending events from where it left off.
  • Example:

    id: 12345
    data: some update
  • Benefit: Enables event replay/resuming without losing data after network hiccups.


  • Purpose: Suggests to the client how long (in milliseconds) it should wait before trying to reconnect after a disconnection.

  • Usage:

    • Overrides the default reconnection delay (usually 3 seconds).
  • Example:

    retry: 10000

    Client waits 10 seconds before reconnecting.


FieldRequiredPurposeNotes
eventNoName/type of eventDefault: "message"
dataYesPayload/content of eventCan be multiline
idNoUnique event ID for resumingEnables reconnection resume
retryNoClient reconnection delay (ms)Overrides default delay

If you want, I can show a simple SSE server example using all these fields!

========================================

@router.get("/subscribe/{job_id}")
async def subscribe(job_id: str, request: Request):
"""
Server-Sent Events endpoint for job status updates.
Returns a unique subscriber_id for the client.
"""
if job_id not in subscribers:
subscribers[job_id] = {}
queue = asyncio.Queue()
subscriber_id = str(uuid.uuid4())
subscribers[job_id][subscriber_id] = queue
async def event_generator():
try:
# Send the subscriber_id as the first event
yield f"event: subscriber_id\ndata: {subscriber_id}\n\n"
while True:
if await request.is_disconnected():
break
try:
message = await asyncio.wait_for(queue.get(), timeout=15.0)
yield f"data: {message}\n\n"
except asyncio.TimeoutError:
yield f": keep-alive\n\n"
finally:
# Remove this subscriber's queue
if job_id in subscribers and subscriber_id in subscribers[job_id]:
del subscribers[job_id][subscriber_id]
if not subscribers[job_id]:
del subscribers[job_id]
return StreamingResponse(event_generator(), media_type="text/event-stream")

Great question!

Is the keep-alive comment/event necessary?

Section titled “Is the keep-alive comment/event necessary?”

In your SSE generator, you have this line on timeout:

yield f": keep-alive\n\n"

This sends a comment line (: is the SSE comment prefix) every 15 seconds if no new messages appear.


  • Prevent connection timeouts: Some proxies, load balancers, or browsers might close idle HTTP connections if no data flows for some time.
  • Keep the SSE connection alive: Sending periodic “heartbeat” comments keeps the connection open even if there are no actual events.
  • Avoid client disconnects: Helps clients maintain a persistent connection.

  • If your server or network infrastructure closes idle connections, clients will disconnect unexpectedly.
  • The client will try to reconnect after a delay (controlled by retry: or default 3 seconds).
  • This might cause extra reconnect overhead or event gaps if you rely on continuous streaming.

  • Keep-alive comments are highly recommended in SSE implementations to keep connections alive across the network.
  • The comment syntax (: keep-alive\n\n) is a lightweight, no-op way to send data without triggering an event.
  • You can adjust the frequency (e.g., every 15 seconds) depending on your network conditions.

If your environment guarantees connections stay alive indefinitely, you might skip it — but in most real-world deployments, keep-alives help a lot.


Want me to help tweak the code or explain more about SSE connection handling?

====================================

Good question!

What does await asyncio.wait_for(queue.get(), timeout=15.0) do?

Section titled “What does await asyncio.wait_for(queue.get(), timeout=15.0) do?”
  • It waits for queue.get() to complete, but only up to 15 seconds.
  • If an item appears in the queue within 15 seconds, it returns immediately with that item.
  • If no item appears in 15 seconds, it raises an asyncio.TimeoutError.

  • No, it does not block the event loop.
  • Since it’s awaiting an asynchronous operation, it suspends the coroutine but other tasks can run during this wait.
  • If an item is available before 15 seconds, it returns immediately — no waiting the full timeout.

  • asyncio.TimeoutError is raised.

  • Your code catches it and sends the keep-alive comment:

    except asyncio.TimeoutError:
    yield f": keep-alive\n\n"

  • asyncio.wait_for adds a timeout to the awaited coroutine (queue.get() here).
  • It does not block synchronously; other async tasks keep running.
  • It waits up to the timeout duration, returns earlier if result is ready.

Want me to explain how you can test this or alternatives to implement timeouts?