For when you need speed ā”
The Python SDK fully supports async/await for non-blocking operations.
import asyncio
from memofai import create_async_moa_client
async def main():
# Create async client
client = create_async_moa_client(
api_token='moa_your_token',
environment='production'
)
# All methods are async
workspaces = await client.workspaces.list()
bot = await client.bots.create({
'name': 'Async Bot',
'workspace_id': workspaces[0]['id']
})
memory = await client.memories.create({
'bot_id': bot['id'],
'content': 'User loves async Python'
})
results = await client.memories.search({
'bot_id': bot['id'],
'query': 'programming preferences'
})
print(f"Found {len(results)} memories")
# Run it
asyncio.run(main())async def store_many_memories(bot_id: str, contents: list[str]):
"""Store multiple memories concurrently"""
client = create_async_moa_client(api_token='moa_token')
# Create tasks
tasks = [
client.memories.create({
'bot_id': bot_id,
'content': content
})
for content in contents
]
# Execute concurrently
results = await asyncio.gather(*tasks)
return results
# Usage
contents = [
"User prefers Python",
"User loves FastAPI",
"User timezone EST"
]
results = asyncio.run(store_many_memories('bot_123', contents))
print(f"Stored {len(results)} memories concurrently!")from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from memofai import create_async_moa_client
import os
app = FastAPI()
# Global async client
moa_client = create_async_moa_client(
api_token=os.getenv('MEMOFAI_TOKEN'),
environment='production'
)
class MemoryCreate(BaseModel):
bot_id: str
content: str
class SearchRequest(BaseModel):
bot_id: str
query: str
limit: int = 5
@app.post("/memories")
async def create_memory(memory: MemoryCreate):
"""Store memory asynchronously"""
try:
result = await moa_client.memories.create({
'bot_id': memory.bot_id,
'content': memory.content
})
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/search")
async def search_memories(request: SearchRequest):
"""Search memories asynchronously"""
try:
results = await moa_client.memories.search({
'bot_id': request.bot_id,
'query': request.query,
'limit': request.limit
})
return {'results': results}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/bots/{bot_id}")
async def get_bot(bot_id: str):
"""Get bot asynchronously"""
try:
bot = await moa_client.bots.get(bot_id)
return bot
except Exception as e:
raise HTTPException(status_code=404, detail="Bot not found")async def search_multiple_bots(bot_ids: list[str], query: str):
"""Search multiple bots in parallel"""
client = create_async_moa_client(api_token='moa_token')
tasks = [
client.memories.search({
'bot_id': bot_id,
'query': query,
'limit': 5
})
for bot_id in bot_ids
]
# Wait for all searches
results = await asyncio.gather(*tasks)
# Combine results
combined = []
for result_set in results:
combined.extend(result_set)
return combinedasync def safe_create_memory(bot_id: str, content: str, max_retries: int = 3):
"""Create memory with retry logic"""
client = create_async_moa_client(api_token='moa_token')
for attempt in range(max_retries):
try:
return await client.memories.create({
'bot_id': bot_id,
'content': content
})
except Exception as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt) # Exponential backofffrom contextlib import asynccontextmanager
@asynccontextmanager
async def get_moa_client():
"""Async context manager for client"""
client = create_async_moa_client(
api_token=os.getenv('MEMOFAI_TOKEN')
)
try:
yield client
finally:
# Cleanup if needed
pass
# Usage
async def example():
async with get_moa_client() as client:
memories = await client.memories.list({'bot_id': 'bot_123'})
print(f"Found {len(memories)} memories")Use concurrency for independent operations
# Good - Concurrent
results = await asyncio.gather(
client.bots.list(),
client.workspaces.list(),
client.memories.search(...)
)
# Bad - Sequential
bots = await client.bots.list()
workspaces = await client.workspaces.list()
memories = await client.memories.search(...)Rate limit yourself
from asyncio import Semaphore
async def store_with_limit(bot_id: str, contents: list[str]):
sem = Semaphore(10) # Max 10 concurrent
async def store_one(content):
async with sem:
return await client.memories.create({
'bot_id': bot_id,
'content': content
})
tasks = [store_one(c) for c in contents]
return await asyncio.gather(*tasks)Don't block the event loop
# Bad - Blocking
def sync_operation():
return requests.get('...')
# Good - Use asyncio
async def async_operation():
async with aiohttp.ClientSession() as session:
async with session.get('...') as resp:
return await resp.json()Next: Examples ā