Skip to main content

Introduction

CrewAI provides the ability to stream real-time output during crew execution, allowing you to display results as they’re generated rather than waiting for the entire process to complete. This feature is particularly useful for building interactive applications, providing user feedback, and monitoring long-running processes.

How Streaming Works

When streaming is enabled, CrewAI captures LLM responses and tool calls as they happen, packaging them into structured chunks that include context about which task and agent is executing. You can iterate over these chunks in real-time and access the final result once execution completes.

Enabling Streaming

To enable streaming, set the stream parameter to True when creating your crew:
Code
from crewai import Agent, Crew, Task

# Create your agents and tasks
researcher = Agent(
    role="Research Analyst",
    goal="Gather comprehensive information on topics",
    backstory="You are an experienced researcher with excellent analytical skills.",
)

task = Task(
    description="Research the latest developments in AI",
    expected_output="A detailed report on recent AI advancements",
    agent=researcher,
)

# Enable streaming
crew = Crew(
    agents=[researcher],
    tasks=[task],
    stream=True  # Enable streaming output
)

Synchronous Streaming

When you call kickoff() on a crew with streaming enabled, it returns a CrewStreamingOutput object that you can iterate over to receive chunks as they arrive:
Code
# Start streaming execution
streaming = crew.kickoff(inputs={"topic": "artificial intelligence"})

# Iterate over chunks as they arrive
for chunk in streaming:
    print(chunk.content, end="", flush=True)

# Access the final result after streaming completes
result = streaming.result
print(f"\n\nFinal output: {result.raw}")

Stream Chunk Information

Each chunk provides rich context about the execution:
Code
streaming = crew.kickoff(inputs={"topic": "AI"})

for chunk in streaming:
    print(f"Task: {chunk.task_name} (index {chunk.task_index})")
    print(f"Agent: {chunk.agent_role}")
    print(f"Content: {chunk.content}")
    print(f"Type: {chunk.chunk_type}")  # TEXT or TOOL_CALL
    if chunk.tool_call:
        print(f"Tool: {chunk.tool_call.tool_name}")
        print(f"Arguments: {chunk.tool_call.arguments}")

Accessing Streaming Results

The CrewStreamingOutput object provides several useful properties:
Code
streaming = crew.kickoff(inputs={"topic": "AI"})

# Iterate and collect chunks
for chunk in streaming:
    print(chunk.content, end="", flush=True)

# After iteration completes
print(f"\nCompleted: {streaming.is_completed}")
print(f"Full text: {streaming.get_full_text()}")
print(f"All chunks: {len(streaming.chunks)}")
print(f"Final result: {streaming.result.raw}")

Asynchronous Streaming

For async applications, use kickoff_async() with async iteration:
Code
import asyncio

async def stream_crew():
    crew = Crew(
        agents=[researcher],
        tasks=[task],
        stream=True
    )

    # Start async streaming
    streaming = await crew.kickoff_async(inputs={"topic": "AI"})

    # Async iteration over chunks
    async for chunk in streaming:
        print(chunk.content, end="", flush=True)

    # Access final result
    result = streaming.result
    print(f"\n\nFinal output: {result.raw}")

asyncio.run(stream_crew())

Streaming with kickoff_for_each

When executing a crew for multiple inputs with kickoff_for_each(), streaming works differently depending on whether you use sync or async:

Synchronous kickoff_for_each

With synchronous kickoff_for_each(), you get a list of CrewStreamingOutput objects, one for each input:
Code
crew = Crew(
    agents=[researcher],
    tasks=[task],
    stream=True
)

inputs_list = [
    {"topic": "AI in healthcare"},
    {"topic": "AI in finance"}
]

# Returns list of streaming outputs
streaming_outputs = crew.kickoff_for_each(inputs=inputs_list)

# Iterate over each streaming output
for i, streaming in enumerate(streaming_outputs):
    print(f"\n=== Input {i + 1} ===")
    for chunk in streaming:
        print(chunk.content, end="", flush=True)

    result = streaming.result
    print(f"\n\nResult {i + 1}: {result.raw}")

Asynchronous kickoff_for_each_async

With async kickoff_for_each_async(), you get a single CrewStreamingOutput that yields chunks from all crews as they arrive concurrently:
Code
import asyncio

async def stream_multiple_crews():
    crew = Crew(
        agents=[researcher],
        tasks=[task],
        stream=True
    )

    inputs_list = [
        {"topic": "AI in healthcare"},
        {"topic": "AI in finance"}
    ]

    # Returns single streaming output for all crews
    streaming = await crew.kickoff_for_each_async(inputs=inputs_list)

    # Chunks from all crews arrive as they're generated
    async for chunk in streaming:
        print(f"[{chunk.task_name}] {chunk.content}", end="", flush=True)

    # Access all results
    results = streaming.results  # List of CrewOutput objects
    for i, result in enumerate(results):
        print(f"\n\nResult {i + 1}: {result.raw}")

asyncio.run(stream_multiple_crews())

Stream Chunk Types

Chunks can be of different types, indicated by the chunk_type field:

TEXT Chunks

Standard text content from LLM responses:
Code
for chunk in streaming:
    if chunk.chunk_type == StreamChunkType.TEXT:
        print(chunk.content, end="", flush=True)

TOOL_CALL Chunks

Information about tool calls being made:
Code
for chunk in streaming:
    if chunk.chunk_type == StreamChunkType.TOOL_CALL:
        print(f"\nCalling tool: {chunk.tool_call.tool_name}")
        print(f"Arguments: {chunk.tool_call.arguments}")

Practical Example: Building a UI with Streaming

Here’s a complete example showing how to build an interactive application with streaming:
Code
import asyncio
from crewai import Agent, Crew, Task
from crewai.types.streaming import StreamChunkType

async def interactive_research():
    # Create crew with streaming enabled
    researcher = Agent(
        role="Research Analyst",
        goal="Provide detailed analysis on any topic",
        backstory="You are an expert researcher with broad knowledge.",
    )

    task = Task(
        description="Research and analyze: {topic}",
        expected_output="A comprehensive analysis with key insights",
        agent=researcher,
    )

    crew = Crew(
        agents=[researcher],
        tasks=[task],
        stream=True,
        verbose=False
    )

    # Get user input
    topic = input("Enter a topic to research: ")

    print(f"\n{'='*60}")
    print(f"Researching: {topic}")
    print(f"{'='*60}\n")

    # Start streaming execution
    streaming = await crew.kickoff_async(inputs={"topic": topic})

    current_task = ""
    async for chunk in streaming:
        # Show task transitions
        if chunk.task_name != current_task:
            current_task = chunk.task_name
            print(f"\n[{chunk.agent_role}] Working on: {chunk.task_name}")
            print("-" * 60)

        # Display text chunks
        if chunk.chunk_type == StreamChunkType.TEXT:
            print(chunk.content, end="", flush=True)

        # Display tool calls
        elif chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
            print(f"\n🔧 Using tool: {chunk.tool_call.tool_name}")

    # Show final result
    result = streaming.result
    print(f"\n\n{'='*60}")
    print("Analysis Complete!")
    print(f"{'='*60}")
    print(f"\nToken Usage: {result.token_usage}")

asyncio.run(interactive_research())

Use Cases

Streaming is particularly valuable for:
  • Interactive Applications: Provide real-time feedback to users as agents work
  • Long-Running Tasks: Show progress for research, analysis, or content generation
  • Debugging and Monitoring: Observe agent behavior and decision-making in real-time
  • User Experience: Reduce perceived latency by showing incremental results
  • Live Dashboards: Build monitoring interfaces that display crew execution status

Important Notes

  • Streaming automatically enables LLM streaming for all agents in the crew
  • You must iterate through all chunks before accessing the .result property
  • For kickoff_for_each_async() with streaming, use .results (plural) to get all outputs
  • Streaming adds minimal overhead and can actually improve perceived performance
  • Each chunk includes full context (task, agent, chunk type) for rich UIs

Error Handling

Handle errors during streaming execution:
Code
streaming = crew.kickoff(inputs={"topic": "AI"})

try:
    for chunk in streaming:
        print(chunk.content, end="", flush=True)

    result = streaming.result
    print(f"\nSuccess: {result.raw}")

except Exception as e:
    print(f"\nError during streaming: {e}")
    if streaming.is_completed:
        print("Streaming completed but an error occurred")
By leveraging streaming, you can build more responsive and interactive applications with CrewAI, providing users with real-time visibility into agent execution and results.