Sync routes
Per-stream poll handlers that fetch from the provider and emit canonical records.
For every poll-mode stream you declare, write a sync route. The platform invokes it on schedule, hands you the saved page/cursor, and expects you to fetch from the provider and emit canonical records via ingest.canonical.
File location
Sync routes live at src/api/sync/<stream-key>.ts — auto-registered as the POST handler for the matching streams[] entry in poll or backfill mode. This is an enforced build/deploy contract: every poll or backfill stream must have a matching POST sync route. They follow the same conventions as plain API routes.
Real example
A real sync route — Stripe customers:
import { api, ingest } from "@backfill-io/sdk";
import { stripePollMetadata } from "../../lib/metadata";
import {
commitStripeListPage,
missingStripeApiKeyResponse,
stripeApiKey,
stripeDateFilterParams,
stripeGet,
stripeListPage,
stripeListResponse,
stripeListUrl,
stripeSyncResponse,
} from "../../lib/stripe";
export const config = { auth: "api_key" };
export const POST = api(async (request) => {
const stream = "customers";
const apiKey = stripeApiKey();
if (!apiKey) return missingStripeApiKeyResponse();
const page = stripeListPage(request, stream);
const response = stripeGet(
stripeListUrl("/customers", page, stripeDateFilterParams(request)),
apiKey,
);
if (!response.ok) {
return api.providerError("Stripe customers sync failed", response);
}
const list = stripeListResponse(response);
const payloads = list.records.map((customer) => customerCanonical(customer));
const batch = ingest.batch("customer", payloads, {
stream,
continueOnError: true,
sourceId: (payload) => payload.customer_id,
idempotencyKey: (payload) => `stripe:customer:${payload.customer_id}`,
});
const checkpoint = commitStripeListPage(stream, list.records, list.hasMore);
return stripeSyncResponse({
stream,
imported: batch.imported,
failed: batch.failed,
hasMore: list.hasMore,
checkpoint,
results: batch.results,
failedRecords: batch.failedRecords,
});
});
The contract
A sync route is just an API route with POST. The platform invokes it as part of the schedule for the matching stream.
| Concern | Where it lives |
|---|---|
| Scheduling | The streams[] entry in the manifest (schedule: "*/15 * * * *"). |
| Auth into the provider | Secrets.get("api_key") (or whatever your settingsSchema declared). |
| Cursor / page | Read from request (e.g., searchParams) and persisted between runs by the platform / lib helpers. |
| Outbound HTTP | Http (with the provider’s host on permissions.http). |
| Emit | ingest.batch(canonicalType, payloads, { stream, continueOnError: true, sourceId, idempotencyKey }) for pages, or ingest.canonical(...) for one record. |
| Telemetry | Log.info("...", { ... }). |
| Response | A status object the platform reads to know how the run went. |
ingest.canonical
The connector-specific runtime helper. Signature observed from usage:
ingest.canonical(
canonicalType: string, // canonical entity name (lowercase or platform-mapped)
payload: Record<string, any>, // shaped to canonical schema
opts: { stream: string; idempotencyKey: string }
): IngestResult;
| Argument | Notes |
|---|---|
canonicalType | The canonical entity to write to (e.g., "customer", "invoice"). The platform routes this to the right concrete entity for the connection. |
payload | Shape it as the canonical schema expects, not the provider’s raw shape. Most connectors maintain lib/transforms/*.ts to normalize. |
stream | The stream key from streams[]. Used for telemetry + checkpoint accounting. |
idempotencyKey | A stable string derived from the provider’s record. Re-emissions with the same key dedupe. |
ingest.batch
For sync pages, prefer ingest.batch(..., { continueOnError: true }) so one bad
record does not abort the whole page:
const batch = ingest.batch("customer", payloads, {
stream: "customers",
continueOnError: true,
sourceId: (payload) => payload.customer_id,
idempotencyKey: (payload) => `stripe:customer:${payload.customer_id}`,
});
The result includes:
| Field | Notes |
|---|---|
imported | Successful records. |
failed | Failed records. |
results | Successful canonical ingest results. |
failedRecords | Structured failures with documentType, stream, sourceId, idempotencyKey, reason, message, retryable, and index. |
Pagination & checkpoints
The platform’s lib helpers (stripeListPage, commitStripeListPage, etc.) abstract a “read the saved cursor → fetch one page → save the new cursor → return hasMore” loop. Until the connector SDK formalizes this, treat the existing connector’s helpers as the reference and copy patterns.
Errors
If the provider call fails, return early with a structured response (e.g., api.providerError(...)). Don’t throw — ingest.canonical is not aware of mid-pagination throws and you may end up with partial state.
If a single record fails to ingest, log the failure and continue the page; the platform records ingest results per record so partial success is observable.
Testing sync routes
Use @backfill-io/sdk/testing to run sync routes locally with mocked provider
HTTP, settings, secrets, checkpoints, and ingest calls:
import assert from "node:assert/strict";
import test from "node:test";
import { connectorTestRuntime } from "@backfill-io/sdk/testing";
test("customers sync emits canonical customers", async () => {
const runtime = connectorTestRuntime({
secrets: { api_key: "sk_test_123" },
});
try {
runtime.install();
const { POST } = await import("../src/api/sync/customers");
runtime.http.getOnce({
status: 200,
body: { data: [{ id: "cus_123", name: "Ada Lovelace" }] },
});
const response = await runtime.call(POST);
assert.equal(response.status, 200);
assert.equal(runtime.ingest.calls[0].documentType, "customer");
assert.equal(runtime.ingest.calls[0].sourceId, "cus_123");
} finally {
runtime.restore();
}
});
See Testing for the full route-test harness, including partial ingest failures and pagination assertions.