Skip to content

Async & Sync

Every retrievalagent operation has both a sync and an async variant. The sync variants work even inside a running event loop (Databricks, Jupyter, FastAPI startup).

Sync usage

from retrievalagent import init_agent

rag = init_agent("docs", model="openai:gpt-5.4", backend="qdrant",
                 backend_url="http://localhost:6333")

state = rag.invoke("What is hybrid search?")
print(state.answer)

Async usage

import asyncio
from retrievalagent import init_agent

async def main():
    rag = init_agent("docs", model="openai:gpt-5.4", backend="qdrant",
                     backend_url="http://localhost:6333")

    state = await rag.ainvoke("What is hybrid search?")
    print(state.answer)

asyncio.run(main())

FastAPI

from fastapi import FastAPI
from retrievalagent import init_agent

app = FastAPI()
rag = init_agent("docs", model="openai:gpt-5.4", backend="qdrant",
                 backend_url="http://localhost:6333")

@app.post("/ask")
async def ask(question: str):
    state = await rag.ainvoke(question)
    return {"answer": state.answer, "sources": len(state.documents)}

Running sync from a running loop

retrievalagent's sync wrappers use an internal _run_sync helper that spawns a new thread if an event loop is already running — so rag.invoke() works from Jupyter notebooks and Databricks cells without modification.

# Works in Jupyter / Databricks — no asyncio.run() needed
state = rag.invoke("What is hybrid search?")

Streaming

async for chunk in rag.astream("Explain retrieval-augmented generation"):
    print(chunk, end="", flush=True)