Ingest API reference

The ingest global — canonical single-record and batch ingestion, idempotency keys, and failure handling.

The ingest global is how connector code hands records to Backfill’s canonical pipeline. It validates the payload against the canonical contract, deduplicates on your idempotency key, versions the document, and triggers downstream policy/journal-entry processing — the same pipeline first-party connectors use.

import { ingest } from "@backfill-io/sdk";

Use ingest after your connector has fetched or received source data and transformed it into a Backfill canonical resource payload. It is not an HTTP endpoint and it does not decide what to fetch; it records already-mapped data into Backfill with enough stream and idempotency metadata for retries, observability, and deduplication.

When to use ingest

Use ingest only from connector code running inside Backfill’s sandbox: poll sync routes, backfill routes, and webhook routes declared by a defineConnector manifest. The helper is provided by the connector runtime; plain extensions and external systems should not call it.

ingest and the public REST create endpoints reach the same canonical pipeline from different sides of the trust boundary:

Where your code runsUseWhy
External service, script, or integration calling Backfill over HTTPREST endpoints such as POST /v1/customers or POST /v1/invoicesYou authenticate with a bearer token and use HTTP idempotency for safe retries.
Connector route running inside Backfill’s sandboxingest.canonical or ingest.batchThe runtime already has connection context, stream metadata, checkpoint integration, batching, and observation-mode interception.
Plain extension code creating records it ownsEntity.create / generated entity classesThe record originates inside the extension instead of mirroring an external source stream.

Do not call Backfill’s REST API from a connector route just to create the same records. That adds a same-host HTTP round trip, requires managing an API token inside the sandbox, and skips connector-specific behavior such as batch results, stream accounting, and observation mode.

Streams and document types

Every ingest call names two things:

ArgumentMeaning
Canonical document typeThe Backfill resource contract to validate against, such as "customer", "invoice", or "bank_transaction".
streamThe connector stream key that produced the record. This must match a streams[] entry in your connector manifest.

Choose stream keys from the source system’s logical collections or event families — the units operators need to schedule, retry, checkpoint, and monitor independently. A poll stream for source customers usually uses customers; a poll stream for transactions usually uses transactions; a generic webhook stream can use events when individual deliveries fan out into multiple canonical document types.

streams: [
  { key: "customers", mode: "poll", schedule: "*/15 * * * *" },
  { key: "transactions", mode: "poll", schedule: "0 * * * *" },
  { key: "events", mode: "webhook" },
];

Use the same key in ingest, sync, and route filenames. For example, src/api/sync/customers.ts should pass stream: "customers" when it emits canonical customer records.

ingest.canonical

One record at a time:

const result = ingest.canonical("bank_transaction", payload, {
  stream: "transactions",
  idempotencyKey: `source:transaction:${txn.transaction_id}`,
});

Returns:

interface CanonicalIngestResult {
  status: "ingested" | "observed";
  documentType: string;
  documentId: string | null;
  documentVersionId: string | null;
  payloadHash: string | null;
  provenance: Record<string, any>;
}

status: "observed" means the extension is running in observation mode — the record was validated and recorded as an intercepted effect but not committed.

ingest.batch

Pages of records — the shape sync routes should prefer:

const batch = ingest.batch("customer", payloads, {
  stream: "customers",
  continueOnError: true,
  sourceId: (payload) => payload.customer_id,
  idempotencyKey: (payload) => `source:customer:${payload.customer_id}`,
});

sourceId and idempotencyKey accept a fixed string or a (payload, index) => string callback. With continueOnError: true, invalid records are collected instead of failing the batch:

interface CanonicalBatchIngestResult {
  status: "ingested" | "observed";
  documentType: string;
  count: number;      // records submitted
  imported: number;   // records accepted
  failed: number;     // records rejected
  results: CanonicalIngestResult[];
  failedRecords: CanonicalBatchFailedRecord[];
}

interface CanonicalBatchFailedRecord {
  failed: true;
  reason: string;      // machine-readable, e.g. validation failure kind
  message: string;     // human-readable detail
  retryable: boolean;  // false → fix the payload, don't resend as-is
  documentType: string;
  sourceId: string | null;
  idempotencyKey: string | null;
  index: number;       // position in the submitted array
}

Report imported / failed / failedRecords in your sync-route response — the platform surfaces them in run status and logs. Records with retryable: false will fail again unchanged; log them and move on rather than blocking the stream.

Idempotency keys

The key is your dedup contract with the pipeline: re-emitting a payload with the same key updates the existing document (or no-ops when the payload hash is unchanged) instead of creating a duplicate.

  • Derive it from the provider’s stable identifier, never from array position or timestamps: "<provider>:<record-type>:<provider-id>" (source:customer:customer_123).
  • Keep one scheme per stream for the connector’s lifetime — changing the scheme re-imports everything as new documents.
  • sourceId is the provider identifier stored on the document for provenance and cross-referencing; it is usually the same value your idempotency key is built from.

Canonical document types

ingest accepts any canonical document type; payloads validate against the same contracts documented in Resources. Two banking types get dedicated TypeScript payload types in the SDK — "bank_account" (BankAccountPayload) and "bank_transaction" (BankTransactionPayload) — so those payloads type-check field-by-field. For the rest ("customer", "invoice", "vendor_bill", …) the payload is an open record; use the resource pages for field names, requiredness, and formats.

Documents that post to the general ledger (invoices, expenses, payments, …) go through the full policy pipeline on ingest — journal entries are generated downstream; your connector never writes ledger lines itself.