Skip to main content

Introduction

CrewAI Flows support streaming output, allowing you to receive real-time updates as your flow executes. This feature enables you to build responsive applications that display results incrementally, provide live progress updates, and create better user experiences for long-running workflows.

How Flow Streaming Works

When streaming is enabled on a Flow, CrewAI captures and streams output from any crews or LLM calls within the flow. The stream delivers structured chunks containing the content, task context, and agent information as execution progresses.

Enabling Streaming

To enable streaming, set the stream attribute to True on your Flow class:
Code
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Task

class ResearchFlow(Flow):
    stream = True  # Enable streaming for the entire flow

    @start()
    def initialize(self):
        return {"topic": "AI trends"}

    @listen(initialize)
    def research_topic(self, data):
        researcher = Agent(
            role="Research Analyst",
            goal="Research topics thoroughly",
            backstory="Expert researcher with analytical skills",
        )

        task = Task(
            description="Research {topic} and provide insights",
            expected_output="Detailed research findings",
            agent=researcher,
        )

        crew = Crew(
            agents=[researcher],
            tasks=[task],
        )

        return crew.kickoff(inputs=data)

Synchronous Streaming

When you call kickoff() on a flow with streaming enabled, it returns a FlowStreamingOutput object that you can iterate over:
Code
flow = ResearchFlow()

# Start streaming execution
streaming = flow.kickoff()

# Iterate over chunks as they arrive
for chunk in streaming:
    print(chunk.content, end="", flush=True)

# Access the final result after streaming completes
result = streaming.result
print(f"\n\nFinal output: {result}")

Stream Chunk Information

Each chunk provides context about where it originated in the flow:
Code
streaming = flow.kickoff()

for chunk in streaming:
    print(f"Agent: {chunk.agent_role}")
    print(f"Task: {chunk.task_name}")
    print(f"Content: {chunk.content}")
    print(f"Type: {chunk.chunk_type}")  # TEXT or TOOL_CALL

Accessing Streaming Properties

The FlowStreamingOutput object provides useful properties and methods:
Code
streaming = flow.kickoff()

# Iterate and collect chunks
for chunk in streaming:
    print(chunk.content, end="", flush=True)

# After iteration completes
print(f"\nCompleted: {streaming.is_completed}")
print(f"Full text: {streaming.get_full_text()}")
print(f"Total chunks: {len(streaming.chunks)}")
print(f"Final result: {streaming.result}")

Asynchronous Streaming

For async applications, use kickoff_async() with async iteration:
Code
import asyncio

async def stream_flow():
    flow = ResearchFlow()

    # Start async streaming
    streaming = await flow.kickoff_async()

    # Async iteration over chunks
    async for chunk in streaming:
        print(chunk.content, end="", flush=True)

    # Access final result
    result = streaming.result
    print(f"\n\nFinal output: {result}")

asyncio.run(stream_flow())

Streaming with Multi-Step Flows

Streaming works seamlessly across multiple flow steps, including flows that execute multiple crews:
Code
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Task

class MultiStepFlow(Flow):
    stream = True

    @start()
    def research_phase(self):
        """First crew: Research the topic."""
        researcher = Agent(
            role="Research Analyst",
            goal="Gather comprehensive information",
            backstory="Expert at finding relevant information",
        )

        task = Task(
            description="Research AI developments in healthcare",
            expected_output="Research findings on AI in healthcare",
            agent=researcher,
        )

        crew = Crew(agents=[researcher], tasks=[task])
        result = crew.kickoff()

        self.state["research"] = result.raw
        return result.raw

    @listen(research_phase)
    def analysis_phase(self, research_data):
        """Second crew: Analyze the research."""
        analyst = Agent(
            role="Data Analyst",
            goal="Analyze information and extract insights",
            backstory="Expert at identifying patterns and trends",
        )

        task = Task(
            description="Analyze this research: {research}",
            expected_output="Key insights and trends",
            agent=analyst,
        )

        crew = Crew(agents=[analyst], tasks=[task])
        return crew.kickoff(inputs={"research": research_data})


# Stream across both phases
flow = MultiStepFlow()
streaming = flow.kickoff()

current_step = ""
for chunk in streaming:
    # Track which flow step is executing
    if chunk.task_name != current_step:
        current_step = chunk.task_name
        print(f"\n\n=== {chunk.task_name} ===\n")

    print(chunk.content, end="", flush=True)

result = streaming.result
print(f"\n\nFinal analysis: {result}")

Practical Example: Progress Dashboard

Here’s a complete example showing how to build a progress dashboard with streaming:
Code
import asyncio
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Task
from crewai.types.streaming import StreamChunkType

class ResearchPipeline(Flow):
    stream = True

    @start()
    def gather_data(self):
        researcher = Agent(
            role="Data Gatherer",
            goal="Collect relevant information",
            backstory="Skilled at finding quality sources",
        )

        task = Task(
            description="Gather data on renewable energy trends",
            expected_output="Collection of relevant data points",
            agent=researcher,
        )

        crew = Crew(agents=[researcher], tasks=[task])
        result = crew.kickoff()
        self.state["data"] = result.raw
        return result.raw

    @listen(gather_data)
    def analyze_data(self, data):
        analyst = Agent(
            role="Data Analyst",
            goal="Extract meaningful insights",
            backstory="Expert at data analysis",
        )

        task = Task(
            description="Analyze: {data}",
            expected_output="Key insights and trends",
            agent=analyst,
        )

        crew = Crew(agents=[analyst], tasks=[task])
        return crew.kickoff(inputs={"data": data})


async def run_with_dashboard():
    flow = ResearchPipeline()

    print("="*60)
    print("RESEARCH PIPELINE DASHBOARD")
    print("="*60)

    streaming = await flow.kickoff_async()

    current_agent = ""
    current_task = ""
    chunk_count = 0

    async for chunk in streaming:
        chunk_count += 1

        # Display phase transitions
        if chunk.task_name != current_task:
            current_task = chunk.task_name
            current_agent = chunk.agent_role
            print(f"\n\n📋 Phase: {current_task}")
            print(f"👤 Agent: {current_agent}")
            print("-" * 60)

        # Display text output
        if chunk.chunk_type == StreamChunkType.TEXT:
            print(chunk.content, end="", flush=True)

        # Display tool usage
        elif chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
            print(f"\n🔧 Tool: {chunk.tool_call.tool_name}")

    # Show completion summary
    result = streaming.result
    print(f"\n\n{'='*60}")
    print("PIPELINE COMPLETE")
    print(f"{'='*60}")
    print(f"Total chunks: {chunk_count}")
    print(f"Final output length: {len(str(result))} characters")

asyncio.run(run_with_dashboard())

Streaming with State Management

Streaming works naturally with Flow state management:
Code
from pydantic import BaseModel

class AnalysisState(BaseModel):
    topic: str = ""
    research: str = ""
    insights: str = ""

class StatefulStreamingFlow(Flow[AnalysisState]):
    stream = True

    @start()
    def research(self):
        # State is available during streaming
        topic = self.state.topic
        print(f"Researching: {topic}")

        researcher = Agent(
            role="Researcher",
            goal="Research topics thoroughly",
            backstory="Expert researcher",
        )

        task = Task(
            description=f"Research {topic}",
            expected_output="Research findings",
            agent=researcher,
        )

        crew = Crew(agents=[researcher], tasks=[task])
        result = crew.kickoff()

        self.state.research = result.raw
        return result.raw

    @listen(research)
    def analyze(self, research):
        # Access updated state
        print(f"Analyzing {len(self.state.research)} chars of research")

        analyst = Agent(
            role="Analyst",
            goal="Extract insights",
            backstory="Expert analyst",
        )

        task = Task(
            description="Analyze: {research}",
            expected_output="Key insights",
            agent=analyst,
        )

        crew = Crew(agents=[analyst], tasks=[task])
        result = crew.kickoff(inputs={"research": research})

        self.state.insights = result.raw
        return result.raw


# Run with streaming
flow = StatefulStreamingFlow()
streaming = flow.kickoff(inputs={"topic": "quantum computing"})

for chunk in streaming:
    print(chunk.content, end="", flush=True)

result = streaming.result
print(f"\n\nFinal state:")
print(f"Topic: {flow.state.topic}")
print(f"Research length: {len(flow.state.research)}")
print(f"Insights length: {len(flow.state.insights)}")

Use Cases

Flow streaming is particularly valuable for:
  • Multi-Stage Workflows: Show progress across research, analysis, and synthesis phases
  • Complex Pipelines: Provide visibility into long-running data processing flows
  • Interactive Applications: Build responsive UIs that display intermediate results
  • Monitoring and Debugging: Observe flow execution and crew interactions in real-time
  • Progress Tracking: Show users which stage of the workflow is currently executing
  • Live Dashboards: Create monitoring interfaces for production flows

Stream Chunk Types

Like crew streaming, flow chunks can be of different types:

TEXT Chunks

Standard text content from LLM responses:
Code
for chunk in streaming:
    if chunk.chunk_type == StreamChunkType.TEXT:
        print(chunk.content, end="", flush=True)

TOOL_CALL Chunks

Information about tool calls within the flow:
Code
for chunk in streaming:
    if chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
        print(f"\nTool: {chunk.tool_call.tool_name}")
        print(f"Args: {chunk.tool_call.arguments}")

Error Handling

Handle errors gracefully during streaming:
Code
flow = ResearchFlow()
streaming = flow.kickoff()

try:
    for chunk in streaming:
        print(chunk.content, end="", flush=True)

    result = streaming.result
    print(f"\nSuccess! Result: {result}")

except Exception as e:
    print(f"\nError during flow execution: {e}")
    if streaming.is_completed:
        print("Streaming completed but flow encountered an error")

Important Notes

  • Streaming automatically enables LLM streaming for any crews used within the flow
  • You must iterate through all chunks before accessing the .result property
  • Streaming works with both structured and unstructured flow state
  • Flow streaming captures output from all crews and LLM calls in the flow
  • Each chunk includes context about which agent and task generated it
  • Streaming adds minimal overhead to flow execution

Combining with Flow Visualization

You can combine streaming with flow visualization to provide a complete picture:
Code
# Generate flow visualization
flow = ResearchFlow()
flow.plot("research_flow")  # Creates HTML visualization

# Run with streaming
streaming = flow.kickoff()
for chunk in streaming:
    print(chunk.content, end="", flush=True)

result = streaming.result
print(f"\nFlow complete! View structure at: research_flow.html")
By leveraging flow streaming, you can build sophisticated, responsive applications that provide users with real-time visibility into complex multi-stage workflows, making your AI automations more transparent and engaging.