The Sync module is AppEngine's background-processing layer. Anything that doesn't return synchronously to a user request — a Facebook page sync, an inbound email parse, a scheduled email blast, an SLA escalation, a notification email, a usage rollup — runs through here. Application code rarely calls Sync directly; it emits events that the appropriate consumer picks up.
Architecture
Three families of workers:
- Queue consumers — drain queues that other modules push to. Examples: datatype changes, one-off jobs, scheduled actions.
- Sync jobs — vendor-specific pollers that pull data in (Facebook, TikTok, LinkedIn, Twitter, Pinterest, Snapchat, Discord, Slack, Reddit, Telegram, Twitch, Google Ads, external email IMAP).
- Schedulers and processors — cron-driven runners (purge, analytics aggregation, billing close, escalation evaluation, frequent realtime sync).
All three run inside the same NestJS process(es). For high-throughput orgs, scale by running multiple processes; the queue and lock subsystem ensures jobs don't double-execute.
Queue manager surface
/syncJWT/sync/infoJWT/sync/statusJWT/sync/purgeJWT/sync/queueJWT/sync/pause/:nameJWT/sync/resume/:nameJWTinfo returns queue depth, in-flight counts, and last-success timestamps for every consumer. status adds the available-platform list. queue accepts an arbitrary job spec for testing or manual re-runs. pause / resume toggle a consumer without restarting the process.
Trigger a sync manually
/sync/triggerJWT/sync/trigger/:platform/:syncType?JWT/sync/platformsJWTUsed to force a fresh pull rather than waiting for the next scheduled cycle. Body shape:
{
"platform": "facebook",
"syncType": ["posts", "ads-metrics"],
"options": {
"startDate": "2026-04-01T00:00:00Z",
"force": true
}
}
platforms returns the catalogue of supported platforms and the sync types each one accepts.
Social media sync
Per-platform sync jobs run on a configured cadence (typically every 15-60 minutes for engagement, hourly for ads metrics). Each job:
- Reads the org's connected accounts from the integrations module.
- Calls the vendor API for new content/events since the last successful run.
- Writes the results to the social activity store and emits events.
Implemented platforms: Facebook, TikTok, LinkedIn, Twitter, Pinterest, Snapchat, Discord, Slack, Reddit, Telegram, Twitch, Google Ads. Each has its own job file under src/sync/jobs/; they share the BaseSyncJob lifecycle (lock acquisition, error handling, lock release).
The aggregated activity feeds two surfaces:
- CRM inbox — incoming comments and DMs as conversation threads.
- Marketing analytics — per-post engagement metrics.
External email IMAP
The external mail sync (external.mail.sync.job.ts) connects to configured IMAP/Gmail accounts (see Google integration) and ingests new messages. Threading by In-Reply-To matches emails to existing tickets or contacts.
Use cases:
- Support email → ticket creation
- Sales rep email → CRM activity log
- Inbound forwarding of customer replies
Notification processor
The notification processor (notification.processor.ts) consumes a queue of "send notification" requests emitted by automations, alerts, mentions, and DB triggers. It fans out to the right channel: email (via SendGrid or Mailgun), SMS (via Twilio), push (FCM/APNS), in-app (real-time WebSocket), or Slack/Discord webhook.
Application code emits notifications by writing to the queue or via the broadcast module — the processor handles delivery, retries, and tracking.
Escalation processor
For SLA-driven flows (support tickets, lead follow-ups), the escalation processor (escalation.processor.ts) runs every minute and:
- Reads records flagged for SLA tracking.
- Compares last-update timestamps against the configured SLA windows.
- Emits escalation events when thresholds are crossed.
- Increments the priority and re-routes if rules say to.
Used by tickets and the merchant-customer dunning flow.
Billing processor
The billing processor (billing.processor.ts) runs on a daily schedule and:
- Closes the previous day's usage window.
- Aggregates per-org usage from the Usage module.
- Generates invoices for orgs on metered plans.
- Triggers Stripe charges for orgs with auto-pay enabled.
- Emits
billing.cycle_closedevents.
Schedulers
Cron-driven workers under src/sync/schedulers/:
| Scheduler | Cadence | What it does |
|---|---|---|
purge.scheduler.ts | Daily | Removes records past the org's retention policy |
analytics-sync.scheduler.ts | Every 5 minutes | Rolls aggregated metrics into the analytics store |
frequent-sync.scheduler.ts | Every 1 minute | High-cadence syncs (active campaign metrics) |
realtime-sync.scheduler.ts | Every 10 seconds | Near-real-time syncs (messaging, presence) |
The Automation module also has its own scheduler for time-triggered flows; it lives in the Automation module rather than Sync but shares the same lock subsystem.
Queue consumers
Three core consumers under src/sync/queue-consumer/:
- Datatype consumer — listens to data-layer change streams and fans events out to subscribers (automations, broadcasts, sync jobs).
- One-off consumer — runs ad-hoc tasks queued via
POST /sync/queueor via the Automation module's "send webhook" / "run job" actions. - Schedule consumer — runs scheduled jobs at their
runAttime. Different from the cron schedulers — these are dynamically scheduled (e.g. "send this email on May 1").
Locks
Sync jobs use a Redis-backed lock (sync-lock.service.ts) keyed by orgId + jobName. Two processes can't run the same Facebook sync for the same org at the same time. Locks expire after a configurable timeout to recover from crashed workers.
Social activity surface
/social-activity/webhookNo authExternal webhooks from social platforms post here. The activity is parsed and written to the social store, then emits events.
Failure handling
Each job:
- Logs to the standard logging pipeline (configurable destination).
- Writes a failure record on the job's run log if it fails.
- Retries with backoff up to the per-job retry limit.
- Emits an alert to the monitoring channel after the final failure.
Permanent failures (revoked OAuth, deleted account) move the integration to expired status — the job stops trying until reconnect.
Don't query the data layer from inside a sync job for "what changed" — listen to data-change events instead. The datatype consumer is built specifically to fan these events out so consumers don't poll.
Operational notes
- Pause a problematic consumer with
GET /sync/pause/:namerather than restarting the process. Resume when ready. - Use
POST /sync/triggerto backfill data after fixing a bug — pass astartDateto scope the re-run. - The queue depth panel on the monitoring dashboard is the fastest way to spot a stuck consumer.