Files
makemd/docs/ARCHIVE/02_Backend/03_Event_Driven.md
wurenzhi 2b86715c09 refactor: 优化代码结构并修复类型问题
- 移除未使用的TabPane组件
- 修复类型定义和导入方式
- 优化mock数据源的环境变量判断逻辑
- 更新文档结构并归档旧文件
- 添加新的UI组件和Memo组件
- 调整API路径和响应处理
2026-03-23 12:41:35 +08:00

14 KiB
Raw Permalink Blame History

事件驱动文档 (Crawlful Hub)

定位Crawlful Hub 事件驱动设计文档 - 定义核心事件和处理流程,实现系统解耦和可扩展性。 更新日期: 2026-03-18 最高优先级参考: Service_Design.md


1. 事件驱动概述

1.1 定义

事件驱动是一种架构模式,通过事件的产生、发布、订阅和处理来实现系统组件之间的通信和协作。

1.2 重要性

事件驱动架构的好处:

  • 解耦:组件之间通过事件通信,减少直接依赖
  • 可扩展性:可以轻松添加新的事件处理逻辑
  • 异步处理:事件处理可以异步执行,提高系统性能
  • 可靠性:事件可以持久化,确保消息不丢失
  • 可观测性:可以追踪事件的产生和处理

2. 核心事件

2.1 事件定义

事件名称 描述 触发条件 相关服务
OrderCreated 订单创建 用户提交订单 订单服务、库存服务、通知服务
OrderUpdated 订单更新 订单状态变更 订单服务、通知服务
OrderCompleted 订单完成 订单状态变为已完成 订单服务、结算服务、通知服务
OrderCancelled 订单取消 订单状态变为已取消 订单服务、库存服务、通知服务
ProductCreated 商品创建 创建新商品 商品服务、搜索服务
ProductUpdated 商品更新 更新商品信息 商品服务、搜索服务
ProductPriceChanged 商品价格变更 商品价格更新 商品服务、定价服务、搜索服务
InventoryLow 库存不足 库存低于预警值 库存服务、通知服务
InventoryUpdated 库存更新 库存数量变更 库存服务、商品服务
PaymentProcessed 支付处理 支付完成 支付服务、订单服务
SettlementCreated 结算创建 创建结算单 结算服务、通知服务
MerchantRegistered 商户注册 新商户注册 商户服务、通知服务
MerchantVerified 商户验证 商户资质验证通过 商户服务、通知服务

2.2 事件结构

示例

interface Event {
  id: string;           // 事件ID
  type: string;         // 事件类型
  timestamp: number;    // 事件时间戳
  data: any;            // 事件数据
  metadata: {
    tenantId: string;   // 租户ID
    shopId: string;     // 店铺ID
    traceId: string;    // 追踪ID
    source: string;     // 事件源
  };
}

// 订单创建事件示例
const orderCreatedEvent: Event = {
  id: 'event-001',
  type: 'OrderCreated',
  timestamp: Date.now(),
  data: {
    orderId: 'order-001',
    userId: 'user-001',
    items: [
      {
        productId: 'product-001',
        quantity: 2,
        price: 99.99
      }
    ],
    totalAmount: 199.98,
    status: 'PENDING'
  },
  metadata: {
    tenantId: 'tenant-001',
    shopId: 'shop-001',
    traceId: 'trace-001',
    source: 'OrderService'
  }
};

3. 事件处理流程

3.1 事件发布

实现

  • 使用事件总线或消息队列
  • 事件发布者将事件发送到事件总线
  • 事件总线确保事件的传递

示例

// 事件总线
class EventBus {
  private listeners: Map<string, Function[]> = new Map();
  
  publish(event: Event): void {
    console.log(`Publishing event: ${event.type}`, event);
    
    if (this.listeners.has(event.type)) {
      this.listeners.get(event.type)?.forEach(listener => {
        try {
          listener(event);
        } catch (error) {
          console.error(`Error handling event ${event.type}:`, error);
        }
      });
    }
  }
  
  subscribe(eventType: string, listener: Function): void {
    if (!this.listeners.has(eventType)) {
      this.listeners.set(eventType, []);
    }
    this.listeners.get(eventType)?.push(listener);
  }
  
  unsubscribe(eventType: string, listener: Function): void {
    if (this.listeners.has(eventType)) {
      const listeners = this.listeners.get(eventType)?.filter(l => l !== listener);
      this.listeners.set(eventType, listeners || []);
    }
  }
}

// 订单服务发布事件
class OrderService {
  constructor(private eventBus: EventBus) {}
  
  async createOrder(orderData: OrderCreateDto): Promise<Order> {
    // 创建订单
    const order = await this.orderRepository.create(orderData);
    
    // 发布订单创建事件
    const event: Event = {
      id: uuidv4(),
      type: 'OrderCreated',
      timestamp: Date.now(),
      data: order,
      metadata: {
        tenantId: order.tenantId,
        shopId: order.shopId,
        traceId: orderData.traceId,
        source: 'OrderService'
      }
    };
    
    this.eventBus.publish(event);
    
    return order;
  }
}

3.2 事件订阅

实现

  • 服务订阅感兴趣的事件
  • 当事件发生时,执行相应的处理逻辑

示例

// 库存服务订阅事件
class InventoryService {
  constructor(private eventBus: EventBus) {
    // 订阅订单创建事件
    this.eventBus.subscribe('OrderCreated', this.handleOrderCreated.bind(this));
    
    // 订阅订单取消事件
    this.eventBus.subscribe('OrderCancelled', this.handleOrderCancelled.bind(this));
  }
  
  private async handleOrderCreated(event: Event): Promise<void> {
    const order = event.data;
    
    // 扣减库存
    for (const item of order.items) {
      await this.deductInventory(item.productId, item.quantity);
    }
  }
  
  private async handleOrderCancelled(event: Event): Promise<void> {
    const order = event.data;
    
    // 恢复库存
    for (const item of order.items) {
      await this.restoreInventory(item.productId, item.quantity);
    }
  }
}

// 通知服务订阅事件
class NotificationService {
  constructor(private eventBus: EventBus) {
    // 订阅订单相关事件
    this.eventBus.subscribe('OrderCreated', this.sendOrderConfirmation.bind(this));
    this.eventBus.subscribe('OrderUpdated', this.sendOrderUpdate.bind(this));
    this.eventBus.subscribe('OrderCompleted', this.sendOrderCompletion.bind(this));
    this.eventBus.subscribe('OrderCancelled', this.sendOrderCancellation.bind(this));
  }
  
  private async sendOrderConfirmation(event: Event): Promise<void> {
    const order = event.data;
    // 发送订单确认通知
    await this.sendEmail(order.userId, 'Order Confirmation', `Your order ${order.id} has been created`);
  }
  
  private async sendOrderUpdate(event: Event): Promise<void> {
    const order = event.data;
    // 发送订单更新通知
    await this.sendEmail(order.userId, 'Order Update', `Your order ${order.id} has been updated`);
  }
  
  private async sendOrderCompletion(event: Event): Promise<void> {
    const order = event.data;
    // 发送订单完成通知
    await this.sendEmail(order.userId, 'Order Completed', `Your order ${order.id} has been completed`);
  }
  
  private async sendOrderCancellation(event: Event): Promise<void> {
    const order = event.data;
    // 发送订单取消通知
    await this.sendEmail(order.userId, 'Order Cancelled', `Your order ${order.id} has been cancelled`);
  }
}

3.3 事件持久化

实现

  • 使用消息队列(如 RabbitMQ、Kafka持久化事件
  • 确保事件不丢失
  • 支持事件的重试和重放

示例

// 使用 Kafka
import { Kafka } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'crawlful-hub',
  brokers: ['localhost:9092']
});

const producer = kafka.producer();
const consumer = kafka.consumer({ groupId: 'inventory-service' });

// 发布事件
async function publishEvent(event: Event): Promise<void> {
  await producer.connect();
  await producer.send({
    topic: 'events',
    messages: [
      {
        key: event.type,
        value: JSON.stringify(event)
      }
    ]
  });
  await producer.disconnect();
}

// 订阅事件
async function subscribeEvents(): Promise<void> {
  await consumer.connect();
  await consumer.subscribe({ topic: 'events', fromBeginning: false });
  
  await consumer.run({
    eachMessage: async ({ message }) => {
      const event = JSON.parse(message.value?.toString() || '{}');
      console.log('Received event:', event);
      
      // 处理事件
      if (event.type === 'OrderCreated') {
        await handleOrderCreated(event);
      } else if (event.type === 'OrderCancelled') {
        await handleOrderCancelled(event);
      }
    }
  });
}

4. 事件驱动架构

4.1 架构图

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│             │     │             │     │             │
│  服务 A     │────>│  事件总线   │────>│  服务 B     │
│             │     │             │     │             │
└─────────────┘     └─────────────┘     └─────────────┘
        ^                   │                   ^
        │                   │                   │
        │                   v                   │
┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│             │     │             │     │             │
│  服务 D     │<────│  消息队列   │<────│  服务 C     │
│             │     │             │     │             │
└─────────────┘     └─────────────┘     └─────────────┘

4.2 核心组件

  • 事件发布者:产生事件的服务
  • 事件总线:传递事件的中间件
  • 消息队列:持久化事件的存储
  • 事件订阅者:处理事件的服务
  • 事件处理器:执行具体的事件处理逻辑

4.3 优势

  • 松耦合:服务之间通过事件通信,减少直接依赖
  • 可扩展性:可以轻松添加新的服务和事件处理逻辑
  • 可靠性:事件可以持久化,确保消息不丢失
  • 灵活性:事件处理可以异步执行,提高系统性能
  • 可观测性:可以追踪事件的产生和处理

5. 最佳实践

5.1 事件设计

  • 事件命名:使用清晰、语义化的事件名称
  • 事件结构:统一事件结构,包含必要的字段
  • 事件数据:只包含必要的数据,避免过大
  • 事件版本:考虑事件的版本管理

5.2 事件处理

  • 幂等性:确保事件处理的幂等性
  • 错误处理:妥善处理事件处理过程中的错误
  • 重试机制:实现事件处理的重试机制
  • 死信队列:处理无法正常处理的事件

5.3 性能优化

  • 批量处理:批量处理事件,减少网络开销
  • 异步处理:使用异步处理提高性能
  • 缓存:合理使用缓存,减少重复计算
  • 限流:实现事件处理的限流机制

5.4 监控和告警

  • 事件监控:监控事件的产生和处理
  • 事件延迟:监控事件处理的延迟
  • 错误率:监控事件处理的错误率
  • 告警机制:设置合理的告警阈值

6. 实现示例

6.1 事件总线

// 事件总线实现
class EventBus {
  private listeners: Map<string, Function[]> = new Map();
  private queue: Event[] = [];
  private processing = false;
  
  publish(event: Event): void {
    this.queue.push(event);
    if (!this.processing) {
      this.processQueue();
    }
  }
  
  private async processQueue(): Promise<void> {
    this.processing = true;
    
    while (this.queue.length > 0) {
      const event = this.queue.shift();
      if (event) {
        await this.processEvent(event);
      }
    }
    
    this.processing = false;
  }
  
  private async processEvent(event: Event): Promise<void> {
    console.log(`Processing event: ${event.type}`, event);
    
    if (this.listeners.has(event.type)) {
      const listeners = this.listeners.get(event.type) || [];
      
      for (const listener of listeners) {
        try {
          await listener(event);
        } catch (error) {
          console.error(`Error handling event ${event.type}:`, error);
        }
      }
    }
  }
  
  subscribe(eventType: string, listener: Function): void {
    if (!this.listeners.has(eventType)) {
      this.listeners.set(eventType, []);
    }
    this.listeners.get(eventType)?.push(listener);
  }
  
  unsubscribe(eventType: string, listener: Function): void {
    if (this.listeners.has(eventType)) {
      const listeners = this.listeners.get(eventType)?.filter(l => l !== listener);
      this.listeners.set(eventType, listeners || []);
    }
  }
}

// 全局事件总线
export const eventBus = new EventBus();

6.2 事件处理器

// 订单事件处理器
class OrderEventHandler {
  constructor(private inventoryService: InventoryService, private notificationService: NotificationService) {
    eventBus.subscribe('OrderCreated', this.handleOrderCreated.bind(this));
    eventBus.subscribe('OrderCancelled', this.handleOrderCancelled.bind(this));
  }
  
  private async handleOrderCreated(event: Event): Promise<void> {
    const order = event.data;
    
    // 扣减库存
    for (const item of order.items) {
      await this.inventoryService.deductInventory(item.productId, item.quantity);
    }
    
    // 发送通知
    await this.notificationService.sendOrderConfirmation(order);
  }
  
  private async handleOrderCancelled(event: Event): Promise<void> {
    const order = event.data;
    
    // 恢复库存
    for (const item of order.items) {
      await this.inventoryService.restoreInventory(item.productId, item.quantity);
    }
    
    // 发送通知
    await this.notificationService.sendOrderCancellation(order);
  }
}

// 库存事件处理器
class InventoryEventHandler {
  constructor(private notificationService: NotificationService) {
    eventBus.subscribe('InventoryLow', this.handleInventoryLow.bind(this));
  }
  
  private async handleInventoryLow(event: Event): Promise<void> {
    const { productId, quantity, alertLevel } = event.data;
    
    // 发送库存预警通知
    await this.notificationService.sendInventoryAlert(productId, quantity, alertLevel);
  }
}

7. 相关文档


本文档基于服务设计文档,最后更新: 2026-03-18