# 事件驱动文档 (Crawlful Hub) > **定位**:Crawlful Hub 事件驱动设计文档 - 定义核心事件和处理流程,实现系统解耦和可扩展性。 > **更新日期**: 2026-03-18 > **最高优先级参考**: [Service_Design.md](./Service_Design.md) --- ## 1. 事件驱动概述 ### 1.1 定义 事件驱动是一种架构模式,通过事件的产生、发布、订阅和处理来实现系统组件之间的通信和协作。 ### 1.2 重要性 事件驱动架构的好处: - **解耦**:组件之间通过事件通信,减少直接依赖 - **可扩展性**:可以轻松添加新的事件处理逻辑 - **异步处理**:事件处理可以异步执行,提高系统性能 - **可靠性**:事件可以持久化,确保消息不丢失 - **可观测性**:可以追踪事件的产生和处理 --- ## 2. 核心事件 ### 2.1 事件定义 | 事件名称 | 描述 | 触发条件 | 相关服务 | |----------|------|----------|----------| | **OrderCreated** | 订单创建 | 用户提交订单 | 订单服务、库存服务、通知服务 | | **OrderUpdated** | 订单更新 | 订单状态变更 | 订单服务、通知服务 | | **OrderCompleted** | 订单完成 | 订单状态变为已完成 | 订单服务、结算服务、通知服务 | | **OrderCancelled** | 订单取消 | 订单状态变为已取消 | 订单服务、库存服务、通知服务 | | **ProductCreated** | 商品创建 | 创建新商品 | 商品服务、搜索服务 | | **ProductUpdated** | 商品更新 | 更新商品信息 | 商品服务、搜索服务 | | **ProductPriceChanged** | 商品价格变更 | 商品价格更新 | 商品服务、定价服务、搜索服务 | | **InventoryLow** | 库存不足 | 库存低于预警值 | 库存服务、通知服务 | | **InventoryUpdated** | 库存更新 | 库存数量变更 | 库存服务、商品服务 | | **PaymentProcessed** | 支付处理 | 支付完成 | 支付服务、订单服务 | | **SettlementCreated** | 结算创建 | 创建结算单 | 结算服务、通知服务 | | **MerchantRegistered** | 商户注册 | 新商户注册 | 商户服务、通知服务 | | **MerchantVerified** | 商户验证 | 商户资质验证通过 | 商户服务、通知服务 | ### 2.2 事件结构 **示例**: ```typescript interface Event { id: string; // 事件ID type: string; // 事件类型 timestamp: number; // 事件时间戳 data: any; // 事件数据 metadata: { tenantId: string; // 租户ID shopId: string; // 店铺ID traceId: string; // 追踪ID source: string; // 事件源 }; } // 订单创建事件示例 const orderCreatedEvent: Event = { id: 'event-001', type: 'OrderCreated', timestamp: Date.now(), data: { orderId: 'order-001', userId: 'user-001', items: [ { productId: 'product-001', quantity: 2, price: 99.99 } ], totalAmount: 199.98, status: 'PENDING' }, metadata: { tenantId: 'tenant-001', shopId: 'shop-001', traceId: 'trace-001', source: 'OrderService' } }; ``` --- ## 3. 事件处理流程 ### 3.1 事件发布 **实现**: - 使用事件总线或消息队列 - 事件发布者将事件发送到事件总线 - 事件总线确保事件的传递 **示例**: ```typescript // 事件总线 class EventBus { private listeners: Map = new Map(); publish(event: Event): void { console.log(`Publishing event: ${event.type}`, event); if (this.listeners.has(event.type)) { this.listeners.get(event.type)?.forEach(listener => { try { listener(event); } catch (error) { console.error(`Error handling event ${event.type}:`, error); } }); } } subscribe(eventType: string, listener: Function): void { if (!this.listeners.has(eventType)) { this.listeners.set(eventType, []); } this.listeners.get(eventType)?.push(listener); } unsubscribe(eventType: string, listener: Function): void { if (this.listeners.has(eventType)) { const listeners = this.listeners.get(eventType)?.filter(l => l !== listener); this.listeners.set(eventType, listeners || []); } } } // 订单服务发布事件 class OrderService { constructor(private eventBus: EventBus) {} async createOrder(orderData: OrderCreateDto): Promise { // 创建订单 const order = await this.orderRepository.create(orderData); // 发布订单创建事件 const event: Event = { id: uuidv4(), type: 'OrderCreated', timestamp: Date.now(), data: order, metadata: { tenantId: order.tenantId, shopId: order.shopId, traceId: orderData.traceId, source: 'OrderService' } }; this.eventBus.publish(event); return order; } } ``` ### 3.2 事件订阅 **实现**: - 服务订阅感兴趣的事件 - 当事件发生时,执行相应的处理逻辑 **示例**: ```typescript // 库存服务订阅事件 class InventoryService { constructor(private eventBus: EventBus) { // 订阅订单创建事件 this.eventBus.subscribe('OrderCreated', this.handleOrderCreated.bind(this)); // 订阅订单取消事件 this.eventBus.subscribe('OrderCancelled', this.handleOrderCancelled.bind(this)); } private async handleOrderCreated(event: Event): Promise { const order = event.data; // 扣减库存 for (const item of order.items) { await this.deductInventory(item.productId, item.quantity); } } private async handleOrderCancelled(event: Event): Promise { const order = event.data; // 恢复库存 for (const item of order.items) { await this.restoreInventory(item.productId, item.quantity); } } } // 通知服务订阅事件 class NotificationService { constructor(private eventBus: EventBus) { // 订阅订单相关事件 this.eventBus.subscribe('OrderCreated', this.sendOrderConfirmation.bind(this)); this.eventBus.subscribe('OrderUpdated', this.sendOrderUpdate.bind(this)); this.eventBus.subscribe('OrderCompleted', this.sendOrderCompletion.bind(this)); this.eventBus.subscribe('OrderCancelled', this.sendOrderCancellation.bind(this)); } private async sendOrderConfirmation(event: Event): Promise { const order = event.data; // 发送订单确认通知 await this.sendEmail(order.userId, 'Order Confirmation', `Your order ${order.id} has been created`); } private async sendOrderUpdate(event: Event): Promise { const order = event.data; // 发送订单更新通知 await this.sendEmail(order.userId, 'Order Update', `Your order ${order.id} has been updated`); } private async sendOrderCompletion(event: Event): Promise { const order = event.data; // 发送订单完成通知 await this.sendEmail(order.userId, 'Order Completed', `Your order ${order.id} has been completed`); } private async sendOrderCancellation(event: Event): Promise { const order = event.data; // 发送订单取消通知 await this.sendEmail(order.userId, 'Order Cancelled', `Your order ${order.id} has been cancelled`); } } ``` ### 3.3 事件持久化 **实现**: - 使用消息队列(如 RabbitMQ、Kafka)持久化事件 - 确保事件不丢失 - 支持事件的重试和重放 **示例**: ```typescript // 使用 Kafka import { Kafka } from 'kafkajs'; const kafka = new Kafka({ clientId: 'crawlful-hub', brokers: ['localhost:9092'] }); const producer = kafka.producer(); const consumer = kafka.consumer({ groupId: 'inventory-service' }); // 发布事件 async function publishEvent(event: Event): Promise { await producer.connect(); await producer.send({ topic: 'events', messages: [ { key: event.type, value: JSON.stringify(event) } ] }); await producer.disconnect(); } // 订阅事件 async function subscribeEvents(): Promise { await consumer.connect(); await consumer.subscribe({ topic: 'events', fromBeginning: false }); await consumer.run({ eachMessage: async ({ message }) => { const event = JSON.parse(message.value?.toString() || '{}'); console.log('Received event:', event); // 处理事件 if (event.type === 'OrderCreated') { await handleOrderCreated(event); } else if (event.type === 'OrderCancelled') { await handleOrderCancelled(event); } } }); } ``` --- ## 4. 事件驱动架构 ### 4.1 架构图 ``` ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ │ │ │ │ 服务 A │────>│ 事件总线 │────>│ 服务 B │ │ │ │ │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ ^ │ ^ │ │ │ │ v │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ │ │ │ │ 服务 D │<────│ 消息队列 │<────│ 服务 C │ │ │ │ │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ ``` ### 4.2 核心组件 - **事件发布者**:产生事件的服务 - **事件总线**:传递事件的中间件 - **消息队列**:持久化事件的存储 - **事件订阅者**:处理事件的服务 - **事件处理器**:执行具体的事件处理逻辑 ### 4.3 优势 - **松耦合**:服务之间通过事件通信,减少直接依赖 - **可扩展性**:可以轻松添加新的服务和事件处理逻辑 - **可靠性**:事件可以持久化,确保消息不丢失 - **灵活性**:事件处理可以异步执行,提高系统性能 - **可观测性**:可以追踪事件的产生和处理 --- ## 5. 最佳实践 ### 5.1 事件设计 - **事件命名**:使用清晰、语义化的事件名称 - **事件结构**:统一事件结构,包含必要的字段 - **事件数据**:只包含必要的数据,避免过大 - **事件版本**:考虑事件的版本管理 ### 5.2 事件处理 - **幂等性**:确保事件处理的幂等性 - **错误处理**:妥善处理事件处理过程中的错误 - **重试机制**:实现事件处理的重试机制 - **死信队列**:处理无法正常处理的事件 ### 5.3 性能优化 - **批量处理**:批量处理事件,减少网络开销 - **异步处理**:使用异步处理提高性能 - **缓存**:合理使用缓存,减少重复计算 - **限流**:实现事件处理的限流机制 ### 5.4 监控和告警 - **事件监控**:监控事件的产生和处理 - **事件延迟**:监控事件处理的延迟 - **错误率**:监控事件处理的错误率 - **告警机制**:设置合理的告警阈值 --- ## 6. 实现示例 ### 6.1 事件总线 ```typescript // 事件总线实现 class EventBus { private listeners: Map = new Map(); private queue: Event[] = []; private processing = false; publish(event: Event): void { this.queue.push(event); if (!this.processing) { this.processQueue(); } } private async processQueue(): Promise { this.processing = true; while (this.queue.length > 0) { const event = this.queue.shift(); if (event) { await this.processEvent(event); } } this.processing = false; } private async processEvent(event: Event): Promise { console.log(`Processing event: ${event.type}`, event); if (this.listeners.has(event.type)) { const listeners = this.listeners.get(event.type) || []; for (const listener of listeners) { try { await listener(event); } catch (error) { console.error(`Error handling event ${event.type}:`, error); } } } } subscribe(eventType: string, listener: Function): void { if (!this.listeners.has(eventType)) { this.listeners.set(eventType, []); } this.listeners.get(eventType)?.push(listener); } unsubscribe(eventType: string, listener: Function): void { if (this.listeners.has(eventType)) { const listeners = this.listeners.get(eventType)?.filter(l => l !== listener); this.listeners.set(eventType, listeners || []); } } } // 全局事件总线 export const eventBus = new EventBus(); ``` ### 6.2 事件处理器 ```typescript // 订单事件处理器 class OrderEventHandler { constructor(private inventoryService: InventoryService, private notificationService: NotificationService) { eventBus.subscribe('OrderCreated', this.handleOrderCreated.bind(this)); eventBus.subscribe('OrderCancelled', this.handleOrderCancelled.bind(this)); } private async handleOrderCreated(event: Event): Promise { const order = event.data; // 扣减库存 for (const item of order.items) { await this.inventoryService.deductInventory(item.productId, item.quantity); } // 发送通知 await this.notificationService.sendOrderConfirmation(order); } private async handleOrderCancelled(event: Event): Promise { const order = event.data; // 恢复库存 for (const item of order.items) { await this.inventoryService.restoreInventory(item.productId, item.quantity); } // 发送通知 await this.notificationService.sendOrderCancellation(order); } } // 库存事件处理器 class InventoryEventHandler { constructor(private notificationService: NotificationService) { eventBus.subscribe('InventoryLow', this.handleInventoryLow.bind(this)); } private async handleInventoryLow(event: Event): Promise { const { productId, quantity, alertLevel } = event.data; // 发送库存预警通知 await this.notificationService.sendInventoryAlert(productId, quantity, alertLevel); } } ``` --- ## 7. 相关文档 - [Service_Design.md](./Service_Design.md) - [Data_Consistency.md](./Data_Consistency.md) - [Observability.md](./Observability.md) --- *本文档基于服务设计文档,最后更新: 2026-03-18*