> ## Documentation Index
> Fetch the complete documentation index at: https://docs.crewai.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Streaming Flow Execution

> Stream real-time output from your CrewAI flow execution

## Introduction

CrewAI Flows support streaming output, allowing you to receive real-time updates as your flow executes. This feature enables you to build responsive applications that display results incrementally, provide live progress updates, and create better user experiences for long-running workflows.

## How Flow Streaming Works

When streaming is enabled on a Flow, CrewAI captures and streams output from any crews, LLM calls, tools, and lifecycle events within the flow. The stream delivers ordered `StreamFrame` items with printable content plus structured event data as execution progresses.

## Enabling Streaming

To enable streaming, set the `stream` attribute to `True` on your Flow class:

```python Code theme={null}
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Task

class ResearchFlow(Flow):
    stream = True  # Enable streaming for the entire flow

    @start()
    def initialize(self):
        return {"topic": "AI trends"}

    @listen(initialize)
    def research_topic(self, data):
        researcher = Agent(
            role="Research Analyst",
            goal="Research topics thoroughly",
            backstory="Expert researcher with analytical skills",
        )

        task = Task(
            description="Research {topic} and provide insights",
            expected_output="Detailed research findings",
            agent=researcher,
        )

        crew = Crew(
            agents=[researcher],
            tasks=[task],
        )

        return crew.kickoff(inputs=data)
```

## Synchronous Streaming

When you call `kickoff()` on a flow with streaming enabled, it returns a stream session that yields ordered `StreamFrame` items:

```python Code theme={null}
flow = ResearchFlow()

# Start streaming execution
streaming = flow.kickoff()

# Iterate over stream items as they arrive
for item in streaming:
    print(item.content, end="", flush=True)

# Access the final result after streaming completes
result = streaming.result
print(f"\n\nFinal output: {result}")
```

### Stream Item Information

Each item provides both printable content and structured event data:

```python Code theme={null}
streaming = flow.kickoff()

for item in streaming:
    print(f"Channel: {item.channel}")
    print(f"Type: {item.type}")
    print(f"Content: {item.content}")
    print(f"Event payload: {item.event}")
```

### Accessing Streaming Properties

The stream session provides useful properties and methods:

```python Code theme={null}
streaming = flow.kickoff()

# Iterate and collect items
for item in streaming:
    print(item.content, end="", flush=True)

# After iteration completes
print(f"\nCompleted: {streaming.is_completed}")
print(f"Total frames: {len(streaming.frames)}")
print(f"Final result: {streaming.result}")
```

## Asynchronous Streaming

For async applications, use `kickoff_async()` with async iteration:

```python Code theme={null}
import asyncio

async def stream_flow():
    flow = ResearchFlow()

    # Start async streaming
    streaming = await flow.kickoff_async()

    # Async iteration over stream items
    async for item in streaming:
        print(item.content, end="", flush=True)

    # Access final result
    result = streaming.result
    print(f"\n\nFinal output: {result}")

asyncio.run(stream_flow())
```

## Streaming with Multi-Step Flows

Streaming works seamlessly across multiple flow steps, including flows that execute multiple crews:

```python Code theme={null}
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Task

class MultiStepFlow(Flow):
    stream = True

    @start()
    def research_phase(self):
        """First crew: Research the topic."""
        researcher = Agent(
            role="Research Analyst",
            goal="Gather comprehensive information",
            backstory="Expert at finding relevant information",
        )

        task = Task(
            description="Research AI developments in healthcare",
            expected_output="Research findings on AI in healthcare",
            agent=researcher,
        )

        crew = Crew(agents=[researcher], tasks=[task])
        result = crew.kickoff()

        self.state["research"] = result.raw
        return result.raw

    @listen(research_phase)
    def analysis_phase(self, research_data):
        """Second crew: Analyze the research."""
        analyst = Agent(
            role="Data Analyst",
            goal="Analyze information and extract insights",
            backstory="Expert at identifying patterns and trends",
        )

        task = Task(
            description="Analyze this research: {research}",
            expected_output="Key insights and trends",
            agent=analyst,
        )

        crew = Crew(agents=[analyst], tasks=[task])
        return crew.kickoff(inputs={"research": research_data})


# Stream across both phases
flow = MultiStepFlow()
streaming = flow.kickoff()

current_step = ""
for item in streaming:
    # Track which flow step is executing
    step_name = item.event.get("method_name") or item.event.get("task_name")
    if step_name and step_name != current_step:
        current_step = step_name
        print(f"\n\n=== {step_name} ===\n")

    print(item.content, end="", flush=True)

result = streaming.result
print(f"\n\nFinal analysis: {result}")
```

## Practical Example: Progress Dashboard

Here's a complete example showing how to build a progress dashboard with streaming:

```python Code theme={null}
import asyncio
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Task

class ResearchPipeline(Flow):
    stream = True

    @start()
    def gather_data(self):
        researcher = Agent(
            role="Data Gatherer",
            goal="Collect relevant information",
            backstory="Skilled at finding quality sources",
        )

        task = Task(
            description="Gather data on renewable energy trends",
            expected_output="Collection of relevant data points",
            agent=researcher,
        )

        crew = Crew(agents=[researcher], tasks=[task])
        result = crew.kickoff()
        self.state["data"] = result.raw
        return result.raw

    @listen(gather_data)
    def analyze_data(self, data):
        analyst = Agent(
            role="Data Analyst",
            goal="Extract meaningful insights",
            backstory="Expert at data analysis",
        )

        task = Task(
            description="Analyze: {data}",
            expected_output="Key insights and trends",
            agent=analyst,
        )

        crew = Crew(agents=[analyst], tasks=[task])
        return crew.kickoff(inputs={"data": data})


async def run_with_dashboard():
    flow = ResearchPipeline()

    print("="*60)
    print("RESEARCH PIPELINE DASHBOARD")
    print("="*60)

    streaming = await flow.kickoff_async()

    current_agent = ""
    current_task = ""
    frame_count = 0

    async for item in streaming:
        frame_count += 1

        # Display phase transitions
        task_name = item.event.get("task_name", "")
        agent_role = item.event.get("agent_role", "")
        if task_name and task_name != current_task:
            current_task = task_name
            current_agent = agent_role
            print(f"\n\n📋 Phase: {current_task}")
            print(f"👤 Agent: {current_agent}")
            print("-" * 60)

        # Display text output
        if item.content:
            print(item.content, end="", flush=True)

        # Display tool usage
        elif item.channel == "tools":
            print(f"\n🔧 Tool event: {item.type}")

    # Show completion summary
    result = streaming.result
    print(f"\n\n{'='*60}")
    print("PIPELINE COMPLETE")
    print(f"{'='*60}")
    print(f"Total frames: {frame_count}")
    print(f"Final output length: {len(str(result))} characters")

asyncio.run(run_with_dashboard())
```

## Streaming with State Management

Streaming works naturally with Flow state management:

```python Code theme={null}
from pydantic import BaseModel

class AnalysisState(BaseModel):
    topic: str = ""
    research: str = ""
    insights: str = ""

class StatefulStreamingFlow(Flow[AnalysisState]):
    stream = True

    @start()
    def research(self):
        # State is available during streaming
        topic = self.state.topic
        print(f"Researching: {topic}")

        researcher = Agent(
            role="Researcher",
            goal="Research topics thoroughly",
            backstory="Expert researcher",
        )

        task = Task(
            description=f"Research {topic}",
            expected_output="Research findings",
            agent=researcher,
        )

        crew = Crew(agents=[researcher], tasks=[task])
        result = crew.kickoff()

        self.state.research = result.raw
        return result.raw

    @listen(research)
    def analyze(self, research):
        # Access updated state
        print(f"Analyzing {len(self.state.research)} chars of research")

        analyst = Agent(
            role="Analyst",
            goal="Extract insights",
            backstory="Expert analyst",
        )

        task = Task(
            description="Analyze: {research}",
            expected_output="Key insights",
            agent=analyst,
        )

        crew = Crew(agents=[analyst], tasks=[task])
        result = crew.kickoff(inputs={"research": research})

        self.state.insights = result.raw
        return result.raw


# Run with streaming
flow = StatefulStreamingFlow()
streaming = flow.kickoff(inputs={"topic": "quantum computing"})

for item in streaming:
    print(item.content, end="", flush=True)

result = streaming.result
print(f"\n\nFinal state:")
print(f"Topic: {flow.state.topic}")
print(f"Research length: {len(flow.state.research)}")
print(f"Insights length: {len(flow.state.insights)}")
```

## Use Cases

Flow streaming is particularly valuable for:

* **Multi-Stage Workflows**: Show progress across research, analysis, and synthesis phases
* **Complex Pipelines**: Provide visibility into long-running data processing flows
* **Interactive Applications**: Build responsive UIs that display intermediate results
* **Monitoring and Debugging**: Observe flow execution and crew interactions in real-time
* **Progress Tracking**: Show users which stage of the workflow is currently executing
* **Live Dashboards**: Create monitoring interfaces for production flows

## Stream Frame Channels

Flow streaming yields `StreamFrame` items across several channels:

### LLM Frames

Standard text content from LLM responses:

```python Code theme={null}
for item in streaming:
    if item.channel == "llm" and item.content:
        print(item.content, end="", flush=True)
```

### Tool Frames

Information about tool calls within the flow:

```python Code theme={null}
for item in streaming:
    if item.channel == "tools":
        print(f"\nTool event: {item.type}")
        print(f"Payload: {item.event}")
```

## Error Handling

Handle errors gracefully during streaming:

```python Code theme={null}
flow = ResearchFlow()
streaming = flow.kickoff()

try:
    for item in streaming:
        print(item.content, end="", flush=True)

    result = streaming.result
    print(f"\nSuccess! Result: {result}")

except Exception as e:
    print(f"\nError during flow execution: {e}")
    if streaming.is_completed:
        print("Streaming completed but flow encountered an error")
```

## Cancellation and Resource Cleanup

The stream session supports graceful cancellation so that in-flight work stops promptly when the consumer disconnects.

### Async Context Manager

```python Code theme={null}
streaming = await flow.kickoff_async()

async with streaming:
    async for item in streaming:
        print(item.content, end="", flush=True)
```

### Explicit Cancellation

```python Code theme={null}
streaming = await flow.kickoff_async()
try:
    async for item in streaming:
        print(item.content, end="", flush=True)
finally:
    await streaming.aclose()  # async
    # streaming.close()       # sync equivalent
```

After cancellation, `streaming.is_cancelled` and `streaming.is_completed` are both `True`. Both `aclose()` and `close()` are idempotent.

## Important Notes

* Streaming automatically enables LLM streaming for any crews used within the flow
* You must iterate through all stream items before accessing the `.result` property
* Streaming works with both structured and unstructured flow state
* Flow streaming captures output from all crews and LLM calls in the flow
* Each frame includes structured event context such as channel, type, namespace, and payload
* Streaming adds minimal overhead to flow execution

## Combining with Flow Visualization

You can combine streaming with flow visualization to provide a complete picture:

```python Code theme={null}
# Generate flow visualization
flow = ResearchFlow()
flow.plot("research_flow")  # Creates HTML visualization

# Run with streaming
streaming = flow.kickoff()
for item in streaming:
    print(item.content, end="", flush=True)

result = streaming.result
print(f"\nFlow complete! View structure at: research_flow.html")
```

By leveraging flow streaming, you can build sophisticated, responsive applications that provide users with real-time visibility into complex multi-stage workflows, making your AI automations more transparent and engaging.
