Sync routes

Per-stream poll handlers that fetch from the provider and emit canonical records.

For every poll-mode stream you declare, write a sync route. The platform invokes it on schedule, hands you the saved page/cursor, and expects you to fetch from the provider and emit canonical records via ingest.canonical.

File location

Sync routes live at src/api/sync/<stream-key>.ts — auto-registered as the POST handler for the matching streams[] entry in poll or backfill mode. This is an enforced build/deploy contract: every poll or backfill stream must have a matching POST sync route. They follow the same conventions as plain API routes.

Real example

A real sync route — Stripe customers:

import { api, ingest } from "@backfill-io/sdk";
import { stripePollMetadata } from "../../lib/metadata";
import {
  commitStripeListPage,
  missingStripeApiKeyResponse,
  stripeApiKey,
  stripeDateFilterParams,
  stripeGet,
  stripeListPage,
  stripeListResponse,
  stripeListUrl,
  stripeSyncResponse,
} from "../../lib/stripe";

export const config = { auth: "api_key" };

export const POST = api(async (request) => {
  const stream = "customers";
  const apiKey = stripeApiKey();
  if (!apiKey) return missingStripeApiKeyResponse();

  const page = stripeListPage(request, stream);
  const response = stripeGet(
    stripeListUrl("/customers", page, stripeDateFilterParams(request)),
    apiKey,
  );
  if (!response.ok) {
    return api.providerError("Stripe customers sync failed", response);
  }

  const list = stripeListResponse(response);
  const payloads = list.records.map((customer) => customerCanonical(customer));
  const batch = ingest.batch("customer", payloads, {
    stream,
    continueOnError: true,
    sourceId: (payload) => payload.customer_id,
    idempotencyKey: (payload) => `stripe:customer:${payload.customer_id}`,
  });

  const checkpoint = commitStripeListPage(stream, list.records, list.hasMore);

  return stripeSyncResponse({
    stream,
    imported: batch.imported,
    failed: batch.failed,
    hasMore: list.hasMore,
    checkpoint,
    results: batch.results,
    failedRecords: batch.failedRecords,
  });
});

The contract

A sync route is just an API route with POST. The platform invokes it as part of the schedule for the matching stream.

ConcernWhere it lives
SchedulingThe streams[] entry in the manifest (schedule: "*/15 * * * *").
Auth into the providerSecrets.get("api_key") (or whatever your settingsSchema declared).
Cursor / pageRead from request (e.g., searchParams) and persisted between runs by the platform / lib helpers.
Outbound HTTPHttp (with the provider’s host on permissions.http).
Emitingest.batch(canonicalType, payloads, { stream, continueOnError: true, sourceId, idempotencyKey }) for pages, or ingest.canonical(...) for one record.
TelemetryLog.info("...", { ... }).
ResponseA status object the platform reads to know how the run went.

ingest.canonical

The connector-specific runtime helper. Signature observed from usage:

ingest.canonical(
  canonicalType: string,                // canonical entity name (lowercase or platform-mapped)
  payload: Record<string, any>,         // shaped to canonical schema
  opts: { stream: string; idempotencyKey: string }
): IngestResult;
ArgumentNotes
canonicalTypeThe canonical entity to write to (e.g., "customer", "invoice"). The platform routes this to the right concrete entity for the connection.
payloadShape it as the canonical schema expects, not the provider’s raw shape. Most connectors maintain lib/transforms/*.ts to normalize.
streamThe stream key from streams[]. Used for telemetry + checkpoint accounting.
idempotencyKeyA stable string derived from the provider’s record. Re-emissions with the same key dedupe.

ingest.batch

For sync pages, prefer ingest.batch(..., { continueOnError: true }) so one bad record does not abort the whole page:

const batch = ingest.batch("customer", payloads, {
  stream: "customers",
  continueOnError: true,
  sourceId: (payload) => payload.customer_id,
  idempotencyKey: (payload) => `stripe:customer:${payload.customer_id}`,
});

The result includes:

FieldNotes
importedSuccessful records.
failedFailed records.
resultsSuccessful canonical ingest results.
failedRecordsStructured failures with documentType, stream, sourceId, idempotencyKey, reason, message, retryable, and index.

Pagination & checkpoints

The platform’s lib helpers (stripeListPage, commitStripeListPage, etc.) abstract a “read the saved cursor → fetch one page → save the new cursor → return hasMore” loop. Until the connector SDK formalizes this, treat the existing connector’s helpers as the reference and copy patterns.

Errors

If the provider call fails, return early with a structured response (e.g., api.providerError(...)). Don’t throw — ingest.canonical is not aware of mid-pagination throws and you may end up with partial state.

If a single record fails to ingest, log the failure and continue the page; the platform records ingest results per record so partial success is observable.

Testing sync routes

Use @backfill-io/sdk/testing to run sync routes locally with mocked provider HTTP, settings, secrets, checkpoints, and ingest calls:

import assert from "node:assert/strict";
import test from "node:test";
import { connectorTestRuntime } from "@backfill-io/sdk/testing";

test("customers sync emits canonical customers", async () => {
  const runtime = connectorTestRuntime({
    secrets: { api_key: "sk_test_123" },
  });

  try {
    runtime.install();
    const { POST } = await import("../src/api/sync/customers");

    runtime.http.getOnce({
      status: 200,
      body: { data: [{ id: "cus_123", name: "Ada Lovelace" }] },
    });

    const response = await runtime.call(POST);

    assert.equal(response.status, 200);
    assert.equal(runtime.ingest.calls[0].documentType, "customer");
    assert.equal(runtime.ingest.calls[0].sourceId, "cus_123");
  } finally {
    runtime.restore();
  }
});

See Testing for the full route-test harness, including partial ingest failures and pagination assertions.