Files
makemd/docs/ARCHIVE/02_Backend/05_Data_Consistency.md
wurenzhi 2b86715c09 refactor: 优化代码结构并修复类型问题
- 移除未使用的TabPane组件
- 修复类型定义和导入方式
- 优化mock数据源的环境变量判断逻辑
- 更新文档结构并归档旧文件
- 添加新的UI组件和Memo组件
- 调整API路径和响应处理
2026-03-23 12:41:35 +08:00

605 lines
20 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 数据一致性文档 (Crawlful Hub)
> **定位**Crawlful Hub 数据一致性设计文档 - 确保系统数据的准确性和可靠性。
> **更新日期**: 2026-03-18
> **最高优先级参考**: [Service_Design.md](./Service_Design.md)
---
## 1. 数据一致性概述
### 1.1 重要性
数据一致性是系统的核心要求,特别是对于涉及资金、库存、订单等关键业务数据的系统。数据不一致可能导致:
- 财务损失
- 库存管理混乱
- 订单处理错误
- 商户信任度下降
- 系统不可用
### 1.2 核心原则
- **原子性**:操作要么全部成功,要么全部失败
- **一致性**:操作前后数据状态保持一致
- **隔离性**:并发操作互不干扰
- **持久性**:数据一旦提交,就永久保存
---
## 2. 事务边界
### 2.1 定义
事务边界是指一组操作的范围,这些操作必须作为一个整体执行,要么全部成功,要么全部失败。
### 2.2 实现方法
#### 2.2.1 数据库事务
**适用场景**:涉及数据库操作的业务流程
**实现**
- 使用 `@Transactional` 注解Java/Spring
- 使用 `transaction` 方法Node.js/Sequelize
- 使用数据库原生事务
**示例**
```typescript
// Node.js/Sequelize 示例
async createOrder(orderData: OrderCreateDto): Promise<Order> {
return await this.sequelize.transaction(async (t) => {
// 1. 创建订单
const order = await Order.create(orderData, { transaction: t });
// 2. 扣减库存
await Inventory.decrement('quantity', {
by: orderData.quantity,
where: { productId: orderData.productId },
transaction: t
});
// 3. 记录交易
await Transaction.create({
orderId: order.id,
amount: order.totalAmount,
type: 'ORDER_CREATED'
}, { transaction: t });
return order;
});
}
```
#### 2.2.2 分布式事务
**适用场景**:跨多个服务或数据源的业务流程
**实现**
- 两阶段提交2PC
- 补偿事务TCC
- 消息队列 + 最终一致性
**示例**
```typescript
// 补偿事务示例
async processOrder(orderId: string): Promise<void> {
// 1. 尝试处理订单
const order = await this.orderRepository.findById(orderId);
try {
// 2. 扣减库存
await this.inventoryService.deductInventory(
order.productId,
order.quantity
);
// 3. 安排物流
const shippingId = await this.logisticsService.createShipping(order);
order.shippingId = shippingId;
// 4. 标记订单为已处理
order.status = 'PROCESSED';
await this.orderRepository.save(order);
} catch (error) {
// 5. 补偿操作
await this.compensateOrder(order);
throw error;
}
}
async compensateOrder(order: Order): Promise<void> {
// 恢复库存
await this.inventoryService.restoreInventory(
order.productId,
order.quantity
);
// 取消物流
if (order.shippingId) {
await this.logisticsService.cancelShipping(order.shippingId);
}
// 标记订单为失败
order.status = 'FAILED';
await this.orderRepository.save(order);
}
```
### 2.3 最佳实践
- **明确事务边界**:只包含必要的操作
- **保持事务简短**:减少锁持有时间
- **合理设置隔离级别**:根据业务需求选择适当的隔离级别
- **处理事务异常**:确保异常情况下能够正确回滚
---
## 3. 幂等性
### 3.1 定义
幂等性是指同一个请求执行多次,结果应该相同。
### 3.2 实现方法
#### 3.2.1 请求ID
**适用场景**:所有外部 API 调用、支付回调等
**实现**
- 生成唯一的 `requestId`
- 记录已处理的 `requestId`
- 检查重复请求
**示例**
```typescript
async processPayment(paymentData: PaymentDto): Promise<Payment> {
// 检查是否已处理
const existingPayment = await this.paymentRepository.findByRequestId(paymentData.requestId);
if (existingPayment) {
return existingPayment;
}
// 处理支付
const payment = await this.paymentRepository.create({
...paymentData,
status: 'PROCESSING'
});
try {
// 调用支付网关
const result = await this.paymentGateway.process(paymentData);
// 更新支付状态
payment.status = result.status;
payment.transactionId = result.transactionId;
await this.paymentRepository.save(payment);
} catch (error) {
// 更新支付状态为失败
payment.status = 'FAILED';
await this.paymentRepository.save(payment);
throw error;
}
return payment;
}
```
#### 3.2.2 乐观锁
**适用场景**:并发更新操作
**实现**
- 使用版本号或时间戳
- 更新时检查版本号
- 版本号不匹配则重试或失败
**示例**
```typescript
async updateProduct(productId: string, updates: Partial<Product>): Promise<Product> {
const product = await this.productRepository.findById(productId);
const currentVersion = product.version;
// 尝试更新
const updated = await this.productRepository.update(
{
...updates,
version: currentVersion + 1
},
{
where: {
id: productId,
version: currentVersion
}
}
);
if (updated[0] === 0) {
// 版本不匹配,说明被其他线程修改
throw new ConflictException('Product has been updated by another process');
}
return await this.productRepository.findById(productId);
}
```
### 3.3 最佳实践
- **为所有外部请求生成唯一ID**
- **存储请求ID和处理结果**
- **设置合理的过期时间**
- **处理重复请求时返回相同的结果**
---
## 4. 状态机
### 4.1 定义
状态机是一种用于描述对象状态及其转换规则的模型。
### 4.2 实现方法
#### 4.2.1 枚举状态
**适用场景**:简单的状态流转
**实现**
- 使用枚举定义状态
- 使用条件判断处理状态转换
**示例**
```typescript
enum OrderStatus {
PENDING = 'PENDING',
PROCESSING = 'PROCESSING',
SHIPPED = 'SHIPPED',
DELIVERED = 'DELIVERED',
CANCELLED = 'CANCELLED',
FAILED = 'FAILED'
}
class OrderStateMachine {
static canTransition(from: OrderStatus, to: OrderStatus): boolean {
const transitions = {
[OrderStatus.PENDING]: [OrderStatus.PROCESSING, OrderStatus.CANCELLED],
[OrderStatus.PROCESSING]: [OrderStatus.SHIPPED, OrderStatus.FAILED, OrderStatus.CANCELLED],
[OrderStatus.SHIPPED]: [OrderStatus.DELIVERED, OrderStatus.FAILED],
[OrderStatus.DELIVERED]: [],
[OrderStatus.CANCELLED]: [],
[OrderStatus.FAILED]: []
};
return transitions[from].includes(to);
}
}
async updateOrderStatus(orderId: string, newStatus: OrderStatus): Promise<Order> {
const order = await this.orderRepository.findById(orderId);
if (!OrderStateMachine.canTransition(order.status, newStatus)) {
throw new BadRequestException(`Cannot transition from ${order.status} to ${newStatus}`);
}
order.status = newStatus;
return await this.orderRepository.save(order);
}
```
#### 4.2.2 状态机库
**适用场景**:复杂的状态流转
**实现**
- 使用专门的状态机库
- 定义状态、事件和转换
- 处理副作用
**示例**
```typescript
// 使用 xstate 库
import { createMachine, interpret } from 'xstate';
const orderMachine = createMachine({
id: 'order',
initial: 'PENDING',
states: {
PENDING: {
on: {
PROCESS: 'PROCESSING',
CANCEL: 'CANCELLED'
}
},
PROCESSING: {
on: {
SHIP: 'SHIPPED',
FAIL: 'FAILED',
CANCEL: 'CANCELLED'
}
},
SHIPPED: {
on: {
DELIVER: 'DELIVERED',
FAIL: 'FAILED'
}
},
DELIVERED: {
type: 'final'
},
CANCELLED: {
type: 'final'
},
FAILED: {
type: 'final'
}
}
});
const orderService = interpret(orderMachine)
.onTransition(state => console.log('Order state:', state.value));
orderService.start();
orderService.send('PROCESS'); // 从 PENDING 到 PROCESSING
orderService.send('SHIP'); // 从 PROCESSING 到 SHIPPED
orderService.send('DELIVER'); // 从 SHIPPED 到 DELIVERED
```
### 4.3 最佳实践
- **明确定义状态和转换规则**
- **使用状态机管理所有状态流转**
- **记录状态转换历史**
- **处理状态转换的副作用**
- **验证状态转换的合法性**
---
## 5. 数据一致性保障措施
### 5.1 数据库层面
- **使用事务**:确保数据操作的原子性
- **设置约束**:使用唯一约束、外键约束等
- **合理索引**:提高查询性能,减少锁竞争
- **定期备份**:防止数据丢失
### 5.2 应用层面
- **实现幂等性**:处理重复请求
- **使用状态机**:管理状态流转
- **异步处理**:使用消息队列处理非实时操作
- **补偿机制**:处理失败的操作
### 5.3 监控和告警
- **数据一致性检查**:定期检查数据一致性
- **异常监控**:监控数据操作异常
- **告警机制**:及时发现和处理数据不一致问题
---
## 6. 常见问题及解决方案
### 6.1 并发更新冲突
**问题**:多个用户同时更新同一条数据
**解决方案**
- 使用乐观锁
- 使用悲观锁
- 实现队列机制
### 6.2 分布式事务
**问题**:跨多个服务或数据源的事务
**解决方案**
- 使用消息队列 + 最终一致性
- 使用 Saga 模式
- 使用 TCC 模式
### 6.3 数据同步延迟
**问题**:不同系统之间的数据同步存在延迟
**解决方案**
- 使用事件驱动架构
- 实现增量同步
- 设置合理的同步频率
---
## 7. 测试策略
### 7.1 单元测试
- 测试单个组件的逻辑
- 模拟依赖
- 验证边界情况
### 7.2 集成测试
- 测试多个组件的交互
- 测试事务边界
- 测试状态转换
### 7.3 压力测试
- 测试并发场景
- 测试系统稳定性
- 测试数据一致性
---
## 8. 相关文档
- [Service_Design.md](./Service_Design.md)
- [Database_Design.md](./Database_Design.md)
- [API_Specs](./API_Specs/)
---
*本文档基于服务设计文档,最后更新: 2026-03-18*
---
## 9. 商品中心数据流转Product Center Data Flow
> **设计原则**: 确保商品、价格、库存、授权等核心数据的流转一致性
### 9.1 商品数据生命周期
```
数据采集 → 数据清洗 → 商品主数据 → SKU生成 → 定价计算 → 上架任务 → 平台同步 → 商品状态更新 → 销售数据反馈
```
### 9.2 三层商品模型数据流转
```
┌─────────────────────────────────────────────────────────────┐
│ SPU层产品层
│ 数据来源: 数据采集、手动录入、供应链导入 │
│ 数据内容: 名称、品牌、类目、通用属性 │
│ 一致性要求: 全局唯一,多平台共享 │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ SKU层库存单元层
│ 数据来源: SPU拆分、变体生成、规格组合 │
│ 数据内容: 变体属性、成本价、基准价、重量、库存 │
│ 一致性要求: 租户内唯一,库存实时同步 │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ Listing层平台商品层
│ 数据来源: SKU映射、平台刊登、API同步 │
│ 数据内容: 标题、最终价格、库存、平台状态、平台ID │
│ 一致性要求: 平台内唯一,状态实时同步 │
└─────────────────────────────────────────────────────────────┘
```
### 9.3 价格数据流转
```
┌─────────────────────────────────────────────────────────────┐
│ 基准价层Base Price
│ 来源: 成本核算、采购价、人工设定 │
│ 存储: cf_sku.base_price │
│ 一致性: SKU层唯一锚点 │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 策略层Strategy Layer
│ 来源: 价格策略规则、AI定价建议 │
│ 存储: cf_price_strategy │
│ 一致性: 策略优先级、冲突检测 │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 最终价层Final Price
│ 来源: 策略计算、人工覆盖 │
│ 存储: cf_platform_listing.price │
│ 一致性: 平台同步、价格校验 │
└─────────────────────────────────────────────────────────────┘
```
### 9.4 库存数据流转
```
┌─────────────────────────────────────────────────────────────┐
│ 系统库存System Inventory
│ 来源: 采购入库、退货入库、库存调整 │
│ 存储: cf_sku.inventory │
│ 一致性: 实时更新、事务保证 │
└────────────────────────┬────────────────────────────────────┘
┌────────────┼────────────┐
↓ ↓ ↓
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ 平台A库存 │ │ 平台B库存 │ │ 平台C库存 │
│ 同步策略A │ │ 同步策略B │ │ 同步策略C │
└───────────────┘ └───────────────┘ └───────────────┘
```
### 9.5 授权数据流转
```
┌─────────────────────────────────────────────────────────────┐
│ 店铺授权Shop Authorization
│ 来源: OAuth授权、Agent授权、API Key授权 │
│ 存储: cf_shop_authorization │
│ 一致性: Token刷新、过期检测 │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 授权状态Auth Status
│ 状态: ACTIVE → EXPIRING → EXPIRED → REVOKED │
│ 触发: 定时检测、API调用失败、手动撤销 │
└─────────────────────────────────────────────────────────────┘
```
### 9.6 数据一致性保障机制
#### 9.6.1 商品数据一致性
| 场景 | 一致性要求 | 保障机制 |
|------|----------|----------|
| **SPU创建** | 全局唯一 | 唯一约束 (tenant_id, name, brand) |
| **SKU生成** | 租户内唯一 | 唯一约束 (tenant_id, spu_id, attributes_hash) |
| **Listing创建** | 平台内唯一 | 唯一约束 (platform, platform_product_id) |
| **SKU映射** | 一对多关系 | 外键约束 + 级联更新 |
#### 9.6.2 价格数据一致性
| 场景 | 一致性要求 | 保障机制 |
|------|----------|----------|
| **基准价更新** | 原子操作 | 事务 + 乐观锁 |
| **策略计算** | 幂等性 | 请求ID + 结果缓存 |
| **价格同步** | 最终一致 | 消息队列 + 重试机制 |
| **价格校验** | 利润红线 | 业务规则校验 |
#### 9.6.3 库存数据一致性
| 场景 | 一致性要求 | 保障机制 |
|------|----------|----------|
| **库存扣减** | 原子操作 | 事务 + 行锁 |
| **库存同步** | 最终一致 | 事件驱动 + 补偿机制 |
| **超卖防护** | 强一致 | 预扣库存 + 回滚机制 |
| **库存预警** | 实时检测 | 定时任务 + 告警 |
#### 9.6.4 授权数据一致性
| 场景 | 一致性要求 | 保障机制 |
|------|----------|----------|
| **Token刷新** | 原子操作 | 事务 + 分布式锁 |
| **授权过期** | 实时检测 | 定时任务 + 预警 |
| **授权撤销** | 级联更新 | 事务 + 事件通知 |
### 9.7 数据流转监控
```typescript
// 数据流转监控指标
interface DataFlowMetrics {
// 商品数据
spuSyncLatency: number; // SPU同步延迟
skuSyncLatency: number; // SKU同步延迟
listingSyncLatency: number; // Listing同步延迟
// 价格数据
priceSyncLatency: number; // 价格同步延迟
priceConsistencyRate: number; // 价格一致性率
// 库存数据
inventorySyncLatency: number; // 库存同步延迟
inventoryDiscrepancyRate: number; // 库存差异率
// 授权数据
authRefreshLatency: number; // 授权刷新延迟
authExpiringCount: number; // 即将过期授权数
}
```