- 移除未使用的TabPane组件 - 修复类型定义和导入方式 - 优化mock数据源的环境变量判断逻辑 - 更新文档结构并归档旧文件 - 添加新的UI组件和Memo组件 - 调整API路径和响应处理
20 KiB
20 KiB
数据一致性文档 (Crawlful Hub)
定位:Crawlful Hub 数据一致性设计文档 - 确保系统数据的准确性和可靠性。 更新日期: 2026-03-18 最高优先级参考: Service_Design.md
1. 数据一致性概述
1.1 重要性
数据一致性是系统的核心要求,特别是对于涉及资金、库存、订单等关键业务数据的系统。数据不一致可能导致:
- 财务损失
- 库存管理混乱
- 订单处理错误
- 商户信任度下降
- 系统不可用
1.2 核心原则
- 原子性:操作要么全部成功,要么全部失败
- 一致性:操作前后数据状态保持一致
- 隔离性:并发操作互不干扰
- 持久性:数据一旦提交,就永久保存
2. 事务边界
2.1 定义
事务边界是指一组操作的范围,这些操作必须作为一个整体执行,要么全部成功,要么全部失败。
2.2 实现方法
2.2.1 数据库事务
适用场景:涉及数据库操作的业务流程
实现:
- 使用
@Transactional注解(Java/Spring) - 使用
transaction方法(Node.js/Sequelize) - 使用数据库原生事务
示例:
// Node.js/Sequelize 示例
async createOrder(orderData: OrderCreateDto): Promise<Order> {
return await this.sequelize.transaction(async (t) => {
// 1. 创建订单
const order = await Order.create(orderData, { transaction: t });
// 2. 扣减库存
await Inventory.decrement('quantity', {
by: orderData.quantity,
where: { productId: orderData.productId },
transaction: t
});
// 3. 记录交易
await Transaction.create({
orderId: order.id,
amount: order.totalAmount,
type: 'ORDER_CREATED'
}, { transaction: t });
return order;
});
}
2.2.2 分布式事务
适用场景:跨多个服务或数据源的业务流程
实现:
- 两阶段提交(2PC)
- 补偿事务(TCC)
- 消息队列 + 最终一致性
示例:
// 补偿事务示例
async processOrder(orderId: string): Promise<void> {
// 1. 尝试处理订单
const order = await this.orderRepository.findById(orderId);
try {
// 2. 扣减库存
await this.inventoryService.deductInventory(
order.productId,
order.quantity
);
// 3. 安排物流
const shippingId = await this.logisticsService.createShipping(order);
order.shippingId = shippingId;
// 4. 标记订单为已处理
order.status = 'PROCESSED';
await this.orderRepository.save(order);
} catch (error) {
// 5. 补偿操作
await this.compensateOrder(order);
throw error;
}
}
async compensateOrder(order: Order): Promise<void> {
// 恢复库存
await this.inventoryService.restoreInventory(
order.productId,
order.quantity
);
// 取消物流
if (order.shippingId) {
await this.logisticsService.cancelShipping(order.shippingId);
}
// 标记订单为失败
order.status = 'FAILED';
await this.orderRepository.save(order);
}
2.3 最佳实践
- 明确事务边界:只包含必要的操作
- 保持事务简短:减少锁持有时间
- 合理设置隔离级别:根据业务需求选择适当的隔离级别
- 处理事务异常:确保异常情况下能够正确回滚
3. 幂等性
3.1 定义
幂等性是指同一个请求执行多次,结果应该相同。
3.2 实现方法
3.2.1 请求ID
适用场景:所有外部 API 调用、支付回调等
实现:
- 生成唯一的
requestId - 记录已处理的
requestId - 检查重复请求
示例:
async processPayment(paymentData: PaymentDto): Promise<Payment> {
// 检查是否已处理
const existingPayment = await this.paymentRepository.findByRequestId(paymentData.requestId);
if (existingPayment) {
return existingPayment;
}
// 处理支付
const payment = await this.paymentRepository.create({
...paymentData,
status: 'PROCESSING'
});
try {
// 调用支付网关
const result = await this.paymentGateway.process(paymentData);
// 更新支付状态
payment.status = result.status;
payment.transactionId = result.transactionId;
await this.paymentRepository.save(payment);
} catch (error) {
// 更新支付状态为失败
payment.status = 'FAILED';
await this.paymentRepository.save(payment);
throw error;
}
return payment;
}
3.2.2 乐观锁
适用场景:并发更新操作
实现:
- 使用版本号或时间戳
- 更新时检查版本号
- 版本号不匹配则重试或失败
示例:
async updateProduct(productId: string, updates: Partial<Product>): Promise<Product> {
const product = await this.productRepository.findById(productId);
const currentVersion = product.version;
// 尝试更新
const updated = await this.productRepository.update(
{
...updates,
version: currentVersion + 1
},
{
where: {
id: productId,
version: currentVersion
}
}
);
if (updated[0] === 0) {
// 版本不匹配,说明被其他线程修改
throw new ConflictException('Product has been updated by another process');
}
return await this.productRepository.findById(productId);
}
3.3 最佳实践
- 为所有外部请求生成唯一ID
- 存储请求ID和处理结果
- 设置合理的过期时间
- 处理重复请求时返回相同的结果
4. 状态机
4.1 定义
状态机是一种用于描述对象状态及其转换规则的模型。
4.2 实现方法
4.2.1 枚举状态
适用场景:简单的状态流转
实现:
- 使用枚举定义状态
- 使用条件判断处理状态转换
示例:
enum OrderStatus {
PENDING = 'PENDING',
PROCESSING = 'PROCESSING',
SHIPPED = 'SHIPPED',
DELIVERED = 'DELIVERED',
CANCELLED = 'CANCELLED',
FAILED = 'FAILED'
}
class OrderStateMachine {
static canTransition(from: OrderStatus, to: OrderStatus): boolean {
const transitions = {
[OrderStatus.PENDING]: [OrderStatus.PROCESSING, OrderStatus.CANCELLED],
[OrderStatus.PROCESSING]: [OrderStatus.SHIPPED, OrderStatus.FAILED, OrderStatus.CANCELLED],
[OrderStatus.SHIPPED]: [OrderStatus.DELIVERED, OrderStatus.FAILED],
[OrderStatus.DELIVERED]: [],
[OrderStatus.CANCELLED]: [],
[OrderStatus.FAILED]: []
};
return transitions[from].includes(to);
}
}
async updateOrderStatus(orderId: string, newStatus: OrderStatus): Promise<Order> {
const order = await this.orderRepository.findById(orderId);
if (!OrderStateMachine.canTransition(order.status, newStatus)) {
throw new BadRequestException(`Cannot transition from ${order.status} to ${newStatus}`);
}
order.status = newStatus;
return await this.orderRepository.save(order);
}
4.2.2 状态机库
适用场景:复杂的状态流转
实现:
- 使用专门的状态机库
- 定义状态、事件和转换
- 处理副作用
示例:
// 使用 xstate 库
import { createMachine, interpret } from 'xstate';
const orderMachine = createMachine({
id: 'order',
initial: 'PENDING',
states: {
PENDING: {
on: {
PROCESS: 'PROCESSING',
CANCEL: 'CANCELLED'
}
},
PROCESSING: {
on: {
SHIP: 'SHIPPED',
FAIL: 'FAILED',
CANCEL: 'CANCELLED'
}
},
SHIPPED: {
on: {
DELIVER: 'DELIVERED',
FAIL: 'FAILED'
}
},
DELIVERED: {
type: 'final'
},
CANCELLED: {
type: 'final'
},
FAILED: {
type: 'final'
}
}
});
const orderService = interpret(orderMachine)
.onTransition(state => console.log('Order state:', state.value));
orderService.start();
orderService.send('PROCESS'); // 从 PENDING 到 PROCESSING
orderService.send('SHIP'); // 从 PROCESSING 到 SHIPPED
orderService.send('DELIVER'); // 从 SHIPPED 到 DELIVERED
4.3 最佳实践
- 明确定义状态和转换规则
- 使用状态机管理所有状态流转
- 记录状态转换历史
- 处理状态转换的副作用
- 验证状态转换的合法性
5. 数据一致性保障措施
5.1 数据库层面
- 使用事务:确保数据操作的原子性
- 设置约束:使用唯一约束、外键约束等
- 合理索引:提高查询性能,减少锁竞争
- 定期备份:防止数据丢失
5.2 应用层面
- 实现幂等性:处理重复请求
- 使用状态机:管理状态流转
- 异步处理:使用消息队列处理非实时操作
- 补偿机制:处理失败的操作
5.3 监控和告警
- 数据一致性检查:定期检查数据一致性
- 异常监控:监控数据操作异常
- 告警机制:及时发现和处理数据不一致问题
6. 常见问题及解决方案
6.1 并发更新冲突
问题:多个用户同时更新同一条数据
解决方案:
- 使用乐观锁
- 使用悲观锁
- 实现队列机制
6.2 分布式事务
问题:跨多个服务或数据源的事务
解决方案:
- 使用消息队列 + 最终一致性
- 使用 Saga 模式
- 使用 TCC 模式
6.3 数据同步延迟
问题:不同系统之间的数据同步存在延迟
解决方案:
- 使用事件驱动架构
- 实现增量同步
- 设置合理的同步频率
7. 测试策略
7.1 单元测试
- 测试单个组件的逻辑
- 模拟依赖
- 验证边界情况
7.2 集成测试
- 测试多个组件的交互
- 测试事务边界
- 测试状态转换
7.3 压力测试
- 测试并发场景
- 测试系统稳定性
- 测试数据一致性
8. 相关文档
本文档基于服务设计文档,最后更新: 2026-03-18
9. 商品中心数据流转(Product Center Data Flow)
设计原则: 确保商品、价格、库存、授权等核心数据的流转一致性
9.1 商品数据生命周期
数据采集 → 数据清洗 → 商品主数据 → SKU生成 → 定价计算 → 上架任务 → 平台同步 → 商品状态更新 → 销售数据反馈
9.2 三层商品模型数据流转
┌─────────────────────────────────────────────────────────────┐
│ SPU层(产品层) │
│ 数据来源: 数据采集、手动录入、供应链导入 │
│ 数据内容: 名称、品牌、类目、通用属性 │
│ 一致性要求: 全局唯一,多平台共享 │
└────────────────────────┬────────────────────────────────────┘
│
↓
┌─────────────────────────────────────────────────────────────┐
│ SKU层(库存单元层) │
│ 数据来源: SPU拆分、变体生成、规格组合 │
│ 数据内容: 变体属性、成本价、基准价、重量、库存 │
│ 一致性要求: 租户内唯一,库存实时同步 │
└────────────────────────┬────────────────────────────────────┘
│
↓
┌─────────────────────────────────────────────────────────────┐
│ Listing层(平台商品层) │
│ 数据来源: SKU映射、平台刊登、API同步 │
│ 数据内容: 标题、最终价格、库存、平台状态、平台ID │
│ 一致性要求: 平台内唯一,状态实时同步 │
└─────────────────────────────────────────────────────────────┘
9.3 价格数据流转
┌─────────────────────────────────────────────────────────────┐
│ 基准价层(Base Price) │
│ 来源: 成本核算、采购价、人工设定 │
│ 存储: cf_sku.base_price │
│ 一致性: SKU层唯一锚点 │
└────────────────────────┬────────────────────────────────────┘
│
↓
┌─────────────────────────────────────────────────────────────┐
│ 策略层(Strategy Layer) │
│ 来源: 价格策略规则、AI定价建议 │
│ 存储: cf_price_strategy │
│ 一致性: 策略优先级、冲突检测 │
└────────────────────────┬────────────────────────────────────┘
│
↓
┌─────────────────────────────────────────────────────────────┐
│ 最终价层(Final Price) │
│ 来源: 策略计算、人工覆盖 │
│ 存储: cf_platform_listing.price │
│ 一致性: 平台同步、价格校验 │
└─────────────────────────────────────────────────────────────┘
9.4 库存数据流转
┌─────────────────────────────────────────────────────────────┐
│ 系统库存(System Inventory) │
│ 来源: 采购入库、退货入库、库存调整 │
│ 存储: cf_sku.inventory │
│ 一致性: 实时更新、事务保证 │
└────────────────────────┬────────────────────────────────────┘
│
┌────────────┼────────────┐
↓ ↓ ↓
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ 平台A库存 │ │ 平台B库存 │ │ 平台C库存 │
│ 同步策略A │ │ 同步策略B │ │ 同步策略C │
└───────────────┘ └───────────────┘ └───────────────┘
9.5 授权数据流转
┌─────────────────────────────────────────────────────────────┐
│ 店铺授权(Shop Authorization) │
│ 来源: OAuth授权、Agent授权、API Key授权 │
│ 存储: cf_shop_authorization │
│ 一致性: Token刷新、过期检测 │
└────────────────────────┬────────────────────────────────────┘
│
↓
┌─────────────────────────────────────────────────────────────┐
│ 授权状态(Auth Status) │
│ 状态: ACTIVE → EXPIRING → EXPIRED → REVOKED │
│ 触发: 定时检测、API调用失败、手动撤销 │
└─────────────────────────────────────────────────────────────┘
9.6 数据一致性保障机制
9.6.1 商品数据一致性
| 场景 | 一致性要求 | 保障机制 |
|---|---|---|
| SPU创建 | 全局唯一 | 唯一约束 (tenant_id, name, brand) |
| SKU生成 | 租户内唯一 | 唯一约束 (tenant_id, spu_id, attributes_hash) |
| Listing创建 | 平台内唯一 | 唯一约束 (platform, platform_product_id) |
| SKU映射 | 一对多关系 | 外键约束 + 级联更新 |
9.6.2 价格数据一致性
| 场景 | 一致性要求 | 保障机制 |
|---|---|---|
| 基准价更新 | 原子操作 | 事务 + 乐观锁 |
| 策略计算 | 幂等性 | 请求ID + 结果缓存 |
| 价格同步 | 最终一致 | 消息队列 + 重试机制 |
| 价格校验 | 利润红线 | 业务规则校验 |
9.6.3 库存数据一致性
| 场景 | 一致性要求 | 保障机制 |
|---|---|---|
| 库存扣减 | 原子操作 | 事务 + 行锁 |
| 库存同步 | 最终一致 | 事件驱动 + 补偿机制 |
| 超卖防护 | 强一致 | 预扣库存 + 回滚机制 |
| 库存预警 | 实时检测 | 定时任务 + 告警 |
9.6.4 授权数据一致性
| 场景 | 一致性要求 | 保障机制 |
|---|---|---|
| Token刷新 | 原子操作 | 事务 + 分布式锁 |
| 授权过期 | 实时检测 | 定时任务 + 预警 |
| 授权撤销 | 级联更新 | 事务 + 事件通知 |
9.7 数据流转监控
// 数据流转监控指标
interface DataFlowMetrics {
// 商品数据
spuSyncLatency: number; // SPU同步延迟
skuSyncLatency: number; // SKU同步延迟
listingSyncLatency: number; // Listing同步延迟
// 价格数据
priceSyncLatency: number; // 价格同步延迟
priceConsistencyRate: number; // 价格一致性率
// 库存数据
inventorySyncLatency: number; // 库存同步延迟
inventoryDiscrepancyRate: number; // 库存差异率
// 授权数据
authRefreshLatency: number; // 授权刷新延迟
authExpiringCount: number; // 即将过期授权数
}