Skip to main content

Overview

CrewAI exposes a frame-based streaming contract for runtimes that need more than plain text chunks. The contract emits ordered StreamFrame objects for Flow lifecycle events, direct LLM tokens, tool activity, conversation messages, and custom events. Use this API when you are building a UI, service bridge, terminal app, or deployment runtime that needs a stable stream of structured events while a Flow, chat turn, or direct LLM call is running.

StreamFrame

Every frame has the same envelope:
from crewai.types.streaming import StreamFrame

frame.id           # unique frame id
frame.seq          # execution-local order, when available
frame.type         # source event type, such as "flow_started"
frame.channel      # "llm", "flow", "tools", "messages", "lifecycle", or "custom"
frame.namespace    # source/runtime namespace
frame.timestamp    # event timestamp
frame.parent_id    # parent event id, when available
frame.previous_id  # previous event id, when available
frame.data         # event payload
frame.event        # alias for frame.data
frame.content      # printable text for token-like frames, otherwise ""
The channel field is the fastest way to route frames in consumers:
ChannelContains
llmToken and thinking chunks from LLM streaming events
flowFlow lifecycle, method execution, routing, and pause/resume events
toolsTool usage events
messagesConversation transcript events
lifecycleRuntime lifecycle events that are not specific to another channel
customEvents that do not map to a built-in channel
frame.type preserves the source event type, so consumers can handle specific events inside a channel.

Stream a Flow

Set stream=True on a Flow to make kickoff() return a stream session:
from crewai.flow import Flow, start


class ReportFlow(Flow):
    @start()
    def generate(self):
        return "done"


flow = ReportFlow(stream=True)
stream = flow.kickoff()

with stream:
    for chunk in stream:
        print(chunk.content, end="", flush=True)
        if chunk.type == "tool_usage_started":
            print(chunk.event["tool_name"])

result = stream.result
You must consume the stream before reading stream.result. Accessing the result early raises a RuntimeError so consumers do not accidentally treat a partial run as complete. You can also call flow.stream_events(...) directly when you want streaming for a single invocation without setting stream=True on the Flow instance.

Filter by Channel

StreamSession exposes channel projections that preserve global frame order within the selected channel:
stream = flow.stream_events()

with stream:
    for frame in stream.llm:
        print(frame.content, end="", flush=True)

result = stream.result
Available projections are:
ProjectionFrames
stream.eventsAll frames
stream.llmLLM frames
stream.messagesConversation message frames
stream.flowFlow frames
stream.toolsTool frames
stream.interleave([...])A selected set of channels
Use stream.interleave(["flow", "llm", "messages"]) when a consumer wants only some channels but still needs their relative order.

Async Streaming

Use astream() for async consumers:
flow = ReportFlow()
stream = flow.astream()

async with stream:
    async for chunk in stream.events:
        print(chunk.channel, chunk.type, chunk.content)

result = stream.result
The async session has the same projections as the sync session.

Stream a Direct LLM Call

llm.call(...) still returns the final assembled result. Use llm.stream_events(...) when you want to iterate over chunks as they arrive while keeping the structured event payload:
from crewai import LLM


llm = LLM(model="gpt-4o-mini")
stream = llm.stream_events(
    messages=[
        {
            "role": "user",
            "content": "Explain CrewAI streaming in two short sentences.",
        }
    ]
)

with stream:
    for chunk in stream:
        print(chunk.content, end="", flush=True)

result = stream.result
llm.stream_events(...) temporarily enables streaming for the wrapped call and restores the LLM’s previous stream setting afterward. Provider integrations continue to emit the underlying LLM stream events; this helper provides a common iterator API over those events for every LLM provider.

Conversational Turns

Conversational Flows can stream one user turn with stream_turn():
from crewai import Flow
from crewai.experimental.conversational import ConversationConfig, ConversationState


@ConversationConfig(llm="gpt-4o-mini", defer_trace_finalization=True)
class ChatFlow(Flow[ConversationState]):
    conversational = True


flow = ChatFlow()
stream = flow.stream_turn("What can you help me with?", session_id="session-1")

with stream:
    for frame in stream.events:
        if frame.channel == "llm" and frame.type == "llm_stream_chunk":
            print(frame.content, end="", flush=True)

reply = stream.result
During stream_turn(), the built-in conversational answer path enables LLM token streaming for that turn and restores the LLM’s previous stream setting afterward. Custom route handlers that create their own agents or LLM instances should configure those LLMs for streaming if they need token-level output.

Cleanup

Use the session as a context manager when possible. If a client disconnects before the stream is exhausted, close the session explicitly:
stream = flow.stream_events()

try:
    for frame in stream.events:
        print(frame.type)
finally:
    if not stream.is_exhausted:
        stream.close()
For async streams, use await stream.aclose().

Legacy Chunk Streaming

Crew streaming with stream=True still returns the chunk-oriented CrewStreamingOutput API described in Streaming Crew Execution. Direct llm.call(...) still returns the final LLM result. The frame contract is intended for runtimes that need a stable event envelope across Flows, direct LLM calls, conversational turns, tools, and messages.