Documentation

Events and async

How AppEngine fans out domain events through NestJS EventEmitter and queue consumers.

AppEngine doesn't ship a webhooks module. Instead, every domain event flows through an internal event bus — NestJS EventEmitter2 — and queue consumers run asynchronous work without blocking the request. Anything that needs to react to a domain change subscribes there.

The system events

The repository service emits a small, stable set of system events on every CRUD operation:

// src/enums/system-events-enum.ts
export enum SystemEvents {
  COLLECTION_CREATE  = 'collection-create',
  COLLECTION_UPDATE  = 'collection-update',
  COLLECTION_DELETE  = 'collection-delete',
  COLLECTION_PUBLISH = 'collection-publish',
  COLLECTION_APPROVE = 'collection-approve',
  USER_CREATE = 'user-create',
  USER_UPDATE = 'user-update',
  USER_DELETE = 'user-delete',
  ORG_CREATE  = 'org-create',
  ORG_UPDATE  = 'org-update',
  ORG_DELETE  = 'org-delete',
  SITE_CREATE = 'site-create',
  SITE_UPDATE = 'site-update',
  SITE_DELETE = 'site-delete',
}

Every payload carries the orgid and the affected BaseModel\<T\>. Listeners discriminate by data.datatype:

@OnEvent(SystemEvents.COLLECTION_CREATE)
async handleCollectionCreate(payload: { orgid: string; data: BaseModel<any> }) {
  if (payload.data.datatype === 'contact') {
    // contact-specific reaction
  }
}

This is exactly what the EventService, the CommunityPageService, the Automation engine, and the social-sync queues do internally.

Domain-specific events

On top of the generic CRUD events, modules emit higher-level events when something semantic happens. A non-exhaustive list:

ModuleEventFires when
CRMcontact.createdA contact is created via API or form
CRMemail.opened / email.clickedA tracked email pixel/link is hit
CRMappointment.bookedA reservation is confirmed
CRMtag.addedA tag is applied to a contact
Storefrontorder.placedA cart converts to an order
Storefrontpayment.succeeded / payment.failedStripe/PayPal webhook lands
Chatchat-queue-notificationA chat needs a human agent
Eventsevent.reminderAn event reminder schedule fires
Bankingtransfer.initiated / transfer.settledACH/RTP lifecycle

These names match the strings emitted in source — search the codebase for eventEmitter.emit( to find more.

Subscribing in your own code

If you're extending AppEngine with a new module, hook into events with @OnEvent. Listeners can be sync (return a value) or async (return a promise — the emitter awaits it).

import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { SystemEvents } from '../enums/system-events-enum';

@Injectable()
export class MyAuditService {
  @OnEvent(SystemEvents.COLLECTION_PUBLISH)
  async logPublish(payload: { orgid: string; data: BaseModel<any> }) {
    // write to audit log, push to webhook, etc.
  }
}

For listeners that should not block the originating request, prefix the handler name and return immediately, or push the work into a queue (next section).

Queues and background work

For anything that takes more than a few hundred milliseconds — sending email, calling a third-party API, indexing a document — the work goes through the Sync module's queue system rather than running in-process.

The queue layer (SyncModule, SyncQueueModule) ships consumers for:

  • Datatype — generic record-level processing
  • OneOff — single fire-and-forget jobs
  • Schedule — cron-style scheduled jobs (AutomationSchedulerService, PurgeScheduler)
  • Automation / AutomationTrigger — automation flow execution
  • Notification — email, SMS, push delivery
  • Escalation — ticket/SLA escalation
  • Billing — usage rollup and invoicing
  • Social — Facebook, TikTok, LinkedIn, Twitter, Pinterest, Snapchat, Discord, Twitch, Slack, Reddit, Google Ads sync

Producers post jobs, consumers pull and process. If a job fails, the queue retries with backoff; persistent failures land in a dead-letter store you can inspect through the monitoring endpoints.

The queue is internal — there's no public API to enqueue jobs from outside AppEngine. External callers should hit the appropriate domain endpoint (e.g. POST /broadcast/campaign/send) and let the controller enqueue the work.

Why no webhooks module?

Webhooks are typically a way for a system to notify external listeners. AppEngine handles outbound integrations through specific modules:

  • Upstream / Connect — outbound HTTP to vendors (Stripe, Twilio, SendGrid). Configured per-org.
  • Broadcast — outbound email/SMS/push. Templated per-channel.
  • Automation actionssend-webhook is one of the action types in the automation engine. If you want "on contact created, POST to this URL", build a one-step automation flow.

There's a single send-webhook action type and the configuration lives in your automation flow, not in a global webhook subscriptions table. This is intentional — it forces you to think about retries, error handling, and payload shape per-integration rather than globally.

Inbound webhooks from vendors

The reverse direction — vendor webhooks landing on AppEngine — is implemented per-vendor as a public POST endpoint under /upstream/*, /storefront/stripe/*, /banking/*, etc. Each one validates the vendor's signature, normalizes the payload, and emits a domain event for the rest of AppEngine to consume.

Stream-based async (AI, voice, chat)

A separate async pattern handles long-running streams — model responses, voice transcription, agent reasoning. Two shapes:

  • WebSocket gatewaysChatGateway, CommunityChatGateway, SimpleVoiceGateway, AIVoiceGateway. Bidirectional streams with Redis as the distributed coordinator.
  • SSE streamsPOST /ai/agent/stream returns a streamId; the client then GET /ai/stream/:streamId and consumes Server-Sent Events.

These don't go through the event bus or the queue — they're synchronous request/response with a streaming body. See the AI and Chat sections for details.