Async Usage

For when you need speed ⚔

The Python SDK fully supports async/await for non-blocking operations.

Quick Start

import asyncio
from memofai import create_async_moa_client

async def main():
    # Create async client
    client = create_async_moa_client(
        api_token='moa_your_token',
        environment='production'
    )
    
    # All methods are async
    workspaces = await client.workspaces.list()
    
    bot = await client.bots.create({
        'name': 'Async Bot',
        'workspace_id': workspaces[0]['id']
    })
    
    memory = await client.memories.create({
        'bot_id': bot['id'],
        'content': 'User loves async Python'
    })
    
    results = await client.memories.search({
        'bot_id': bot['id'],
        'query': 'programming preferences'
    })
    
    print(f"Found {len(results)} memories")

# Run it
asyncio.run(main())

Concurrent Operations

async def store_many_memories(bot_id: str, contents: list[str]):
    """Store multiple memories concurrently"""
    client = create_async_moa_client(api_token='moa_token')
    
    # Create tasks
    tasks = [
        client.memories.create({
            'bot_id': bot_id,
            'content': content
        })
        for content in contents
    ]
    
    # Execute concurrently
    results = await asyncio.gather(*tasks)
    return results

# Usage
contents = [
    "User prefers Python",
    "User loves FastAPI",
    "User timezone EST"
]

results = asyncio.run(store_many_memories('bot_123', contents))
print(f"Stored {len(results)} memories concurrently!")

FastAPI Integration

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from memofai import create_async_moa_client
import os

app = FastAPI()

# Global async client
moa_client = create_async_moa_client(
    api_token=os.getenv('MEMOFAI_TOKEN'),
    environment='production'
)

class MemoryCreate(BaseModel):
    bot_id: str
    content: str

class SearchRequest(BaseModel):
    bot_id: str
    query: str
    limit: int = 5

@app.post("/memories")
async def create_memory(memory: MemoryCreate):
    """Store memory asynchronously"""
    try:
        result = await moa_client.memories.create({
            'bot_id': memory.bot_id,
            'content': memory.content
        })
        return result
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/search")
async def search_memories(request: SearchRequest):
    """Search memories asynchronously"""
    try:
        results = await moa_client.memories.search({
            'bot_id': request.bot_id,
            'query': request.query,
            'limit': request.limit
        })
        return {'results': results}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/bots/{bot_id}")
async def get_bot(bot_id: str):
    """Get bot asynchronously"""
    try:
        bot = await moa_client.bots.get(bot_id)
        return bot
    except Exception as e:
        raise HTTPException(status_code=404, detail="Bot not found")

Advanced Patterns

async def search_multiple_bots(bot_ids: list[str], query: str):
    """Search multiple bots in parallel"""
    client = create_async_moa_client(api_token='moa_token')
    
    tasks = [
        client.memories.search({
            'bot_id': bot_id,
            'query': query,
            'limit': 5
        })
        for bot_id in bot_ids
    ]
    
    # Wait for all searches
    results = await asyncio.gather(*tasks)
    
    # Combine results
    combined = []
    for result_set in results:
        combined.extend(result_set)
    
    return combined

With Error Handling

async def safe_create_memory(bot_id: str, content: str, max_retries: int = 3):
    """Create memory with retry logic"""
    client = create_async_moa_client(api_token='moa_token')
    
    for attempt in range(max_retries):
        try:
            return await client.memories.create({
                'bot_id': bot_id,
                'content': content
            })
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(2 ** attempt)  # Exponential backoff

Context Manager

from contextlib import asynccontextmanager

@asynccontextmanager
async def get_moa_client():
    """Async context manager for client"""
    client = create_async_moa_client(
        api_token=os.getenv('MEMOFAI_TOKEN')
    )
    try:
        yield client
    finally:
        # Cleanup if needed
        pass

# Usage
async def example():
    async with get_moa_client() as client:
        memories = await client.memories.list({'bot_id': 'bot_123'})
        print(f"Found {len(memories)} memories")

Performance Tips

  1. Use concurrency for independent operations

    # Good - Concurrent
    results = await asyncio.gather(
        client.bots.list(),
        client.workspaces.list(),
        client.memories.search(...)
    )
    
    # Bad - Sequential
    bots = await client.bots.list()
    workspaces = await client.workspaces.list()
    memories = await client.memories.search(...)
  2. Rate limit yourself

    from asyncio import Semaphore
    
    async def store_with_limit(bot_id: str, contents: list[str]):
        sem = Semaphore(10)  # Max 10 concurrent
        
        async def store_one(content):
            async with sem:
                return await client.memories.create({
                    'bot_id': bot_id,
                    'content': content
                })
        
        tasks = [store_one(c) for c in contents]
        return await asyncio.gather(*tasks)
  3. Don't block the event loop

    # Bad - Blocking
    def sync_operation():
        return requests.get('...')
    
    # Good - Use asyncio
    async def async_operation():
        async with aiohttp.ClientSession() as session:
            async with session.get('...') as resp:
                return await resp.json()

Next: Examples →