Documentation

Sync and background jobs

Background job system, queue consumers, social and email sync, notification processor, and scheduled tasks.

The Sync module is AppEngine's background-processing layer. Anything that doesn't return synchronously to a user request — a Facebook page sync, an inbound email parse, a scheduled email blast, an SLA escalation, a notification email, a usage rollup — runs through here. Application code rarely calls Sync directly; it emits events that the appropriate consumer picks up.

Architecture

Three families of workers:

  • Queue consumers — drain queues that other modules push to. Examples: datatype changes, one-off jobs, scheduled actions.
  • Sync jobs — vendor-specific pollers that pull data in (Facebook, TikTok, LinkedIn, Twitter, Pinterest, Snapchat, Discord, Slack, Reddit, Telegram, Twitch, Google Ads, external email IMAP).
  • Schedulers and processors — cron-driven runners (purge, analytics aggregation, billing close, escalation evaluation, frequent realtime sync).

All three run inside the same NestJS process(es). For high-throughput orgs, scale by running multiple processes; the queue and lock subsystem ensures jobs don't double-execute.

Queue manager surface

GET/syncJWT
GET/sync/infoJWT
GET/sync/statusJWT
GET/sync/purgeJWT
POST/sync/queueJWT
GET/sync/pause/:nameJWT
GET/sync/resume/:nameJWT

info returns queue depth, in-flight counts, and last-success timestamps for every consumer. status adds the available-platform list. queue accepts an arbitrary job spec for testing or manual re-runs. pause / resume toggle a consumer without restarting the process.

Trigger a sync manually

POST/sync/triggerJWT
POST/sync/trigger/:platform/:syncType?JWT
GET/sync/platformsJWT

Used to force a fresh pull rather than waiting for the next scheduled cycle. Body shape:

{
  "platform": "facebook",
  "syncType": ["posts", "ads-metrics"],
  "options": {
    "startDate": "2026-04-01T00:00:00Z",
    "force": true
  }
}

platforms returns the catalogue of supported platforms and the sync types each one accepts.

Social media sync

Per-platform sync jobs run on a configured cadence (typically every 15-60 minutes for engagement, hourly for ads metrics). Each job:

  1. Reads the org's connected accounts from the integrations module.
  2. Calls the vendor API for new content/events since the last successful run.
  3. Writes the results to the social activity store and emits events.

Implemented platforms: Facebook, TikTok, LinkedIn, Twitter, Pinterest, Snapchat, Discord, Slack, Reddit, Telegram, Twitch, Google Ads. Each has its own job file under src/sync/jobs/; they share the BaseSyncJob lifecycle (lock acquisition, error handling, lock release).

The aggregated activity feeds two surfaces:

  • CRM inbox — incoming comments and DMs as conversation threads.
  • Marketing analytics — per-post engagement metrics.

External email IMAP

The external mail sync (external.mail.sync.job.ts) connects to configured IMAP/Gmail accounts (see Google integration) and ingests new messages. Threading by In-Reply-To matches emails to existing tickets or contacts.

Use cases:

  • Support email → ticket creation
  • Sales rep email → CRM activity log
  • Inbound forwarding of customer replies

Notification processor

The notification processor (notification.processor.ts) consumes a queue of "send notification" requests emitted by automations, alerts, mentions, and DB triggers. It fans out to the right channel: email (via SendGrid or Mailgun), SMS (via Twilio), push (FCM/APNS), in-app (real-time WebSocket), or Slack/Discord webhook.

Application code emits notifications by writing to the queue or via the broadcast module — the processor handles delivery, retries, and tracking.

Escalation processor

For SLA-driven flows (support tickets, lead follow-ups), the escalation processor (escalation.processor.ts) runs every minute and:

  1. Reads records flagged for SLA tracking.
  2. Compares last-update timestamps against the configured SLA windows.
  3. Emits escalation events when thresholds are crossed.
  4. Increments the priority and re-routes if rules say to.

Used by tickets and the merchant-customer dunning flow.

Billing processor

The billing processor (billing.processor.ts) runs on a daily schedule and:

  • Closes the previous day's usage window.
  • Aggregates per-org usage from the Usage module.
  • Generates invoices for orgs on metered plans.
  • Triggers Stripe charges for orgs with auto-pay enabled.
  • Emits billing.cycle_closed events.

Schedulers

Cron-driven workers under src/sync/schedulers/:

SchedulerCadenceWhat it does
purge.scheduler.tsDailyRemoves records past the org's retention policy
analytics-sync.scheduler.tsEvery 5 minutesRolls aggregated metrics into the analytics store
frequent-sync.scheduler.tsEvery 1 minuteHigh-cadence syncs (active campaign metrics)
realtime-sync.scheduler.tsEvery 10 secondsNear-real-time syncs (messaging, presence)

The Automation module also has its own scheduler for time-triggered flows; it lives in the Automation module rather than Sync but shares the same lock subsystem.

Queue consumers

Three core consumers under src/sync/queue-consumer/:

  • Datatype consumer — listens to data-layer change streams and fans events out to subscribers (automations, broadcasts, sync jobs).
  • One-off consumer — runs ad-hoc tasks queued via POST /sync/queue or via the Automation module's "send webhook" / "run job" actions.
  • Schedule consumer — runs scheduled jobs at their runAt time. Different from the cron schedulers — these are dynamically scheduled (e.g. "send this email on May 1").

Locks

Sync jobs use a Redis-backed lock (sync-lock.service.ts) keyed by orgId + jobName. Two processes can't run the same Facebook sync for the same org at the same time. Locks expire after a configurable timeout to recover from crashed workers.

Social activity surface

POST/social-activity/webhookNo auth

External webhooks from social platforms post here. The activity is parsed and written to the social store, then emits events.

Failure handling

Each job:

  1. Logs to the standard logging pipeline (configurable destination).
  2. Writes a failure record on the job's run log if it fails.
  3. Retries with backoff up to the per-job retry limit.
  4. Emits an alert to the monitoring channel after the final failure.

Permanent failures (revoked OAuth, deleted account) move the integration to expired status — the job stops trying until reconnect.

Don't query the data layer from inside a sync job for "what changed" — listen to data-change events instead. The datatype consumer is built specifically to fan these events out so consumers don't poll.

Operational notes

  • Pause a problematic consumer with GET /sync/pause/:name rather than restarting the process. Resume when ready.
  • Use POST /sync/trigger to backfill data after fixing a bug — pass a startDate to scope the re-run.
  • The queue depth panel on the monitoring dashboard is the fastest way to spot a stuck consumer.