Ingest API reference
The ingest global — canonical single-record and batch ingestion, idempotency keys, and failure handling.
The ingest global is how connector code hands records to Backfill’s
canonical pipeline. It validates the payload against the canonical contract,
deduplicates on your idempotency key, versions the document, and triggers
downstream policy/journal-entry processing — the same pipeline first-party
connectors use.
import { ingest } from "@backfill-io/sdk";
Use ingest after your connector has fetched or received source data and
transformed it into a Backfill canonical resource payload. It is not an HTTP
endpoint and it does not decide what to fetch; it records already-mapped data
into Backfill with enough stream and idempotency metadata for retries,
observability, and deduplication.
When to use ingest
Use ingest only from connector code running inside Backfill’s sandbox:
poll sync routes, backfill routes, and webhook routes declared by a
defineConnector manifest. The helper is provided by the connector runtime;
plain extensions and external systems should not call it.
ingest and the public REST create endpoints reach the same canonical
pipeline from different sides of the trust boundary:
| Where your code runs | Use | Why |
|---|---|---|
| External service, script, or integration calling Backfill over HTTP | REST endpoints such as POST /v1/customers or POST /v1/invoices | You authenticate with a bearer token and use HTTP idempotency for safe retries. |
| Connector route running inside Backfill’s sandbox | ingest.canonical or ingest.batch | The runtime already has connection context, stream metadata, checkpoint integration, batching, and observation-mode interception. |
| Plain extension code creating records it owns | Entity.create / generated entity classes | The record originates inside the extension instead of mirroring an external source stream. |
Do not call Backfill’s REST API from a connector route just to create the same records. That adds a same-host HTTP round trip, requires managing an API token inside the sandbox, and skips connector-specific behavior such as batch results, stream accounting, and observation mode.
Streams and document types
Every ingest call names two things:
| Argument | Meaning |
|---|---|
| Canonical document type | The Backfill resource contract to validate against, such as "customer", "invoice", or "bank_transaction". |
stream | The connector stream key that produced the record. This must match a streams[] entry in your connector manifest. |
Choose stream keys from the source system’s logical collections or event
families — the units operators need to schedule, retry, checkpoint, and
monitor independently. A poll stream for source customers usually uses
customers; a poll stream for transactions usually uses transactions; a
generic webhook stream can use events when individual deliveries fan out
into multiple canonical document types.
streams: [
{ key: "customers", mode: "poll", schedule: "*/15 * * * *" },
{ key: "transactions", mode: "poll", schedule: "0 * * * *" },
{ key: "events", mode: "webhook" },
];
Use the same key in ingest, sync, and route filenames. For example,
src/api/sync/customers.ts should pass stream: "customers" when it emits
canonical customer records.
ingest.canonical
One record at a time:
const result = ingest.canonical("bank_transaction", payload, {
stream: "transactions",
idempotencyKey: `source:transaction:${txn.transaction_id}`,
});
Returns:
interface CanonicalIngestResult {
status: "ingested" | "observed";
documentType: string;
documentId: string | null;
documentVersionId: string | null;
payloadHash: string | null;
provenance: Record<string, any>;
}
status: "observed" means the extension is running in
observation mode — the record was validated
and recorded as an intercepted effect but not committed.
ingest.batch
Pages of records — the shape sync routes should prefer:
const batch = ingest.batch("customer", payloads, {
stream: "customers",
continueOnError: true,
sourceId: (payload) => payload.customer_id,
idempotencyKey: (payload) => `source:customer:${payload.customer_id}`,
});
sourceId and idempotencyKey accept a fixed string or a
(payload, index) => string callback. With continueOnError: true, invalid
records are collected instead of failing the batch:
interface CanonicalBatchIngestResult {
status: "ingested" | "observed";
documentType: string;
count: number; // records submitted
imported: number; // records accepted
failed: number; // records rejected
results: CanonicalIngestResult[];
failedRecords: CanonicalBatchFailedRecord[];
}
interface CanonicalBatchFailedRecord {
failed: true;
reason: string; // machine-readable, e.g. validation failure kind
message: string; // human-readable detail
retryable: boolean; // false → fix the payload, don't resend as-is
documentType: string;
sourceId: string | null;
idempotencyKey: string | null;
index: number; // position in the submitted array
}
Report imported / failed / failedRecords in your sync-route response —
the platform surfaces them in run status and logs. Records with
retryable: false will fail again unchanged; log them and move on rather
than blocking the stream.
Idempotency keys
The key is your dedup contract with the pipeline: re-emitting a payload with the same key updates the existing document (or no-ops when the payload hash is unchanged) instead of creating a duplicate.
- Derive it from the provider’s stable identifier, never from array
position or timestamps:
"<provider>:<record-type>:<provider-id>"(source:customer:customer_123). - Keep one scheme per stream for the connector’s lifetime — changing the scheme re-imports everything as new documents.
sourceIdis the provider identifier stored on the document for provenance and cross-referencing; it is usually the same value your idempotency key is built from.
Canonical document types
ingest accepts any canonical document type; payloads validate against the
same contracts documented in Resources. Two banking
types get dedicated TypeScript payload types in the SDK — "bank_account"
(BankAccountPayload) and "bank_transaction" (BankTransactionPayload) —
so those payloads type-check field-by-field. For the rest ("customer",
"invoice", "vendor_bill", …) the payload is an open record; use the
resource pages for field names, requiredness, and formats.
Documents that post to the general ledger (invoices, expenses, payments, …) go through the full policy pipeline on ingest — journal entries are generated downstream; your connector never writes ledger lines itself.