- 移除未使用的TabPane组件 - 修复类型定义和导入方式 - 优化mock数据源的环境变量判断逻辑 - 更新文档结构并归档旧文件 - 添加新的UI组件和Memo组件 - 调整API路径和响应处理
14 KiB
14 KiB
事件驱动文档 (Crawlful Hub)
定位:Crawlful Hub 事件驱动设计文档 - 定义核心事件和处理流程,实现系统解耦和可扩展性。 更新日期: 2026-03-18 最高优先级参考: Service_Design.md
1. 事件驱动概述
1.1 定义
事件驱动是一种架构模式,通过事件的产生、发布、订阅和处理来实现系统组件之间的通信和协作。
1.2 重要性
事件驱动架构的好处:
- 解耦:组件之间通过事件通信,减少直接依赖
- 可扩展性:可以轻松添加新的事件处理逻辑
- 异步处理:事件处理可以异步执行,提高系统性能
- 可靠性:事件可以持久化,确保消息不丢失
- 可观测性:可以追踪事件的产生和处理
2. 核心事件
2.1 事件定义
| 事件名称 | 描述 | 触发条件 | 相关服务 |
|---|---|---|---|
| OrderCreated | 订单创建 | 用户提交订单 | 订单服务、库存服务、通知服务 |
| OrderUpdated | 订单更新 | 订单状态变更 | 订单服务、通知服务 |
| OrderCompleted | 订单完成 | 订单状态变为已完成 | 订单服务、结算服务、通知服务 |
| OrderCancelled | 订单取消 | 订单状态变为已取消 | 订单服务、库存服务、通知服务 |
| ProductCreated | 商品创建 | 创建新商品 | 商品服务、搜索服务 |
| ProductUpdated | 商品更新 | 更新商品信息 | 商品服务、搜索服务 |
| ProductPriceChanged | 商品价格变更 | 商品价格更新 | 商品服务、定价服务、搜索服务 |
| InventoryLow | 库存不足 | 库存低于预警值 | 库存服务、通知服务 |
| InventoryUpdated | 库存更新 | 库存数量变更 | 库存服务、商品服务 |
| PaymentProcessed | 支付处理 | 支付完成 | 支付服务、订单服务 |
| SettlementCreated | 结算创建 | 创建结算单 | 结算服务、通知服务 |
| MerchantRegistered | 商户注册 | 新商户注册 | 商户服务、通知服务 |
| MerchantVerified | 商户验证 | 商户资质验证通过 | 商户服务、通知服务 |
2.2 事件结构
示例:
interface Event {
id: string; // 事件ID
type: string; // 事件类型
timestamp: number; // 事件时间戳
data: any; // 事件数据
metadata: {
tenantId: string; // 租户ID
shopId: string; // 店铺ID
traceId: string; // 追踪ID
source: string; // 事件源
};
}
// 订单创建事件示例
const orderCreatedEvent: Event = {
id: 'event-001',
type: 'OrderCreated',
timestamp: Date.now(),
data: {
orderId: 'order-001',
userId: 'user-001',
items: [
{
productId: 'product-001',
quantity: 2,
price: 99.99
}
],
totalAmount: 199.98,
status: 'PENDING'
},
metadata: {
tenantId: 'tenant-001',
shopId: 'shop-001',
traceId: 'trace-001',
source: 'OrderService'
}
};
3. 事件处理流程
3.1 事件发布
实现:
- 使用事件总线或消息队列
- 事件发布者将事件发送到事件总线
- 事件总线确保事件的传递
示例:
// 事件总线
class EventBus {
private listeners: Map<string, Function[]> = new Map();
publish(event: Event): void {
console.log(`Publishing event: ${event.type}`, event);
if (this.listeners.has(event.type)) {
this.listeners.get(event.type)?.forEach(listener => {
try {
listener(event);
} catch (error) {
console.error(`Error handling event ${event.type}:`, error);
}
});
}
}
subscribe(eventType: string, listener: Function): void {
if (!this.listeners.has(eventType)) {
this.listeners.set(eventType, []);
}
this.listeners.get(eventType)?.push(listener);
}
unsubscribe(eventType: string, listener: Function): void {
if (this.listeners.has(eventType)) {
const listeners = this.listeners.get(eventType)?.filter(l => l !== listener);
this.listeners.set(eventType, listeners || []);
}
}
}
// 订单服务发布事件
class OrderService {
constructor(private eventBus: EventBus) {}
async createOrder(orderData: OrderCreateDto): Promise<Order> {
// 创建订单
const order = await this.orderRepository.create(orderData);
// 发布订单创建事件
const event: Event = {
id: uuidv4(),
type: 'OrderCreated',
timestamp: Date.now(),
data: order,
metadata: {
tenantId: order.tenantId,
shopId: order.shopId,
traceId: orderData.traceId,
source: 'OrderService'
}
};
this.eventBus.publish(event);
return order;
}
}
3.2 事件订阅
实现:
- 服务订阅感兴趣的事件
- 当事件发生时,执行相应的处理逻辑
示例:
// 库存服务订阅事件
class InventoryService {
constructor(private eventBus: EventBus) {
// 订阅订单创建事件
this.eventBus.subscribe('OrderCreated', this.handleOrderCreated.bind(this));
// 订阅订单取消事件
this.eventBus.subscribe('OrderCancelled', this.handleOrderCancelled.bind(this));
}
private async handleOrderCreated(event: Event): Promise<void> {
const order = event.data;
// 扣减库存
for (const item of order.items) {
await this.deductInventory(item.productId, item.quantity);
}
}
private async handleOrderCancelled(event: Event): Promise<void> {
const order = event.data;
// 恢复库存
for (const item of order.items) {
await this.restoreInventory(item.productId, item.quantity);
}
}
}
// 通知服务订阅事件
class NotificationService {
constructor(private eventBus: EventBus) {
// 订阅订单相关事件
this.eventBus.subscribe('OrderCreated', this.sendOrderConfirmation.bind(this));
this.eventBus.subscribe('OrderUpdated', this.sendOrderUpdate.bind(this));
this.eventBus.subscribe('OrderCompleted', this.sendOrderCompletion.bind(this));
this.eventBus.subscribe('OrderCancelled', this.sendOrderCancellation.bind(this));
}
private async sendOrderConfirmation(event: Event): Promise<void> {
const order = event.data;
// 发送订单确认通知
await this.sendEmail(order.userId, 'Order Confirmation', `Your order ${order.id} has been created`);
}
private async sendOrderUpdate(event: Event): Promise<void> {
const order = event.data;
// 发送订单更新通知
await this.sendEmail(order.userId, 'Order Update', `Your order ${order.id} has been updated`);
}
private async sendOrderCompletion(event: Event): Promise<void> {
const order = event.data;
// 发送订单完成通知
await this.sendEmail(order.userId, 'Order Completed', `Your order ${order.id} has been completed`);
}
private async sendOrderCancellation(event: Event): Promise<void> {
const order = event.data;
// 发送订单取消通知
await this.sendEmail(order.userId, 'Order Cancelled', `Your order ${order.id} has been cancelled`);
}
}
3.3 事件持久化
实现:
- 使用消息队列(如 RabbitMQ、Kafka)持久化事件
- 确保事件不丢失
- 支持事件的重试和重放
示例:
// 使用 Kafka
import { Kafka } from 'kafkajs';
const kafka = new Kafka({
clientId: 'crawlful-hub',
brokers: ['localhost:9092']
});
const producer = kafka.producer();
const consumer = kafka.consumer({ groupId: 'inventory-service' });
// 发布事件
async function publishEvent(event: Event): Promise<void> {
await producer.connect();
await producer.send({
topic: 'events',
messages: [
{
key: event.type,
value: JSON.stringify(event)
}
]
});
await producer.disconnect();
}
// 订阅事件
async function subscribeEvents(): Promise<void> {
await consumer.connect();
await consumer.subscribe({ topic: 'events', fromBeginning: false });
await consumer.run({
eachMessage: async ({ message }) => {
const event = JSON.parse(message.value?.toString() || '{}');
console.log('Received event:', event);
// 处理事件
if (event.type === 'OrderCreated') {
await handleOrderCreated(event);
} else if (event.type === 'OrderCancelled') {
await handleOrderCancelled(event);
}
}
});
}
4. 事件驱动架构
4.1 架构图
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ │ │ │ │ │
│ 服务 A │────>│ 事件总线 │────>│ 服务 B │
│ │ │ │ │ │
└─────────────┘ └─────────────┘ └─────────────┘
^ │ ^
│ │ │
│ v │
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ │ │ │ │ │
│ 服务 D │<────│ 消息队列 │<────│ 服务 C │
│ │ │ │ │ │
└─────────────┘ └─────────────┘ └─────────────┘
4.2 核心组件
- 事件发布者:产生事件的服务
- 事件总线:传递事件的中间件
- 消息队列:持久化事件的存储
- 事件订阅者:处理事件的服务
- 事件处理器:执行具体的事件处理逻辑
4.3 优势
- 松耦合:服务之间通过事件通信,减少直接依赖
- 可扩展性:可以轻松添加新的服务和事件处理逻辑
- 可靠性:事件可以持久化,确保消息不丢失
- 灵活性:事件处理可以异步执行,提高系统性能
- 可观测性:可以追踪事件的产生和处理
5. 最佳实践
5.1 事件设计
- 事件命名:使用清晰、语义化的事件名称
- 事件结构:统一事件结构,包含必要的字段
- 事件数据:只包含必要的数据,避免过大
- 事件版本:考虑事件的版本管理
5.2 事件处理
- 幂等性:确保事件处理的幂等性
- 错误处理:妥善处理事件处理过程中的错误
- 重试机制:实现事件处理的重试机制
- 死信队列:处理无法正常处理的事件
5.3 性能优化
- 批量处理:批量处理事件,减少网络开销
- 异步处理:使用异步处理提高性能
- 缓存:合理使用缓存,减少重复计算
- 限流:实现事件处理的限流机制
5.4 监控和告警
- 事件监控:监控事件的产生和处理
- 事件延迟:监控事件处理的延迟
- 错误率:监控事件处理的错误率
- 告警机制:设置合理的告警阈值
6. 实现示例
6.1 事件总线
// 事件总线实现
class EventBus {
private listeners: Map<string, Function[]> = new Map();
private queue: Event[] = [];
private processing = false;
publish(event: Event): void {
this.queue.push(event);
if (!this.processing) {
this.processQueue();
}
}
private async processQueue(): Promise<void> {
this.processing = true;
while (this.queue.length > 0) {
const event = this.queue.shift();
if (event) {
await this.processEvent(event);
}
}
this.processing = false;
}
private async processEvent(event: Event): Promise<void> {
console.log(`Processing event: ${event.type}`, event);
if (this.listeners.has(event.type)) {
const listeners = this.listeners.get(event.type) || [];
for (const listener of listeners) {
try {
await listener(event);
} catch (error) {
console.error(`Error handling event ${event.type}:`, error);
}
}
}
}
subscribe(eventType: string, listener: Function): void {
if (!this.listeners.has(eventType)) {
this.listeners.set(eventType, []);
}
this.listeners.get(eventType)?.push(listener);
}
unsubscribe(eventType: string, listener: Function): void {
if (this.listeners.has(eventType)) {
const listeners = this.listeners.get(eventType)?.filter(l => l !== listener);
this.listeners.set(eventType, listeners || []);
}
}
}
// 全局事件总线
export const eventBus = new EventBus();
6.2 事件处理器
// 订单事件处理器
class OrderEventHandler {
constructor(private inventoryService: InventoryService, private notificationService: NotificationService) {
eventBus.subscribe('OrderCreated', this.handleOrderCreated.bind(this));
eventBus.subscribe('OrderCancelled', this.handleOrderCancelled.bind(this));
}
private async handleOrderCreated(event: Event): Promise<void> {
const order = event.data;
// 扣减库存
for (const item of order.items) {
await this.inventoryService.deductInventory(item.productId, item.quantity);
}
// 发送通知
await this.notificationService.sendOrderConfirmation(order);
}
private async handleOrderCancelled(event: Event): Promise<void> {
const order = event.data;
// 恢复库存
for (const item of order.items) {
await this.inventoryService.restoreInventory(item.productId, item.quantity);
}
// 发送通知
await this.notificationService.sendOrderCancellation(order);
}
}
// 库存事件处理器
class InventoryEventHandler {
constructor(private notificationService: NotificationService) {
eventBus.subscribe('InventoryLow', this.handleInventoryLow.bind(this));
}
private async handleInventoryLow(event: Event): Promise<void> {
const { productId, quantity, alertLevel } = event.data;
// 发送库存预警通知
await this.notificationService.sendInventoryAlert(productId, quantity, alertLevel);
}
}
7. 相关文档
本文档基于服务设计文档,最后更新: 2026-03-18