Agent Development and Integration Guide

Overview

This documentation provides comprehensive guidance on how to build, integrate, and interact with agents in our system. Agents are autonomous services that can process user inputs, perform specific functions, and return useful responses.

Table of Contents

Agent Requirements

Every agent in our system must satisfy these core requirements:

  1. HTTP Endpoint: Must provide an HTTP endpoint that returns a stream (Default: /chat)
  2. Docker Packaging: Must be packaged as Docker images for portability and consistent deployment
  3. Message Payload: Must accept and validate messages against the standard payload schema

Message Payload Schema

type MessagePayload = {
  input: string;
  history: Array<{
    role: 'agent' | 'user';
    content: string;
  }>;
  config?: Record<string, unknown>;
}

const EXAMPLE = {
  input: "Tell me the weather from the past 10 days?", // Your awesome question!
  history: [
    {
      role: "agent" | "user",  // Who's talking?
      content: "",             // What's being said!
    }
  ],
  config: {
    // Run level config i.e. max_tokens, context
    // Always set default values as a safety net
  }
}

Note: Configuration at the run level gives you fine-grained control. Always set appropriate default values for a better user experience.

Architecture

Agents in our system follow a consistent architecture:

  1. API Layer: FastAPI endpoints for client-server communication
  2. Business Logic: Core agent functionality
  3. External Services: Integration with AI providers and other services
  4. Tools: Specialized functions agents can access

Agent Structure

A typical agent consists of these components:

  • Interface: RESTful API endpoints (typically using FastAPI)
  • Request Handler: Processes incoming requests
  • Response Generator: Creates and streams responses
  • Tools: Functions the agent can use (web search, image generation, etc.)
  • Configuration: Settings that control agent behavior

Creating an Agent

Basic Implementation

At minimum, an agent needs:

  1. An API endpoint to receive requests
  2. Logic to process those requests
  3. A mechanism to return responses

Example Implementation

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from starlette.responses import StreamingResponse
from pydantic import BaseModel, Field, field_validator
import os
from dotenv import load_dotenv

# Initialize app
load_dotenv()
app = FastAPI()

# Add CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Define request model
class ChatRequest(BaseModel):
    input: str = Field(..., min_length=1)
    history: list = Field(default_factory=list)
    config: dict = Field(default_factory=dict)
    
    @field_validator('input')
    @classmethod
    def validate_input(cls, v):
        # Sanitize input
        v = ''.join(c for c in v if c.isprintable())
        return v.strip()

@app.post("/chat")
async def chat_stream(request: ChatRequest):
    async def generate():
        try:
            # Process request and generate response
            response = "This is a sample response"
            yield f"data: {response}\n\n"
        except Exception as e:
            yield f"data: Error: {str(e)}\n\n"
    
    return StreamingResponse(generate(), media_type="text/event-stream")

@app.get("/health")
async def health_check():
    return {"status": "healthy"}

Using Different AI Providers

You can bring your own AI providers and they would be implemented such as:

OpenAI

client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))

stream = await client.chat.completions.create(
    model="gpt-4o",
    messages=messages,
    stream=True
)

async for chunk in stream:
    if chunk.choices[0].delta.content:
        yield f"data: {chunk.choices[0].delta.content}\n\n"

Anthropic

client = AsyncAnthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))

stream = await client.messages.create(
    model="claude-3-5-sonnet-latest",
    messages=messages,
    max_tokens=1024,
    stream=True
)

async for chunk in stream:
    if chunk.type == "content_block_delta" and chunk.delta.text:
        yield f"data: {chunk.delta.text}\n\n"

Adding Tools

Here is an example of one way to define a tool:

from agents import FunctionTool

async def create_meme_function(ctx: Any, args: str) -> str:
    """Generate a meme based on the given prompt."""
    try:
        args_dict = json.loads(args)
        prompt = args_dict.get("prompt")
        if not prompt:
            raise ValueError("Prompt is required")
        
        result = meme_generator.generate_meme(prompt)
        return json.dumps(result)
    except Exception as e:
        return json.dumps({"error": f"Failed to generate meme: {str(e)}"})

create_meme_tool = FunctionTool(
    name="create_meme",
    description="Generates multiple meme images based on a text prompt",
    params_json_schema={
        "type": "object",
        "properties": {
            "prompt": {
                "type": "string",
                "description": "The text prompt describing the meme to generate"
            }
        },
        "required": ["prompt"],
        "additionalProperties": False
    },
    on_invoke_tool=create_meme_function
)

Interacting with Agents

Request Format

Agents accept requests with this structure:

{
  "input": "User query or instruction",
  "history": [
    {"role": "user", "content": "Previous user message"},
    {"role": "agent", "content": "Previous agent response"}
  ],
  "config": {
    "parameter1": "value1"
  }
}
  • input: The user's current query or instruction
  • history: Previous exchanges (optional)
  • config: Additional parameters to customize agent behavior (optional)

Response Stream

Agents typically respond with a stream of text events:

data: First part of response

data: Second part of response

data: $ref$"https://example.com/meme1.jpg"$ref$

Communication Protocol

Server-Sent Events (SSE)

Our agents use SSE to stream responses back to clients:

  1. Client makes a POST request to the /chat endpoint
  2. Server keeps the connection open
  3. Server sends events formatted as data: {content}\n\n
  4. Client processes each event as it arrives
@app.post("/chat")
async def chat_stream(request: ChatRequest, api_key: str = Header(None)):
    # Validate API key
    if api_key != os.getenv("AUTHORIZED_KEY"):
        raise HTTPException(status_code=401, detail="Unauthorized")
    
    # Process request...

Best Practices

Error Handling

Implement robust error handling:

try:
    # Process request
except Exception as e:
    yield f"data: Error: {str(e)}\n\n"

Retry Mechanisms

For external service calls, implement retries:

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=10),
    reraise=True
)
async def external_service_call():
    # Make the call

Service Fallbacks

Design agents to fall back to alternative services:

def generate_content(prompt: str):
    for service in [primary_service, backup_service]:
        try:
            return service.generate(prompt)
        except Exception:
            continue
    
    raise Exception("All services failed")

Input Validation

Always validate and sanitize user inputs:

@field_validator('input')
@classmethod
def validate_input(cls, v):
    # Remove any potentially harmful characters
    v = ''.join(c for c in v if c.isprintable())
    return v.strip()

Examples

Meme Generation Agent

A complete example that creates memes based on user prompts:

https://github.com/ZuvuFoundation/agent-catalogue/tree/main/agents/meme

Simple Chat Agent

A minimal agent that just returns AI-generated responses:

https://github.com/ZuvuFoundation/agent-catalogue/tree/main/agents/default/claude

Language Agnostic Implementation

The principles covered here apply across programming languages:

  • Node.js: Use Express for the API layer
  • Go: Use Gin or Echo frameworks
  • Java/Kotlin: Use Spring Boot
  • Ruby: Use Sinatra or Rails

When implementing in another language, ensure:

  1. Your framework supports streaming responses
  2. You maintain the same request/response structure
  3. You implement proper error handling and retries
  4. You follow the SSE protocol for streaming responses

Conclusion

Building effective agents requires balancing user needs, technical constraints, and service capabilities. Follow these guidelines to create agents that are robust, responsive, and valuable to users.