from crewai import Agent, Task, Crew, Process
from crewai_tools import InvokeCrewAIAutomationTool
# Initialize different automation tools
data_collection_tool = InvokeCrewAIAutomationTool(
crew_api_url="https://data-collection-crew-[...].crewai.com",
crew_bearer_token="your_bearer_token_here",
crew_name="Data Collection Automation",
crew_description="Collects and preprocesses raw data"
)
analysis_tool = InvokeCrewAIAutomationTool(
crew_api_url="https://analysis-crew-[...].crewai.com",
crew_bearer_token="your_bearer_token_here",
crew_name="Analysis Automation",
crew_description="Performs advanced data analysis and modeling"
)
reporting_tool = InvokeCrewAIAutomationTool(
crew_api_url="https://reporting-crew-[...].crewai.com",
crew_bearer_token="your_bearer_token_here",
crew_name="Reporting Automation",
crew_description="Generates comprehensive reports and visualizations"
)
# Create specialized agents
data_collector = Agent(
role='Data Collection Specialist',
goal='Gather and preprocess data from various sources',
backstory='I specialize in collecting and cleaning data from multiple sources.',
tools=[data_collection_tool]
)
data_analyst = Agent(
role='Data Analysis Expert',
goal='Perform advanced analysis on collected data',
backstory='I am an expert in statistical analysis and machine learning.',
tools=[analysis_tool]
)
report_generator = Agent(
role='Report Generation Specialist',
goal='Create comprehensive reports and visualizations',
backstory='I excel at creating clear, actionable reports from complex data.',
tools=[reporting_tool]
)
# Create sequential tasks
collection_task = Task(
description="Collect market data for Q4 2024 analysis",
agent=data_collector
)
analysis_task = Task(
description="Analyze collected data to identify trends and patterns",
agent=data_analyst
)
reporting_task = Task(
description="Generate executive summary report with key insights and recommendations",
agent=report_generator
)
# Create a crew with sequential processing
crew = Crew(
agents=[data_collector, data_analyst, report_generator],
tasks=[collection_task, analysis_task, reporting_task],
process=Process.sequential,
verbose=2
)
result = crew.kickoff()