442 lines
9.9 KiB
Markdown
442 lines
9.9 KiB
Markdown
|
|
# 数据一致性文档 (Crawlful Hub)
|
|||
|
|
|
|||
|
|
> **定位**:Crawlful Hub 数据一致性设计文档 - 确保系统数据的准确性和可靠性。
|
|||
|
|
> **更新日期**: 2026-03-18
|
|||
|
|
> **最高优先级参考**: [Service_Design.md](./Service_Design.md)
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 1. 数据一致性概述
|
|||
|
|
|
|||
|
|
### 1.1 重要性
|
|||
|
|
|
|||
|
|
数据一致性是系统的核心要求,特别是对于涉及资金、库存、订单等关键业务数据的系统。数据不一致可能导致:
|
|||
|
|
- 财务损失
|
|||
|
|
- 库存管理混乱
|
|||
|
|
- 订单处理错误
|
|||
|
|
- 商户信任度下降
|
|||
|
|
- 系统不可用
|
|||
|
|
|
|||
|
|
### 1.2 核心原则
|
|||
|
|
|
|||
|
|
- **原子性**:操作要么全部成功,要么全部失败
|
|||
|
|
- **一致性**:操作前后数据状态保持一致
|
|||
|
|
- **隔离性**:并发操作互不干扰
|
|||
|
|
- **持久性**:数据一旦提交,就永久保存
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 2. 事务边界
|
|||
|
|
|
|||
|
|
### 2.1 定义
|
|||
|
|
|
|||
|
|
事务边界是指一组操作的范围,这些操作必须作为一个整体执行,要么全部成功,要么全部失败。
|
|||
|
|
|
|||
|
|
### 2.2 实现方法
|
|||
|
|
|
|||
|
|
#### 2.2.1 数据库事务
|
|||
|
|
|
|||
|
|
**适用场景**:涉及数据库操作的业务流程
|
|||
|
|
|
|||
|
|
**实现**:
|
|||
|
|
- 使用 `@Transactional` 注解(Java/Spring)
|
|||
|
|
- 使用 `transaction` 方法(Node.js/Sequelize)
|
|||
|
|
- 使用数据库原生事务
|
|||
|
|
|
|||
|
|
**示例**:
|
|||
|
|
```typescript
|
|||
|
|
// Node.js/Sequelize 示例
|
|||
|
|
async createOrder(orderData: OrderCreateDto): Promise<Order> {
|
|||
|
|
return await this.sequelize.transaction(async (t) => {
|
|||
|
|
// 1. 创建订单
|
|||
|
|
const order = await Order.create(orderData, { transaction: t });
|
|||
|
|
|
|||
|
|
// 2. 扣减库存
|
|||
|
|
await Inventory.decrement('quantity', {
|
|||
|
|
by: orderData.quantity,
|
|||
|
|
where: { productId: orderData.productId },
|
|||
|
|
transaction: t
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
// 3. 记录交易
|
|||
|
|
await Transaction.create({
|
|||
|
|
orderId: order.id,
|
|||
|
|
amount: order.totalAmount,
|
|||
|
|
type: 'ORDER_CREATED'
|
|||
|
|
}, { transaction: t });
|
|||
|
|
|
|||
|
|
return order;
|
|||
|
|
});
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
#### 2.2.2 分布式事务
|
|||
|
|
|
|||
|
|
**适用场景**:跨多个服务或数据源的业务流程
|
|||
|
|
|
|||
|
|
**实现**:
|
|||
|
|
- 两阶段提交(2PC)
|
|||
|
|
- 补偿事务(TCC)
|
|||
|
|
- 消息队列 + 最终一致性
|
|||
|
|
|
|||
|
|
**示例**:
|
|||
|
|
```typescript
|
|||
|
|
// 补偿事务示例
|
|||
|
|
async processOrder(orderId: string): Promise<void> {
|
|||
|
|
// 1. 尝试处理订单
|
|||
|
|
const order = await this.orderRepository.findById(orderId);
|
|||
|
|
|
|||
|
|
try {
|
|||
|
|
// 2. 扣减库存
|
|||
|
|
await this.inventoryService.deductInventory(
|
|||
|
|
order.productId,
|
|||
|
|
order.quantity
|
|||
|
|
);
|
|||
|
|
|
|||
|
|
// 3. 安排物流
|
|||
|
|
const shippingId = await this.logisticsService.createShipping(order);
|
|||
|
|
order.shippingId = shippingId;
|
|||
|
|
|
|||
|
|
// 4. 标记订单为已处理
|
|||
|
|
order.status = 'PROCESSED';
|
|||
|
|
await this.orderRepository.save(order);
|
|||
|
|
} catch (error) {
|
|||
|
|
// 5. 补偿操作
|
|||
|
|
await this.compensateOrder(order);
|
|||
|
|
throw error;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
async compensateOrder(order: Order): Promise<void> {
|
|||
|
|
// 恢复库存
|
|||
|
|
await this.inventoryService.restoreInventory(
|
|||
|
|
order.productId,
|
|||
|
|
order.quantity
|
|||
|
|
);
|
|||
|
|
|
|||
|
|
// 取消物流
|
|||
|
|
if (order.shippingId) {
|
|||
|
|
await this.logisticsService.cancelShipping(order.shippingId);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 标记订单为失败
|
|||
|
|
order.status = 'FAILED';
|
|||
|
|
await this.orderRepository.save(order);
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 2.3 最佳实践
|
|||
|
|
|
|||
|
|
- **明确事务边界**:只包含必要的操作
|
|||
|
|
- **保持事务简短**:减少锁持有时间
|
|||
|
|
- **合理设置隔离级别**:根据业务需求选择适当的隔离级别
|
|||
|
|
- **处理事务异常**:确保异常情况下能够正确回滚
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 3. 幂等性
|
|||
|
|
|
|||
|
|
### 3.1 定义
|
|||
|
|
|
|||
|
|
幂等性是指同一个请求执行多次,结果应该相同。
|
|||
|
|
|
|||
|
|
### 3.2 实现方法
|
|||
|
|
|
|||
|
|
#### 3.2.1 请求ID
|
|||
|
|
|
|||
|
|
**适用场景**:所有外部 API 调用、支付回调等
|
|||
|
|
|
|||
|
|
**实现**:
|
|||
|
|
- 生成唯一的 `requestId`
|
|||
|
|
- 记录已处理的 `requestId`
|
|||
|
|
- 检查重复请求
|
|||
|
|
|
|||
|
|
**示例**:
|
|||
|
|
```typescript
|
|||
|
|
async processPayment(paymentData: PaymentDto): Promise<Payment> {
|
|||
|
|
// 检查是否已处理
|
|||
|
|
const existingPayment = await this.paymentRepository.findByRequestId(paymentData.requestId);
|
|||
|
|
if (existingPayment) {
|
|||
|
|
return existingPayment;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 处理支付
|
|||
|
|
const payment = await this.paymentRepository.create({
|
|||
|
|
...paymentData,
|
|||
|
|
status: 'PROCESSING'
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
try {
|
|||
|
|
// 调用支付网关
|
|||
|
|
const result = await this.paymentGateway.process(paymentData);
|
|||
|
|
|
|||
|
|
// 更新支付状态
|
|||
|
|
payment.status = result.status;
|
|||
|
|
payment.transactionId = result.transactionId;
|
|||
|
|
await this.paymentRepository.save(payment);
|
|||
|
|
} catch (error) {
|
|||
|
|
// 更新支付状态为失败
|
|||
|
|
payment.status = 'FAILED';
|
|||
|
|
await this.paymentRepository.save(payment);
|
|||
|
|
throw error;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return payment;
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
#### 3.2.2 乐观锁
|
|||
|
|
|
|||
|
|
**适用场景**:并发更新操作
|
|||
|
|
|
|||
|
|
**实现**:
|
|||
|
|
- 使用版本号或时间戳
|
|||
|
|
- 更新时检查版本号
|
|||
|
|
- 版本号不匹配则重试或失败
|
|||
|
|
|
|||
|
|
**示例**:
|
|||
|
|
```typescript
|
|||
|
|
async updateProduct(productId: string, updates: Partial<Product>): Promise<Product> {
|
|||
|
|
const product = await this.productRepository.findById(productId);
|
|||
|
|
const currentVersion = product.version;
|
|||
|
|
|
|||
|
|
// 尝试更新
|
|||
|
|
const updated = await this.productRepository.update(
|
|||
|
|
{
|
|||
|
|
...updates,
|
|||
|
|
version: currentVersion + 1
|
|||
|
|
},
|
|||
|
|
{
|
|||
|
|
where: {
|
|||
|
|
id: productId,
|
|||
|
|
version: currentVersion
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
);
|
|||
|
|
|
|||
|
|
if (updated[0] === 0) {
|
|||
|
|
// 版本不匹配,说明被其他线程修改
|
|||
|
|
throw new ConflictException('Product has been updated by another process');
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return await this.productRepository.findById(productId);
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 3.3 最佳实践
|
|||
|
|
|
|||
|
|
- **为所有外部请求生成唯一ID**
|
|||
|
|
- **存储请求ID和处理结果**
|
|||
|
|
- **设置合理的过期时间**
|
|||
|
|
- **处理重复请求时返回相同的结果**
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 4. 状态机
|
|||
|
|
|
|||
|
|
### 4.1 定义
|
|||
|
|
|
|||
|
|
状态机是一种用于描述对象状态及其转换规则的模型。
|
|||
|
|
|
|||
|
|
### 4.2 实现方法
|
|||
|
|
|
|||
|
|
#### 4.2.1 枚举状态
|
|||
|
|
|
|||
|
|
**适用场景**:简单的状态流转
|
|||
|
|
|
|||
|
|
**实现**:
|
|||
|
|
- 使用枚举定义状态
|
|||
|
|
- 使用条件判断处理状态转换
|
|||
|
|
|
|||
|
|
**示例**:
|
|||
|
|
```typescript
|
|||
|
|
enum OrderStatus {
|
|||
|
|
PENDING = 'PENDING',
|
|||
|
|
PROCESSING = 'PROCESSING',
|
|||
|
|
SHIPPED = 'SHIPPED',
|
|||
|
|
DELIVERED = 'DELIVERED',
|
|||
|
|
CANCELLED = 'CANCELLED',
|
|||
|
|
FAILED = 'FAILED'
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
class OrderStateMachine {
|
|||
|
|
static canTransition(from: OrderStatus, to: OrderStatus): boolean {
|
|||
|
|
const transitions = {
|
|||
|
|
[OrderStatus.PENDING]: [OrderStatus.PROCESSING, OrderStatus.CANCELLED],
|
|||
|
|
[OrderStatus.PROCESSING]: [OrderStatus.SHIPPED, OrderStatus.FAILED, OrderStatus.CANCELLED],
|
|||
|
|
[OrderStatus.SHIPPED]: [OrderStatus.DELIVERED, OrderStatus.FAILED],
|
|||
|
|
[OrderStatus.DELIVERED]: [],
|
|||
|
|
[OrderStatus.CANCELLED]: [],
|
|||
|
|
[OrderStatus.FAILED]: []
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
return transitions[from].includes(to);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
async updateOrderStatus(orderId: string, newStatus: OrderStatus): Promise<Order> {
|
|||
|
|
const order = await this.orderRepository.findById(orderId);
|
|||
|
|
|
|||
|
|
if (!OrderStateMachine.canTransition(order.status, newStatus)) {
|
|||
|
|
throw new BadRequestException(`Cannot transition from ${order.status} to ${newStatus}`);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
order.status = newStatus;
|
|||
|
|
return await this.orderRepository.save(order);
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
#### 4.2.2 状态机库
|
|||
|
|
|
|||
|
|
**适用场景**:复杂的状态流转
|
|||
|
|
|
|||
|
|
**实现**:
|
|||
|
|
- 使用专门的状态机库
|
|||
|
|
- 定义状态、事件和转换
|
|||
|
|
- 处理副作用
|
|||
|
|
|
|||
|
|
**示例**:
|
|||
|
|
```typescript
|
|||
|
|
// 使用 xstate 库
|
|||
|
|
import { createMachine, interpret } from 'xstate';
|
|||
|
|
|
|||
|
|
const orderMachine = createMachine({
|
|||
|
|
id: 'order',
|
|||
|
|
initial: 'PENDING',
|
|||
|
|
states: {
|
|||
|
|
PENDING: {
|
|||
|
|
on: {
|
|||
|
|
PROCESS: 'PROCESSING',
|
|||
|
|
CANCEL: 'CANCELLED'
|
|||
|
|
}
|
|||
|
|
},
|
|||
|
|
PROCESSING: {
|
|||
|
|
on: {
|
|||
|
|
SHIP: 'SHIPPED',
|
|||
|
|
FAIL: 'FAILED',
|
|||
|
|
CANCEL: 'CANCELLED'
|
|||
|
|
}
|
|||
|
|
},
|
|||
|
|
SHIPPED: {
|
|||
|
|
on: {
|
|||
|
|
DELIVER: 'DELIVERED',
|
|||
|
|
FAIL: 'FAILED'
|
|||
|
|
}
|
|||
|
|
},
|
|||
|
|
DELIVERED: {
|
|||
|
|
type: 'final'
|
|||
|
|
},
|
|||
|
|
CANCELLED: {
|
|||
|
|
type: 'final'
|
|||
|
|
},
|
|||
|
|
FAILED: {
|
|||
|
|
type: 'final'
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
const orderService = interpret(orderMachine)
|
|||
|
|
.onTransition(state => console.log('Order state:', state.value));
|
|||
|
|
|
|||
|
|
orderService.start();
|
|||
|
|
orderService.send('PROCESS'); // 从 PENDING 到 PROCESSING
|
|||
|
|
orderService.send('SHIP'); // 从 PROCESSING 到 SHIPPED
|
|||
|
|
orderService.send('DELIVER'); // 从 SHIPPED 到 DELIVERED
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 4.3 最佳实践
|
|||
|
|
|
|||
|
|
- **明确定义状态和转换规则**
|
|||
|
|
- **使用状态机管理所有状态流转**
|
|||
|
|
- **记录状态转换历史**
|
|||
|
|
- **处理状态转换的副作用**
|
|||
|
|
- **验证状态转换的合法性**
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 5. 数据一致性保障措施
|
|||
|
|
|
|||
|
|
### 5.1 数据库层面
|
|||
|
|
|
|||
|
|
- **使用事务**:确保数据操作的原子性
|
|||
|
|
- **设置约束**:使用唯一约束、外键约束等
|
|||
|
|
- **合理索引**:提高查询性能,减少锁竞争
|
|||
|
|
- **定期备份**:防止数据丢失
|
|||
|
|
|
|||
|
|
### 5.2 应用层面
|
|||
|
|
|
|||
|
|
- **实现幂等性**:处理重复请求
|
|||
|
|
- **使用状态机**:管理状态流转
|
|||
|
|
- **异步处理**:使用消息队列处理非实时操作
|
|||
|
|
- **补偿机制**:处理失败的操作
|
|||
|
|
|
|||
|
|
### 5.3 监控和告警
|
|||
|
|
|
|||
|
|
- **数据一致性检查**:定期检查数据一致性
|
|||
|
|
- **异常监控**:监控数据操作异常
|
|||
|
|
- **告警机制**:及时发现和处理数据不一致问题
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 6. 常见问题及解决方案
|
|||
|
|
|
|||
|
|
### 6.1 并发更新冲突
|
|||
|
|
|
|||
|
|
**问题**:多个用户同时更新同一条数据
|
|||
|
|
|
|||
|
|
**解决方案**:
|
|||
|
|
- 使用乐观锁
|
|||
|
|
- 使用悲观锁
|
|||
|
|
- 实现队列机制
|
|||
|
|
|
|||
|
|
### 6.2 分布式事务
|
|||
|
|
|
|||
|
|
**问题**:跨多个服务或数据源的事务
|
|||
|
|
|
|||
|
|
**解决方案**:
|
|||
|
|
- 使用消息队列 + 最终一致性
|
|||
|
|
- 使用 Saga 模式
|
|||
|
|
- 使用 TCC 模式
|
|||
|
|
|
|||
|
|
### 6.3 数据同步延迟
|
|||
|
|
|
|||
|
|
**问题**:不同系统之间的数据同步存在延迟
|
|||
|
|
|
|||
|
|
**解决方案**:
|
|||
|
|
- 使用事件驱动架构
|
|||
|
|
- 实现增量同步
|
|||
|
|
- 设置合理的同步频率
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 7. 测试策略
|
|||
|
|
|
|||
|
|
### 7.1 单元测试
|
|||
|
|
|
|||
|
|
- 测试单个组件的逻辑
|
|||
|
|
- 模拟依赖
|
|||
|
|
- 验证边界情况
|
|||
|
|
|
|||
|
|
### 7.2 集成测试
|
|||
|
|
|
|||
|
|
- 测试多个组件的交互
|
|||
|
|
- 测试事务边界
|
|||
|
|
- 测试状态转换
|
|||
|
|
|
|||
|
|
### 7.3 压力测试
|
|||
|
|
|
|||
|
|
- 测试并发场景
|
|||
|
|
- 测试系统稳定性
|
|||
|
|
- 测试数据一致性
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 8. 相关文档
|
|||
|
|
|
|||
|
|
- [Service_Design.md](./Service_Design.md)
|
|||
|
|
- [Database_Design.md](./Database_Design.md)
|
|||
|
|
- [API_Specs](./API_Specs/)
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
*本文档基于服务设计文档,最后更新: 2026-03-18*
|