refactor: 优化代码结构并修复类型问题
- 移除未使用的TabPane组件 - 修复类型定义和导入方式 - 优化mock数据源的环境变量判断逻辑 - 更新文档结构并归档旧文件 - 添加新的UI组件和Memo组件 - 调整API路径和响应处理
This commit is contained in:
492
docs/ARCHIVE/02_Backend/03_Event_Driven.md
Normal file
492
docs/ARCHIVE/02_Backend/03_Event_Driven.md
Normal file
@@ -0,0 +1,492 @@
|
||||
# 事件驱动文档 (Crawlful Hub)
|
||||
|
||||
> **定位**:Crawlful Hub 事件驱动设计文档 - 定义核心事件和处理流程,实现系统解耦和可扩展性。
|
||||
> **更新日期**: 2026-03-18
|
||||
> **最高优先级参考**: [Service_Design.md](./Service_Design.md)
|
||||
|
||||
---
|
||||
|
||||
## 1. 事件驱动概述
|
||||
|
||||
### 1.1 定义
|
||||
|
||||
事件驱动是一种架构模式,通过事件的产生、发布、订阅和处理来实现系统组件之间的通信和协作。
|
||||
|
||||
### 1.2 重要性
|
||||
|
||||
事件驱动架构的好处:
|
||||
- **解耦**:组件之间通过事件通信,减少直接依赖
|
||||
- **可扩展性**:可以轻松添加新的事件处理逻辑
|
||||
- **异步处理**:事件处理可以异步执行,提高系统性能
|
||||
- **可靠性**:事件可以持久化,确保消息不丢失
|
||||
- **可观测性**:可以追踪事件的产生和处理
|
||||
|
||||
---
|
||||
|
||||
## 2. 核心事件
|
||||
|
||||
### 2.1 事件定义
|
||||
|
||||
| 事件名称 | 描述 | 触发条件 | 相关服务 |
|
||||
|----------|------|----------|----------|
|
||||
| **OrderCreated** | 订单创建 | 用户提交订单 | 订单服务、库存服务、通知服务 |
|
||||
| **OrderUpdated** | 订单更新 | 订单状态变更 | 订单服务、通知服务 |
|
||||
| **OrderCompleted** | 订单完成 | 订单状态变为已完成 | 订单服务、结算服务、通知服务 |
|
||||
| **OrderCancelled** | 订单取消 | 订单状态变为已取消 | 订单服务、库存服务、通知服务 |
|
||||
| **ProductCreated** | 商品创建 | 创建新商品 | 商品服务、搜索服务 |
|
||||
| **ProductUpdated** | 商品更新 | 更新商品信息 | 商品服务、搜索服务 |
|
||||
| **ProductPriceChanged** | 商品价格变更 | 商品价格更新 | 商品服务、定价服务、搜索服务 |
|
||||
| **InventoryLow** | 库存不足 | 库存低于预警值 | 库存服务、通知服务 |
|
||||
| **InventoryUpdated** | 库存更新 | 库存数量变更 | 库存服务、商品服务 |
|
||||
| **PaymentProcessed** | 支付处理 | 支付完成 | 支付服务、订单服务 |
|
||||
| **SettlementCreated** | 结算创建 | 创建结算单 | 结算服务、通知服务 |
|
||||
| **MerchantRegistered** | 商户注册 | 新商户注册 | 商户服务、通知服务 |
|
||||
| **MerchantVerified** | 商户验证 | 商户资质验证通过 | 商户服务、通知服务 |
|
||||
|
||||
### 2.2 事件结构
|
||||
|
||||
**示例**:
|
||||
```typescript
|
||||
interface Event {
|
||||
id: string; // 事件ID
|
||||
type: string; // 事件类型
|
||||
timestamp: number; // 事件时间戳
|
||||
data: any; // 事件数据
|
||||
metadata: {
|
||||
tenantId: string; // 租户ID
|
||||
shopId: string; // 店铺ID
|
||||
traceId: string; // 追踪ID
|
||||
source: string; // 事件源
|
||||
};
|
||||
}
|
||||
|
||||
// 订单创建事件示例
|
||||
const orderCreatedEvent: Event = {
|
||||
id: 'event-001',
|
||||
type: 'OrderCreated',
|
||||
timestamp: Date.now(),
|
||||
data: {
|
||||
orderId: 'order-001',
|
||||
userId: 'user-001',
|
||||
items: [
|
||||
{
|
||||
productId: 'product-001',
|
||||
quantity: 2,
|
||||
price: 99.99
|
||||
}
|
||||
],
|
||||
totalAmount: 199.98,
|
||||
status: 'PENDING'
|
||||
},
|
||||
metadata: {
|
||||
tenantId: 'tenant-001',
|
||||
shopId: 'shop-001',
|
||||
traceId: 'trace-001',
|
||||
source: 'OrderService'
|
||||
}
|
||||
};
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 3. 事件处理流程
|
||||
|
||||
### 3.1 事件发布
|
||||
|
||||
**实现**:
|
||||
- 使用事件总线或消息队列
|
||||
- 事件发布者将事件发送到事件总线
|
||||
- 事件总线确保事件的传递
|
||||
|
||||
**示例**:
|
||||
```typescript
|
||||
// 事件总线
|
||||
class EventBus {
|
||||
private listeners: Map<string, Function[]> = new Map();
|
||||
|
||||
publish(event: Event): void {
|
||||
console.log(`Publishing event: ${event.type}`, event);
|
||||
|
||||
if (this.listeners.has(event.type)) {
|
||||
this.listeners.get(event.type)?.forEach(listener => {
|
||||
try {
|
||||
listener(event);
|
||||
} catch (error) {
|
||||
console.error(`Error handling event ${event.type}:`, error);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
subscribe(eventType: string, listener: Function): void {
|
||||
if (!this.listeners.has(eventType)) {
|
||||
this.listeners.set(eventType, []);
|
||||
}
|
||||
this.listeners.get(eventType)?.push(listener);
|
||||
}
|
||||
|
||||
unsubscribe(eventType: string, listener: Function): void {
|
||||
if (this.listeners.has(eventType)) {
|
||||
const listeners = this.listeners.get(eventType)?.filter(l => l !== listener);
|
||||
this.listeners.set(eventType, listeners || []);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 订单服务发布事件
|
||||
class OrderService {
|
||||
constructor(private eventBus: EventBus) {}
|
||||
|
||||
async createOrder(orderData: OrderCreateDto): Promise<Order> {
|
||||
// 创建订单
|
||||
const order = await this.orderRepository.create(orderData);
|
||||
|
||||
// 发布订单创建事件
|
||||
const event: Event = {
|
||||
id: uuidv4(),
|
||||
type: 'OrderCreated',
|
||||
timestamp: Date.now(),
|
||||
data: order,
|
||||
metadata: {
|
||||
tenantId: order.tenantId,
|
||||
shopId: order.shopId,
|
||||
traceId: orderData.traceId,
|
||||
source: 'OrderService'
|
||||
}
|
||||
};
|
||||
|
||||
this.eventBus.publish(event);
|
||||
|
||||
return order;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 3.2 事件订阅
|
||||
|
||||
**实现**:
|
||||
- 服务订阅感兴趣的事件
|
||||
- 当事件发生时,执行相应的处理逻辑
|
||||
|
||||
**示例**:
|
||||
```typescript
|
||||
// 库存服务订阅事件
|
||||
class InventoryService {
|
||||
constructor(private eventBus: EventBus) {
|
||||
// 订阅订单创建事件
|
||||
this.eventBus.subscribe('OrderCreated', this.handleOrderCreated.bind(this));
|
||||
|
||||
// 订阅订单取消事件
|
||||
this.eventBus.subscribe('OrderCancelled', this.handleOrderCancelled.bind(this));
|
||||
}
|
||||
|
||||
private async handleOrderCreated(event: Event): Promise<void> {
|
||||
const order = event.data;
|
||||
|
||||
// 扣减库存
|
||||
for (const item of order.items) {
|
||||
await this.deductInventory(item.productId, item.quantity);
|
||||
}
|
||||
}
|
||||
|
||||
private async handleOrderCancelled(event: Event): Promise<void> {
|
||||
const order = event.data;
|
||||
|
||||
// 恢复库存
|
||||
for (const item of order.items) {
|
||||
await this.restoreInventory(item.productId, item.quantity);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 通知服务订阅事件
|
||||
class NotificationService {
|
||||
constructor(private eventBus: EventBus) {
|
||||
// 订阅订单相关事件
|
||||
this.eventBus.subscribe('OrderCreated', this.sendOrderConfirmation.bind(this));
|
||||
this.eventBus.subscribe('OrderUpdated', this.sendOrderUpdate.bind(this));
|
||||
this.eventBus.subscribe('OrderCompleted', this.sendOrderCompletion.bind(this));
|
||||
this.eventBus.subscribe('OrderCancelled', this.sendOrderCancellation.bind(this));
|
||||
}
|
||||
|
||||
private async sendOrderConfirmation(event: Event): Promise<void> {
|
||||
const order = event.data;
|
||||
// 发送订单确认通知
|
||||
await this.sendEmail(order.userId, 'Order Confirmation', `Your order ${order.id} has been created`);
|
||||
}
|
||||
|
||||
private async sendOrderUpdate(event: Event): Promise<void> {
|
||||
const order = event.data;
|
||||
// 发送订单更新通知
|
||||
await this.sendEmail(order.userId, 'Order Update', `Your order ${order.id} has been updated`);
|
||||
}
|
||||
|
||||
private async sendOrderCompletion(event: Event): Promise<void> {
|
||||
const order = event.data;
|
||||
// 发送订单完成通知
|
||||
await this.sendEmail(order.userId, 'Order Completed', `Your order ${order.id} has been completed`);
|
||||
}
|
||||
|
||||
private async sendOrderCancellation(event: Event): Promise<void> {
|
||||
const order = event.data;
|
||||
// 发送订单取消通知
|
||||
await this.sendEmail(order.userId, 'Order Cancelled', `Your order ${order.id} has been cancelled`);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 3.3 事件持久化
|
||||
|
||||
**实现**:
|
||||
- 使用消息队列(如 RabbitMQ、Kafka)持久化事件
|
||||
- 确保事件不丢失
|
||||
- 支持事件的重试和重放
|
||||
|
||||
**示例**:
|
||||
```typescript
|
||||
// 使用 Kafka
|
||||
import { Kafka } from 'kafkajs';
|
||||
|
||||
const kafka = new Kafka({
|
||||
clientId: 'crawlful-hub',
|
||||
brokers: ['localhost:9092']
|
||||
});
|
||||
|
||||
const producer = kafka.producer();
|
||||
const consumer = kafka.consumer({ groupId: 'inventory-service' });
|
||||
|
||||
// 发布事件
|
||||
async function publishEvent(event: Event): Promise<void> {
|
||||
await producer.connect();
|
||||
await producer.send({
|
||||
topic: 'events',
|
||||
messages: [
|
||||
{
|
||||
key: event.type,
|
||||
value: JSON.stringify(event)
|
||||
}
|
||||
]
|
||||
});
|
||||
await producer.disconnect();
|
||||
}
|
||||
|
||||
// 订阅事件
|
||||
async function subscribeEvents(): Promise<void> {
|
||||
await consumer.connect();
|
||||
await consumer.subscribe({ topic: 'events', fromBeginning: false });
|
||||
|
||||
await consumer.run({
|
||||
eachMessage: async ({ message }) => {
|
||||
const event = JSON.parse(message.value?.toString() || '{}');
|
||||
console.log('Received event:', event);
|
||||
|
||||
// 处理事件
|
||||
if (event.type === 'OrderCreated') {
|
||||
await handleOrderCreated(event);
|
||||
} else if (event.type === 'OrderCancelled') {
|
||||
await handleOrderCancelled(event);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 4. 事件驱动架构
|
||||
|
||||
### 4.1 架构图
|
||||
|
||||
```
|
||||
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
|
||||
│ │ │ │ │ │
|
||||
│ 服务 A │────>│ 事件总线 │────>│ 服务 B │
|
||||
│ │ │ │ │ │
|
||||
└─────────────┘ └─────────────┘ └─────────────┘
|
||||
^ │ ^
|
||||
│ │ │
|
||||
│ v │
|
||||
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
|
||||
│ │ │ │ │ │
|
||||
│ 服务 D │<────│ 消息队列 │<────│ 服务 C │
|
||||
│ │ │ │ │ │
|
||||
└─────────────┘ └─────────────┘ └─────────────┘
|
||||
```
|
||||
|
||||
### 4.2 核心组件
|
||||
|
||||
- **事件发布者**:产生事件的服务
|
||||
- **事件总线**:传递事件的中间件
|
||||
- **消息队列**:持久化事件的存储
|
||||
- **事件订阅者**:处理事件的服务
|
||||
- **事件处理器**:执行具体的事件处理逻辑
|
||||
|
||||
### 4.3 优势
|
||||
|
||||
- **松耦合**:服务之间通过事件通信,减少直接依赖
|
||||
- **可扩展性**:可以轻松添加新的服务和事件处理逻辑
|
||||
- **可靠性**:事件可以持久化,确保消息不丢失
|
||||
- **灵活性**:事件处理可以异步执行,提高系统性能
|
||||
- **可观测性**:可以追踪事件的产生和处理
|
||||
|
||||
---
|
||||
|
||||
## 5. 最佳实践
|
||||
|
||||
### 5.1 事件设计
|
||||
|
||||
- **事件命名**:使用清晰、语义化的事件名称
|
||||
- **事件结构**:统一事件结构,包含必要的字段
|
||||
- **事件数据**:只包含必要的数据,避免过大
|
||||
- **事件版本**:考虑事件的版本管理
|
||||
|
||||
### 5.2 事件处理
|
||||
|
||||
- **幂等性**:确保事件处理的幂等性
|
||||
- **错误处理**:妥善处理事件处理过程中的错误
|
||||
- **重试机制**:实现事件处理的重试机制
|
||||
- **死信队列**:处理无法正常处理的事件
|
||||
|
||||
### 5.3 性能优化
|
||||
|
||||
- **批量处理**:批量处理事件,减少网络开销
|
||||
- **异步处理**:使用异步处理提高性能
|
||||
- **缓存**:合理使用缓存,减少重复计算
|
||||
- **限流**:实现事件处理的限流机制
|
||||
|
||||
### 5.4 监控和告警
|
||||
|
||||
- **事件监控**:监控事件的产生和处理
|
||||
- **事件延迟**:监控事件处理的延迟
|
||||
- **错误率**:监控事件处理的错误率
|
||||
- **告警机制**:设置合理的告警阈值
|
||||
|
||||
---
|
||||
|
||||
## 6. 实现示例
|
||||
|
||||
### 6.1 事件总线
|
||||
|
||||
```typescript
|
||||
// 事件总线实现
|
||||
class EventBus {
|
||||
private listeners: Map<string, Function[]> = new Map();
|
||||
private queue: Event[] = [];
|
||||
private processing = false;
|
||||
|
||||
publish(event: Event): void {
|
||||
this.queue.push(event);
|
||||
if (!this.processing) {
|
||||
this.processQueue();
|
||||
}
|
||||
}
|
||||
|
||||
private async processQueue(): Promise<void> {
|
||||
this.processing = true;
|
||||
|
||||
while (this.queue.length > 0) {
|
||||
const event = this.queue.shift();
|
||||
if (event) {
|
||||
await this.processEvent(event);
|
||||
}
|
||||
}
|
||||
|
||||
this.processing = false;
|
||||
}
|
||||
|
||||
private async processEvent(event: Event): Promise<void> {
|
||||
console.log(`Processing event: ${event.type}`, event);
|
||||
|
||||
if (this.listeners.has(event.type)) {
|
||||
const listeners = this.listeners.get(event.type) || [];
|
||||
|
||||
for (const listener of listeners) {
|
||||
try {
|
||||
await listener(event);
|
||||
} catch (error) {
|
||||
console.error(`Error handling event ${event.type}:`, error);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
subscribe(eventType: string, listener: Function): void {
|
||||
if (!this.listeners.has(eventType)) {
|
||||
this.listeners.set(eventType, []);
|
||||
}
|
||||
this.listeners.get(eventType)?.push(listener);
|
||||
}
|
||||
|
||||
unsubscribe(eventType: string, listener: Function): void {
|
||||
if (this.listeners.has(eventType)) {
|
||||
const listeners = this.listeners.get(eventType)?.filter(l => l !== listener);
|
||||
this.listeners.set(eventType, listeners || []);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 全局事件总线
|
||||
export const eventBus = new EventBus();
|
||||
```
|
||||
|
||||
### 6.2 事件处理器
|
||||
|
||||
```typescript
|
||||
// 订单事件处理器
|
||||
class OrderEventHandler {
|
||||
constructor(private inventoryService: InventoryService, private notificationService: NotificationService) {
|
||||
eventBus.subscribe('OrderCreated', this.handleOrderCreated.bind(this));
|
||||
eventBus.subscribe('OrderCancelled', this.handleOrderCancelled.bind(this));
|
||||
}
|
||||
|
||||
private async handleOrderCreated(event: Event): Promise<void> {
|
||||
const order = event.data;
|
||||
|
||||
// 扣减库存
|
||||
for (const item of order.items) {
|
||||
await this.inventoryService.deductInventory(item.productId, item.quantity);
|
||||
}
|
||||
|
||||
// 发送通知
|
||||
await this.notificationService.sendOrderConfirmation(order);
|
||||
}
|
||||
|
||||
private async handleOrderCancelled(event: Event): Promise<void> {
|
||||
const order = event.data;
|
||||
|
||||
// 恢复库存
|
||||
for (const item of order.items) {
|
||||
await this.inventoryService.restoreInventory(item.productId, item.quantity);
|
||||
}
|
||||
|
||||
// 发送通知
|
||||
await this.notificationService.sendOrderCancellation(order);
|
||||
}
|
||||
}
|
||||
|
||||
// 库存事件处理器
|
||||
class InventoryEventHandler {
|
||||
constructor(private notificationService: NotificationService) {
|
||||
eventBus.subscribe('InventoryLow', this.handleInventoryLow.bind(this));
|
||||
}
|
||||
|
||||
private async handleInventoryLow(event: Event): Promise<void> {
|
||||
const { productId, quantity, alertLevel } = event.data;
|
||||
|
||||
// 发送库存预警通知
|
||||
await this.notificationService.sendInventoryAlert(productId, quantity, alertLevel);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 7. 相关文档
|
||||
|
||||
- [Service_Design.md](./Service_Design.md)
|
||||
- [Data_Consistency.md](./Data_Consistency.md)
|
||||
- [Observability.md](./Observability.md)
|
||||
|
||||
---
|
||||
|
||||
*本文档基于服务设计文档,最后更新: 2026-03-18*
|
||||
Reference in New Issue
Block a user