Skip to main content

Dispatcher

A dispatcher is an instance of the CollectingDispatcher class used to generate responses to send back to the user.

CollectingDispatcher

CollectingDispatcher provides two complementary APIs:

  • Unary APIutter_message accumulates complete messages in the messages attribute. The action server returns them in the final response payload.
  • Streaming APIstream_start, stream_chunk, and stream_end let an action emit text or rich content incrementally. Whether chunks are delivered in real time is decided by the transport layer (see Streaming responses), not by the action author.

In both cases, the Rasa server adds BotUttered events to the tracker for responses the user received. Responses added using the dispatcher should therefore not be returned explicitly as events. For example, the following custom action returns no events explicitly but will return the response, "Hi, User!" to the user:

class ActionGreetUser(Action):
def name(self) -> Text:
return "action_greet_user"

async def run(
self,
dispatcher: CollectingDispatcher,
tracker: Tracker,
domain: Dict[Text, Any],
) -> List[EventType]:

dispatcher.utter_message(text = "Hi, User!")

return []

CollectingDispatcher.utter_message

The utter_message method can be used to return any type of response to the user.

Parameters

The utter_message method takes the following optional arguments. Passing no arguments will result in an empty message being returned to the user. Passing multiple arguments will result in a rich response (e.g. text and buttons) being returned to the user.

  • text: The text to return to the user.
dispatcher.utter_message(text = "Hey there")
  • image: An image URL or file path that will be used to display an image to the user.
dispatcher.utter_message(image = "https://i.imgur.com/nGF1K8f.jpg")
  • json_message: A custom json payload as a dictionary. It can be used to send channel specific responses. The following example would return a date picker in Slack:
date_picker = {
"blocks":[
{
"type": "section",
"text":{
"text": "Make a bet on when the world will end:",
"type": "mrkdwn"
},
"accessory":
{
"type": "datepicker",
"initial_date": "2019-05-21",
"placeholder":
{
"type": "plain_text",
"text": "Select a date"
}
}
}
]
}
dispatcher.utter_message(json_message = date_picker)
  • response: The name of a response to return to the user. This response should be specified in your assistants domain.
dispatcher.utter_message(response = "utter_greet")
  • attachment: A URL or file path of an attachment to return to the user.
dispatcher.utter_message(attachment = "")
  • buttons: A list of buttons to return to the user. Each button is a dictionary and should have a title and a payload key. A button can include other keys, but these will only be used if a specific channel looks for them. The button's payload will be sent as a user message if the user clicks the button.
dispatcher.utter_message(buttons = [
{"payload": "/affirm", "title": "Yes"},
{"payload": "/deny", "title": "No"},
])
responses:
utter_greet_name:
- text: Hi {name}!

You could specify the name with:

dispatcher.utter_message(response = "utter_greet_name", name = "Aimee")

Return type

None

Streaming responses

New in Rasa SDK 3.17 / Rasa Pro 3.17

Custom actions can stream responses token-by-token or as a sequence of rich-content fragments. Use the streaming API when you want low-latency delivery on channels that support it — for example, voice assistants synthesizing TTS from partial text, or messaging channels that render incremental assistant output.

Supported transports

Real-time streaming is available only when both of the following are true:

  1. The action runs on a streaming-capable executordirect custom action execution (actions_module) or a gRPC action server.
  2. The output channel supports streaming — see Built-in output channels with streaming support.

The HTTP(S) action server protocol does not support streaming. When Rasa calls an HTTP action server, it sends a single HTTP POST to the /webhook endpoint and waits for one JSON response — even if the output channel could otherwise stream. Your action code can still use stream_start, stream_chunk, and stream_end; the SDK accumulates chunks internally and replays them as individual utter_message calls when stream_end() runs, so the user receives the full response once the action completes — not incrementally. No conditional logic is required in the action.

Writing a streaming action

Call stream_start() before emitting chunks, stream_chunk() for each fragment, and stream_end() when the response is complete:

class ActionStreamReply(Action):
def name(self) -> Text:
return "action_stream_reply"

async def run(
self,
dispatcher: CollectingDispatcher,
tracker: Tracker,
domain: Dict[Text, Any],
) -> List[EventType]:
await dispatcher.stream_start()
async for token in fetch_tokens_from_llm(tracker.latest_message.text):
await dispatcher.stream_chunk(text=token)
await dispatcher.stream_end()
return []

You can also stream rich content mid-response — for example, append buttons after the main text:

await dispatcher.stream_start()
async for token in fetch_tokens_from_llm(prompt):
await dispatcher.stream_chunk(text=token)
await dispatcher.stream_chunk(buttons=[
{"title": "Yes", "payload": "/affirm"},
{"title": "No", "payload": "/deny"},
])
await dispatcher.stream_end()

If you call stream_chunk() without an explicit stream_start(), the dispatcher opens a stream implicitly.

If an action opens a stream but forgets to call stream_end(), the executor closes the stream automatically and logs a warning.

CollectingDispatcher.stream_start

await dispatcher.stream_start()

Begins a streaming response and resets the internal chunk accumulator. When a streaming transport is active, emits a stream_start event to the caller.

Calling stream_start() again before stream_end() resets the accumulator and starts a new stream sequence.

Return type

None

CollectingDispatcher.stream_chunk

await dispatcher.stream_chunk(
text=None,
image=None,
json_message=None,
attachment=None,
buttons=None,
elements=None,
**kwargs,
)

Emits a response fragment. Accepts the same rich-content fields as utter_message, except template and response — those refer to pre-defined domain responses and are not valid in a streaming context. Passing either raises ValueError.

Parameters

  • text: A plain-text fragment (for example, a token from an LLM).

  • image: URL of an image to include in this chunk.

  • json_message: Arbitrary custom JSON payload for this chunk.

  • attachment: URL of an attachment to include in this chunk.

  • buttons: List of button dicts to include in this chunk.

  • elements: List of carousel/card element dicts for this chunk.

  • **kwargs: Extra fields merged into the chunk payload. Unlike the named parameters above, these are merged unconditionally (no None/falsy guard).

    On gRPC streaming, only fields defined in the Chunk protobuf message are serialised and sent to Rasa (text, image, custom, attachment, buttons, elements). Any other keys in **kwargs are silently omitted from the stream. This is intentional: the protobuf schema must describe fields precisely, and without a concrete use case for arbitrary extras the SDK exposes only known parameters on CollectingDispatcher.

    On non-streaming transports (including HTTP action servers), each accumulated chunk is replayed via utter_message() at stream_end(), and extra keys survive that replay.

    Callers should pass only keys the target transport understands.

After a barge-in, stream_chunk() calls that arrive after cancel_stream() has been invoked are silently dropped.

Return type

None

CollectingDispatcher.stream_end

await dispatcher.stream_end()

Ends the streaming response.

On streaming transports, emits a stream_end event. Chunks were already delivered in-band, so the dispatcher does not replay them via utter_message().

On non-streaming transports, replays every accumulated chunk as an individual utter_message() call so the tracker receives a record of the full response.

Return type

None

CollectingDispatcher.is_streaming_active

Read-only property. Returns True after stream_start() has been called and before stream_end() completes. The executor uses this to detect actions that opened a stream but never closed it.

Barge-in and stream cancellation

When a user interrupts the assistant while a streaming custom action is in progress (voice barge-in), the transport layer signals the dispatcher to stop producing output:

  • gRPC action server — Rasa sends an AckStreamChunks RPC with the active response_id. The SDK calls cancel_stream() on the matching dispatcher.
  • Direct custom action executor — Rasa sets the same cancellation flag on the in-process dispatcher.

cancel_stream() is called by the transport layer. Action authors do not call it directly.

CollectingDispatcher.cancel_stream

Sets is_streaming_cancelled to True. All subsequent stream_chunk() calls are silently dropped so the action stops producing output without being forcefully interrupted. The action continues running until its run() method returns, so events such as SlotSet can still be applied.

CollectingDispatcher.is_streaming_cancelled

Read-only property. Returns True after cancel_stream() has been called. You can inspect this after stream_end() to suppress side effects that are irrelevant when the user already interrupted:

await dispatcher.stream_end()
if dispatcher.is_streaming_cancelled:
return []
return [SlotSet("result", value)]

Checking the flag is optional. If you do not inspect it, all events returned by the action are preserved — which is the right default for most actions.

👉 Learn how Rasa Pro handles barge-in for streaming custom actions