Files
makemd/docs/ARCHIVE/02_Backend/03_Event_Driven.md

492 lines
14 KiB
Markdown
Raw Permalink Normal View History

# 事件驱动文档 (Crawlful Hub)
> **定位**Crawlful Hub 事件驱动设计文档 - 定义核心事件和处理流程,实现系统解耦和可扩展性。
> **更新日期**: 2026-03-18
> **最高优先级参考**: [Service_Design.md](./Service_Design.md)
---
## 1. 事件驱动概述
### 1.1 定义
事件驱动是一种架构模式,通过事件的产生、发布、订阅和处理来实现系统组件之间的通信和协作。
### 1.2 重要性
事件驱动架构的好处:
- **解耦**:组件之间通过事件通信,减少直接依赖
- **可扩展性**:可以轻松添加新的事件处理逻辑
- **异步处理**:事件处理可以异步执行,提高系统性能
- **可靠性**:事件可以持久化,确保消息不丢失
- **可观测性**:可以追踪事件的产生和处理
---
## 2. 核心事件
### 2.1 事件定义
| 事件名称 | 描述 | 触发条件 | 相关服务 |
|----------|------|----------|----------|
| **OrderCreated** | 订单创建 | 用户提交订单 | 订单服务、库存服务、通知服务 |
| **OrderUpdated** | 订单更新 | 订单状态变更 | 订单服务、通知服务 |
| **OrderCompleted** | 订单完成 | 订单状态变为已完成 | 订单服务、结算服务、通知服务 |
| **OrderCancelled** | 订单取消 | 订单状态变为已取消 | 订单服务、库存服务、通知服务 |
| **ProductCreated** | 商品创建 | 创建新商品 | 商品服务、搜索服务 |
| **ProductUpdated** | 商品更新 | 更新商品信息 | 商品服务、搜索服务 |
| **ProductPriceChanged** | 商品价格变更 | 商品价格更新 | 商品服务、定价服务、搜索服务 |
| **InventoryLow** | 库存不足 | 库存低于预警值 | 库存服务、通知服务 |
| **InventoryUpdated** | 库存更新 | 库存数量变更 | 库存服务、商品服务 |
| **PaymentProcessed** | 支付处理 | 支付完成 | 支付服务、订单服务 |
| **SettlementCreated** | 结算创建 | 创建结算单 | 结算服务、通知服务 |
| **MerchantRegistered** | 商户注册 | 新商户注册 | 商户服务、通知服务 |
| **MerchantVerified** | 商户验证 | 商户资质验证通过 | 商户服务、通知服务 |
### 2.2 事件结构
**示例**
```typescript
interface Event {
id: string; // 事件ID
type: string; // 事件类型
timestamp: number; // 事件时间戳
data: any; // 事件数据
metadata: {
tenantId: string; // 租户ID
shopId: string; // 店铺ID
traceId: string; // 追踪ID
source: string; // 事件源
};
}
// 订单创建事件示例
const orderCreatedEvent: Event = {
id: 'event-001',
type: 'OrderCreated',
timestamp: Date.now(),
data: {
orderId: 'order-001',
userId: 'user-001',
items: [
{
productId: 'product-001',
quantity: 2,
price: 99.99
}
],
totalAmount: 199.98,
status: 'PENDING'
},
metadata: {
tenantId: 'tenant-001',
shopId: 'shop-001',
traceId: 'trace-001',
source: 'OrderService'
}
};
```
---
## 3. 事件处理流程
### 3.1 事件发布
**实现**
- 使用事件总线或消息队列
- 事件发布者将事件发送到事件总线
- 事件总线确保事件的传递
**示例**
```typescript
// 事件总线
class EventBus {
private listeners: Map<string, Function[]> = new Map();
publish(event: Event): void {
console.log(`Publishing event: ${event.type}`, event);
if (this.listeners.has(event.type)) {
this.listeners.get(event.type)?.forEach(listener => {
try {
listener(event);
} catch (error) {
console.error(`Error handling event ${event.type}:`, error);
}
});
}
}
subscribe(eventType: string, listener: Function): void {
if (!this.listeners.has(eventType)) {
this.listeners.set(eventType, []);
}
this.listeners.get(eventType)?.push(listener);
}
unsubscribe(eventType: string, listener: Function): void {
if (this.listeners.has(eventType)) {
const listeners = this.listeners.get(eventType)?.filter(l => l !== listener);
this.listeners.set(eventType, listeners || []);
}
}
}
// 订单服务发布事件
class OrderService {
constructor(private eventBus: EventBus) {}
async createOrder(orderData: OrderCreateDto): Promise<Order> {
// 创建订单
const order = await this.orderRepository.create(orderData);
// 发布订单创建事件
const event: Event = {
id: uuidv4(),
type: 'OrderCreated',
timestamp: Date.now(),
data: order,
metadata: {
tenantId: order.tenantId,
shopId: order.shopId,
traceId: orderData.traceId,
source: 'OrderService'
}
};
this.eventBus.publish(event);
return order;
}
}
```
### 3.2 事件订阅
**实现**
- 服务订阅感兴趣的事件
- 当事件发生时,执行相应的处理逻辑
**示例**
```typescript
// 库存服务订阅事件
class InventoryService {
constructor(private eventBus: EventBus) {
// 订阅订单创建事件
this.eventBus.subscribe('OrderCreated', this.handleOrderCreated.bind(this));
// 订阅订单取消事件
this.eventBus.subscribe('OrderCancelled', this.handleOrderCancelled.bind(this));
}
private async handleOrderCreated(event: Event): Promise<void> {
const order = event.data;
// 扣减库存
for (const item of order.items) {
await this.deductInventory(item.productId, item.quantity);
}
}
private async handleOrderCancelled(event: Event): Promise<void> {
const order = event.data;
// 恢复库存
for (const item of order.items) {
await this.restoreInventory(item.productId, item.quantity);
}
}
}
// 通知服务订阅事件
class NotificationService {
constructor(private eventBus: EventBus) {
// 订阅订单相关事件
this.eventBus.subscribe('OrderCreated', this.sendOrderConfirmation.bind(this));
this.eventBus.subscribe('OrderUpdated', this.sendOrderUpdate.bind(this));
this.eventBus.subscribe('OrderCompleted', this.sendOrderCompletion.bind(this));
this.eventBus.subscribe('OrderCancelled', this.sendOrderCancellation.bind(this));
}
private async sendOrderConfirmation(event: Event): Promise<void> {
const order = event.data;
// 发送订单确认通知
await this.sendEmail(order.userId, 'Order Confirmation', `Your order ${order.id} has been created`);
}
private async sendOrderUpdate(event: Event): Promise<void> {
const order = event.data;
// 发送订单更新通知
await this.sendEmail(order.userId, 'Order Update', `Your order ${order.id} has been updated`);
}
private async sendOrderCompletion(event: Event): Promise<void> {
const order = event.data;
// 发送订单完成通知
await this.sendEmail(order.userId, 'Order Completed', `Your order ${order.id} has been completed`);
}
private async sendOrderCancellation(event: Event): Promise<void> {
const order = event.data;
// 发送订单取消通知
await this.sendEmail(order.userId, 'Order Cancelled', `Your order ${order.id} has been cancelled`);
}
}
```
### 3.3 事件持久化
**实现**
- 使用消息队列(如 RabbitMQ、Kafka持久化事件
- 确保事件不丢失
- 支持事件的重试和重放
**示例**
```typescript
// 使用 Kafka
import { Kafka } from 'kafkajs';
const kafka = new Kafka({
clientId: 'crawlful-hub',
brokers: ['localhost:9092']
});
const producer = kafka.producer();
const consumer = kafka.consumer({ groupId: 'inventory-service' });
// 发布事件
async function publishEvent(event: Event): Promise<void> {
await producer.connect();
await producer.send({
topic: 'events',
messages: [
{
key: event.type,
value: JSON.stringify(event)
}
]
});
await producer.disconnect();
}
// 订阅事件
async function subscribeEvents(): Promise<void> {
await consumer.connect();
await consumer.subscribe({ topic: 'events', fromBeginning: false });
await consumer.run({
eachMessage: async ({ message }) => {
const event = JSON.parse(message.value?.toString() || '{}');
console.log('Received event:', event);
// 处理事件
if (event.type === 'OrderCreated') {
await handleOrderCreated(event);
} else if (event.type === 'OrderCancelled') {
await handleOrderCancelled(event);
}
}
});
}
```
---
## 4. 事件驱动架构
### 4.1 架构图
```
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ │ │ │ │ │
│ 服务 A │────>│ 事件总线 │────>│ 服务 B │
│ │ │ │ │ │
└─────────────┘ └─────────────┘ └─────────────┘
^ │ ^
│ │ │
│ v │
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ │ │ │ │ │
│ 服务 D │<────│ 消息队列 │<────│ 服务 C │
│ │ │ │ │ │
└─────────────┘ └─────────────┘ └─────────────┘
```
### 4.2 核心组件
- **事件发布者**:产生事件的服务
- **事件总线**:传递事件的中间件
- **消息队列**:持久化事件的存储
- **事件订阅者**:处理事件的服务
- **事件处理器**:执行具体的事件处理逻辑
### 4.3 优势
- **松耦合**:服务之间通过事件通信,减少直接依赖
- **可扩展性**:可以轻松添加新的服务和事件处理逻辑
- **可靠性**:事件可以持久化,确保消息不丢失
- **灵活性**:事件处理可以异步执行,提高系统性能
- **可观测性**:可以追踪事件的产生和处理
---
## 5. 最佳实践
### 5.1 事件设计
- **事件命名**:使用清晰、语义化的事件名称
- **事件结构**:统一事件结构,包含必要的字段
- **事件数据**:只包含必要的数据,避免过大
- **事件版本**:考虑事件的版本管理
### 5.2 事件处理
- **幂等性**:确保事件处理的幂等性
- **错误处理**:妥善处理事件处理过程中的错误
- **重试机制**:实现事件处理的重试机制
- **死信队列**:处理无法正常处理的事件
### 5.3 性能优化
- **批量处理**:批量处理事件,减少网络开销
- **异步处理**:使用异步处理提高性能
- **缓存**:合理使用缓存,减少重复计算
- **限流**:实现事件处理的限流机制
### 5.4 监控和告警
- **事件监控**:监控事件的产生和处理
- **事件延迟**:监控事件处理的延迟
- **错误率**:监控事件处理的错误率
- **告警机制**:设置合理的告警阈值
---
## 6. 实现示例
### 6.1 事件总线
```typescript
// 事件总线实现
class EventBus {
private listeners: Map<string, Function[]> = new Map();
private queue: Event[] = [];
private processing = false;
publish(event: Event): void {
this.queue.push(event);
if (!this.processing) {
this.processQueue();
}
}
private async processQueue(): Promise<void> {
this.processing = true;
while (this.queue.length > 0) {
const event = this.queue.shift();
if (event) {
await this.processEvent(event);
}
}
this.processing = false;
}
private async processEvent(event: Event): Promise<void> {
console.log(`Processing event: ${event.type}`, event);
if (this.listeners.has(event.type)) {
const listeners = this.listeners.get(event.type) || [];
for (const listener of listeners) {
try {
await listener(event);
} catch (error) {
console.error(`Error handling event ${event.type}:`, error);
}
}
}
}
subscribe(eventType: string, listener: Function): void {
if (!this.listeners.has(eventType)) {
this.listeners.set(eventType, []);
}
this.listeners.get(eventType)?.push(listener);
}
unsubscribe(eventType: string, listener: Function): void {
if (this.listeners.has(eventType)) {
const listeners = this.listeners.get(eventType)?.filter(l => l !== listener);
this.listeners.set(eventType, listeners || []);
}
}
}
// 全局事件总线
export const eventBus = new EventBus();
```
### 6.2 事件处理器
```typescript
// 订单事件处理器
class OrderEventHandler {
constructor(private inventoryService: InventoryService, private notificationService: NotificationService) {
eventBus.subscribe('OrderCreated', this.handleOrderCreated.bind(this));
eventBus.subscribe('OrderCancelled', this.handleOrderCancelled.bind(this));
}
private async handleOrderCreated(event: Event): Promise<void> {
const order = event.data;
// 扣减库存
for (const item of order.items) {
await this.inventoryService.deductInventory(item.productId, item.quantity);
}
// 发送通知
await this.notificationService.sendOrderConfirmation(order);
}
private async handleOrderCancelled(event: Event): Promise<void> {
const order = event.data;
// 恢复库存
for (const item of order.items) {
await this.inventoryService.restoreInventory(item.productId, item.quantity);
}
// 发送通知
await this.notificationService.sendOrderCancellation(order);
}
}
// 库存事件处理器
class InventoryEventHandler {
constructor(private notificationService: NotificationService) {
eventBus.subscribe('InventoryLow', this.handleInventoryLow.bind(this));
}
private async handleInventoryLow(event: Event): Promise<void> {
const { productId, quantity, alertLevel } = event.data;
// 发送库存预警通知
await this.notificationService.sendInventoryAlert(productId, quantity, alertLevel);
}
}
```
---
## 7. 相关文档
- [Service_Design.md](./Service_Design.md)
- [Data_Consistency.md](./Data_Consistency.md)
- [Observability.md](./Observability.md)
---
*本文档基于服务设计文档,最后更新: 2026-03-18*