Skip to content

Build an A2A Agent

A recipe for building a new external A2A agent that plugs into Workstacean’s routing, scheduling, and fleet-health pipelines. Complementary to Add an agent, which covers the operator side (YAML + executor registration); this guide covers what to build on the agent side so everything else just works.

Reference implementations: protoPen/a2a_handler.py and Quinn/a2a_handler.py — ~85% shared code, either one is a reasonable starting template.

If you implement the A2A spec properly and serve an agent card with the right capability flags, Workstacean wires up the following automatically — no per-agent code on the Workstacean side:

FeatureHow it works
Skill routingSkillBrokerPlugin discovers your skills from the card every 10 min
Long-running task handlingReturn Task.status.state = submittedTaskTracker polls every 30s until terminal
Push notificationsAdvertise capabilities.pushNotifications: true → Workstacean registers a webhook per task, no more polling
Fleet-health observabilityTaskTracker.onTerminal publishes autonomous.outcome.{systemActor}.{skill} on every task completion → agent_fleet_health aggregates per-agent success rate, p50/p95 latency, cost, failure reasons in a 24h rolling window
Human-in-the-loopReturn input-required → Workstacean raises a HITL prompt in Discord, resumes your task with the human’s answer via message/send on the same taskId
Scheduled invocationsDefine a ceremony YAML → Workstacean fires the skill at the scheduled time

Your agent never writes a health probe, a retry loop, or a cron scheduler. Workstacean does those for every agent that joins the fleet.

The @a2a-js/sdk Client uses JSON-RPC on /a2a for every operation it calls — including task management and push notification CRUD. REST paths like GET /tasks/{id} are convenience aliases only; the SDK never hits them. Omitting any JSON-RPC method means the matching SDK call silently returns -32601 Method not found and the SDK wraps that as a misleading “Invalid response Content-Type” error. See Quinn PR #25.

JSON-RPC methods (required — all of them)

Section titled “JSON-RPC methods (required — all of them)”
MethodPurpose
message/sendAsync submit; returns a Task with state: submitted
message/streamSend + SSE stream (spec-canonical name)
message/sendStreamLegacy alias — accept alongside message/stream for backwards compat with older protoPen-era clients
tasks/getFetch current Task state + artifact
tasks/cancelCooperative cancel
tasks/resubscribeSSE reconnect to an in-flight task
tasks/pushNotificationConfig/setRegister a webhook
tasks/pushNotificationConfig/getRead the current webhook
tasks/pushNotificationConfig/listList configs
tasks/pushNotificationConfig/deleteClear the webhook

If you skip the last four, every SDK call from client.setTaskPushNotificationConfig through client.deleteTaskPushNotificationConfig returns -32601. Your webhooks can still work via the REST alias below — but only if the caller knows to use it, which the SDK does not.

REST aliases (optional; no SDK client hits these)

Section titled “REST aliases (optional; no SDK client hits these)”
MethodPathPurpose
POST/message:sendsame as message/send (HTTP 202)
POST/message:streamsame as message/stream (SSE)
GET/tasks/{id}same as tasks/get
GET/tasks/{id}:subscribesame as tasks/resubscribe (plain SSE, no JSON-RPC envelope)
POST/tasks/{id}:cancelsame as tasks/cancel
POST/tasks/{id}/pushNotificationConfigssame as tasks/pushNotificationConfig/set

Serve them if you want manual curl smoke tests to be simpler. They are not required for SDK interoperability.

MethodPathPurpose
GET/.well-known/agent-card.jsonAgent card (spec-canonical)
GET/.well-known/agent.jsonLegacy path older clients still probe — serve both
CodeMeaning
-32601Method not found
-32602Invalid params (missing field, bad type, SSRF rejection)
-32001Task not found
-32002Task already terminal (cancel-after-complete)

Every task moves through the A2A state machine:

submitted → working → completed
↘ failed
↘ canceled
↘ input-required (HITL — resumed via message/send on same taskId)

Critical: message/send must return submitted within ~1s. The full response comes later via polling, SSE, or webhook. If your handler blocks for the duration of the underlying skill, you lose:

  • Timeout safety (reverse proxies cap HTTP connections, usually 60-300s)
  • Cancellation (no way to interrupt a stuck task)
  • Observability (no granular state updates; just “hanging”)
  • Push notifications (the whole point is “fire-and-forget on the caller side”)

Spawn the skill execution as a background task owned by the task record, return a task receipt, let TaskTracker or SSE drive the result.

async def _submit_task(text, context_id, push_config):
task_id = str(uuid4())
record = TaskRecord(id=task_id, state="submitted", ...)
await store.create(record)
record._bg_task = asyncio.create_task(_run_skill(task_id, text))
return record # returned immediately — state=submitted
AGENT_CARD = {
"name": "my-agent",
"description": "One-line summary of what the agent does",
"url": f"http://{host}/a2a", # JSON-RPC endpoint, NOT the server root
"version": "1.0.0",
"provider": {"organization": "protoLabsAI"},
"capabilities": {
"streaming": True, # enables message/stream + :subscribe
"pushNotifications": True, # enables /tasks/{id}/pushNotificationConfigs
"stateTransitionHistory": False, # tracking every transition in the response
},
"defaultInputModes": ["text/plain"],
"defaultOutputModes": ["text/markdown"],
"skills": [
{
"id": "my_skill",
"name": "My Skill",
"description": "What this skill does, what input it expects",
"tags": ["category"],
"examples": ["/my-command", "do the thing"],
},
],
"securitySchemes": {
"apiKey": {"type": "apiKey", "in": "header", "name": "X-API-Key"},
},
"security": [{"apiKey": []}],
}

Three field-level gotchas, all from incidents we’ve hit:

  • url must point at the JSON-RPC endpoint, not the server root. @a2a-js/sdk uses this field to send message/send — if it’s http://host/, FastAPI returns 405 and the dispatch dies silently. (See Quinn PR #6.)
  • Serve at both /.well-known/agent-card.json and /.well-known/agent.json. Spec-canonical is agent-card.json; older clients use agent.json. Workstacean’s A2AExecutor falls back from one to the other, but serving both saves a 404 round-trip.
  • Flip capabilities.streaming / pushNotifications to true only when you actually support them. SkillBrokerPlugin refreshes the card every 10 min, and A2AExecutor switches transport (sendMessageStream vs sendMessage) based on those flags. False-positive flags break the dispatcher.

Lessons the hard way — every agent’s task store should handle the cases below. Steal from protoPen/a2a_handler.py or Quinn/a2a_handler.py directly.

A long-running process leaks memory proportional to total lifetime traffic if you never drop completed tasks. Run a background sweep that evicts terminal tasks older than N (we use 1h by default — long enough for pollers and webhook delivery to drain). Never evict submitted or working tasks.

2. Strong references on webhook delivery tasks

Section titled “2. Strong references on webhook delivery tasks”

asyncio.create_task(_deliver_webhook(...)) without retaining the handle is a trap — Python 3.11+ docs warn the event loop keeps only weak references. A pending retry can be garbage-collected mid-backoff. Keep a module-level set[asyncio.Task], add each task on create, attach a done_callback that evicts on completion.

The naive sequence state = await store.get(id); if state not terminal: await store.update(CANCELED) races with the background runner. A worker can transition to completed between the read and the write, and you clobber a legitimate terminal state with canceled. Do the check and the write under the same lock.

Any client supplying http://169.254.169.254/..., http://localhost/..., or http://10.0.0.1/... as a push config target turns your agent into an internal network scanner. Before accepting a PushNotificationConfig:

  • Reject non-http(s) schemes (file://, javascript:, gopher:, …)
  • Resolve the hostname once and check every returned address
  • Reject loopback, link-local, RFC1918, multicast, reserved, unspecified IPs
  • Reject unresolvable hostnames outright

One-time resolution is not a defence against DNS rebinding — pin the resolved IP on the httpx transport if your threat model requires it. Private-network deployments (Tailscale-only, Docker-only) can skip this; public-internet deployments cannot.

5. Push config must be read from the live record, not closed over

Section titled “5. Push config must be read from the live record, not closed over”

POST /tasks/{id}/pushNotificationConfigs is explicitly a post-submit channel — callers who didn’t have a webhook at submit time can register one later and still get terminal notifications. If your push helper closes over the submit-time config instead of reading record.push_config on every call, the post-submit registration silently does nothing.

# Wrong — closes over submit-time config
def _make_push_fn(push_config):
async def _push(record):
if push_config and record.state in TERMINAL | {WORKING}:
asyncio.create_task(_deliver_webhook(record, push_config))
return _push
# Right — reads record.push_config at call time
async def _push(record):
cfg = record.push_config
if cfg and record.state in TERMINAL | {WORKING}:
task = asyncio.create_task(_deliver_webhook(record, cfg))
_pending_webhook_tasks.add(task)
task.add_done_callback(_pending_webhook_tasks.discard)

6. Producer must be independent of the SSE connection

Section titled “6. Producer must be independent of the SSE connection”

If your agent implements message/stream, the HTTP SSE generator and the LangGraph (or similar) producer should not be the same task. When the SSE client disconnects mid-run, FastAPI cancels the generator — if the producer is inline, it dies with the connection and any :subscribe reconnect has no task to attach to.

Pattern: always spawn the producer as a background task owned by the task record, regardless of which entry point created the task. The SSE generator becomes a pure consumer that reads from the store and awaits the rotating _update_event. Drop the SSE connection → producer keeps running → reconnect via :subscribe picks up cleanly.

# Shared consumer used by both message/stream and :subscribe
async def _watch_task(task_id, start_text_len=0):
record = await _store.get(task_id)
if record is None:
return
last_sent_len = start_text_len
yield ("status", record, None)
if record.accumulated_text and len(record.accumulated_text) > last_sent_len:
yield ("text_delta", record, record.accumulated_text[last_sent_len:])
last_sent_len = len(record.accumulated_text)
if record.state in _TERMINAL:
return
while True:
r = await _store.get(task_id)
if r is None:
return
try:
await asyncio.wait_for(r._update_event.wait(), timeout=25)
except asyncio.TimeoutError:
yield ("keepalive", None, None)
continue
r = await _store.get(task_id)
if r is None:
return
yield ("status", r, None)
if r.accumulated_text and len(r.accumulated_text) > last_sent_len:
yield ("text_delta", r, r.accumulated_text[last_sent_len:])
last_sent_len = len(r.accumulated_text)
if r.state in _TERMINAL:
return

Both SSE routes catch asyncio.CancelledError (FastAPI raises this on client disconnect), log the drop, and re-raise — the background task is never cancelled.

7. Emit text deltas, not the full accumulated text

Section titled “7. Emit text deltas, not the full accumulated text”

On :subscribe reconnect, the initial snapshot can emit the full accumulated_text once as an append: false frame. Every subsequent update must emit only the new suffix as append: true. Clients that receive the full text on every update see duplicated content on the wire (the subscribe snapshot already gave them the pre-disconnect portion). Track last_sent_len in your consumer and emit text[last_sent_len:].

8. Persist tool-progress messages on the record

Section titled “8. Persist tool-progress messages on the record”

If the producer emits in-process events like tool_start / tool_end that your SSE path currently surfaces as status messages, those are invisible to a :subscribe reconnect — the subscriber reads the store, not the producer’s event queue. Store the most recent tool message on the task record (e.g. record.last_status_message: str | None) via your update_state(status_message=...) call. Surface it under status.message in status events. Clear on terminal transitions so subscribers on a completed task see the final state cleanly.

See A2A Streaming (SSE) for the full SSE event protocol. Short version: advertise capabilities.streaming: true, accept method: "message/stream" on /a2a and POST /message:stream, emit text/event-stream frames as work progresses. A2AExecutor on Workstacean’s side switches transport automatically based on the card; your message/send consumers stay unchanged.

On COMPLETED, emit two events per the A2A spec: first a kind: "artifact-update" carrying the authoritative full artifact — text + any worldstate-delta DataParts — as append: false, lastChunk: true; then a kind: "status-update" with final: true to close the stream. Mid-run stays incremental via kind: "artifact-update" + append: true text deltas only.

Critical — SSE events must carry a kind discriminator. @a2a-js/sdk’s for await (const event of client.sendMessageStream(...)) routes events by kind. Events missing it are silently dropped, which means Workstacean’s TaskTracker never attaches, push notifications never register, and HITL chains silently fail. This was Quinn #40. Every frame — initial task, every status-update, every artifact-update — needs kind.

Workstacean registers webhooks of the form ${WORKSTACEAN_BASE_URL}/api/a2a/callback/{taskId} with a per-task HMAC token. When you POST a task snapshot to the URL, Workstacean verifies the token and fires the same bus event a poll would have.

The webhook payload is a TaskStatusUpdateEvent — same shape as on the wire, including the kind discriminator:

{
"kind": "status-update",
"taskId": "3f8a1c2d-...",
"contextId": "engagement-001",
"status": {"state": "completed", "timestamp": "2026-04-15T20:00:00Z"},
"final": true,
"artifact": {
"artifactId": "3f8a1c2d-...",
"parts": [{"kind": "text", "text": "## Results\n..."}],
"append": false,
"lastChunk": true
}
}

Fire webhooks on every transition the caller cares about — working, completed, failed, canceled. Missing the cancel transition is a common bug (we fixed it in Quinn PR #19).

The A2A spec’s PushNotificationConfig permits two interchangeable shapes for the bearer token, and SDKs are inconsistent about which one they send:

// Shape 1 — top-level `token` (what @a2a-js/sdk serialises by default,
// what Workstacean's SkillDispatcherPlugin sends)
{
"url": "http://caller/api/a2a/callback/{taskId}",
"token": "per-task-secret"
}
// Shape 2 — structured RFC-8821 AuthenticationInfo
{
"url": "http://caller/api/a2a/callback/{taskId}",
"authentication": {
"schemes": ["Bearer"],
"credentials": "per-task-secret"
}
}

Your tasks/pushNotificationConfig/set handler (and /tasks/{id}/pushNotificationConfigs REST alias) must read both and store the resolved string. Only reading one side silently breaks whichever caller picked the other shape — the webhook fires, the caller returns 401 “Invalid notification token”, and the behaviour is identical to “never delivered” unless you have INFO-level logs. This was Quinn #61 — cost us a day of polling fallback to find.

Reference: Quinn’s _extract_push_token — one helper, both shapes, top-level wins when both present.

  • Retry delivery 3× with exponential backoff (1s / 3s / 9s)
  • Skip retry on 4xx — the caller doesn’t want it and retrying won’t help
  • Send Authorization: Bearer <token> if the caller registered a token (see token-parsing above)
  • Do NOT log the webhook body or token — these carry task artifacts, which may contain sensitive data
  • Log delivery attempts at INFO, not DEBUG. The default Python root level is WARNING; without logging.basicConfig(level=INFO) on your server, “webhook delivered” lines silently vanish and you can’t tell delivery from silent-drop (Quinn #61). curl the agent’s container env for LOG_LEVEL — if it’s unset and the server doesn’t set a basicConfig, you’re flying blind.

You do not need to expose a separate health endpoint. Workstacean’s TaskTracker publishes an autonomous.outcome.{systemActor}.{skill} bus event on every terminal task:

protoWorkstacean/src/executor/skill-dispatcher-plugin.ts
onTerminal: (content, isError, taskState) => {
this._publishAutonomousOutcome({
correlationId, systemActor, skill,
success: !isError,
taskState,
text: content,
durationMs: Date.now() - dispatchedAt,
usage, // token counts if reported
});
}

The AgentFleetHealthPlugin subscribes to autonomous.outcome.# and aggregates per-systemActor over a rolling 24h window: success rate, p50/p95 latency, cost per successful outcome, recent failures, orphaned-skill counts. Quinn, protoPen, the protoMaker team — every agent that returns async tasks appears in fleet health automatically.

What to do on your side: make sure success = true means “the agent achieved its goal,” not “the skill returned without crashing.” Classic mistake: a tool returns "Error: ..." but the agent reports state: completed with that string as an artifact. Fleet health now thinks every failure is a success. Propagate tool errors into the task state machine — return state: failed, error: <msg> when things go wrong.

Two entry points — operator-driven and agent-driven.

Full schema is in Create a Ceremony. Short version — drop a file in workspace/ceremonies/<id>.yaml:

id: quinn.daily-digest
name: "Quinn Daily QA Digest"
schedule: "0 14 * * *" # 5-field cron, UTC
skill: qa_report # must be on the target agent's card
targets: [quinn]
notifyChannel: "1469080556720623699"
enabled: true

Workstacean reloads workspace/ceremonies/*.yaml every ~5s, so no restart is needed to add or disable a ceremony.

When the cron fires:

  1. CeremonyPlugin’s internal timer emits ceremony.quinn.daily-digest.execute
  2. CeremonyPlugin publishes agent.skill.request with the ceremony’s configured skill + targets
  3. SkillDispatcherPlugin resolves the executor; A2AExecutor sends message/send to your /a2a with the skill content
  4. Your agent runs the skill — the message text is whatever content the ceremony plugin built (usually a skill-specific prompt)

From the agent’s perspective, a cron-triggered message/send looks identical to a Discord DM or any other inbound call. The only hint is params.metadata.skillHint which you can use for deterministic tool selection.

Agent side: schedule from inside the agent

Section titled “Agent side: schedule from inside the agent”

Agents with access to Workstacean’s manage_cron tool can CRUD ceremonies programmatically — useful when the agent itself needs to set up a follow-up (“remind me to re-audit this PR in 30 minutes”) or when the ceremony definition is dynamic.

The tool wraps these endpoints:

MethodPathAction
GET/api/ceremoniesList
POST/api/ceremonies/createCreate
POST/api/ceremonies/{id}/updateUpdate
POST/api/ceremonies/{id}/deleteDelete
POST/api/ceremonies/{id}/runManual fire (ignores schedule)

Auth via X-API-Key. Use the per-agent key (WORKSTACEAN_API_KEY_<AGENT>) rather than the shared admin key — ceremonies stamp createdBy: <agent> on the row, and per-agent keys enforce ownership on update/delete. See Quinn’s manage_cron.py for a ~250-line reference implementation.

Example body for create:

{
"id": "my-agent.hourly-sweep",
"name": "Hourly Sweep",
"schedule": "0 * * * *",
"skill": "my_skill",
"targets": ["my-agent"],
"enabled": true
}

ID validation: ^[\w.\-]+$ — alphanumeric, dots, dashes. Reject bad IDs at the tool layer before the API does, same error message the server produces.

On update — do not re-enable by default. The tool must treat enabled as present-only on update requests (not pass it through unless the caller explicitly set it) — otherwise every update silently re-enables paused ceremonies. This bit us in Quinn PR #12; see the enabled: bool | None = None pattern in tools/lg_tools.py::manage_cron for the fix.

A protoAgent that ticks all of these is a first-class fleet citizen — routing, streaming, scheduling, observability, and planner ranking all work without any Workstacean-side tailoring:

  • GET /.well-known/agent-card.json + /.well-known/agent.json both serve the card
  • capabilities.streaming: true matches a real message/stream implementation
  • capabilities.pushNotifications: true matches a real POST /tasks/{id}/pushNotificationConfigs implementation
  • message/send returns a Task with state: submitted in under 1s (never blocks on work)
  • GET /tasks/{id} reflects current state + artifact
  • POST /tasks/{id}:cancel is atomic and fires a push notification
  • GET /tasks/{id}:subscribe can reattach to an in-flight task after SSE disconnect
  • Terminal tasks evict on a TTL (default 1h)
  • Webhook delivery tasks have strong references (Python 3.11 GC trap)
  • Cancel is an atomic check-and-write under the lock
  • Webhook URLs pass SSRF validation (loopback / RFC1918 / link-local rejected) with an operator allowlist for trusted docker-network hosts (Quinn #53)
  • Push config read from the live record on every delivery, not closed over
  • Push config parser accepts both token shapes — top-level token AND authentication.credentials (Quinn #61)
  • Webhook delivery logs at INFO (include taskId + state + response status); logging.basicConfig(level=INFO) installed on server boot so INFO actually reaches stderr
  • Background producer runs independently of any SSE connection
  • Every event carries a kind discriminator (task / status-update / artifact-update / message) — without it @a2a-js/sdk silently drops the event (Quinn #40)
  • Wire fields are camelCase: taskId, contextId, lastChunk, artifactId (not task_id / context_id / last_chunk) — Python variable names can stay snake_case, just not dict keys that cross the wire
  • Status updates carry a final boolean (true on terminal states)
  • Text deltas emit as kind: "artifact-update" + append: true — never the full accumulated_text
  • COMPLETED emits TWO events per spec: artifact-update (full artifact, append: false, lastChunk: true) then status-update (final: true)
  • Artifact objects carry an artifactId for cross-event correlation
  • Tool status messages persist on the record for :subscribe to surface
  • Consumer disconnect does not cancel the producer
  • effect-domain-v1 declared on the card for every state-mutating skill (enables L1 planner ranking)
  • worldstate-delta-v1 DataPart emitted on the terminal task whenever a tool with known effects succeeds (declared effects must agree with observed deltas — a drift test is cheap)
  • cost-v1 {usage, durationMs, costUsd?} DataPart on every terminal task that ran an LLM. Capture token usage from your model events (LangChain: on_chat_model_endoutput.usage_metadata); compute durationMs from created_at/updated_at; emit only when usage was actually tracked. costUsd is optional — derive from per-model rates or capture from the gateway response. Reference: Quinn _terminal_artifact_parts + store.add_usage (PR #56).
  • confidence-v1 {confidence, explanation} populated when the agent can self-assess
  • Success contract: state: completed means the agent actually achieved the goal, not just that the skill returned without crashing
  • Quinn/a2a_handler.py — the most complete reference. All of the hardening above + decoupled SSE producer + delta-only text frames + tool-message persistence. Single 800-line file, no inheritance.
  • Quinn/server.py — agent-side wiring. _chat_langgraph_stream maps LangGraph astream_events(v2) to the (tool_start|tool_end|text|delta|usage|done|error) tuple contract the handler consumes. _build_agent_card shows capabilities.extensions with effect-domain-v1 and cost-v1. _worldstate_delta_for_tool emits the runtime delta on file_bug success. on_chat_model_end capture feeds the usage channel for cost-v1.
  • Quinn/tools/manage_cron.py — LangGraph tool that CRUDs ceremonies via the per-agent X-API-Key.
  • Quinn/tests/test_a2a_handler.py — ~55 tests covering the store, background runner, SSRF, cancel races, webhook retention, subscribe reconnect, delta-only text, producer survives consumer cancellation. Clone this alongside the handler.
  • protoPen/a2a_handler.py — original port target. Predates Quinn’s hardening; lacks decoupled SSE producer, SSRF guard, webhook retention, atomic cancel. Kept as a reference for the minimum viable spec surface, but new agents should mirror Quinn’s version.