Docs
Advanced Usage - Python SDK

Advanced Usage - Python SDK

Error handling, testing, and advanced patterns with the Python SDK

Error Handling

Exception Types

from lumnisai import (
    Client,
    AuthenticationError,
    NotFoundError,
    RateLimitError,
    ValidationError,
    LumnisAPIError
)
 
client = Client(api_key="your-api-key")
 
try:
    response = client.invoke("Hello!")
    
except AuthenticationError as e:
    # Invalid API key or authentication issue
    print(f"Authentication failed: {e}")
    
except ValidationError as e:
    # Invalid request parameters
    print(f"Validation error: {e}")
    print(f"Details: {e.details}")
    
except RateLimitError as e:
    # Rate limit exceeded
    print(f"Rate limited. Retry after {e.retry_after} seconds")
    
except NotFoundError as e:
    # Resource not found
    print(f"Resource not found: {e}")
    
except LumnisAPIError as e:
    # Generic API error
    print(f"API error: {e}")
    print(f"Status code: {e.status_code}")
    print(f"Error code: {e.error_code}")

Handling Specific Errors

from lumnisai import Client, ValidationError, NotFoundError
 
client = Client(api_key="your-api-key")
 
# Handle validation errors
try:
    response = client.invoke(
        messages=[],  # Invalid: empty messages
        user_id="user@example.com"
    )
except ValidationError as e:
    print(f"Invalid request: {e.details}")
    # Fix and retry
    response = client.invoke(
        "Valid message",
        user_id="user@example.com"
    )
 
# Handle not found errors
try:
    response = client.get_response("nonexistent-id")
except NotFoundError:
    print("Response not found, creating new one")
    response = client.invoke("New task")

Retry Logic

import time
from lumnisai import Client, RateLimitError, LumnisAPIError
 
client = Client(api_key="your-api-key")
 
def invoke_with_retry(client, prompt, max_retries=3):
    """Invoke with automatic retry on transient errors."""
    for attempt in range(max_retries):
        try:
            return client.invoke(prompt)
            
        except RateLimitError as e:
            if attempt < max_retries - 1:
                wait_time = e.retry_after or (2 ** attempt)
                print(f"Rate limited. Waiting {wait_time}s...")
                time.sleep(wait_time)
            else:
                raise
                
        except LumnisAPIError as e:
            # Retry on 5xx errors
            if 500 <= e.status_code < 600 and attempt < max_retries - 1:
                wait_time = 2 ** attempt
                print(f"Server error. Retrying in {wait_time}s...")
                time.sleep(wait_time)
            else:
                raise
 
# Usage
response = invoke_with_retry(client, "Hello!")

Custom Retry Configuration

from lumnisai import Client
 
# Configure client-level retries
client = Client(
    api_key="your-api-key",
    max_retries=5,      # Retry up to 5 times
    timeout=60.0        # 60 second timeout
)
 
# Retries are automatic for transient errors
response = client.invoke("Hello!")

Idempotent Requests

import uuid
from lumnisai import Client
 
client = Client(api_key="your-api-key")
 
# Generate idempotency key
idempotency_key = str(uuid.uuid4())
 
# First request
response1 = client.invoke(
    "Important task",
    idempotency_key=idempotency_key,
    user_id="user@example.com"
)
 
# Retry with same key (won't create duplicate)
response2 = client.invoke(
    "Important task",
    idempotency_key=idempotency_key,
    user_id="user@example.com"
)
 
# Returns the same response
assert response1.response_id == response2.response_id

Type Hints

from typing import List, Dict, Any
from lumnisai import Client, ResponseObject, AgentConfig
from pydantic import BaseModel
 
client: Client = Client(api_key="your-api-key")
 
# Type-safe messages
messages: List[Dict[str, str]] = [
    {"role": "user", "content": "Hello"}
]
 
# Type-safe response
response: ResponseObject = client.invoke(messages=messages)
output: str = response.output_text
 
# Type-safe structured output
class Person(BaseModel):
    name: str
    age: int
 
response = client.invoke(
    "Tell me about Ada Lovelace",
    response_format=Person
)
 
if response.structured_response:
    person = Person(**response.structured_response)
    name: str = person.name
    age: int = person.age

Testing

Unit Tests

import pytest
from lumnisai import Client, ValidationError
 
@pytest.fixture
def client():
    return Client(api_key="test-api-key")
 
def test_create_response(client):
    """Test creating a response."""
    response = client.invoke("Hello!")
    
    assert response.response_id is not None
    assert response.status in ["queued", "in_progress", "succeeded"]
    
def test_invalid_request(client):
    """Test validation error handling."""
    with pytest.raises(ValidationError):
        client.invoke(messages=[])  # Empty messages should fail
 
def test_list_users(client):
    """Test listing users."""
    users = client.list_users()
    
    assert isinstance(users.users, list)
    assert users.total >= 0

Integration Tests

import pytest
from lumnisai import Client
 
@pytest.fixture
def client():
    """Fixture with real API credentials."""
    return Client(api_key="your-test-api-key")
 
def test_full_workflow(client):
    """Test complete workflow."""
    # Create user
    user = client.create_user(
        email="test@example.com",
        first_name="Test"
    )
    assert user.email == "test@example.com"
    
    # Create response
    response = client.invoke(
        "Hello!",
        user_id=user.email
    )
    assert response.response_id is not None
    
    # List responses
    responses = client.list_responses(user_id=user.email)
    assert len(responses.responses) > 0
    
    # Cleanup
    client.delete_user(user.email)

Mocking

from unittest.mock import Mock, patch
from lumnisai import Client, ResponseObject
 
def test_with_mock():
    """Test with mocked API."""
    client = Client(api_key="test-key")
    
    # Mock the invoke method
    mock_response = ResponseObject(
        response_id="test-id",
        status="succeeded",
        output_text="Hello!",
        thread_id="thread-id"
    )
    
    with patch.object(client, 'invoke', return_value=mock_response):
        response = client.invoke("Test")
        assert response.output_text == "Hello!"

Advanced Patterns

Connection Pooling

from lumnisai import AsyncClient
import asyncio
 
async def process_batch(items):
    """Process multiple items concurrently."""
    client = AsyncClient(api_key="your-api-key")
    
    async with client:
        # Process items concurrently
        tasks = [
            client.invoke(item, user_id="user@example.com")
            for item in items
        ]
        
        results = await asyncio.gather(*tasks)
        return results
 
# Usage
items = ["Task 1", "Task 2", "Task 3"]
results = asyncio.run(process_batch(items))

Progress Tracking

from lumnisai import Client, ProgressTracker
 
client = Client(api_key="your-api-key")
 
# Custom progress tracking
tracker = ProgressTracker()
 
for update in client.invoke("Complex task", stream=True):
    # Only display new content (no duplicates)
    new_content = tracker.format_new_entries(
        update.state,
        update.message,
        update.tool_calls
    )
    
    if new_content:
        print(new_content)

Response Caching

from functools import lru_cache
from lumnisai import Client
 
client = Client(api_key="your-api-key")
 
@lru_cache(maxsize=100)
def cached_invoke(prompt: str) -> str:
    """Cache responses for identical prompts."""
    response = client.invoke(prompt)
    return response.output_text
 
# First call hits API
result1 = cached_invoke("What is AI?")
 
# Second call uses cache
result2 = cached_invoke("What is AI?")
 
assert result1 == result2

Batch Processing

from lumnisai import AsyncClient
import asyncio
 
async def batch_process(items, batch_size=5):
    """Process items in batches to avoid rate limits."""
    client = AsyncClient(api_key="your-api-key")
    
    async with client:
        results = []
        
        for i in range(0, len(items), batch_size):
            batch = items[i:i+batch_size]
            
            # Process batch
            batch_results = await asyncio.gather(*[
                client.invoke(item, user_id="user@example.com")
                for item in batch
            ])
            
            results.extend(batch_results)
            
            # Rate limit protection
            if i + batch_size < len(items):
                await asyncio.sleep(1)
        
        return results
 
# Usage
items = [f"Task {i}" for i in range(20)]
results = asyncio.run(batch_process(items, batch_size=5))

Request Middleware

from lumnisai import AsyncClient
from typing import Any, Dict
 
class LoggingClient(AsyncClient):
    """Client with request/response logging."""
    
    async def invoke(self, *args, **kwargs):
        print(f"Request: {args}, {kwargs}")
        
        try:
            response = await super().invoke(*args, **kwargs)
            print(f"Response: {response.response_id}")
            return response
        except Exception as e:
            print(f"Error: {e}")
            raise
 
# Usage
client = LoggingClient(api_key="your-api-key")
 
async def main():
    async with client:
        response = await client.invoke("Hello!")
 
asyncio.run(main())

Performance Optimization

Async for Concurrency

from lumnisai import AsyncClient
import asyncio
 
async def process_multiple_tasks():
    """Process multiple tasks concurrently."""
    client = AsyncClient(api_key="your-api-key")
    
    async with client:
        # Create multiple tasks
        tasks = [
            client.invoke(f"Task {i}", user_id="user@example.com")
            for i in range(10)
        ]
        
        # Wait for all
        results = await asyncio.gather(*tasks)
        
        return results
 
# Much faster than sequential processing
results = asyncio.run(process_multiple_tasks())

Connection Reuse

from lumnisai import Client
 
# Reuse client for multiple requests
client = Client(api_key="your-api-key")
 
# Connection pooling handled automatically
for i in range(100):
    response = client.invoke(f"Task {i}")
 
# Close when done
client.close()

Streaming for Long Tasks

from lumnisai import display_progress
 
# Use streaming for better responsiveness
for update in client.invoke("Long task", stream=True):
    # Display formatted progress
    display_progress(update)
    
    # Can process partial results immediately
    if update.tool_calls:
        process_tool_calls(update.tool_calls)

Debugging

Enable Logging

import logging
from lumnisai import Client
 
# Enable SDK logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("lumnisai")
logger.setLevel(logging.DEBUG)
 
client = Client(api_key="your-api-key")
response = client.invoke("Hello!")

Inspect Request Details

# Access response metadata
response = client.invoke("Hello!")
 
print(f"Response ID: {response.response_id}")
print(f"Thread ID: {response.thread_id}")
print(f"Status: {response.status}")
print(f"Created: {response.created_at}")
print(f"Completed: {response.completed_at}")
 
# Access progress entries
for entry in response.progress:
    print(f"{entry.ts}: {entry.state} - {entry.message}")

Error Details

from lumnisai import Client, LumnisAPIError
 
client = Client(api_key="your-api-key")
 
try:
    response = client.invoke("Invalid request")
except LumnisAPIError as e:
    print(f"Status: {e.status_code}")
    print(f"Error code: {e.error_code}")
    print(f"Message: {e.message}")
    print(f"Details: {e.details}")

Best Practices Summary

1. Use Context Managers

# Good: Automatic cleanup
with Client(api_key="your-api-key") as client:
    response = client.invoke("Hello!")
 
# Even better: Async with proper cleanup
async with AsyncClient(api_key="your-api-key") as client:
    response = await client.invoke("Hello!")

2. Handle Errors Gracefully

try:
    response = client.invoke("Hello!")
except ValidationError as e:
    # Handle validation errors
    print(f"Invalid request: {e.details}")
except RateLimitError:
    # Handle rate limits
    time.sleep(60)
except LumnisAPIError as e:
    # Handle other errors
    print(f"Error: {e}")

3. Use Type Hints

from lumnisai import Client, ResponseObject
from typing import List, Dict
 
def process_response(response: ResponseObject) -> str:
    return response.output_text
 
client: Client = Client(api_key="your-api-key")

4. Leverage Async for Performance

# Sync: Sequential (slow)
for item in items:
    response = client.invoke(item)
 
# Async: Concurrent (fast)
async with AsyncClient() as client:
    results = await asyncio.gather(*[
        client.invoke(item) for item in items
    ])

5. Use Idempotency Keys

# For critical operations
response = client.invoke(
    "Important task",
    idempotency_key=str(uuid.uuid4())
)