Skip to main content

Advanced Examples

This section contains complex, real-world examples of building AI agents with Airtrain. These examples demonstrate how to combine multiple Airtrain features to create sophisticated applications.

Customer Support Agent

This example builds a complete customer support agent that can answer questions, look up order information, and escalate issues when necessary.

import json
from typing import List, Optional, Dict, Any
from datetime import datetime

from airtrain.core import Agent, Tool
from airtrain.models import OpenAIModel
from airtrain.core.schemas import InputSchema, OutputSchema
from airtrain.core.memory import VectorMemory
from airtrain.core.skills import TextGenerationSkill
from pydantic import BaseModel, Field

# Define schemas for order lookup tool
class OrderLookupInput(InputSchema):
order_id: str = Field(description="Order ID to look up")

class OrderItem(BaseModel):
product_id: str
name: str
quantity: int
price: float

class OrderDetails(OutputSchema):
order_id: str
customer_name: str
order_date: str
status: str
items: List[OrderItem]
total: float
shipping_address: str
tracking_number: Optional[str] = None
estimated_delivery: Optional[str] = None

# Simulated order database
ORDER_DB = {
"ORD-12345": {
"order_id": "ORD-12345",
"customer_name": "Jane Smith",
"order_date": "2024-03-15",
"status": "shipped",
"items": [
{"product_id": "P123", "name": "Wireless Headphones", "quantity": 1, "price": 89.99},
{"product_id": "P456", "name": "Phone Case", "quantity": 2, "price": 19.99}
],
"total": 129.97,
"shipping_address": "123 Main St, Anytown, USA",
"tracking_number": "TRK-987654",
"estimated_delivery": "2024-03-20"
},
"ORD-67890": {
"order_id": "ORD-67890",
"customer_name": "John Doe",
"order_date": "2024-03-18",
"status": "processing",
"items": [
{"product_id": "P789", "name": "Smart Watch", "quantity": 1, "price": 249.99}
],
"total": 249.99,
"shipping_address": "456 Oak Ave, Othertown, USA"
}
}

# Define schemas for ticket creation tool
class TicketCreationInput(InputSchema):
customer_name: str
customer_email: str
issue_type: str = Field(description="Type of issue (e.g., refund, technical, order)")
priority: int = Field(ge=1, le=5, default=3, description="Priority level, 1-5 where 1 is highest")
description: str = Field(description="Detailed description of the issue")
order_id: Optional[str] = Field(default=None, description="Related order ID if applicable")

class TicketCreationOutput(OutputSchema):
ticket_id: str
status: str
created_at: str
assigned_to: str
estimated_response_time: str

# Knowledge base data (example product FAQs)
PRODUCT_FAQS = [
{
"product": "Wireless Headphones",
"question": "How do I pair my headphones with my device?",
"answer": "Turn on Bluetooth on your device. Press and hold the power button on the headphones for 5 seconds until the LED flashes blue and red. Select the headphones from your device's Bluetooth menu."
},
{
"product": "Wireless Headphones",
"question": "How long does the battery last?",
"answer": "The battery lasts approximately 20 hours on a full charge. Charging time is about 2 hours."
},
{
"product": "Smart Watch",
"question": "Is the Smart Watch waterproof?",
"answer": "The Smart Watch is water-resistant up to 50 meters (5 ATM), suitable for swimming in shallow water but not for diving or high-pressure water activities."
},
{
"product": "Smart Watch",
"question": "How do I update the firmware?",
"answer": "Open the companion app on your phone, go to Settings > Device > Firmware Update, and follow the on-screen instructions."
}
]

# Tool function implementations
def lookup_order(order_id: str) -> OrderDetails:
"""Look up order information in the database"""
if order_id in ORDER_DB:
order_data = ORDER_DB[order_id]
return OrderDetails(**order_data)
else:
return OrderDetails(
order_id=order_id,
customer_name="Unknown",
order_date=datetime.now().strftime("%Y-%m-%d"),
status="not_found",
items=[],
total=0.0,
shipping_address=""
)

def create_support_ticket(
customer_name: str,
customer_email: str,
issue_type: str,
priority: int,
description: str,
order_id: Optional[str] = None
) -> TicketCreationOutput:
"""Create a support ticket in the system"""
# In a real implementation, this would call a CRM API
ticket_id = f"TKT-{hash(description + customer_email) % 100000:05d}"

# Assign based on issue type
assignments = {
"refund": "Finance Team",
"technical": "Tech Support",
"order": "Order Support",
}
assigned_to = assignments.get(issue_type.lower(), "Customer Support")

# Estimate response time based on priority
response_times = {
1: "1 hour",
2: "4 hours",
3: "24 hours",
4: "48 hours",
5: "72 hours"
}

return TicketCreationOutput(
ticket_id=ticket_id,
status="open",
created_at=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
assigned_to=assigned_to,
estimated_response_time=response_times[priority]
)

# Create tools
order_lookup_tool = Tool(
name="lookup_order",
function=lookup_order,
input_schema=OrderLookupInput,
output_schema=OrderDetails,
description="Look up order details using an order ID"
)

ticket_creation_tool = Tool(
name="create_support_ticket",
function=create_support_ticket,
input_schema=TicketCreationInput,
output_schema=TicketCreationOutput,
description="Create a support ticket for a customer issue that cannot be resolved immediately"
)

# Set up vector memory for FAQs
memory = VectorMemory(embedding_model="text-embedding-3-large")

# Load FAQs into memory
for faq in PRODUCT_FAQS:
memory.store_text(
f"{faq['product']}-{faq['question']}",
f"Question: {faq['question']}\nProduct: {faq['product']}\nAnswer: {faq['answer']}"
)

# Create the customer support agent
support_agent = Agent(
name="customer_support",
model=OpenAIModel("gpt-4-turbo"),
tools=[order_lookup_tool, ticket_creation_tool],
skills=[TextGenerationSkill()],
memory=memory
)

# System message for the agent
system_message = """
You are a customer support agent for an electronics company.
You can:
1. Answer general questions about products using your knowledge
2. Look up order information for customers
3. Create support tickets for issues you cannot resolve

Always be polite and professional. If you need order details, ask for the order ID.
If you cannot resolve an issue, create a support ticket and provide the ticket ID to the customer.
"""

# Example usage
def handle_customer_query(query: str, customer_info: Dict[str, Any] = None):
"""Handle a customer support query"""
customer_info = customer_info or {}

# Prepare context with customer info
context = ""
if customer_info:
context = f"Customer Info:\nName: {customer_info.get('name', 'Unknown')}\nEmail: {customer_info.get('email', 'Unknown')}\n\n"

# Process query with agent
full_query = context + query

# First check memory for relevant FAQs
if "how" in query.lower() or "?" in query:
similar_faqs = memory.search(query, limit=2)
if similar_faqs:
# Add relevant FAQs to context
context += "Relevant information:\n"
for faq in similar_faqs:
context += f"{faq}\n\n"

response = support_agent.process(
f"{context}\nCustomer query: {query}",
system_message=system_message
)

return response.content

# Example calls
print(handle_customer_query("How do I pair my wireless headphones?"))
print(handle_customer_query("What's the status of my order ORD-12345?"))
print(handle_customer_query(
"I still haven't received my Smart Watch and it's been 2 weeks",
{"name": "John Doe", "email": "john.doe@example.com"}
))

Content Generation Agent

This example creates a content generation system that can research topics, generate blog posts, and optimize content for SEO.

import re
import time
from typing import List, Optional, Dict, Any
from datetime import datetime

from airtrain.core import Agent, Tool
from airtrain.models import AnthropicModel, OpenAIModel
from airtrain.core.schemas import InputSchema, OutputSchema
from airtrain.core.memory import VectorMemory
from airtrain.core.skills import TextGenerationSkill
from airtrain.utils.rate_limiters import ExponentialBackoffLimiter
from pydantic import BaseModel, Field

# Define schemas for research tool
class ResearchInput(InputSchema):
topic: str = Field(description="Topic to research")
focus_areas: List[str] = Field(description="Specific areas to focus on", default_factory=list)
depth: str = Field(description="Research depth (brief, standard, comprehensive)", default="standard")

class ResearchPoint(BaseModel):
title: str
summary: str
source: Optional[str] = None

class ResearchOutput(OutputSchema):
main_points: List[ResearchPoint]
related_topics: List[str]
sources: List[str]

# Define schemas for content generation
class ContentGenerationInput(InputSchema):
topic: str
content_type: str = Field(description="Type of content (blog, article, social post)")
target_audience: str
tone: str = Field(description="Tone of content (formal, conversational, technical)")
length: str = Field(description="Content length (short, medium, long)")
key_points: Optional[List[str]] = None

class ContentOutput(OutputSchema):
title: str
content: str
meta_description: str
word_count: int
estimated_reading_time: str

# Define schemas for SEO optimization
class SEOInput(InputSchema):
content: str
primary_keyword: str
secondary_keywords: List[str]

class SEOOutput(OutputSchema):
optimized_content: str
keyword_density: Dict[str, float]
suggestions: List[str]
seo_score: int

# Simulated web search for research
def research_topic(
topic: str,
focus_areas: List[str] = None,
depth: str = "standard"
) -> ResearchOutput:
"""Simulate researching a topic online"""
# In a real implementation, this would call a search API
focus_areas = focus_areas or []

# Simulate API call delay
time.sleep(1)

# Build simulated research results based on topic
main_points = [
ResearchPoint(
title=f"Overview of {topic}",
summary=f"General information about {topic} including its history and significance.",
source="wikipedia.org"
),
ResearchPoint(
title=f"Recent developments in {topic}",
summary=f"Latest advancements and innovations in the field of {topic}.",
source="industry-journal.com"
)
]

# Add focus area specific points
for area in focus_areas:
main_points.append(
ResearchPoint(
title=f"{area} in relation to {topic}",
summary=f"Exploration of how {area} impacts and relates to {topic}.",
source="academic-source.edu"
)
)

# Add depth-specific points
if depth == "comprehensive":
main_points.extend([
ResearchPoint(
title=f"Academic research on {topic}",
summary=f"Peer-reviewed studies and academic perspectives on {topic}.",
source="research-gate.net"
),
ResearchPoint(
title=f"Future outlook for {topic}",
summary=f"Predictions and future directions for {topic} based on current trends.",
source="future-insights.org"
)
])

# Generate related topics
related_topics = [
f"{topic} applications",
f"{topic} history",
f"{topic} vs alternative approaches",
f"Future of {topic}"
]

# Collect sources
sources = list(set(point.source for point in main_points if point.source))

return ResearchOutput(
main_points=main_points,
related_topics=related_topics,
sources=sources
)

# Generate content based on research
def generate_content(
topic: str,
content_type: str,
target_audience: str,
tone: str,
length: str,
key_points: Optional[List[str]] = None
) -> None:
"""
This is a placeholder. In the real implementation, we would
use the Claude model to generate content based on the parameters.
The actual content generation will be handled by the agent using
the Anthropic model.
"""
# This function exists primarily for schema definition
# The actual generation will be done by the AI model
pass

# Optimize content for SEO
def optimize_for_seo(
content: str,
primary_keyword: str,
secondary_keywords: List[str]
) -> SEOOutput:
"""Optimize content for SEO by analyzing keyword density and making suggestions"""
# Check keyword density
word_count = len(content.split())

# Calculate keyword densities
keyword_density = {}
keyword_density[primary_keyword] = content.lower().count(primary_keyword.lower()) / word_count * 100

for keyword in secondary_keywords:
keyword_density[keyword] = content.lower().count(keyword.lower()) / word_count * 100

# Generate suggestions
suggestions = []

# Check if primary keyword is in the first paragraph
first_paragraph = content.split('\n\n')[0] if '\n\n' in content else content
if primary_keyword.lower() not in first_paragraph.lower():
suggestions.append(f"Add the primary keyword '{primary_keyword}' to the first paragraph")

# Check heading structure
if "# " not in content:
suggestions.append("Add a main heading (H1) to the content")

if "## " not in content:
suggestions.append("Consider adding subheadings (H2) to break up content")

# Check keyword density
if keyword_density[primary_keyword] < 0.5:
suggestions.append(f"Increase the density of the primary keyword '{primary_keyword}'")
elif keyword_density[primary_keyword] > 3:
suggestions.append(f"Reduce the density of the primary keyword '{primary_keyword}' to avoid keyword stuffing")

# Check for missing secondary keywords
for keyword in secondary_keywords:
if keyword.lower() not in content.lower():
suggestions.append(f"Add the secondary keyword '{keyword}' to the content")

# Calculate SEO score (simplified)
seo_score = 100
seo_score -= len(suggestions) * 10
seo_score = max(seo_score, 0)

return SEOOutput(
optimized_content=content, # In a real implementation, we would make some optimizations
keyword_density=keyword_density,
suggestions=suggestions,
seo_score=seo_score
)

# Set up the tools
research_tool = Tool(
name="research_topic",
function=research_topic,
input_schema=ResearchInput,
output_schema=ResearchOutput,
description="Research a topic and gather information from the web"
)

seo_tool = Tool(
name="optimize_for_seo",
function=optimize_for_seo,
input_schema=SEOInput,
output_schema=SEOOutput,
description="Analyze and optimize content for SEO"
)

# Create the content creation agent
content_agent = Agent(
name="content_creator",
# Use Claude for long-form content generation
model=AnthropicModel(
model_name="claude-3-opus-20240229",
temperature=0.7,
max_tokens=10000,
rate_limiter=ExponentialBackoffLimiter(
initial_delay=1,
max_delay=30,
max_retries=3
)
),
tools=[research_tool, seo_tool],
skills=[TextGenerationSkill()]
)

# System message for the agent
system_message = """
You are a professional content creator specializing in well-researched, high-quality content.

Your workflow is:
1. Research the given topic thoroughly using the research_topic tool
2. Create well-structured content based on the research and specified parameters
3. Optimize the content for SEO using the optimize_for_seo tool
4. Make any final improvements based on SEO suggestions

For blog posts and articles, include:
- A compelling headline
- Introduction with hook
- Well-structured body with subheadings
- Conclusion with call to action
- Proper formatting including headings, bullet points, and paragraphs

Adapt your tone and style to match the target audience and specified content type.
"""

# Example: Create a blog post
def create_blog_post(topic, target_audience, tone, keywords):
"""Create a complete blog post with research and SEO optimization"""
print(f"Creating a blog post about: {topic}")
print("Researching topic...")

# Process with the agent
prompt = f"""
Create a blog post about {topic}.
Target audience: {target_audience}
Tone: {tone}
Primary keyword: {keywords[0]}
Secondary keywords: {', '.join(keywords[1:])}

First research the topic thoroughly, then write a well-structured blog post,
and finally optimize it for SEO.
"""

response = content_agent.process(
prompt,
system_message=system_message
)

return response.content

# Example usage
blog_post = create_blog_post(
topic="AI Agents in Business Automation",
target_audience="Business professionals and IT managers",
tone="Professional but accessible",
keywords=["AI agents", "business automation", "workflow optimization", "AI implementation", "productivity"]
)

print(blog_post)

Multi-Agent System

This example creates a multi-agent system where different specialized agents collaborate to solve complex problems.

import uuid
from typing import List, Dict, Any, Optional, Union
from datetime import datetime
import json

from airtrain.core import Agent, Tool
from airtrain.models import OpenAIModel, AnthropicModel
from airtrain.core.schemas import InputSchema, OutputSchema
from airtrain.core.memory import SimpleMemory
from airtrain.core.skills import TextGenerationSkill
from airtrain.core.execution import ExecutionGraph, ExecutionNode, ConditionNode
from pydantic import BaseModel, Field

# Define schemas for agent communication
class AgentMessage(BaseModel):
from_agent: str
to_agent: str
message_type: str
content: Any
timestamp: str = Field(default_factory=lambda: datetime.now().isoformat())
message_id: str = Field(default_factory=lambda: str(uuid.uuid4()))

class AgentRequest(InputSchema):
request_type: str
data: Dict[str, Any]
urgency: str = "normal"

class AgentResponse(OutputSchema):
status: str
data: Dict[str, Any]
errors: Optional[List[str]] = None

# Message queue for agent communication
class MessageQueue:
def __init__(self):
self.messages = []

def send(self, message: AgentMessage):
self.messages.append(message)

def get_messages_for(self, agent_name: str) -> List[AgentMessage]:
return [msg for msg in self.messages if msg.to_agent == agent_name]

def mark_processed(self, message_id: str):
self.messages = [msg for msg in self.messages if msg.message_id != message_id]

# Create a shared message queue
message_queue = MessageQueue()

# Create memories for each agent
research_memory = SimpleMemory()
writing_memory = SimpleMemory()
editing_memory = SimpleMemory()
coordination_memory = SimpleMemory()

# Define tool functions for inter-agent communication
def send_message(
from_agent: str,
to_agent: str,
message_type: str,
content: Dict[str, Any]
) -> Dict[str, Any]:
"""Send a message to another agent"""
message = AgentMessage(
from_agent=from_agent,
to_agent=to_agent,
message_type=message_type,
content=content
)
message_queue.send(message)
return {"status": "sent", "message_id": message.message_id}

def get_messages(agent_name: str) -> Dict[str, Any]:
"""Get all messages for an agent"""
messages = message_queue.get_messages_for(agent_name)
return {"messages": [msg.dict() for msg in messages]}

def process_message(message_id: str) -> Dict[str, Any]:
"""Mark a message as processed"""
message_queue.mark_processed(message_id)
return {"status": "processed", "message_id": message_id}

# Create communication tools for each agent
def create_communication_tools(agent_name: str) -> List[Tool]:
"""Create standard communication tools for an agent"""

# Send message tool
class SendMessageInput(InputSchema):
to_agent: str = Field(description="Name of the agent to send message to")
message_type: str = Field(description="Type of message (request, response, update)")
content: Dict[str, Any] = Field(description="Content of the message")

class SendMessageOutput(OutputSchema):
status: str
message_id: str

send_message_tool = Tool(
name="send_message",
function=lambda to_agent, message_type, content: send_message(agent_name, to_agent, message_type, content),
input_schema=SendMessageInput,
output_schema=SendMessageOutput,
description=f"Send a message from {agent_name} to another agent"
)

# Get messages tool
class GetMessagesInput(InputSchema):
pass

class MessageSchema(BaseModel):
from_agent: str
to_agent: str
message_type: str
content: Dict[str, Any]
timestamp: str
message_id: str

class GetMessagesOutput(OutputSchema):
messages: List[MessageSchema]

get_messages_tool = Tool(
name="get_messages",
function=lambda: get_messages(agent_name),
input_schema=GetMessagesInput,
output_schema=GetMessagesOutput,
description=f"Get all messages for {agent_name}"
)

# Process message tool
class ProcessMessageInput(InputSchema):
message_id: str = Field(description="ID of the message to mark as processed")

class ProcessMessageOutput(OutputSchema):
status: str
message_id: str

process_message_tool = Tool(
name="process_message",
function=lambda message_id: process_message(message_id),
input_schema=ProcessMessageInput,
output_schema=ProcessMessageOutput,
description="Mark a message as processed"
)

return [send_message_tool, get_messages_tool, process_message_tool]

# Create the Research Agent
research_tools = create_communication_tools("research_agent")
research_agent = Agent(
name="research_agent",
model=OpenAIModel("gpt-4-turbo"),
tools=research_tools,
skills=[TextGenerationSkill()],
memory=research_memory
)

# Create the Writing Agent
writing_tools = create_communication_tools("writing_agent")
writing_agent = Agent(
name="writing_agent",
model=AnthropicModel("claude-3-opus-20240229"),
tools=writing_tools,
skills=[TextGenerationSkill()],
memory=writing_memory
)

# Create the Editing Agent
editing_tools = create_communication_tools("editing_agent")
editing_agent = Agent(
name="editing_agent",
model=OpenAIModel("gpt-4-turbo"),
tools=editing_tools,
skills=[TextGenerationSkill()],
memory=editing_memory
)

# Create the Coordination Agent
coordination_tools = create_communication_tools("coordination_agent")
coordination_agent = Agent(
name="coordination_agent",
model=OpenAIModel("gpt-4-turbo"),
tools=coordination_tools,
skills=[TextGenerationSkill()],
memory=coordination_memory
)

# System messages for each agent
research_system_message = """
You are a Research Agent specialized in gathering accurate information. Your responsibilities:
1. Analyzing research requests from the Coordination Agent
2. Finding and verifying information on the requested topic
3. Summarizing findings in a structured format
4. Sending research results back to the Coordination Agent or Writing Agent

Always check for messages using the get_messages tool and respond appropriately.
"""

writing_system_message = """
You are a Writing Agent specialized in creating high-quality content. Your responsibilities:
1. Analyzing writing requests from the Coordination Agent
2. Using research provided by the Research Agent
3. Creating well-structured, engaging content
4. Sending draft content to the Editing Agent

Always check for messages using the get_messages tool and respond appropriately.
"""

editing_system_message = """
You are an Editing Agent specialized in polishing and improving content. Your responsibilities:
1. Analyzing editing requests and draft content
2. Correcting grammar, spelling, and style issues
3. Improving clarity, flow, and structure
4. Sending edited content back to the Coordination Agent

Always check for messages using the get_messages tool and respond appropriately.
"""

coordination_system_message = """
You are a Coordination Agent managing a multi-agent workflow. Your responsibilities:
1. Understanding user requests and breaking them into tasks
2. Assigning tasks to specialized agents (Research, Writing, Editing)
3. Tracking progress and ensuring successful completion
4. Integrating outputs from different agents into a final result
5. Communicating final results to the user

Always check for messages using the get_messages tool and respond appropriately.
"""

# Function to process an agent's messages
def process_agent_messages(agent, system_message):
"""Process all messages for an agent and respond appropriately"""
# Get messages
response = agent.process(
"Check for new messages and process them appropriately.",
system_message=system_message
)
return response.content

# Workflow for creating content with the multi-agent system
def create_content_with_multi_agent(topic, content_type, audience):
"""Use the multi-agent system to create content on a topic"""
print(f"Starting content creation for topic: {topic}")

# Step 1: Coordination agent receives request and plans
coordination_agent.process(
f"""
You've received a new content creation request.
Topic: {topic}
Content Type: {content_type}
Target Audience: {audience}

Plan the workflow by:
1. Breaking this into research, writing, and editing tasks
2. Sending a research request to the research_agent
""",
system_message=coordination_system_message
)

# Step 2: Simulate the workflow with message processing
# In a real implementation, this would be an ongoing process or event loop
for _ in range(10): # Simulate multiple rounds of communication
# Each agent processes their messages
process_agent_messages(research_agent, research_system_message)
process_agent_messages(writing_agent, writing_system_message)
process_agent_messages(editing_agent, editing_system_message)
process_agent_messages(coordination_agent, coordination_system_message)

# Check if coordination agent has a final result
if "final_content" in coordination_memory.get_all():
break

# Step 3: Get final result from coordination agent
final_content = coordination_memory.get("final_content", "Content creation not completed")
return final_content

# Example usage
final_content = create_content_with_multi_agent(
topic="The Impact of Artificial Intelligence on Healthcare",
content_type="educational article",
audience="healthcare professionals"
)

print("Final Content:")
print(final_content)

These advanced examples demonstrate how to combine multiple Airtrain features to build complex, real-world applications with AI agents. Use them as inspiration for your own projects and adapt them to your specific use cases.