Kiến trúc Hướng sự kiện (EDA)
VENI-AI sử dụng Redis Streams để truyền tải domain event từ Shell sang các satellite app. Mỗi satellite duy trì read model cục bộ của riêng mình — không cần gọi gRPC đồng bộ để lấy dữ liệu user hoặc tổ chức.
Tổng quan
┌─ Shell SCS ─────────────────────────────────┐
│ UserService ──► XADD │
│ OrganizationService ──► XADD │
└─────────────────────────────────────────────┘
│
┌─ Platform Event Bus (không thuộc SCS nào) ──┐
│ Redis Streams (persistent log) │
│ shell:stream:users │
│ shell:stream:organizations │
└──────────────────────────────────────────────┘
│
┌───────────────┴───────────────┐
▼ ▼
┌─ Drive SCS ──────────┐ ┌─ HRM SCS ──────────────────┐
│ EventSubscriberSvc │ │ EventSubscriberSvc │
│ group: drive-api │ │ group: hrm-api │
│ │ │ │
│ shell:stream:users │ │ shell:stream:users │
│ │ │ │ shell:stream:organizations │
│ ▼ │ │ │ │ │
│ UserRepository │ │ UserRepository OrgRepo │
│ upsert(shell_user_id)│ │ upsert(ref_id) upsert(id) │
│ [Drive DB riêng] │ │ [HRM DB riêng] │
└──────────────────────┘ └─────────────────────────────┘Shell là nguồn dữ liệu duy nhất (single source of truth) cho user và tổ chức. Mỗi SCS nhận event bất đồng bộ và upsert read model cục bộ của riêng mình — không bao giờ query Shell ở runtime để lấy dữ liệu này.
Platform Event Bus: Redis Streams là hạ tầng cấp nền tảng, không thuộc sở hữu của Shell hay bất kỳ satellite nào. Shell publish lên bus; mỗi SCS subscribe độc lập. Tương tự như các SCS dùng chung một mạng lưới — coupling ở tầng hạ tầng, không phải tầng ứng dụng.
Phạm vi theo SCS: Drive chỉ subscribe
shell:stream:users. HRM subscribe cả hai. Mỗi SCS chỉ tiêu thụ những gì nó cần.
Event Registry
Tất cả event và stream đích được định nghĩa tập trung tại một nơi:
shell/api/src/events/registry.ts
export const SHELL_STREAMS = {
USERS: 'shell:stream:users',
ORGANIZATIONS: 'shell:stream:organizations',
} as const;
export const EVENT_STREAM_MAP = {
'user.created': SHELL_STREAMS.USERS,
'user.updated': SHELL_STREAMS.USERS,
'user.deleted': SHELL_STREAMS.USERS,
'organization.created': SHELL_STREAMS.ORGANIZATIONS,
'organization.updated': SHELL_STREAMS.ORGANIZATIONS,
'organization.deleted': SHELL_STREAMS.ORGANIZATIONS,
} as const;
export type EventName = keyof typeof EVENT_STREAM_MAP;Publish Event (Shell API)
EventPublisherService là singleton. Gọi publish(event, payload) từ bất kỳ domain service nào:
// Trong UserService, OrganizationService, v.v.
await this.eventPublisher.publish('user.created', {
id: createdUser.id,
email: createdUser.email,
firstName: createdUser.firstName,
lastName: createdUser.lastName,
status: createdUser.status,
organizationId: createdUser.organizationId,
});- Event là best-effort — lỗi được log nhưng không bao giờ throw
- Lỗi publisher không làm ảnh hưởng đến operation chính của user
- Stream được resolve tự động từ
EVENT_STREAM_MAP
Consume Event (Satellite App)
Mỗi remote app có một EventSubscriberService. Handler được đăng ký khai báo trong setupHandlers(). Subscriber tự động suy ra Redis stream cần lắng nghe từ các handler key — không cần danh sách stream hardcode.
HRM (user + tổ chức):
private setupHandlers(): void {
this
.on('user.created', (p) => this.upsertUser(p))
.on('user.updated', (p) => this.upsertUser(p))
.on('user.deleted', (p) => this.deleteUser(p))
.on('organization.created', (p) => this.upsertOrg(p))
.on('organization.updated', (p) => this.upsertOrg(p))
.on('organization.deleted', (p) => this.deleteOrg(p));
}Drive (chỉ user — không có bảng organizations):
private setupHandlers(): void {
this
.on('user.created', (p) => this.upsertUser(p))
.on('user.updated', (p) => this.upsertUser(p))
.on('user.deleted', (p) => this.deleteUser(p));
}Handler gọi repository đã inject — không dùng raw SQL:
// HRM: UserRepository.upsertByRefId() / markInactiveByRefId()
// Drive: UserRepository.upsertFromShellEvent() / markInactiveByShellId()
private async upsertUser(payload: Record<string, unknown>): Promise<void> {
if (!payload.id) return;
await this.userRepository.upsertByRefId({ refId: payload.id as string, ... });
}Độ tin cậy
| Thuộc tính | Cơ chế |
|---|---|
| At-least-once delivery | XREADGROUP — message chưa ack ở lại PEL, retry khi restart |
| Cô lập consumer | Drive (drive-api) và HRM (hrm-api) đọc độc lập |
| Redis outage | Publisher: best-effort, log lỗi. Subscriber: RedisHelper tự reconnect, poll loop backoff 2s |
| Idempotency | Tất cả write dùng INSERT ... ON CONFLICT DO UPDATE — an toàn khi replay |
| Khởi động | XGROUP CREATE MKSTREAM — tạo stream + group nếu chưa có; lỗi BUSYGROUP được bỏ qua |
Thêm Event Mới
1. Đăng ký event (Shell API)
// shell/api/src/events/registry.ts
export const SHELL_STREAMS = {
// ...
SUBSCRIPTIONS: 'shell:stream:subscriptions', // ← thêm stream
} as const;
export const EVENT_STREAM_MAP = {
// ...
'subscription.created': SHELL_STREAMS.SUBSCRIPTIONS, // ← thêm event
} as const;2. Publish trong domain service (Shell API)
await this.eventPublisher.publish('subscription.created', {
id: result.id, plan: result.plan,
});3. Thêm handler trong consumer
// apps/<app>/api/src/services/event-subscriber.service.ts
// 1. Thêm vào EVENT_STREAM_MAP cục bộ
'subscription.created': 'shell:stream:subscriptions',
'subscription.updated': 'shell:stream:subscriptions',
// 2. Đăng ký trong setupHandlers()
.on('subscription.created', (p) => this.upsertSubscription(p))
.on('subscription.updated', (p) => this.upsertSubscription(p))
// 3. Inject repository vào constructor
constructor(
@inject({ key: 'repositories.SubscriptionRepository' })
private subscriptionRepository: SubscriptionRepository,
// ...
)
// 4. Viết handler — dùng repository, không dùng raw SQL
private async upsertSubscription(payload: Record<string, unknown>): Promise<void> {
if (!payload.id) return;
await this.subscriptionRepository.upsertById({
id: payload.id as string,
plan: payload.plan as string,
});
}4. Thêm migration
CREATE TABLE IF NOT EXISTS "subscriptions" (
"id" UUID PRIMARY KEY, "plan" VARCHAR(50),
"synced_at" TIMESTAMP WITH TIME ZONE, "updated_at" TIMESTAMP WITH TIME ZONE
);Checklist
| Bước | File |
|---|---|
| Thêm stream + event vào registry | shell/api/src/events/registry.ts |
Gọi publish() trong domain service | shell/api/src/services/<domain>.service.ts |
| Mirror event trong consumer registry | apps/<app>/api/src/services/event-subscriber.service.ts |
Đăng ký handler trong setupHandlers() | cùng file trên |
| Viết handler — inject + gọi repository | cùng file trên |
| Tạo/cập nhật repository với method upsert | apps/<app>/api/src/repositories/ |
Đăng ký repository trong application.ts | apps/<app>/api/src/application.ts |
| Thêm migration | apps/<app>/api/drizzle/ |
Ánh xạ identity theo từng app
Mỗi app lưu UUID user của Shell theo cách riêng:
| App | PK nội bộ | Cột chứa UUID Shell | Upsert target |
|---|---|---|---|
| HRM | id (UUID nội bộ) | ref_id | ON CONFLICT (ref_id) |
| Drive | id (UUID nội bộ) | shell_user_id | ON CONFLICT (shell_user_id) |
HRM — UUID Shell lưu ở ref_id:
INSERT INTO "users" ("ref_id", "email", ...)
ON CONFLICT ("ref_id") DO UPDATE SET ...Drive — UUID Shell lưu ở shell_user_id; name được ghép từ firstName + lastName:
INSERT INTO "users" ("shell_user_id", "email", "name", ...)
ON CONFLICT ("shell_user_id") DO UPDATE SET ...Biến môi trường
APP_ENV_REDIS_URL=redis://:password@redis-host:6379Nếu APP_ENV_REDIS_URL không được set, subscriber không khởi động — app vẫn chạy bình thường mà không có event sync.