Event-Driven Architecture (EDA)
VENI-AI uses Redis Streams to propagate domain events from the Shell to remote satellite apps. Each satellite maintains its own local read model — no synchronous gRPC calls needed for user or organization data.
Overview
┌─ Shell SCS ─────────────────────────────────┐
│ UserService ──► XADD │
│ OrganizationService ──► XADD │
└─────────────────────────────────────────────┘
│
┌─ Platform Event Bus (not owned by any SCS) ─┐
│ Redis Streams (persistent log) │
│ shell:stream:users │
│ shell:stream:organizations │
└──────────────────────────────────────────────┘
│
┌───────────────┴───────────────┐
▼ ▼
┌─ Drive SCS ──────────┐ ┌─ HRM SCS ──────────────────┐
│ EventSubscriberSvc │ │ EventSubscriberSvc │
│ group: drive-api │ │ group: hrm-api │
│ │ │ │
│ shell:stream:users │ │ shell:stream:users │
│ │ │ │ shell:stream:organizations │
│ ▼ │ │ │ │ │
│ UserRepository │ │ UserRepository OrgRepo │
│ upsert(shell_user_id)│ │ upsert(ref_id) upsert(id) │
│ [Drive own DB] │ │ [HRM own DB] │
└──────────────────────┘ └─────────────────────────────┘Shell is the single source of truth for users and organizations. Each SCS receives events asynchronously and upserts its own local read model — it never queries Shell at runtime for this data.
Platform Event Bus: Redis Streams is platform-level infrastructure, not owned by Shell or any satellite. Shell publishes to it; each SCS subscribes independently. This is analogous to how SCSes share a network — infrastructure-level coupling, not application-level coupling.
Per-SCS scope: Drive only subscribes to
shell:stream:users. HRM subscribes to both. Each SCS only consumes what it needs.
Event Registry
All events and their target streams are defined in one place:
shell/api/src/events/registry.ts
export const SHELL_STREAMS = {
USERS: 'shell:stream:users',
ORGANIZATIONS: 'shell:stream:organizations',
} as const;
export const EVENT_STREAM_MAP = {
'user.created': SHELL_STREAMS.USERS,
'user.updated': SHELL_STREAMS.USERS,
'user.deleted': SHELL_STREAMS.USERS,
'organization.created': SHELL_STREAMS.ORGANIZATIONS,
'organization.updated': SHELL_STREAMS.ORGANIZATIONS,
'organization.deleted': SHELL_STREAMS.ORGANIZATIONS,
} as const;
export type EventName = keyof typeof EVENT_STREAM_MAP;Publishing Events (Shell API)
EventPublisherService is a singleton. Call publish(event, payload) from any domain service:
// In UserService, OrganizationService, etc.
await this.eventPublisher.publish('user.created', {
id: createdUser.id,
email: createdUser.email,
firstName: createdUser.firstName,
lastName: createdUser.lastName,
status: createdUser.status,
organizationId: createdUser.organizationId,
});- Events are best-effort — failures are logged but never thrown
- Publisher failures never block the primary user operation
- The stream is resolved automatically from
EVENT_STREAM_MAP
Consuming Events (Satellite Apps)
Each remote app has an EventSubscriberService. Handlers are registered declaratively in setupHandlers(). The subscriber derives which Redis streams to listen to automatically from registered handler keys — no hardcoded stream list needed.
HRM (users + organizations):
private setupHandlers(): void {
this
.on('user.created', (p) => this.upsertUser(p))
.on('user.updated', (p) => this.upsertUser(p))
.on('user.deleted', (p) => this.deleteUser(p))
.on('organization.created', (p) => this.upsertOrg(p))
.on('organization.updated', (p) => this.upsertOrg(p))
.on('organization.deleted', (p) => this.deleteOrg(p));
}Drive (users only — no organizations table):
private setupHandlers(): void {
this
.on('user.created', (p) => this.upsertUser(p))
.on('user.updated', (p) => this.upsertUser(p))
.on('user.deleted', (p) => this.deleteUser(p));
}Handlers call injected repositories — no raw SQL:
// HRM: UserRepository.upsertByRefId() / markInactiveByRefId()
// Drive: UserRepository.upsertFromShellEvent() / markInactiveByShellId()
private async upsertUser(payload: Record<string, unknown>): Promise<void> {
if (!payload.id) return;
await this.userRepository.upsertByRefId({ refId: payload.id as string, ... });
}The subscriber is started in application.ts:
override async start() {
await super.start();
if (env.APP_ENV_REDIS_URL) {
try {
const subscriber = this.get<EventSubscriberService>({ key: 'services.EventSubscriberService' });
await subscriber.start();
} catch (err) {
console.warn('EventSubscriberService failed to start (Redis unavailable?)', err);
}
}
}Reliability
| Property | Mechanism |
|---|---|
| At-least-once delivery | XREADGROUP — unacknowledged messages stay in PEL, retried on restart |
| Consumer isolation | Drive (drive-api) and HRM (hrm-api) read independently; one slow consumer does not affect the other |
| Redis outage | Publisher: best-effort, logged. Subscriber: RedisHelper reconnects, poll loop backs off 2s |
| Idempotency | All writes use INSERT ... ON CONFLICT DO UPDATE — safe to replay |
| Startup bootstrap | XGROUP CREATE MKSTREAM — creates stream + group if missing; BUSYGROUP error ignored |
Adding a New Event
1. Register the event (Shell API)
// shell/api/src/events/registry.ts
export const SHELL_STREAMS = {
USERS: 'shell:stream:users',
ORGANIZATIONS: 'shell:stream:organizations',
SUBSCRIPTIONS: 'shell:stream:subscriptions', // ← add stream
} as const;
export const EVENT_STREAM_MAP = {
// ...existing...
'subscription.created': SHELL_STREAMS.SUBSCRIPTIONS, // ← add event
'subscription.updated': SHELL_STREAMS.SUBSCRIPTIONS,
} as const;2. Publish in the domain service (Shell API)
// shell/api/src/services/subscription.service.ts
await this.eventPublisher.publish('subscription.created', {
id: result.id,
plan: result.plan,
// ...
});3. Add a handler in each consumer
// apps/<app>/api/src/services/event-subscriber.service.ts
// 1. Add to EVENT_STREAM_MAP (local mirror)
'subscription.created': 'shell:stream:subscriptions',
'subscription.updated': 'shell:stream:subscriptions',
// 2. Register in setupHandlers()
.on('subscription.created', (p) => this.upsertSubscription(p))
.on('subscription.updated', (p) => this.upsertSubscription(p))
// 3. Inject the repository in constructor
constructor(
@inject({ key: 'repositories.SubscriptionRepository' })
private subscriptionRepository: SubscriptionRepository,
// ...
)
// 4. Write the handler — use the repository, not raw SQL
private async upsertSubscription(payload: Record<string, unknown>): Promise<void> {
if (!payload.id) return;
await this.subscriptionRepository.upsertById({
id: payload.id as string,
plan: payload.plan as string,
});
}4. Add a migration
-- apps/drive/api/drizzle/0005_subscription_sync.sql
CREATE TABLE IF NOT EXISTS "subscriptions" (
"id" UUID PRIMARY KEY,
"plan" VARCHAR(50),
"synced_at" TIMESTAMP WITH TIME ZONE,
"updated_at" TIMESTAMP WITH TIME ZONE
);Checklist
| Step | File |
|---|---|
| Add stream + event to registry | shell/api/src/events/registry.ts |
Call publish() in domain service | shell/api/src/services/<domain>.service.ts |
| Mirror event in consumer registry | apps/<app>/api/src/services/event-subscriber.service.ts |
Register handler in setupHandlers() | same file |
| Write handler — inject + call repository | same file |
| Create/update repository with upsert method | apps/<app>/api/src/repositories/ |
Register repository in application.ts | apps/<app>/api/src/application.ts |
| Add migration | apps/<app>/api/drizzle/ |
Per-app Identity Mapping
Each app stores Shell's user UUID differently:
| App | Local PK | Shell UUID column | Upsert target |
|---|---|---|---|
| HRM | id (own UUID) | ref_id | ON CONFLICT (ref_id) |
| Drive | id (own UUID) | shell_user_id | ON CONFLICT (shell_user_id) |
HRM — Shell UUID stored in ref_id:
INSERT INTO "users" ("ref_id", "email", ...)
ON CONFLICT ("ref_id") DO UPDATE SET ...Drive — Shell UUID stored in shell_user_id; name is derived from firstName + lastName:
INSERT INTO "users" ("shell_user_id", "email", "name", ...)
ON CONFLICT ("shell_user_id") DO UPDATE SET ...Environment Variable
APP_ENV_REDIS_URL=redis://:password@redis-host:6379If APP_ENV_REDIS_URL is not set, the subscriber does not start — the app runs normally without event sync.