The Connectivity API runtime: run an AgentRunner, control live calls with Session, and read per-call metrics.
The Connectivity half of the SDK. An
AgentRunner is the long-lived process
that receives calls; a Session is your
control surface for one live call. This page is the reference for both.
AgentRunner holds a WebSocket connection to the Unpod orchestrator. When a
call is dispatched to your agent, the runner invokes your entrypoint with a
CallContext.
Unpod Orchestrator | WebSocket (wss) vAgentRunner --- heartbeat, advertises capacity | | on dispatch ventrypoint(CallContext) --- your code
from unpod import AgentRunner, CallContextasync def handle_call(ctx: CallContext) -> None: await ctx.session.say("Hello, thanks for calling!") await ctx.session.run() # blocks until the call endsrunner = AgentRunner( entrypoint=handle_call, agent_id="my-agent", # must match agent_id in your Speech Pipe)runner.start() # blocking
agent_id is the runner agent ID - a short string you choose, matching the
agent_id in your Speech Pipe config. Not the pipe’s UUID. See
IDs You’ll Meet.
await ctx.session.say("Thank you for your patience.") # speak via TTS, returns immediatelyawait ctx.session.set_filler("One moment please...") # played during processing silences
@ctx.session.on("user_turn")async def on_user_turn(text: str) -> None: if "stop" in text.lower(): await ctx.session.interrupt() # stop the current utterance
await ctx.session.transfer_to_human(queue="tier-2-support") # cold transfer to a human queueawait ctx.session.transfer_to_agent(agent_id="billing-agent") # cold transfer to another agent
A cold transfer drops your session the moment it is initiated. For a warm
handoff, use the out-of-band client.sessions.transfer(..., mode="warm") -
see below.
session.run() keeps the call alive. It reads bridge events, fires your hooks,
routes each transcribed user turn to your dialog adapter’s stream(), and
pipes the reply tokens to TTS.
async def handle_call(ctx: CallContext) -> None: ctx.session.dialog_machine = my_brain # see Bring Your Agent await ctx.session.say("Hi, I'm Alex. How can I help?") await ctx.session.run() # blocks until call ends # anything here runs as post-call cleanup
m = ctx.session.metrics.live() # CallMetrics snapshot, during or after run()m.turns # int: dialog turns so farm.duration_s # float: call durationm.stt_p95_ms # int: P95 STT latencym.llm_p95_ms # int: P95 LLM latencym.tts_p95_ms # int: P95 TTS latencym.cost.voice # float - m.cost.llm, m.cost.totalm.tokens.input # int - m.tokens.outputm.active_llm # str: model used on the last turn
Act on a live session from outside the call - your backend, an ops tool -
via the Management SDK, targeting it by session ID:
from unpod import AsyncClientasync with AsyncClient(api_key="sk_...") as client: await client.sessions.end(session_id) await client.sessions.transfer( # warm handoff supported here session_id, to_type="sip", to_config={"number": "+15551230000"}, mode="warm", warm_handoff_ms=4000, ) await client.sessions.merge( # e.g. conference a supervisor in primary_session_id, secondary_session_ids=[other_session_id], )
s = runner.stats() # RunnerStats snapshots.in_flight # current active callss.queued # dispatches waiting for capacitys.capacity # your max_sessions settings.completed_last_hour # completed callss.failed_last_hour # failed callss.mean_call_duration_s # average call length
Send SIGTERM (standard for containers and systemd). The runner stops
accepting dispatches, waits up to drain_timeout_s for active calls, then
exits. Or call await runner.shutdown() yourself.
Run multiple AgentRunner processes with the same agent_id across machines.
The orchestrator load-balances on reported capacity - no shared state needed.