Skip to main content

Webhook handler

Use this recipe when you already have jobs configured with webhook.url and need a safe receiver shape.

Express.js example

Conduit signs each delivery with the conduit-signature header. Verify the raw body first, then deduplicate on event.id before doing heavier work.

import express from "express"
import { Conduit } from "@mappa-ai/conduit"

const app = express()
const conduit = new Conduit({ apiKey: process.env.CONDUIT_API_KEY! })

const webhookStore = {
async insertIfAbsent(eventId: string, payload: string) {
console.info("store event", eventId, payload.length)
return true
},
}

async function processWebhookEvent(event: ReturnType<typeof conduit.webhooks.parseEvent>) {
if (event.type === "report.completed") {
const report = await conduit.reports.get(event.data.reportId)
console.info(`Report ${report.id} is ready`)
}

if (event.type === "report.failed") {
console.error(`Job ${event.data.jobId} failed`, event.data.error)
}
}

app.post("/webhooks/conduit", express.text({ type: "*/*" }), async (req, res) => {
try {
await conduit.webhooks.verifySignature({
payload: req.body,
headers: req.headers as Record<string, string | string[] | undefined>,
secret: process.env.CONDUIT_WEBHOOK_SECRET!,
})

const event = conduit.webhooks.parseEvent(req.body)
const inserted = await webhookStore.insertIfAbsent(event.id, req.body)

if (!inserted) {
return res.status(200).json({ duplicate: true, received: true })
}

res.status(200).json({ received: true })

queueMicrotask(() => {
void processWebhookEvent(event)
})

return
} catch {
return res.status(401).json({ error: "Invalid signature" })
}
})

app.listen(3000)

Replace webhookStore.insertIfAbsent(...) with your durable store or queue. The important behavior is the unique event.id gate before you repeat downstream work.

FastAPI example

import os

from fastapi import FastAPI, HTTPException, Request

from conduit import Conduit, WebhookVerificationError

app = FastAPI()
conduit = Conduit(api_key=os.environ["CONDUIT_API_KEY"])
processed_event_ids: set[str] = set()


@app.post("/webhooks/conduit")
async def handle_webhook(request: Request):
raw_payload = await request.body()
headers = dict(request.headers)

try:
conduit.webhooks.verify_signature(
raw_payload,
headers,
secret=os.environ["CONDUIT_WEBHOOK_SECRET"],
)
except WebhookVerificationError as error:
raise HTTPException(status_code=401, detail=error.code) from error

event = conduit.webhooks.parse_event(raw_payload)

if event.id in processed_event_ids:
return {"duplicate": True, "received": True}

processed_event_ids.add(event.id)

if event.type == "report.completed":
report = conduit.reports.get(event.data["reportId"])
print(f"Report ready: {report.id}")

if event.type == "report.failed":
print(event.data["error"])

return {"received": True}

Use a database or queue-backed dedupe key in production instead of an in-memory set.

Next steps