Files
makemd/docs/ARCHIVE/02_Backend/05_Data_Consistency.md
wurenzhi 2b86715c09 refactor: 优化代码结构并修复类型问题
- 移除未使用的TabPane组件
- 修复类型定义和导入方式
- 优化mock数据源的环境变量判断逻辑
- 更新文档结构并归档旧文件
- 添加新的UI组件和Memo组件
- 调整API路径和响应处理
2026-03-23 12:41:35 +08:00

20 KiB
Raw Permalink Blame History

数据一致性文档 (Crawlful Hub)

定位Crawlful Hub 数据一致性设计文档 - 确保系统数据的准确性和可靠性。 更新日期: 2026-03-18 最高优先级参考: Service_Design.md


1. 数据一致性概述

1.1 重要性

数据一致性是系统的核心要求,特别是对于涉及资金、库存、订单等关键业务数据的系统。数据不一致可能导致:

  • 财务损失
  • 库存管理混乱
  • 订单处理错误
  • 商户信任度下降
  • 系统不可用

1.2 核心原则

  • 原子性:操作要么全部成功,要么全部失败
  • 一致性:操作前后数据状态保持一致
  • 隔离性:并发操作互不干扰
  • 持久性:数据一旦提交,就永久保存

2. 事务边界

2.1 定义

事务边界是指一组操作的范围,这些操作必须作为一个整体执行,要么全部成功,要么全部失败。

2.2 实现方法

2.2.1 数据库事务

适用场景:涉及数据库操作的业务流程

实现

  • 使用 @Transactional 注解Java/Spring
  • 使用 transaction 方法Node.js/Sequelize
  • 使用数据库原生事务

示例

// Node.js/Sequelize 示例
async createOrder(orderData: OrderCreateDto): Promise<Order> {
  return await this.sequelize.transaction(async (t) => {
    // 1. 创建订单
    const order = await Order.create(orderData, { transaction: t });
    
    // 2. 扣减库存
    await Inventory.decrement('quantity', {
      by: orderData.quantity,
      where: { productId: orderData.productId },
      transaction: t
    });
    
    // 3. 记录交易
    await Transaction.create({
      orderId: order.id,
      amount: order.totalAmount,
      type: 'ORDER_CREATED'
    }, { transaction: t });
    
    return order;
  });
}

2.2.2 分布式事务

适用场景:跨多个服务或数据源的业务流程

实现

  • 两阶段提交2PC
  • 补偿事务TCC
  • 消息队列 + 最终一致性

示例

// 补偿事务示例
async processOrder(orderId: string): Promise<void> {
  // 1. 尝试处理订单
  const order = await this.orderRepository.findById(orderId);
  
  try {
    // 2. 扣减库存
    await this.inventoryService.deductInventory(
      order.productId, 
      order.quantity
    );
    
    // 3. 安排物流
    const shippingId = await this.logisticsService.createShipping(order);
    order.shippingId = shippingId;
    
    // 4. 标记订单为已处理
    order.status = 'PROCESSED';
    await this.orderRepository.save(order);
  } catch (error) {
    // 5. 补偿操作
    await this.compensateOrder(order);
    throw error;
  }
}

async compensateOrder(order: Order): Promise<void> {
  // 恢复库存
  await this.inventoryService.restoreInventory(
    order.productId, 
    order.quantity
  );
  
  // 取消物流
  if (order.shippingId) {
    await this.logisticsService.cancelShipping(order.shippingId);
  }
  
  // 标记订单为失败
  order.status = 'FAILED';
  await this.orderRepository.save(order);
}

2.3 最佳实践

  • 明确事务边界:只包含必要的操作
  • 保持事务简短:减少锁持有时间
  • 合理设置隔离级别:根据业务需求选择适当的隔离级别
  • 处理事务异常:确保异常情况下能够正确回滚

3. 幂等性

3.1 定义

幂等性是指同一个请求执行多次,结果应该相同。

3.2 实现方法

3.2.1 请求ID

适用场景:所有外部 API 调用、支付回调等

实现

  • 生成唯一的 requestId
  • 记录已处理的 requestId
  • 检查重复请求

示例

async processPayment(paymentData: PaymentDto): Promise<Payment> {
  // 检查是否已处理
  const existingPayment = await this.paymentRepository.findByRequestId(paymentData.requestId);
  if (existingPayment) {
    return existingPayment;
  }
  
  // 处理支付
  const payment = await this.paymentRepository.create({
    ...paymentData,
    status: 'PROCESSING'
  });
  
  try {
    // 调用支付网关
    const result = await this.paymentGateway.process(paymentData);
    
    // 更新支付状态
    payment.status = result.status;
    payment.transactionId = result.transactionId;
    await this.paymentRepository.save(payment);
  } catch (error) {
    // 更新支付状态为失败
    payment.status = 'FAILED';
    await this.paymentRepository.save(payment);
    throw error;
  }
  
  return payment;
}

3.2.2 乐观锁

适用场景:并发更新操作

实现

  • 使用版本号或时间戳
  • 更新时检查版本号
  • 版本号不匹配则重试或失败

示例

async updateProduct(productId: string, updates: Partial<Product>): Promise<Product> {
  const product = await this.productRepository.findById(productId);
  const currentVersion = product.version;
  
  // 尝试更新
  const updated = await this.productRepository.update(
    {
      ...updates,
      version: currentVersion + 1
    },
    {
      where: {
        id: productId,
        version: currentVersion
      }
    }
  );
  
  if (updated[0] === 0) {
    // 版本不匹配,说明被其他线程修改
    throw new ConflictException('Product has been updated by another process');
  }
  
  return await this.productRepository.findById(productId);
}

3.3 最佳实践

  • 为所有外部请求生成唯一ID
  • 存储请求ID和处理结果
  • 设置合理的过期时间
  • 处理重复请求时返回相同的结果

4. 状态机

4.1 定义

状态机是一种用于描述对象状态及其转换规则的模型。

4.2 实现方法

4.2.1 枚举状态

适用场景:简单的状态流转

实现

  • 使用枚举定义状态
  • 使用条件判断处理状态转换

示例

enum OrderStatus {
  PENDING = 'PENDING',
  PROCESSING = 'PROCESSING',
  SHIPPED = 'SHIPPED',
  DELIVERED = 'DELIVERED',
  CANCELLED = 'CANCELLED',
  FAILED = 'FAILED'
}

class OrderStateMachine {
  static canTransition(from: OrderStatus, to: OrderStatus): boolean {
    const transitions = {
      [OrderStatus.PENDING]: [OrderStatus.PROCESSING, OrderStatus.CANCELLED],
      [OrderStatus.PROCESSING]: [OrderStatus.SHIPPED, OrderStatus.FAILED, OrderStatus.CANCELLED],
      [OrderStatus.SHIPPED]: [OrderStatus.DELIVERED, OrderStatus.FAILED],
      [OrderStatus.DELIVERED]: [],
      [OrderStatus.CANCELLED]: [],
      [OrderStatus.FAILED]: []
    };
    
    return transitions[from].includes(to);
  }
}

async updateOrderStatus(orderId: string, newStatus: OrderStatus): Promise<Order> {
  const order = await this.orderRepository.findById(orderId);
  
  if (!OrderStateMachine.canTransition(order.status, newStatus)) {
    throw new BadRequestException(`Cannot transition from ${order.status} to ${newStatus}`);
  }
  
  order.status = newStatus;
  return await this.orderRepository.save(order);
}

4.2.2 状态机库

适用场景:复杂的状态流转

实现

  • 使用专门的状态机库
  • 定义状态、事件和转换
  • 处理副作用

示例

// 使用 xstate 库
import { createMachine, interpret } from 'xstate';

const orderMachine = createMachine({
  id: 'order',
  initial: 'PENDING',
  states: {
    PENDING: {
      on: {
        PROCESS: 'PROCESSING',
        CANCEL: 'CANCELLED'
      }
    },
    PROCESSING: {
      on: {
        SHIP: 'SHIPPED',
        FAIL: 'FAILED',
        CANCEL: 'CANCELLED'
      }
    },
    SHIPPED: {
      on: {
        DELIVER: 'DELIVERED',
        FAIL: 'FAILED'
      }
    },
    DELIVERED: {
      type: 'final'
    },
    CANCELLED: {
      type: 'final'
    },
    FAILED: {
      type: 'final'
    }
  }
});

const orderService = interpret(orderMachine)
  .onTransition(state => console.log('Order state:', state.value));

orderService.start();
orderService.send('PROCESS'); // 从 PENDING 到 PROCESSING
orderService.send('SHIP'); // 从 PROCESSING 到 SHIPPED
orderService.send('DELIVER'); // 从 SHIPPED 到 DELIVERED

4.3 最佳实践

  • 明确定义状态和转换规则
  • 使用状态机管理所有状态流转
  • 记录状态转换历史
  • 处理状态转换的副作用
  • 验证状态转换的合法性

5. 数据一致性保障措施

5.1 数据库层面

  • 使用事务:确保数据操作的原子性
  • 设置约束:使用唯一约束、外键约束等
  • 合理索引:提高查询性能,减少锁竞争
  • 定期备份:防止数据丢失

5.2 应用层面

  • 实现幂等性:处理重复请求
  • 使用状态机:管理状态流转
  • 异步处理:使用消息队列处理非实时操作
  • 补偿机制:处理失败的操作

5.3 监控和告警

  • 数据一致性检查:定期检查数据一致性
  • 异常监控:监控数据操作异常
  • 告警机制:及时发现和处理数据不一致问题

6. 常见问题及解决方案

6.1 并发更新冲突

问题:多个用户同时更新同一条数据

解决方案

  • 使用乐观锁
  • 使用悲观锁
  • 实现队列机制

6.2 分布式事务

问题:跨多个服务或数据源的事务

解决方案

  • 使用消息队列 + 最终一致性
  • 使用 Saga 模式
  • 使用 TCC 模式

6.3 数据同步延迟

问题:不同系统之间的数据同步存在延迟

解决方案

  • 使用事件驱动架构
  • 实现增量同步
  • 设置合理的同步频率

7. 测试策略

7.1 单元测试

  • 测试单个组件的逻辑
  • 模拟依赖
  • 验证边界情况

7.2 集成测试

  • 测试多个组件的交互
  • 测试事务边界
  • 测试状态转换

7.3 压力测试

  • 测试并发场景
  • 测试系统稳定性
  • 测试数据一致性

8. 相关文档


本文档基于服务设计文档,最后更新: 2026-03-18


9. 商品中心数据流转Product Center Data Flow

设计原则: 确保商品、价格、库存、授权等核心数据的流转一致性

9.1 商品数据生命周期

数据采集 → 数据清洗 → 商品主数据 → SKU生成 → 定价计算 → 上架任务 → 平台同步 → 商品状态更新 → 销售数据反馈

9.2 三层商品模型数据流转

┌─────────────────────────────────────────────────────────────┐
│                     SPU层产品层                          │
│  数据来源: 数据采集、手动录入、供应链导入                      │
│  数据内容: 名称、品牌、类目、通用属性                          │
│  一致性要求: 全局唯一,多平台共享                              │
└────────────────────────┬────────────────────────────────────┘
                         │
                         ↓
┌─────────────────────────────────────────────────────────────┐
│                     SKU层库存单元层                       │
│  数据来源: SPU拆分、变体生成、规格组合                        │
│  数据内容: 变体属性、成本价、基准价、重量、库存                │
│  一致性要求: 租户内唯一,库存实时同步                          │
└────────────────────────┬────────────────────────────────────┘
                         │
                         ↓
┌─────────────────────────────────────────────────────────────┐
│                   Listing层平台商品层                     │
│  数据来源: SKU映射、平台刊登、API同步                         │
│  数据内容: 标题、最终价格、库存、平台状态、平台ID              │
│  一致性要求: 平台内唯一,状态实时同步                          │
└─────────────────────────────────────────────────────────────┘

9.3 价格数据流转

┌─────────────────────────────────────────────────────────────┐
│                  基准价层Base Price                       │
│  来源: 成本核算、采购价、人工设定                              │
│  存储: cf_sku.base_price                                    │
│  一致性: SKU层唯一锚点                                        │
└────────────────────────┬────────────────────────────────────┘
                         │
                         ↓
┌─────────────────────────────────────────────────────────────┐
│                  策略层Strategy Layer                     │
│  来源: 价格策略规则、AI定价建议                               │
│  存储: cf_price_strategy                                    │
│  一致性: 策略优先级、冲突检测                                 │
└────────────────────────┬────────────────────────────────────┘
                         │
                         ↓
┌─────────────────────────────────────────────────────────────┐
│                  最终价层Final Price                      │
│  来源: 策略计算、人工覆盖                                     │
│  存储: cf_platform_listing.price                            │
│  一致性: 平台同步、价格校验                                   │
└─────────────────────────────────────────────────────────────┘

9.4 库存数据流转

┌─────────────────────────────────────────────────────────────┐
│                    系统库存System Inventory               │
│  来源: 采购入库、退货入库、库存调整                            │
│  存储: cf_sku.inventory                                     │
│  一致性: 实时更新、事务保证                                   │
└────────────────────────┬────────────────────────────────────┘
                         │
            ┌────────────┼────────────┐
            ↓            ↓            ↓
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│  平台A库存    │ │  平台B库存    │ │  平台C库存    │
│  同步策略A    │ │  同步策略B    │ │  同步策略C    │
└───────────────┘ └───────────────┘ └───────────────┘

9.5 授权数据流转

┌─────────────────────────────────────────────────────────────┐
│                    店铺授权Shop Authorization             │
│  来源: OAuth授权、Agent授权、API Key授权                     │
│  存储: cf_shop_authorization                                │
│  一致性: Token刷新、过期检测                                 │
└────────────────────────┬────────────────────────────────────┘
                         │
                         ↓
┌─────────────────────────────────────────────────────────────┐
│                    授权状态Auth Status                    │
│  状态: ACTIVE → EXPIRING → EXPIRED → REVOKED               │
│  触发: 定时检测、API调用失败、手动撤销                        │
└─────────────────────────────────────────────────────────────┘

9.6 数据一致性保障机制

9.6.1 商品数据一致性

场景 一致性要求 保障机制
SPU创建 全局唯一 唯一约束 (tenant_id, name, brand)
SKU生成 租户内唯一 唯一约束 (tenant_id, spu_id, attributes_hash)
Listing创建 平台内唯一 唯一约束 (platform, platform_product_id)
SKU映射 一对多关系 外键约束 + 级联更新

9.6.2 价格数据一致性

场景 一致性要求 保障机制
基准价更新 原子操作 事务 + 乐观锁
策略计算 幂等性 请求ID + 结果缓存
价格同步 最终一致 消息队列 + 重试机制
价格校验 利润红线 业务规则校验

9.6.3 库存数据一致性

场景 一致性要求 保障机制
库存扣减 原子操作 事务 + 行锁
库存同步 最终一致 事件驱动 + 补偿机制
超卖防护 强一致 预扣库存 + 回滚机制
库存预警 实时检测 定时任务 + 告警

9.6.4 授权数据一致性

场景 一致性要求 保障机制
Token刷新 原子操作 事务 + 分布式锁
授权过期 实时检测 定时任务 + 预警
授权撤销 级联更新 事务 + 事件通知

9.7 数据流转监控

// 数据流转监控指标
interface DataFlowMetrics {
  // 商品数据
  spuSyncLatency: number;        // SPU同步延迟
  skuSyncLatency: number;        // SKU同步延迟
  listingSyncLatency: number;    // Listing同步延迟
  
  // 价格数据
  priceSyncLatency: number;      // 价格同步延迟
  priceConsistencyRate: number;  // 价格一致性率
  
  // 库存数据
  inventorySyncLatency: number;  // 库存同步延迟
  inventoryDiscrepancyRate: number; // 库存差异率
  
  // 授权数据
  authRefreshLatency: number;    // 授权刷新延迟
  authExpiringCount: number;     // 即将过期授权数
}