d:spatch logodocs

Custom LLM Integration

Use any LLM or framework with d:spatch, or build your own context class.

When your LLM provider or agent framework isn't covered by ClaudeAgentContext or OpenAiAgentContext, you have two options: use the base Context class directly, or subclass it to build a reusable integration.

Using base Context directly

The base Context gives you full access to the d:spatch platform — messaging, logging, activities, inquiries, and inter-agent communication — without any provider-specific wiring. You call your LLM yourself and relay the results.

Manual prompt augmentation

With base Context, system prompt augmentation is not automatic. Call ctx._get_augmented_system_prompt() yourself and pass the result to your LLM. If you skip this, your agent won't know about inquiry tools, authority boundaries, or peer agents.

Minimal example

agent.py
from dspatch import Context, DspatchEngine
import os

dspatch = DspatchEngine()

@dspatch.agent(Context)
async def my_agent(prompt: str, ctx: Context):
    ctx.setup(
        system_prompt="You are a helpful assistant.",
        authority="You may write and modify code freely. Escalate any database migrations.",
    )

    async with ctx:
        # Build the full system prompt with platform instructions injected
        full_system_prompt = ctx._get_augmented_system_prompt()
        client = MyLLMClient(
            api_key=os.environ["MY_API_KEY"],
            system_prompt=full_system_prompt,
        )

        try:
            while True:
                response = await client.generate(prompt)
                await ctx.message(response.text)
                await ctx.usage(
                    model="my-model",
                    input_tokens=response.input_tokens,
                    output_tokens=response.output_tokens,
                    cost_usd=response.cost,
                )
                prompt = yield
                if prompt is None:
                    break
        finally:
            await client.close()

dspatch.run()

Streaming responses

Use is_delta=True to stream tokens incrementally. Pass the returned id back to append to the same message:

msg_id = None
async for chunk in client.stream(prompt):
    msg_id = await ctx.message(chunk, is_delta=True, id=msg_id)

Wiring platform tools

Access d:spatch tool definitions via ctx._dspatch_tool_specs() and convert them to your framework's tool format. Each ToolSpec provides:

for spec in ctx._dspatch_tool_specs():
    spec.name         # "send_inquiry", "talk_to_coder", etc.
    spec.description  # Human-readable description for the LLM
    spec.schema       # JSON Schema dict for the tool's parameters
    spec.handler      # async (args: dict) -> dict — call to execute
from langchain_core.tools import StructuredTool

tools = []
for spec in ctx._dspatch_tool_specs():
    handler = spec.handler
    tools.append(StructuredTool.from_function(
        coroutine=lambda args, _h=handler: _h(args),
        name=spec.name,
        description=spec.description,
        args_schema=spec.schema,
    ))
tool_handlers = {
    spec.name: spec.handler
    for spec in ctx._dspatch_tool_specs()
}

tool_definitions = [
    {"type": "function", "function": {
        "name": s.name,
        "description": s.description,
        "parameters": s.schema,
    }}
    for s in ctx._dspatch_tool_specs()
]

# In your tool-call dispatch loop:
for call in response.tool_calls:
    result = await tool_handlers[call.name](call.arguments)

Building a custom context class

For a reusable integration, subclass Context to create your own context type. This is how ClaudeAgentContext and OpenAiAgentContext are built — the same pattern is available to you.

What to override

MethodRequiredPurpose
__init__OptionalInitialize provider-specific fields. Always call super().__init__(**kwargs).
__aenter__YesCreate the provider client, get the augmented system prompt, wrap tools. Always call await super().__aenter__() first.
__aexit__YesClean up the provider client. Call await super().__aexit__(...).
run(prompt)YesExecute one agent turn. This is abstract in the base class.
_tool_name_prefixOptionalProperty returning a prefix for tool names in the system prompt (e.g., "mcp__dspatch__" for Claude). Default: "".

Methods you cannot override (they are final in the base class):

  • setup() — stores system prompt, authority, and options
  • _get_augmented_system_prompt() — builds the full platform-augmented prompt
  • _dspatch_tool_specs() — returns canonical tool definitions
  • _handle_tool_call() — dispatches tool calls by name
  • message(), log(), activity(), usage(), files() — event methods
  • inquire(), talk_to() — blocking communication methods

Full example

my_context.py
from typing import Any
from dspatch.contexts.context import Context

class GeminiAgentContext(Context):
    """Custom context for Google Gemini."""

    def __init__(self, **kwargs: Any) -> None:
        super().__init__(**kwargs)
        self._gemini_client: Any = None

    async def __aenter__(self):
        await super().__aenter__()

        import google.generativeai as genai

        # Get augmented system prompt with platform instructions
        augmented = self._get_augmented_system_prompt()

        # Wrap d:spatch tools into Gemini function declarations
        tools = self._get_tools()

        # Read model from options (default: gemini-2.0-flash)
        model = "gemini-2.0-flash"
        if self._user_options is not None:
            model = getattr(self._user_options, "model", model) or model

        self._gemini_client = genai.GenerativeModel(
            model_name=model,
            system_instruction=augmented,
            tools=tools,
        )
        self.client = self._gemini_client
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self._gemini_client = None
        self.client = None
        return False

    async def run(self, prompt: str) -> str:
        if self.client is None:
            raise RuntimeError("No active client. Use 'async with ctx:' first.")

        self.log(f"Processing prompt: {prompt}")

        response = await self.client.generate_content_async(prompt, stream=True)

        result_text = ""
        async for chunk in response:
            if chunk.text:
                result_text += chunk.text
                await self.message(chunk.text, is_delta=True)

            # Handle tool calls
            for part in chunk.parts:
                if fn := part.function_call:
                    result = await self._handle_tool_call(fn.name, dict(fn.args))
                    await self.activity("tool_call", data={
                        "tool": fn.name,
                        "args": dict(fn.args),
                    })

        # Record usage
        if hasattr(response, "usage_metadata"):
            await self.usage(
                model=self.client.model_name,
                input_tokens=response.usage_metadata.prompt_token_count,
                output_tokens=response.usage_metadata.candidates_token_count,
            )

        return result_text

    def _get_tools(self) -> list:
        """Wrap ToolSpecs into Gemini function declarations."""
        import google.generativeai as genai

        declarations = []
        for spec in self._dspatch_tool_specs():
            declarations.append(genai.protos.FunctionDeclaration(
                name=spec.name,
                description=spec.description,
                parameters=spec.schema,
            ))

        return [genai.protos.Tool(function_declarations=declarations)]

Using your custom context

agent.py
from dspatch import DspatchEngine
from my_context import GeminiAgentContext

dspatch = DspatchEngine()

@dspatch.agent(GeminiAgentContext)
async def my_agent(prompt: str, ctx: GeminiAgentContext):
    ctx.setup(
        system_prompt="You are a helpful coding assistant.",
        authority="You may refactor code. Escalate architecture changes.",
    )
    async with ctx:
        while True:
            try:
                await ctx.run(prompt)
            except Exception as e:
                ctx.log(f"Error: {e}", level="error")
            prompt = yield
            if prompt is None:
                break

dspatch.run()

Lifecycle contract

Always call await super().__aenter__() first in your __aenter__ — it validates that setup() was called. Always call await super().__aexit__(...) in your __aexit__. The run() method is abstract and must be implemented.

On this page