Build an A2A Agent
A recipe for building a new external A2A agent that plugs into Workstacean’s routing, scheduling, and fleet-health pipelines. Complementary to Add an agent, which covers the operator side (YAML + executor registration); this guide covers what to build on the agent side so everything else just works.
Reference implementations: protoPen/a2a_handler.py and Quinn/a2a_handler.py — ~85% shared code, either one is a reasonable starting template.
What you get for free
Section titled “What you get for free”If you implement the A2A spec properly and serve an agent card with the right capability flags, Workstacean wires up the following automatically — no per-agent code on the Workstacean side:
| Feature | How it works |
|---|---|
| Skill routing | SkillBrokerPlugin discovers your skills from the card every 10 min |
| Long-running task handling | Return Task.status.state = submitted → TaskTracker polls every 30s until terminal |
| Push notifications | Advertise capabilities.pushNotifications: true → Workstacean registers a webhook per task, no more polling |
| Fleet-health observability | TaskTracker.onTerminal publishes autonomous.outcome.{systemActor}.{skill} on every task completion → agent_fleet_health aggregates per-agent success rate, p50/p95 latency, cost, failure reasons in a 24h rolling window |
| Human-in-the-loop | Return input-required → Workstacean raises a HITL prompt in Discord, resumes your task with the human’s answer via message/send on the same taskId |
| Scheduled invocations | Define a ceremony YAML → Workstacean fires the skill at the scheduled time |
Your agent never writes a health probe, a retry loop, or a cron scheduler. Workstacean does those for every agent that joins the fleet.
The endpoint surface
Section titled “The endpoint surface”The @a2a-js/sdk Client uses JSON-RPC on /a2a for every operation it calls — including task management and push notification CRUD. REST paths like GET /tasks/{id} are convenience aliases only; the SDK never hits them. Omitting any JSON-RPC method means the matching SDK call silently returns -32601 Method not found and the SDK wraps that as a misleading “Invalid response Content-Type” error. See Quinn PR #25.
JSON-RPC methods (required — all of them)
Section titled “JSON-RPC methods (required — all of them)”| Method | Purpose |
|---|---|
message/send | Async submit; returns a Task with state: submitted |
message/stream | Send + SSE stream (spec-canonical name) |
message/sendStream | Legacy alias — accept alongside message/stream for backwards compat with older protoPen-era clients |
tasks/get | Fetch current Task state + artifact |
tasks/cancel | Cooperative cancel |
tasks/resubscribe | SSE reconnect to an in-flight task |
tasks/pushNotificationConfig/set | Register a webhook |
tasks/pushNotificationConfig/get | Read the current webhook |
tasks/pushNotificationConfig/list | List configs |
tasks/pushNotificationConfig/delete | Clear the webhook |
If you skip the last four, every SDK call from client.setTaskPushNotificationConfig through client.deleteTaskPushNotificationConfig returns -32601. Your webhooks can still work via the REST alias below — but only if the caller knows to use it, which the SDK does not.
REST aliases (optional; no SDK client hits these)
Section titled “REST aliases (optional; no SDK client hits these)”| Method | Path | Purpose |
|---|---|---|
POST | /message:send | same as message/send (HTTP 202) |
POST | /message:stream | same as message/stream (SSE) |
GET | /tasks/{id} | same as tasks/get |
GET | /tasks/{id}:subscribe | same as tasks/resubscribe (plain SSE, no JSON-RPC envelope) |
POST | /tasks/{id}:cancel | same as tasks/cancel |
POST | /tasks/{id}/pushNotificationConfigs | same as tasks/pushNotificationConfig/set |
Serve them if you want manual curl smoke tests to be simpler. They are not required for SDK interoperability.
Agent card
Section titled “Agent card”| Method | Path | Purpose |
|---|---|---|
GET | /.well-known/agent-card.json | Agent card (spec-canonical) |
GET | /.well-known/agent.json | Legacy path older clients still probe — serve both |
Error code conventions
Section titled “Error code conventions”| Code | Meaning |
|---|---|
-32601 | Method not found |
-32602 | Invalid params (missing field, bad type, SSRF rejection) |
-32001 | Task not found |
-32002 | Task already terminal (cancel-after-complete) |
Task lifecycle
Section titled “Task lifecycle”Every task moves through the A2A state machine:
submitted → working → completed ↘ failed ↘ canceled ↘ input-required (HITL — resumed via message/send on same taskId)Critical: message/send must return submitted within ~1s. The full response comes later via polling, SSE, or webhook. If your handler blocks for the duration of the underlying skill, you lose:
- Timeout safety (reverse proxies cap HTTP connections, usually 60-300s)
- Cancellation (no way to interrupt a stuck task)
- Observability (no granular state updates; just “hanging”)
- Push notifications (the whole point is “fire-and-forget on the caller side”)
Spawn the skill execution as a background task owned by the task record, return a task receipt, let TaskTracker or SSE drive the result.
async def _submit_task(text, context_id, push_config): task_id = str(uuid4()) record = TaskRecord(id=task_id, state="submitted", ...) await store.create(record) record._bg_task = asyncio.create_task(_run_skill(task_id, text)) return record # returned immediately — state=submittedThe agent card
Section titled “The agent card”AGENT_CARD = { "name": "my-agent", "description": "One-line summary of what the agent does", "url": f"http://{host}/a2a", # JSON-RPC endpoint, NOT the server root "version": "1.0.0", "provider": {"organization": "protoLabsAI"}, "capabilities": { "streaming": True, # enables message/stream + :subscribe "pushNotifications": True, # enables /tasks/{id}/pushNotificationConfigs "stateTransitionHistory": False, # tracking every transition in the response }, "defaultInputModes": ["text/plain"], "defaultOutputModes": ["text/markdown"], "skills": [ { "id": "my_skill", "name": "My Skill", "description": "What this skill does, what input it expects", "tags": ["category"], "examples": ["/my-command", "do the thing"], }, ], "securitySchemes": { "apiKey": {"type": "apiKey", "in": "header", "name": "X-API-Key"}, }, "security": [{"apiKey": []}],}Three field-level gotchas, all from incidents we’ve hit:
urlmust point at the JSON-RPC endpoint, not the server root.@a2a-js/sdkuses this field to sendmessage/send— if it’shttp://host/, FastAPI returns 405 and the dispatch dies silently. (See Quinn PR #6.)- Serve at both
/.well-known/agent-card.jsonand/.well-known/agent.json. Spec-canonical isagent-card.json; older clients useagent.json. Workstacean’sA2AExecutorfalls back from one to the other, but serving both saves a 404 round-trip. - Flip
capabilities.streaming/pushNotificationstotrueonly when you actually support them.SkillBrokerPluginrefreshes the card every 10 min, andA2AExecutorswitches transport (sendMessageStreamvssendMessage) based on those flags. False-positive flags break the dispatcher.
Task store & hardening
Section titled “Task store & hardening”Lessons the hard way — every agent’s task store should handle the cases below. Steal from protoPen/a2a_handler.py or Quinn/a2a_handler.py directly.
1. Terminal-task TTL eviction
Section titled “1. Terminal-task TTL eviction”A long-running process leaks memory proportional to total lifetime traffic if you never drop completed tasks. Run a background sweep that evicts terminal tasks older than N (we use 1h by default — long enough for pollers and webhook delivery to drain). Never evict submitted or working tasks.
2. Strong references on webhook delivery tasks
Section titled “2. Strong references on webhook delivery tasks”asyncio.create_task(_deliver_webhook(...)) without retaining the handle is a trap — Python 3.11+ docs warn the event loop keeps only weak references. A pending retry can be garbage-collected mid-backoff. Keep a module-level set[asyncio.Task], add each task on create, attach a done_callback that evicts on completion.
3. Atomic cancel
Section titled “3. Atomic cancel”The naive sequence state = await store.get(id); if state not terminal: await store.update(CANCELED) races with the background runner. A worker can transition to completed between the read and the write, and you clobber a legitimate terminal state with canceled. Do the check and the write under the same lock.
4. SSRF guard on webhook URLs
Section titled “4. SSRF guard on webhook URLs”Any client supplying http://169.254.169.254/..., http://localhost/..., or http://10.0.0.1/... as a push config target turns your agent into an internal network scanner. Before accepting a PushNotificationConfig:
- Reject non-http(s) schemes (
file://,javascript:,gopher:, …) - Resolve the hostname once and check every returned address
- Reject loopback, link-local, RFC1918, multicast, reserved, unspecified IPs
- Reject unresolvable hostnames outright
One-time resolution is not a defence against DNS rebinding — pin the resolved IP on the httpx transport if your threat model requires it. Private-network deployments (Tailscale-only, Docker-only) can skip this; public-internet deployments cannot.
5. Push config must be read from the live record, not closed over
Section titled “5. Push config must be read from the live record, not closed over”POST /tasks/{id}/pushNotificationConfigs is explicitly a post-submit channel — callers who didn’t have a webhook at submit time can register one later and still get terminal notifications. If your push helper closes over the submit-time config instead of reading record.push_config on every call, the post-submit registration silently does nothing.
# Wrong — closes over submit-time configdef _make_push_fn(push_config): async def _push(record): if push_config and record.state in TERMINAL | {WORKING}: asyncio.create_task(_deliver_webhook(record, push_config)) return _push
# Right — reads record.push_config at call timeasync def _push(record): cfg = record.push_config if cfg and record.state in TERMINAL | {WORKING}: task = asyncio.create_task(_deliver_webhook(record, cfg)) _pending_webhook_tasks.add(task) task.add_done_callback(_pending_webhook_tasks.discard)6. Producer must be independent of the SSE connection
Section titled “6. Producer must be independent of the SSE connection”If your agent implements message/stream, the HTTP SSE generator and the LangGraph (or similar) producer should not be the same task. When the SSE client disconnects mid-run, FastAPI cancels the generator — if the producer is inline, it dies with the connection and any :subscribe reconnect has no task to attach to.
Pattern: always spawn the producer as a background task owned by the task record, regardless of which entry point created the task. The SSE generator becomes a pure consumer that reads from the store and awaits the rotating _update_event. Drop the SSE connection → producer keeps running → reconnect via :subscribe picks up cleanly.
# Shared consumer used by both message/stream and :subscribeasync def _watch_task(task_id, start_text_len=0): record = await _store.get(task_id) if record is None: return last_sent_len = start_text_len yield ("status", record, None) if record.accumulated_text and len(record.accumulated_text) > last_sent_len: yield ("text_delta", record, record.accumulated_text[last_sent_len:]) last_sent_len = len(record.accumulated_text) if record.state in _TERMINAL: return while True: r = await _store.get(task_id) if r is None: return try: await asyncio.wait_for(r._update_event.wait(), timeout=25) except asyncio.TimeoutError: yield ("keepalive", None, None) continue r = await _store.get(task_id) if r is None: return yield ("status", r, None) if r.accumulated_text and len(r.accumulated_text) > last_sent_len: yield ("text_delta", r, r.accumulated_text[last_sent_len:]) last_sent_len = len(r.accumulated_text) if r.state in _TERMINAL: returnBoth SSE routes catch asyncio.CancelledError (FastAPI raises this on client disconnect), log the drop, and re-raise — the background task is never cancelled.
7. Emit text deltas, not the full accumulated text
Section titled “7. Emit text deltas, not the full accumulated text”On :subscribe reconnect, the initial snapshot can emit the full accumulated_text once as an append: false frame. Every subsequent update must emit only the new suffix as append: true. Clients that receive the full text on every update see duplicated content on the wire (the subscribe snapshot already gave them the pre-disconnect portion). Track last_sent_len in your consumer and emit text[last_sent_len:].
8. Persist tool-progress messages on the record
Section titled “8. Persist tool-progress messages on the record”If the producer emits in-process events like tool_start / tool_end that your SSE path currently surfaces as status messages, those are invisible to a :subscribe reconnect — the subscriber reads the store, not the producer’s event queue. Store the most recent tool message on the task record (e.g. record.last_status_message: str | None) via your update_state(status_message=...) call. Surface it under status.message in status events. Clear on terminal transitions so subscribers on a completed task see the final state cleanly.
Streaming (optional)
Section titled “Streaming (optional)”See A2A Streaming (SSE) for the full SSE event protocol. Short version: advertise capabilities.streaming: true, accept method: "message/stream" on /a2a and POST /message:stream, emit text/event-stream frames as work progresses. A2AExecutor on Workstacean’s side switches transport automatically based on the card; your message/send consumers stay unchanged.
On COMPLETED, emit two events per the A2A spec: first a kind: "artifact-update" carrying the authoritative full artifact — text + any worldstate-delta DataParts — as append: false, lastChunk: true; then a kind: "status-update" with final: true to close the stream. Mid-run stays incremental via kind: "artifact-update" + append: true text deltas only.
Critical — SSE events must carry a
kinddiscriminator.@a2a-js/sdk’sfor await (const event of client.sendMessageStream(...))routes events bykind. Events missing it are silently dropped, which means Workstacean’s TaskTracker never attaches, push notifications never register, and HITL chains silently fail. This was Quinn #40. Every frame — initialtask, everystatus-update, everyartifact-update— needskind.
Push notifications
Section titled “Push notifications”Workstacean registers webhooks of the form ${WORKSTACEAN_BASE_URL}/api/a2a/callback/{taskId} with a per-task HMAC token. When you POST a task snapshot to the URL, Workstacean verifies the token and fires the same bus event a poll would have.
The webhook payload is a TaskStatusUpdateEvent — same shape as on the wire, including the kind discriminator:
{ "kind": "status-update", "taskId": "3f8a1c2d-...", "contextId": "engagement-001", "status": {"state": "completed", "timestamp": "2026-04-15T20:00:00Z"}, "final": true, "artifact": { "artifactId": "3f8a1c2d-...", "parts": [{"kind": "text", "text": "## Results\n..."}], "append": false, "lastChunk": true }}Fire webhooks on every transition the caller cares about — working, completed, failed, canceled. Missing the cancel transition is a common bug (we fixed it in Quinn PR #19).
Token parsing — accept BOTH shapes
Section titled “Token parsing — accept BOTH shapes”The A2A spec’s PushNotificationConfig permits two interchangeable shapes for the bearer token, and SDKs are inconsistent about which one they send:
// Shape 1 — top-level `token` (what @a2a-js/sdk serialises by default,// what Workstacean's SkillDispatcherPlugin sends){ "url": "http://caller/api/a2a/callback/{taskId}", "token": "per-task-secret"}
// Shape 2 — structured RFC-8821 AuthenticationInfo{ "url": "http://caller/api/a2a/callback/{taskId}", "authentication": { "schemes": ["Bearer"], "credentials": "per-task-secret" }}Your tasks/pushNotificationConfig/set handler (and /tasks/{id}/pushNotificationConfigs REST alias) must read both and store the resolved string. Only reading one side silently breaks whichever caller picked the other shape — the webhook fires, the caller returns 401 “Invalid notification token”, and the behaviour is identical to “never delivered” unless you have INFO-level logs. This was Quinn #61 — cost us a day of polling fallback to find.
Reference: Quinn’s _extract_push_token — one helper, both shapes, top-level wins when both present.
Implementation checklist
Section titled “Implementation checklist”- Retry delivery 3× with exponential backoff (1s / 3s / 9s)
- Skip retry on 4xx — the caller doesn’t want it and retrying won’t help
- Send
Authorization: Bearer <token>if the caller registered a token (see token-parsing above) - Do NOT log the webhook body or token — these carry task artifacts, which may contain sensitive data
- Log delivery attempts at INFO, not DEBUG. The default Python root level is WARNING; without
logging.basicConfig(level=INFO)on your server, “webhook delivered” lines silently vanish and you can’t tell delivery from silent-drop (Quinn #61).curlthe agent’s container env forLOG_LEVEL— if it’s unset and the server doesn’t set a basicConfig, you’re flying blind.
Health = outcomes (automatic)
Section titled “Health = outcomes (automatic)”You do not need to expose a separate health endpoint. Workstacean’s TaskTracker publishes an autonomous.outcome.{systemActor}.{skill} bus event on every terminal task:
onTerminal: (content, isError, taskState) => { this._publishAutonomousOutcome({ correlationId, systemActor, skill, success: !isError, taskState, text: content, durationMs: Date.now() - dispatchedAt, usage, // token counts if reported });}The AgentFleetHealthPlugin subscribes to autonomous.outcome.# and aggregates per-systemActor over a rolling 24h window: success rate, p50/p95 latency, cost per successful outcome, recent failures, orphaned-skill counts. Quinn, protoPen, the protoMaker team — every agent that returns async tasks appears in fleet health automatically.
What to do on your side: make sure success = true means “the agent achieved its goal,” not “the skill returned without crashing.” Classic mistake: a tool returns "Error: ..." but the agent reports state: completed with that string as an artifact. Fleet health now thinks every failure is a success. Propagate tool errors into the task state machine — return state: failed, error: <msg> when things go wrong.
Scheduled work (ceremonies)
Section titled “Scheduled work (ceremonies)”Two entry points — operator-driven and agent-driven.
Operator side: define a ceremony YAML
Section titled “Operator side: define a ceremony YAML”Full schema is in Create a Ceremony. Short version — drop a file in workspace/ceremonies/<id>.yaml:
id: quinn.daily-digestname: "Quinn Daily QA Digest"schedule: "0 14 * * *" # 5-field cron, UTCskill: qa_report # must be on the target agent's cardtargets: [quinn]notifyChannel: "1469080556720623699"enabled: trueWorkstacean reloads workspace/ceremonies/*.yaml every ~5s, so no restart is needed to add or disable a ceremony.
When the cron fires:
CeremonyPlugin’s internal timer emitsceremony.quinn.daily-digest.executeCeremonyPluginpublishesagent.skill.requestwith the ceremony’s configured skill + targetsSkillDispatcherPluginresolves the executor;A2AExecutorsendsmessage/sendto your/a2awith the skill content- Your agent runs the skill — the message text is whatever content the ceremony plugin built (usually a skill-specific prompt)
From the agent’s perspective, a cron-triggered message/send looks identical to a Discord DM or any other inbound call. The only hint is params.metadata.skillHint which you can use for deterministic tool selection.
Agent side: schedule from inside the agent
Section titled “Agent side: schedule from inside the agent”Agents with access to Workstacean’s manage_cron tool can CRUD ceremonies programmatically — useful when the agent itself needs to set up a follow-up (“remind me to re-audit this PR in 30 minutes”) or when the ceremony definition is dynamic.
The tool wraps these endpoints:
| Method | Path | Action |
|---|---|---|
GET | /api/ceremonies | List |
POST | /api/ceremonies/create | Create |
POST | /api/ceremonies/{id}/update | Update |
POST | /api/ceremonies/{id}/delete | Delete |
POST | /api/ceremonies/{id}/run | Manual fire (ignores schedule) |
Auth via X-API-Key. Use the per-agent key (WORKSTACEAN_API_KEY_<AGENT>) rather than the shared admin key — ceremonies stamp createdBy: <agent> on the row, and per-agent keys enforce ownership on update/delete. See Quinn’s manage_cron.py for a ~250-line reference implementation.
Example body for create:
{ "id": "my-agent.hourly-sweep", "name": "Hourly Sweep", "schedule": "0 * * * *", "skill": "my_skill", "targets": ["my-agent"], "enabled": true}ID validation: ^[\w.\-]+$ — alphanumeric, dots, dashes. Reject bad IDs at the tool layer before the API does, same error message the server produces.
On update — do not re-enable by default. The tool must treat enabled as present-only on update requests (not pass it through unless the caller explicitly set it) — otherwise every update silently re-enables paused ceremonies. This bit us in Quinn PR #12; see the enabled: bool | None = None pattern in tools/lg_tools.py::manage_cron for the fix.
Gold-standard checklist
Section titled “Gold-standard checklist”A protoAgent that ticks all of these is a first-class fleet citizen — routing, streaming, scheduling, observability, and planner ranking all work without any Workstacean-side tailoring:
Transport & lifecycle
Section titled “Transport & lifecycle”-
GET /.well-known/agent-card.json+/.well-known/agent.jsonboth serve the card -
capabilities.streaming: truematches a realmessage/streamimplementation -
capabilities.pushNotifications: truematches a realPOST /tasks/{id}/pushNotificationConfigsimplementation -
message/sendreturns aTaskwithstate: submittedin under 1s (never blocks on work) -
GET /tasks/{id}reflects current state + artifact -
POST /tasks/{id}:cancelis atomic and fires a push notification -
GET /tasks/{id}:subscribecan reattach to an in-flight task after SSE disconnect
Task store hygiene
Section titled “Task store hygiene”- Terminal tasks evict on a TTL (default 1h)
- Webhook delivery tasks have strong references (Python 3.11 GC trap)
- Cancel is an atomic
check-and-writeunder the lock - Webhook URLs pass SSRF validation (loopback / RFC1918 / link-local rejected) with an operator allowlist for trusted docker-network hosts (Quinn #53)
- Push config read from the live record on every delivery, not closed over
- Push config parser accepts both token shapes — top-level
tokenANDauthentication.credentials(Quinn #61) - Webhook delivery logs at INFO (include taskId + state + response status);
logging.basicConfig(level=INFO)installed on server boot so INFO actually reaches stderr - Background producer runs independently of any SSE connection
SSE semantics
Section titled “SSE semantics”- Every event carries a
kinddiscriminator (task/status-update/artifact-update/message) — without it@a2a-js/sdksilently drops the event (Quinn #40) - Wire fields are camelCase:
taskId,contextId,lastChunk,artifactId(nottask_id/context_id/last_chunk) — Python variable names can stay snake_case, just not dict keys that cross the wire - Status updates carry a
finalboolean (true on terminal states) - Text deltas emit as
kind: "artifact-update"+append: true— never the fullaccumulated_text -
COMPLETEDemits TWO events per spec:artifact-update(full artifact,append: false,lastChunk: true) thenstatus-update(final: true) - Artifact objects carry an
artifactIdfor cross-event correlation - Tool status messages persist on the record for
:subscribeto surface - Consumer disconnect does not cancel the producer
Extensions (cards + runtime)
Section titled “Extensions (cards + runtime)”-
effect-domain-v1declared on the card for every state-mutating skill (enables L1 planner ranking) -
worldstate-delta-v1DataPart emitted on the terminal task whenever a tool with known effects succeeds (declared effects must agree with observed deltas — a drift test is cheap) -
cost-v1{usage, durationMs, costUsd?}DataPart on every terminal task that ran an LLM. Capture token usage from your model events (LangChain:on_chat_model_end→output.usage_metadata); computedurationMsfromcreated_at/updated_at; emit only when usage was actually tracked.costUsdis optional — derive from per-model rates or capture from the gateway response. Reference: Quinn_terminal_artifact_parts+store.add_usage(PR #56). -
confidence-v1{confidence, explanation}populated when the agent can self-assess - Success contract:
state: completedmeans the agent actually achieved the goal, not just that the skill returned without crashing
Reference implementations
Section titled “Reference implementations”Quinn/a2a_handler.py— the most complete reference. All of the hardening above + decoupled SSE producer + delta-only text frames + tool-message persistence. Single 800-line file, no inheritance.Quinn/server.py— agent-side wiring._chat_langgraph_streammaps LangGraphastream_events(v2)to the(tool_start|tool_end|text|delta|usage|done|error)tuple contract the handler consumes._build_agent_cardshowscapabilities.extensionswitheffect-domain-v1andcost-v1._worldstate_delta_for_toolemits the runtime delta onfile_bugsuccess.on_chat_model_endcapture feeds theusagechannel for cost-v1.Quinn/tools/manage_cron.py— LangGraph tool that CRUDs ceremonies via the per-agentX-API-Key.Quinn/tests/test_a2a_handler.py— ~55 tests covering the store, background runner, SSRF, cancel races, webhook retention, subscribe reconnect, delta-only text, producer survives consumer cancellation. Clone this alongside the handler.protoPen/a2a_handler.py— original port target. Predates Quinn’s hardening; lacks decoupled SSE producer, SSRF guard, webhook retention, atomic cancel. Kept as a reference for the minimum viable spec surface, but new agents should mirror Quinn’s version.
Related
Section titled “Related”- Add an agent — operator-side YAML + executor wiring
- A2A Streaming (SSE) — SSE event protocol in detail
- Create a Ceremony — ceremony YAML schema
- Workspace files reference
effect-domain-v1,worldstate-delta-v1,cost-v1,confidence-v1— the four A2A extensions covered by the planner’s ranking model