# 数据一致性文档 (Crawlful Hub) > **定位**:Crawlful Hub 数据一致性设计文档 - 确保系统数据的准确性和可靠性。 > **更新日期**: 2026-03-18 > **最高优先级参考**: [Service_Design.md](./Service_Design.md) --- ## 1. 数据一致性概述 ### 1.1 重要性 数据一致性是系统的核心要求,特别是对于涉及资金、库存、订单等关键业务数据的系统。数据不一致可能导致: - 财务损失 - 库存管理混乱 - 订单处理错误 - 商户信任度下降 - 系统不可用 ### 1.2 核心原则 - **原子性**:操作要么全部成功,要么全部失败 - **一致性**:操作前后数据状态保持一致 - **隔离性**:并发操作互不干扰 - **持久性**:数据一旦提交,就永久保存 --- ## 2. 事务边界 ### 2.1 定义 事务边界是指一组操作的范围,这些操作必须作为一个整体执行,要么全部成功,要么全部失败。 ### 2.2 实现方法 #### 2.2.1 数据库事务 **适用场景**:涉及数据库操作的业务流程 **实现**: - 使用 `@Transactional` 注解(Java/Spring) - 使用 `transaction` 方法(Node.js/Sequelize) - 使用数据库原生事务 **示例**: ```typescript // Node.js/Sequelize 示例 async createOrder(orderData: OrderCreateDto): Promise { return await this.sequelize.transaction(async (t) => { // 1. 创建订单 const order = await Order.create(orderData, { transaction: t }); // 2. 扣减库存 await Inventory.decrement('quantity', { by: orderData.quantity, where: { productId: orderData.productId }, transaction: t }); // 3. 记录交易 await Transaction.create({ orderId: order.id, amount: order.totalAmount, type: 'ORDER_CREATED' }, { transaction: t }); return order; }); } ``` #### 2.2.2 分布式事务 **适用场景**:跨多个服务或数据源的业务流程 **实现**: - 两阶段提交(2PC) - 补偿事务(TCC) - 消息队列 + 最终一致性 **示例**: ```typescript // 补偿事务示例 async processOrder(orderId: string): Promise { // 1. 尝试处理订单 const order = await this.orderRepository.findById(orderId); try { // 2. 扣减库存 await this.inventoryService.deductInventory( order.productId, order.quantity ); // 3. 安排物流 const shippingId = await this.logisticsService.createShipping(order); order.shippingId = shippingId; // 4. 标记订单为已处理 order.status = 'PROCESSED'; await this.orderRepository.save(order); } catch (error) { // 5. 补偿操作 await this.compensateOrder(order); throw error; } } async compensateOrder(order: Order): Promise { // 恢复库存 await this.inventoryService.restoreInventory( order.productId, order.quantity ); // 取消物流 if (order.shippingId) { await this.logisticsService.cancelShipping(order.shippingId); } // 标记订单为失败 order.status = 'FAILED'; await this.orderRepository.save(order); } ``` ### 2.3 最佳实践 - **明确事务边界**:只包含必要的操作 - **保持事务简短**:减少锁持有时间 - **合理设置隔离级别**:根据业务需求选择适当的隔离级别 - **处理事务异常**:确保异常情况下能够正确回滚 --- ## 3. 幂等性 ### 3.1 定义 幂等性是指同一个请求执行多次,结果应该相同。 ### 3.2 实现方法 #### 3.2.1 请求ID **适用场景**:所有外部 API 调用、支付回调等 **实现**: - 生成唯一的 `requestId` - 记录已处理的 `requestId` - 检查重复请求 **示例**: ```typescript async processPayment(paymentData: PaymentDto): Promise { // 检查是否已处理 const existingPayment = await this.paymentRepository.findByRequestId(paymentData.requestId); if (existingPayment) { return existingPayment; } // 处理支付 const payment = await this.paymentRepository.create({ ...paymentData, status: 'PROCESSING' }); try { // 调用支付网关 const result = await this.paymentGateway.process(paymentData); // 更新支付状态 payment.status = result.status; payment.transactionId = result.transactionId; await this.paymentRepository.save(payment); } catch (error) { // 更新支付状态为失败 payment.status = 'FAILED'; await this.paymentRepository.save(payment); throw error; } return payment; } ``` #### 3.2.2 乐观锁 **适用场景**:并发更新操作 **实现**: - 使用版本号或时间戳 - 更新时检查版本号 - 版本号不匹配则重试或失败 **示例**: ```typescript async updateProduct(productId: string, updates: Partial): Promise { const product = await this.productRepository.findById(productId); const currentVersion = product.version; // 尝试更新 const updated = await this.productRepository.update( { ...updates, version: currentVersion + 1 }, { where: { id: productId, version: currentVersion } } ); if (updated[0] === 0) { // 版本不匹配,说明被其他线程修改 throw new ConflictException('Product has been updated by another process'); } return await this.productRepository.findById(productId); } ``` ### 3.3 最佳实践 - **为所有外部请求生成唯一ID** - **存储请求ID和处理结果** - **设置合理的过期时间** - **处理重复请求时返回相同的结果** --- ## 4. 状态机 ### 4.1 定义 状态机是一种用于描述对象状态及其转换规则的模型。 ### 4.2 实现方法 #### 4.2.1 枚举状态 **适用场景**:简单的状态流转 **实现**: - 使用枚举定义状态 - 使用条件判断处理状态转换 **示例**: ```typescript enum OrderStatus { PENDING = 'PENDING', PROCESSING = 'PROCESSING', SHIPPED = 'SHIPPED', DELIVERED = 'DELIVERED', CANCELLED = 'CANCELLED', FAILED = 'FAILED' } class OrderStateMachine { static canTransition(from: OrderStatus, to: OrderStatus): boolean { const transitions = { [OrderStatus.PENDING]: [OrderStatus.PROCESSING, OrderStatus.CANCELLED], [OrderStatus.PROCESSING]: [OrderStatus.SHIPPED, OrderStatus.FAILED, OrderStatus.CANCELLED], [OrderStatus.SHIPPED]: [OrderStatus.DELIVERED, OrderStatus.FAILED], [OrderStatus.DELIVERED]: [], [OrderStatus.CANCELLED]: [], [OrderStatus.FAILED]: [] }; return transitions[from].includes(to); } } async updateOrderStatus(orderId: string, newStatus: OrderStatus): Promise { const order = await this.orderRepository.findById(orderId); if (!OrderStateMachine.canTransition(order.status, newStatus)) { throw new BadRequestException(`Cannot transition from ${order.status} to ${newStatus}`); } order.status = newStatus; return await this.orderRepository.save(order); } ``` #### 4.2.2 状态机库 **适用场景**:复杂的状态流转 **实现**: - 使用专门的状态机库 - 定义状态、事件和转换 - 处理副作用 **示例**: ```typescript // 使用 xstate 库 import { createMachine, interpret } from 'xstate'; const orderMachine = createMachine({ id: 'order', initial: 'PENDING', states: { PENDING: { on: { PROCESS: 'PROCESSING', CANCEL: 'CANCELLED' } }, PROCESSING: { on: { SHIP: 'SHIPPED', FAIL: 'FAILED', CANCEL: 'CANCELLED' } }, SHIPPED: { on: { DELIVER: 'DELIVERED', FAIL: 'FAILED' } }, DELIVERED: { type: 'final' }, CANCELLED: { type: 'final' }, FAILED: { type: 'final' } } }); const orderService = interpret(orderMachine) .onTransition(state => console.log('Order state:', state.value)); orderService.start(); orderService.send('PROCESS'); // 从 PENDING 到 PROCESSING orderService.send('SHIP'); // 从 PROCESSING 到 SHIPPED orderService.send('DELIVER'); // 从 SHIPPED 到 DELIVERED ``` ### 4.3 最佳实践 - **明确定义状态和转换规则** - **使用状态机管理所有状态流转** - **记录状态转换历史** - **处理状态转换的副作用** - **验证状态转换的合法性** --- ## 5. 数据一致性保障措施 ### 5.1 数据库层面 - **使用事务**:确保数据操作的原子性 - **设置约束**:使用唯一约束、外键约束等 - **合理索引**:提高查询性能,减少锁竞争 - **定期备份**:防止数据丢失 ### 5.2 应用层面 - **实现幂等性**:处理重复请求 - **使用状态机**:管理状态流转 - **异步处理**:使用消息队列处理非实时操作 - **补偿机制**:处理失败的操作 ### 5.3 监控和告警 - **数据一致性检查**:定期检查数据一致性 - **异常监控**:监控数据操作异常 - **告警机制**:及时发现和处理数据不一致问题 --- ## 6. 常见问题及解决方案 ### 6.1 并发更新冲突 **问题**:多个用户同时更新同一条数据 **解决方案**: - 使用乐观锁 - 使用悲观锁 - 实现队列机制 ### 6.2 分布式事务 **问题**:跨多个服务或数据源的事务 **解决方案**: - 使用消息队列 + 最终一致性 - 使用 Saga 模式 - 使用 TCC 模式 ### 6.3 数据同步延迟 **问题**:不同系统之间的数据同步存在延迟 **解决方案**: - 使用事件驱动架构 - 实现增量同步 - 设置合理的同步频率 --- ## 7. 测试策略 ### 7.1 单元测试 - 测试单个组件的逻辑 - 模拟依赖 - 验证边界情况 ### 7.2 集成测试 - 测试多个组件的交互 - 测试事务边界 - 测试状态转换 ### 7.3 压力测试 - 测试并发场景 - 测试系统稳定性 - 测试数据一致性 --- ## 8. 相关文档 - [Service_Design.md](./Service_Design.md) - [Database_Design.md](./Database_Design.md) - [API_Specs](./API_Specs/) --- *本文档基于服务设计文档,最后更新: 2026-03-18* --- ## 9. 商品中心数据流转(Product Center Data Flow) > **设计原则**: 确保商品、价格、库存、授权等核心数据的流转一致性 ### 9.1 商品数据生命周期 ``` 数据采集 → 数据清洗 → 商品主数据 → SKU生成 → 定价计算 → 上架任务 → 平台同步 → 商品状态更新 → 销售数据反馈 ``` ### 9.2 三层商品模型数据流转 ``` ┌─────────────────────────────────────────────────────────────┐ │ SPU层(产品层) │ │ 数据来源: 数据采集、手动录入、供应链导入 │ │ 数据内容: 名称、品牌、类目、通用属性 │ │ 一致性要求: 全局唯一,多平台共享 │ └────────────────────────┬────────────────────────────────────┘ │ ↓ ┌─────────────────────────────────────────────────────────────┐ │ SKU层(库存单元层) │ │ 数据来源: SPU拆分、变体生成、规格组合 │ │ 数据内容: 变体属性、成本价、基准价、重量、库存 │ │ 一致性要求: 租户内唯一,库存实时同步 │ └────────────────────────┬────────────────────────────────────┘ │ ↓ ┌─────────────────────────────────────────────────────────────┐ │ Listing层(平台商品层) │ │ 数据来源: SKU映射、平台刊登、API同步 │ │ 数据内容: 标题、最终价格、库存、平台状态、平台ID │ │ 一致性要求: 平台内唯一,状态实时同步 │ └─────────────────────────────────────────────────────────────┘ ``` ### 9.3 价格数据流转 ``` ┌─────────────────────────────────────────────────────────────┐ │ 基准价层(Base Price) │ │ 来源: 成本核算、采购价、人工设定 │ │ 存储: cf_sku.base_price │ │ 一致性: SKU层唯一锚点 │ └────────────────────────┬────────────────────────────────────┘ │ ↓ ┌─────────────────────────────────────────────────────────────┐ │ 策略层(Strategy Layer) │ │ 来源: 价格策略规则、AI定价建议 │ │ 存储: cf_price_strategy │ │ 一致性: 策略优先级、冲突检测 │ └────────────────────────┬────────────────────────────────────┘ │ ↓ ┌─────────────────────────────────────────────────────────────┐ │ 最终价层(Final Price) │ │ 来源: 策略计算、人工覆盖 │ │ 存储: cf_platform_listing.price │ │ 一致性: 平台同步、价格校验 │ └─────────────────────────────────────────────────────────────┘ ``` ### 9.4 库存数据流转 ``` ┌─────────────────────────────────────────────────────────────┐ │ 系统库存(System Inventory) │ │ 来源: 采购入库、退货入库、库存调整 │ │ 存储: cf_sku.inventory │ │ 一致性: 实时更新、事务保证 │ └────────────────────────┬────────────────────────────────────┘ │ ┌────────────┼────────────┐ ↓ ↓ ↓ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │ 平台A库存 │ │ 平台B库存 │ │ 平台C库存 │ │ 同步策略A │ │ 同步策略B │ │ 同步策略C │ └───────────────┘ └───────────────┘ └───────────────┘ ``` ### 9.5 授权数据流转 ``` ┌─────────────────────────────────────────────────────────────┐ │ 店铺授权(Shop Authorization) │ │ 来源: OAuth授权、Agent授权、API Key授权 │ │ 存储: cf_shop_authorization │ │ 一致性: Token刷新、过期检测 │ └────────────────────────┬────────────────────────────────────┘ │ ↓ ┌─────────────────────────────────────────────────────────────┐ │ 授权状态(Auth Status) │ │ 状态: ACTIVE → EXPIRING → EXPIRED → REVOKED │ │ 触发: 定时检测、API调用失败、手动撤销 │ └─────────────────────────────────────────────────────────────┘ ``` ### 9.6 数据一致性保障机制 #### 9.6.1 商品数据一致性 | 场景 | 一致性要求 | 保障机制 | |------|----------|----------| | **SPU创建** | 全局唯一 | 唯一约束 (tenant_id, name, brand) | | **SKU生成** | 租户内唯一 | 唯一约束 (tenant_id, spu_id, attributes_hash) | | **Listing创建** | 平台内唯一 | 唯一约束 (platform, platform_product_id) | | **SKU映射** | 一对多关系 | 外键约束 + 级联更新 | #### 9.6.2 价格数据一致性 | 场景 | 一致性要求 | 保障机制 | |------|----------|----------| | **基准价更新** | 原子操作 | 事务 + 乐观锁 | | **策略计算** | 幂等性 | 请求ID + 结果缓存 | | **价格同步** | 最终一致 | 消息队列 + 重试机制 | | **价格校验** | 利润红线 | 业务规则校验 | #### 9.6.3 库存数据一致性 | 场景 | 一致性要求 | 保障机制 | |------|----------|----------| | **库存扣减** | 原子操作 | 事务 + 行锁 | | **库存同步** | 最终一致 | 事件驱动 + 补偿机制 | | **超卖防护** | 强一致 | 预扣库存 + 回滚机制 | | **库存预警** | 实时检测 | 定时任务 + 告警 | #### 9.6.4 授权数据一致性 | 场景 | 一致性要求 | 保障机制 | |------|----------|----------| | **Token刷新** | 原子操作 | 事务 + 分布式锁 | | **授权过期** | 实时检测 | 定时任务 + 预警 | | **授权撤销** | 级联更新 | 事务 + 事件通知 | ### 9.7 数据流转监控 ```typescript // 数据流转监控指标 interface DataFlowMetrics { // 商品数据 spuSyncLatency: number; // SPU同步延迟 skuSyncLatency: number; // SKU同步延迟 listingSyncLatency: number; // Listing同步延迟 // 价格数据 priceSyncLatency: number; // 价格同步延迟 priceConsistencyRate: number; // 价格一致性率 // 库存数据 inventorySyncLatency: number; // 库存同步延迟 inventoryDiscrepancyRate: number; // 库存差异率 // 授权数据 authRefreshLatency: number; // 授权刷新延迟 authExpiringCount: number; // 即将过期授权数 } ```