Files
makemd/docs/02_Backend/05_Data_Consistency.md
wurenzhi eafa1bbe94 feat: 添加货币和汇率管理功能
refactor: 重构前端路由和登录逻辑

docs: 更新业务闭环、任务和架构文档

style: 调整代码格式和文件结构

chore: 更新依赖项和配置文件
2026-03-19 19:08:15 +08:00

9.9 KiB
Raw Blame History

数据一致性文档 (Crawlful Hub)

定位Crawlful Hub 数据一致性设计文档 - 确保系统数据的准确性和可靠性。 更新日期: 2026-03-18 最高优先级参考: Service_Design.md


1. 数据一致性概述

1.1 重要性

数据一致性是系统的核心要求,特别是对于涉及资金、库存、订单等关键业务数据的系统。数据不一致可能导致:

  • 财务损失
  • 库存管理混乱
  • 订单处理错误
  • 商户信任度下降
  • 系统不可用

1.2 核心原则

  • 原子性:操作要么全部成功,要么全部失败
  • 一致性:操作前后数据状态保持一致
  • 隔离性:并发操作互不干扰
  • 持久性:数据一旦提交,就永久保存

2. 事务边界

2.1 定义

事务边界是指一组操作的范围,这些操作必须作为一个整体执行,要么全部成功,要么全部失败。

2.2 实现方法

2.2.1 数据库事务

适用场景:涉及数据库操作的业务流程

实现

  • 使用 @Transactional 注解Java/Spring
  • 使用 transaction 方法Node.js/Sequelize
  • 使用数据库原生事务

示例

// Node.js/Sequelize 示例
async createOrder(orderData: OrderCreateDto): Promise<Order> {
  return await this.sequelize.transaction(async (t) => {
    // 1. 创建订单
    const order = await Order.create(orderData, { transaction: t });
    
    // 2. 扣减库存
    await Inventory.decrement('quantity', {
      by: orderData.quantity,
      where: { productId: orderData.productId },
      transaction: t
    });
    
    // 3. 记录交易
    await Transaction.create({
      orderId: order.id,
      amount: order.totalAmount,
      type: 'ORDER_CREATED'
    }, { transaction: t });
    
    return order;
  });
}

2.2.2 分布式事务

适用场景:跨多个服务或数据源的业务流程

实现

  • 两阶段提交2PC
  • 补偿事务TCC
  • 消息队列 + 最终一致性

示例

// 补偿事务示例
async processOrder(orderId: string): Promise<void> {
  // 1. 尝试处理订单
  const order = await this.orderRepository.findById(orderId);
  
  try {
    // 2. 扣减库存
    await this.inventoryService.deductInventory(
      order.productId, 
      order.quantity
    );
    
    // 3. 安排物流
    const shippingId = await this.logisticsService.createShipping(order);
    order.shippingId = shippingId;
    
    // 4. 标记订单为已处理
    order.status = 'PROCESSED';
    await this.orderRepository.save(order);
  } catch (error) {
    // 5. 补偿操作
    await this.compensateOrder(order);
    throw error;
  }
}

async compensateOrder(order: Order): Promise<void> {
  // 恢复库存
  await this.inventoryService.restoreInventory(
    order.productId, 
    order.quantity
  );
  
  // 取消物流
  if (order.shippingId) {
    await this.logisticsService.cancelShipping(order.shippingId);
  }
  
  // 标记订单为失败
  order.status = 'FAILED';
  await this.orderRepository.save(order);
}

2.3 最佳实践

  • 明确事务边界:只包含必要的操作
  • 保持事务简短:减少锁持有时间
  • 合理设置隔离级别:根据业务需求选择适当的隔离级别
  • 处理事务异常:确保异常情况下能够正确回滚

3. 幂等性

3.1 定义

幂等性是指同一个请求执行多次,结果应该相同。

3.2 实现方法

3.2.1 请求ID

适用场景:所有外部 API 调用、支付回调等

实现

  • 生成唯一的 requestId
  • 记录已处理的 requestId
  • 检查重复请求

示例

async processPayment(paymentData: PaymentDto): Promise<Payment> {
  // 检查是否已处理
  const existingPayment = await this.paymentRepository.findByRequestId(paymentData.requestId);
  if (existingPayment) {
    return existingPayment;
  }
  
  // 处理支付
  const payment = await this.paymentRepository.create({
    ...paymentData,
    status: 'PROCESSING'
  });
  
  try {
    // 调用支付网关
    const result = await this.paymentGateway.process(paymentData);
    
    // 更新支付状态
    payment.status = result.status;
    payment.transactionId = result.transactionId;
    await this.paymentRepository.save(payment);
  } catch (error) {
    // 更新支付状态为失败
    payment.status = 'FAILED';
    await this.paymentRepository.save(payment);
    throw error;
  }
  
  return payment;
}

3.2.2 乐观锁

适用场景:并发更新操作

实现

  • 使用版本号或时间戳
  • 更新时检查版本号
  • 版本号不匹配则重试或失败

示例

async updateProduct(productId: string, updates: Partial<Product>): Promise<Product> {
  const product = await this.productRepository.findById(productId);
  const currentVersion = product.version;
  
  // 尝试更新
  const updated = await this.productRepository.update(
    {
      ...updates,
      version: currentVersion + 1
    },
    {
      where: {
        id: productId,
        version: currentVersion
      }
    }
  );
  
  if (updated[0] === 0) {
    // 版本不匹配,说明被其他线程修改
    throw new ConflictException('Product has been updated by another process');
  }
  
  return await this.productRepository.findById(productId);
}

3.3 最佳实践

  • 为所有外部请求生成唯一ID
  • 存储请求ID和处理结果
  • 设置合理的过期时间
  • 处理重复请求时返回相同的结果

4. 状态机

4.1 定义

状态机是一种用于描述对象状态及其转换规则的模型。

4.2 实现方法

4.2.1 枚举状态

适用场景:简单的状态流转

实现

  • 使用枚举定义状态
  • 使用条件判断处理状态转换

示例

enum OrderStatus {
  PENDING = 'PENDING',
  PROCESSING = 'PROCESSING',
  SHIPPED = 'SHIPPED',
  DELIVERED = 'DELIVERED',
  CANCELLED = 'CANCELLED',
  FAILED = 'FAILED'
}

class OrderStateMachine {
  static canTransition(from: OrderStatus, to: OrderStatus): boolean {
    const transitions = {
      [OrderStatus.PENDING]: [OrderStatus.PROCESSING, OrderStatus.CANCELLED],
      [OrderStatus.PROCESSING]: [OrderStatus.SHIPPED, OrderStatus.FAILED, OrderStatus.CANCELLED],
      [OrderStatus.SHIPPED]: [OrderStatus.DELIVERED, OrderStatus.FAILED],
      [OrderStatus.DELIVERED]: [],
      [OrderStatus.CANCELLED]: [],
      [OrderStatus.FAILED]: []
    };
    
    return transitions[from].includes(to);
  }
}

async updateOrderStatus(orderId: string, newStatus: OrderStatus): Promise<Order> {
  const order = await this.orderRepository.findById(orderId);
  
  if (!OrderStateMachine.canTransition(order.status, newStatus)) {
    throw new BadRequestException(`Cannot transition from ${order.status} to ${newStatus}`);
  }
  
  order.status = newStatus;
  return await this.orderRepository.save(order);
}

4.2.2 状态机库

适用场景:复杂的状态流转

实现

  • 使用专门的状态机库
  • 定义状态、事件和转换
  • 处理副作用

示例

// 使用 xstate 库
import { createMachine, interpret } from 'xstate';

const orderMachine = createMachine({
  id: 'order',
  initial: 'PENDING',
  states: {
    PENDING: {
      on: {
        PROCESS: 'PROCESSING',
        CANCEL: 'CANCELLED'
      }
    },
    PROCESSING: {
      on: {
        SHIP: 'SHIPPED',
        FAIL: 'FAILED',
        CANCEL: 'CANCELLED'
      }
    },
    SHIPPED: {
      on: {
        DELIVER: 'DELIVERED',
        FAIL: 'FAILED'
      }
    },
    DELIVERED: {
      type: 'final'
    },
    CANCELLED: {
      type: 'final'
    },
    FAILED: {
      type: 'final'
    }
  }
});

const orderService = interpret(orderMachine)
  .onTransition(state => console.log('Order state:', state.value));

orderService.start();
orderService.send('PROCESS'); // 从 PENDING 到 PROCESSING
orderService.send('SHIP'); // 从 PROCESSING 到 SHIPPED
orderService.send('DELIVER'); // 从 SHIPPED 到 DELIVERED

4.3 最佳实践

  • 明确定义状态和转换规则
  • 使用状态机管理所有状态流转
  • 记录状态转换历史
  • 处理状态转换的副作用
  • 验证状态转换的合法性

5. 数据一致性保障措施

5.1 数据库层面

  • 使用事务:确保数据操作的原子性
  • 设置约束:使用唯一约束、外键约束等
  • 合理索引:提高查询性能,减少锁竞争
  • 定期备份:防止数据丢失

5.2 应用层面

  • 实现幂等性:处理重复请求
  • 使用状态机:管理状态流转
  • 异步处理:使用消息队列处理非实时操作
  • 补偿机制:处理失败的操作

5.3 监控和告警

  • 数据一致性检查:定期检查数据一致性
  • 异常监控:监控数据操作异常
  • 告警机制:及时发现和处理数据不一致问题

6. 常见问题及解决方案

6.1 并发更新冲突

问题:多个用户同时更新同一条数据

解决方案

  • 使用乐观锁
  • 使用悲观锁
  • 实现队列机制

6.2 分布式事务

问题:跨多个服务或数据源的事务

解决方案

  • 使用消息队列 + 最终一致性
  • 使用 Saga 模式
  • 使用 TCC 模式

6.3 数据同步延迟

问题:不同系统之间的数据同步存在延迟

解决方案

  • 使用事件驱动架构
  • 实现增量同步
  • 设置合理的同步频率

7. 测试策略

7.1 单元测试

  • 测试单个组件的逻辑
  • 模拟依赖
  • 验证边界情况

7.2 集成测试

  • 测试多个组件的交互
  • 测试事务边界
  • 测试状态转换

7.3 压力测试

  • 测试并发场景
  • 测试系统稳定性
  • 测试数据一致性

8. 相关文档


本文档基于服务设计文档,最后更新: 2026-03-18