Webhook handler
Use this recipe when you already have jobs configured with webhook.url and need a safe receiver shape.
Express.js example
Conduit signs each delivery with the conduit-signature header. Verify the raw body first, then deduplicate on event.id before doing heavier work.
import express from "express"
import { Conduit } from "@mappa-ai/conduit"
const app = express()
const conduit = new Conduit({ apiKey: process.env.CONDUIT_API_KEY! })
const webhookStore = {
async insertIfAbsent(eventId: string, payload: string) {
console.info("store event", eventId, payload.length)
return true
},
}
async function processWebhookEvent(event: ReturnType<typeof conduit.webhooks.parseEvent>) {
if (event.type === "report.completed") {
const report = await conduit.reports.get(event.data.reportId)
console.info(`Report ${report.id} is ready`)
}
if (event.type === "report.failed") {
console.error(`Job ${event.data.jobId} failed`, event.data.error)
}
}
app.post("/webhooks/conduit", express.text({ type: "*/*" }), async (req, res) => {
try {
await conduit.webhooks.verifySignature({
payload: req.body,
headers: req.headers as Record<string, string | string[] | undefined>,
secret: process.env.CONDUIT_WEBHOOK_SECRET!,
})
const event = conduit.webhooks.parseEvent(req.body)
const inserted = await webhookStore.insertIfAbsent(event.id, req.body)
if (!inserted) {
return res.status(200).json({ duplicate: true, received: true })
}
res.status(200).json({ received: true })
queueMicrotask(() => {
void processWebhookEvent(event)
})
return
} catch {
return res.status(401).json({ error: "Invalid signature" })
}
})
app.listen(3000)
Replace webhookStore.insertIfAbsent(...) with your durable store or queue. The important behavior is the unique event.id gate before you repeat downstream work.
FastAPI example
import os
from fastapi import FastAPI, HTTPException, Request
from conduit import Conduit, WebhookVerificationError
app = FastAPI()
conduit = Conduit(api_key=os.environ["CONDUIT_API_KEY"])
processed_event_ids: set[str] = set()
@app.post("/webhooks/conduit")
async def handle_webhook(request: Request):
raw_payload = await request.body()
headers = dict(request.headers)
try:
conduit.webhooks.verify_signature(
raw_payload,
headers,
secret=os.environ["CONDUIT_WEBHOOK_SECRET"],
)
except WebhookVerificationError as error:
raise HTTPException(status_code=401, detail=error.code) from error
event = conduit.webhooks.parse_event(raw_payload)
if event.id in processed_event_ids:
return {"duplicate": True, "received": True}
processed_event_ids.add(event.id)
if event.type == "report.completed":
report = conduit.reports.get(event.data["reportId"])
print(f"Report ready: {report.id}")
if event.type == "report.failed":
print(event.data["error"])
return {"received": True}
Use a database or queue-backed dedupe key in production instead of an in-memory set.