Files
makemd/docs/02_Backend/05_Data_Consistency.md
wurenzhi eafa1bbe94 feat: 添加货币和汇率管理功能
refactor: 重构前端路由和登录逻辑

docs: 更新业务闭环、任务和架构文档

style: 调整代码格式和文件结构

chore: 更新依赖项和配置文件
2026-03-19 19:08:15 +08:00

442 lines
9.9 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 数据一致性文档 (Crawlful Hub)
> **定位**Crawlful Hub 数据一致性设计文档 - 确保系统数据的准确性和可靠性。
> **更新日期**: 2026-03-18
> **最高优先级参考**: [Service_Design.md](./Service_Design.md)
---
## 1. 数据一致性概述
### 1.1 重要性
数据一致性是系统的核心要求,特别是对于涉及资金、库存、订单等关键业务数据的系统。数据不一致可能导致:
- 财务损失
- 库存管理混乱
- 订单处理错误
- 商户信任度下降
- 系统不可用
### 1.2 核心原则
- **原子性**:操作要么全部成功,要么全部失败
- **一致性**:操作前后数据状态保持一致
- **隔离性**:并发操作互不干扰
- **持久性**:数据一旦提交,就永久保存
---
## 2. 事务边界
### 2.1 定义
事务边界是指一组操作的范围,这些操作必须作为一个整体执行,要么全部成功,要么全部失败。
### 2.2 实现方法
#### 2.2.1 数据库事务
**适用场景**:涉及数据库操作的业务流程
**实现**
- 使用 `@Transactional` 注解Java/Spring
- 使用 `transaction` 方法Node.js/Sequelize
- 使用数据库原生事务
**示例**
```typescript
// Node.js/Sequelize 示例
async createOrder(orderData: OrderCreateDto): Promise<Order> {
return await this.sequelize.transaction(async (t) => {
// 1. 创建订单
const order = await Order.create(orderData, { transaction: t });
// 2. 扣减库存
await Inventory.decrement('quantity', {
by: orderData.quantity,
where: { productId: orderData.productId },
transaction: t
});
// 3. 记录交易
await Transaction.create({
orderId: order.id,
amount: order.totalAmount,
type: 'ORDER_CREATED'
}, { transaction: t });
return order;
});
}
```
#### 2.2.2 分布式事务
**适用场景**:跨多个服务或数据源的业务流程
**实现**
- 两阶段提交2PC
- 补偿事务TCC
- 消息队列 + 最终一致性
**示例**
```typescript
// 补偿事务示例
async processOrder(orderId: string): Promise<void> {
// 1. 尝试处理订单
const order = await this.orderRepository.findById(orderId);
try {
// 2. 扣减库存
await this.inventoryService.deductInventory(
order.productId,
order.quantity
);
// 3. 安排物流
const shippingId = await this.logisticsService.createShipping(order);
order.shippingId = shippingId;
// 4. 标记订单为已处理
order.status = 'PROCESSED';
await this.orderRepository.save(order);
} catch (error) {
// 5. 补偿操作
await this.compensateOrder(order);
throw error;
}
}
async compensateOrder(order: Order): Promise<void> {
// 恢复库存
await this.inventoryService.restoreInventory(
order.productId,
order.quantity
);
// 取消物流
if (order.shippingId) {
await this.logisticsService.cancelShipping(order.shippingId);
}
// 标记订单为失败
order.status = 'FAILED';
await this.orderRepository.save(order);
}
```
### 2.3 最佳实践
- **明确事务边界**:只包含必要的操作
- **保持事务简短**:减少锁持有时间
- **合理设置隔离级别**:根据业务需求选择适当的隔离级别
- **处理事务异常**:确保异常情况下能够正确回滚
---
## 3. 幂等性
### 3.1 定义
幂等性是指同一个请求执行多次,结果应该相同。
### 3.2 实现方法
#### 3.2.1 请求ID
**适用场景**:所有外部 API 调用、支付回调等
**实现**
- 生成唯一的 `requestId`
- 记录已处理的 `requestId`
- 检查重复请求
**示例**
```typescript
async processPayment(paymentData: PaymentDto): Promise<Payment> {
// 检查是否已处理
const existingPayment = await this.paymentRepository.findByRequestId(paymentData.requestId);
if (existingPayment) {
return existingPayment;
}
// 处理支付
const payment = await this.paymentRepository.create({
...paymentData,
status: 'PROCESSING'
});
try {
// 调用支付网关
const result = await this.paymentGateway.process(paymentData);
// 更新支付状态
payment.status = result.status;
payment.transactionId = result.transactionId;
await this.paymentRepository.save(payment);
} catch (error) {
// 更新支付状态为失败
payment.status = 'FAILED';
await this.paymentRepository.save(payment);
throw error;
}
return payment;
}
```
#### 3.2.2 乐观锁
**适用场景**:并发更新操作
**实现**
- 使用版本号或时间戳
- 更新时检查版本号
- 版本号不匹配则重试或失败
**示例**
```typescript
async updateProduct(productId: string, updates: Partial<Product>): Promise<Product> {
const product = await this.productRepository.findById(productId);
const currentVersion = product.version;
// 尝试更新
const updated = await this.productRepository.update(
{
...updates,
version: currentVersion + 1
},
{
where: {
id: productId,
version: currentVersion
}
}
);
if (updated[0] === 0) {
// 版本不匹配,说明被其他线程修改
throw new ConflictException('Product has been updated by another process');
}
return await this.productRepository.findById(productId);
}
```
### 3.3 最佳实践
- **为所有外部请求生成唯一ID**
- **存储请求ID和处理结果**
- **设置合理的过期时间**
- **处理重复请求时返回相同的结果**
---
## 4. 状态机
### 4.1 定义
状态机是一种用于描述对象状态及其转换规则的模型。
### 4.2 实现方法
#### 4.2.1 枚举状态
**适用场景**:简单的状态流转
**实现**
- 使用枚举定义状态
- 使用条件判断处理状态转换
**示例**
```typescript
enum OrderStatus {
PENDING = 'PENDING',
PROCESSING = 'PROCESSING',
SHIPPED = 'SHIPPED',
DELIVERED = 'DELIVERED',
CANCELLED = 'CANCELLED',
FAILED = 'FAILED'
}
class OrderStateMachine {
static canTransition(from: OrderStatus, to: OrderStatus): boolean {
const transitions = {
[OrderStatus.PENDING]: [OrderStatus.PROCESSING, OrderStatus.CANCELLED],
[OrderStatus.PROCESSING]: [OrderStatus.SHIPPED, OrderStatus.FAILED, OrderStatus.CANCELLED],
[OrderStatus.SHIPPED]: [OrderStatus.DELIVERED, OrderStatus.FAILED],
[OrderStatus.DELIVERED]: [],
[OrderStatus.CANCELLED]: [],
[OrderStatus.FAILED]: []
};
return transitions[from].includes(to);
}
}
async updateOrderStatus(orderId: string, newStatus: OrderStatus): Promise<Order> {
const order = await this.orderRepository.findById(orderId);
if (!OrderStateMachine.canTransition(order.status, newStatus)) {
throw new BadRequestException(`Cannot transition from ${order.status} to ${newStatus}`);
}
order.status = newStatus;
return await this.orderRepository.save(order);
}
```
#### 4.2.2 状态机库
**适用场景**:复杂的状态流转
**实现**
- 使用专门的状态机库
- 定义状态、事件和转换
- 处理副作用
**示例**
```typescript
// 使用 xstate 库
import { createMachine, interpret } from 'xstate';
const orderMachine = createMachine({
id: 'order',
initial: 'PENDING',
states: {
PENDING: {
on: {
PROCESS: 'PROCESSING',
CANCEL: 'CANCELLED'
}
},
PROCESSING: {
on: {
SHIP: 'SHIPPED',
FAIL: 'FAILED',
CANCEL: 'CANCELLED'
}
},
SHIPPED: {
on: {
DELIVER: 'DELIVERED',
FAIL: 'FAILED'
}
},
DELIVERED: {
type: 'final'
},
CANCELLED: {
type: 'final'
},
FAILED: {
type: 'final'
}
}
});
const orderService = interpret(orderMachine)
.onTransition(state => console.log('Order state:', state.value));
orderService.start();
orderService.send('PROCESS'); // 从 PENDING 到 PROCESSING
orderService.send('SHIP'); // 从 PROCESSING 到 SHIPPED
orderService.send('DELIVER'); // 从 SHIPPED 到 DELIVERED
```
### 4.3 最佳实践
- **明确定义状态和转换规则**
- **使用状态机管理所有状态流转**
- **记录状态转换历史**
- **处理状态转换的副作用**
- **验证状态转换的合法性**
---
## 5. 数据一致性保障措施
### 5.1 数据库层面
- **使用事务**:确保数据操作的原子性
- **设置约束**:使用唯一约束、外键约束等
- **合理索引**:提高查询性能,减少锁竞争
- **定期备份**:防止数据丢失
### 5.2 应用层面
- **实现幂等性**:处理重复请求
- **使用状态机**:管理状态流转
- **异步处理**:使用消息队列处理非实时操作
- **补偿机制**:处理失败的操作
### 5.3 监控和告警
- **数据一致性检查**:定期检查数据一致性
- **异常监控**:监控数据操作异常
- **告警机制**:及时发现和处理数据不一致问题
---
## 6. 常见问题及解决方案
### 6.1 并发更新冲突
**问题**:多个用户同时更新同一条数据
**解决方案**
- 使用乐观锁
- 使用悲观锁
- 实现队列机制
### 6.2 分布式事务
**问题**:跨多个服务或数据源的事务
**解决方案**
- 使用消息队列 + 最终一致性
- 使用 Saga 模式
- 使用 TCC 模式
### 6.3 数据同步延迟
**问题**:不同系统之间的数据同步存在延迟
**解决方案**
- 使用事件驱动架构
- 实现增量同步
- 设置合理的同步频率
---
## 7. 测试策略
### 7.1 单元测试
- 测试单个组件的逻辑
- 模拟依赖
- 验证边界情况
### 7.2 集成测试
- 测试多个组件的交互
- 测试事务边界
- 测试状态转换
### 7.3 压力测试
- 测试并发场景
- 测试系统稳定性
- 测试数据一致性
---
## 8. 相关文档
- [Service_Design.md](./Service_Design.md)
- [Database_Design.md](./Database_Design.md)
- [API_Specs](./API_Specs/)
---
*本文档基于服务设计文档,最后更新: 2026-03-18*