Custom LLM Integration
Use any LLM or framework with d:spatch, or build your own context class.
When your LLM provider or agent framework isn't covered by ClaudeAgentContext or
OpenAiAgentContext, you have two options: use the base Context class directly, or
subclass it to build a reusable integration.
Using base Context directly
The base Context gives you full access to the d:spatch platform — messaging, logging,
activities, inquiries, and inter-agent communication — without any provider-specific wiring.
You call your LLM yourself and relay the results.
Manual prompt augmentation
With base Context, system prompt augmentation is not automatic. Call
ctx._get_augmented_system_prompt() yourself and pass the result to your LLM.
If you skip this, your agent won't know about inquiry tools, authority boundaries,
or peer agents.
Minimal example
from dspatch import Context, DspatchEngine
import os
dspatch = DspatchEngine()
@dspatch.agent(Context)
async def my_agent(prompt: str, ctx: Context):
ctx.setup(
system_prompt="You are a helpful assistant.",
authority="You may write and modify code freely. Escalate any database migrations.",
)
async with ctx:
# Build the full system prompt with platform instructions injected
full_system_prompt = ctx._get_augmented_system_prompt()
client = MyLLMClient(
api_key=os.environ["MY_API_KEY"],
system_prompt=full_system_prompt,
)
try:
while True:
response = await client.generate(prompt)
await ctx.message(response.text)
await ctx.usage(
model="my-model",
input_tokens=response.input_tokens,
output_tokens=response.output_tokens,
cost_usd=response.cost,
)
prompt = yield
if prompt is None:
break
finally:
await client.close()
dspatch.run()Streaming responses
Use is_delta=True to stream tokens incrementally. Pass the returned id back to
append to the same message:
msg_id = None
async for chunk in client.stream(prompt):
msg_id = await ctx.message(chunk, is_delta=True, id=msg_id)Wiring platform tools
Access d:spatch tool definitions via ctx._dspatch_tool_specs() and convert them to
your framework's tool format. Each ToolSpec provides:
for spec in ctx._dspatch_tool_specs():
spec.name # "send_inquiry", "talk_to_coder", etc.
spec.description # Human-readable description for the LLM
spec.schema # JSON Schema dict for the tool's parameters
spec.handler # async (args: dict) -> dict — call to executefrom langchain_core.tools import StructuredTool
tools = []
for spec in ctx._dspatch_tool_specs():
handler = spec.handler
tools.append(StructuredTool.from_function(
coroutine=lambda args, _h=handler: _h(args),
name=spec.name,
description=spec.description,
args_schema=spec.schema,
))tool_handlers = {
spec.name: spec.handler
for spec in ctx._dspatch_tool_specs()
}
tool_definitions = [
{"type": "function", "function": {
"name": s.name,
"description": s.description,
"parameters": s.schema,
}}
for s in ctx._dspatch_tool_specs()
]
# In your tool-call dispatch loop:
for call in response.tool_calls:
result = await tool_handlers[call.name](call.arguments)Building a custom context class
For a reusable integration, subclass Context to create your own context type. This is
how ClaudeAgentContext and OpenAiAgentContext are built — the same pattern is available
to you.
What to override
| Method | Required | Purpose |
|---|---|---|
__init__ | Optional | Initialize provider-specific fields. Always call super().__init__(**kwargs). |
__aenter__ | Yes | Create the provider client, get the augmented system prompt, wrap tools. Always call await super().__aenter__() first. |
__aexit__ | Yes | Clean up the provider client. Call await super().__aexit__(...). |
run(prompt) | Yes | Execute one agent turn. This is abstract in the base class. |
_tool_name_prefix | Optional | Property returning a prefix for tool names in the system prompt (e.g., "mcp__dspatch__" for Claude). Default: "". |
Methods you cannot override (they are final in the base class):
setup()— stores system prompt, authority, and options_get_augmented_system_prompt()— builds the full platform-augmented prompt_dspatch_tool_specs()— returns canonical tool definitions_handle_tool_call()— dispatches tool calls by namemessage(),log(),activity(),usage(),files()— event methodsinquire(),talk_to()— blocking communication methods
Full example
from typing import Any
from dspatch.contexts.context import Context
class GeminiAgentContext(Context):
"""Custom context for Google Gemini."""
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._gemini_client: Any = None
async def __aenter__(self):
await super().__aenter__()
import google.generativeai as genai
# Get augmented system prompt with platform instructions
augmented = self._get_augmented_system_prompt()
# Wrap d:spatch tools into Gemini function declarations
tools = self._get_tools()
# Read model from options (default: gemini-2.0-flash)
model = "gemini-2.0-flash"
if self._user_options is not None:
model = getattr(self._user_options, "model", model) or model
self._gemini_client = genai.GenerativeModel(
model_name=model,
system_instruction=augmented,
tools=tools,
)
self.client = self._gemini_client
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self._gemini_client = None
self.client = None
return False
async def run(self, prompt: str) -> str:
if self.client is None:
raise RuntimeError("No active client. Use 'async with ctx:' first.")
self.log(f"Processing prompt: {prompt}")
response = await self.client.generate_content_async(prompt, stream=True)
result_text = ""
async for chunk in response:
if chunk.text:
result_text += chunk.text
await self.message(chunk.text, is_delta=True)
# Handle tool calls
for part in chunk.parts:
if fn := part.function_call:
result = await self._handle_tool_call(fn.name, dict(fn.args))
await self.activity("tool_call", data={
"tool": fn.name,
"args": dict(fn.args),
})
# Record usage
if hasattr(response, "usage_metadata"):
await self.usage(
model=self.client.model_name,
input_tokens=response.usage_metadata.prompt_token_count,
output_tokens=response.usage_metadata.candidates_token_count,
)
return result_text
def _get_tools(self) -> list:
"""Wrap ToolSpecs into Gemini function declarations."""
import google.generativeai as genai
declarations = []
for spec in self._dspatch_tool_specs():
declarations.append(genai.protos.FunctionDeclaration(
name=spec.name,
description=spec.description,
parameters=spec.schema,
))
return [genai.protos.Tool(function_declarations=declarations)]Using your custom context
from dspatch import DspatchEngine
from my_context import GeminiAgentContext
dspatch = DspatchEngine()
@dspatch.agent(GeminiAgentContext)
async def my_agent(prompt: str, ctx: GeminiAgentContext):
ctx.setup(
system_prompt="You are a helpful coding assistant.",
authority="You may refactor code. Escalate architecture changes.",
)
async with ctx:
while True:
try:
await ctx.run(prompt)
except Exception as e:
ctx.log(f"Error: {e}", level="error")
prompt = yield
if prompt is None:
break
dspatch.run()Lifecycle contract
Always call await super().__aenter__() first in your __aenter__ — it validates
that setup() was called. Always call await super().__aexit__(...) in your __aexit__.
The run() method is abstract and must be implemented.