chore: 清理归档文件和文档模板

删除不再需要的归档文件和过时的文档模板,包括多个README、安全策略、前端集成蓝图等文件,同时移除了未使用的业务文档和项目结构文件。

优化项目结构,移除冗余文件,保持代码库整洁。主要删除archive/handover目录下的多个文件及doc目录下的部分文档模板。
This commit is contained in:
2026-03-18 01:21:15 +08:00
parent 56b8a2e2f8
commit 72cd7f6f45
147 changed files with 5982 additions and 16716 deletions

View File

@@ -1,326 +0,0 @@
import { exec } from 'child_process';
import { BrowserContext, chromium, Page } from 'playwright';
import { promisify } from 'util';
import { Product, Sku } from '../models/Product';
import { logger } from '../utils/logger';
import { AIService } from './AIService';
import { ConfigService } from './ConfigService';
import { SelfHealingService } from './SelfHealingService';
const execAsync = promisify(exec);
export interface CrawlerOptions {
useSandbox?: boolean;
cpuLimit?: string;
memoryLimit?: string;
}
export class CrawlerService {
private static PROXY_LIST = [
'http://proxy1.crawlful.com:8080',
'http://proxy2.crawlful.com:8080',
];
/**
* @description 获取自动调度的代理配置 (CORE_EXT_07)
*/
private static getProxyConfig() {
const proxy = this.PROXY_LIST[Math.floor(Math.random() * this.PROXY_LIST.length)];
return {
server: proxy,
};
}
/**
* [CORE_DEV_05] 容器化隔离采集 (Sandbox Crawler)
* @description 将采集任务分发至隔离的 Docker 容器中,确保 IP 隔离与资源限制
*/
private static async dispatchToSandbox(url: string, options: CrawlerOptions): Promise<Partial<Product>> {
const cpu = options.cpuLimit || '0.5';
const memory = options.memoryLimit || '512m';
const containerName = `crawler-${Date.now()}`;
logger.info(`[Crawler] Dispatching ${url} to sandbox ${containerName} (CPU: ${cpu}, Mem: ${memory})...`);
try {
// 生产环境下应调用 Docker API 或 K8s Job
// 此处通过命令行模拟docker run --rm --cpus=0.5 --memory=512m crawler-image npm run crawl --url="..."
const cmd = `docker run --rm --name ${containerName} --cpus=${cpu} --memory=${memory} crawler-image npm run crawl --url="${url}"`;
if (process.env.NODE_ENV === 'production') {
const { stdout } = await execAsync(cmd);
return JSON.parse(stdout);
} else {
// 开发模式下模拟容器延迟
await new Promise(resolve => setTimeout(resolve, 2000));
return this.crawlProductDirect(url);
}
} catch (error: any) {
logger.error(`[Crawler] Sandbox dispatch failed: ${error.message}`);
throw error;
}
}
/**
* @description 抓取商品详情,集成指纹混淆与拟人化模拟
* @param {string} url 商品详情页 URL
* @param {CrawlerOptions} options 采集配置
*/
static async crawlProduct(url: string, options: CrawlerOptions = {}): Promise<Partial<Product>> {
if (options.useSandbox) {
return this.dispatchToSandbox(url, options);
}
return this.crawlProductDirect(url);
}
private static async crawlProductDirect(url: string): Promise<Partial<Product>> {
const proxy = this.getProxyConfig();
const browser = await chromium.launch({
headless: true,
proxy,
args: [
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-blink-features=AutomationControlled',
'--js-flags="--max-old-space-size=512"' // 资源限制 (CORE_DEV_05)
]
});
const context = await browser.newContext({
userAgent: this.getRandomUserAgent(),
viewport: { width: 1920, height: 1080 },
deviceScaleFactor: 1,
});
// 1. 注入指纹混淆脚本 (CORE_EXT_07)
await this.injectFingerprintObfuscator(context);
const page = await context.newPage();
try {
logger.info(`[Crawler] Navigating to ${url}...`);
// 2. 模拟真实人类行为 (CORE_EXT_07)
await this.simulateHumanBehavior(page, url);
const platform = this.detectPlatform(url);
let title = '';
let price = 0;
let mainImage = '';
let images: string[] = [];
let skus: Sku[] = [];
let attributes: Record<string, string> = {};
if (platform === '1688') {
// 1688 深度解析逻辑
title = await this.getTextWithSelfHealing(page, platform, '.title-text, .d-title, h1', 'Product Title');
mainImage = (await page.locator('.prop-img, .main-image img, .mod-detail-gallery img').first().getAttribute('src').catch(() => '')) || '';
const priceText = await this.getTextWithSelfHealing(page, platform, '.price-text, .value, .price-now', 'Price');
price = parseFloat(priceText.replace(/[^\d.]/g, '')) || 0;
images = await page.locator('.tab-trigger img, .vertical-img img').evaluateAll(imgs =>
imgs.map(img => (img as HTMLImageElement).src).filter(src => src && !src.includes('video'))
);
const attrKeys = await page.locator('.attributes-list .obj-title').evaluateAll(els => els.map(el => el.textContent?.trim() || ''));
const attrValues = await page.locator('.attributes-list .obj-content').evaluateAll(els => els.map(el => el.textContent?.trim() || ''));
attrKeys.forEach((key, i) => {
if (key && attrValues[i]) attributes[key] = attrValues[i];
});
} else if (platform === 'Amazon') {
title = await this.getTextWithSelfHealing(page, platform, '#productTitle', 'Product Title');
mainImage = (await page.locator('#landingImage, #imgBlkFront, #ebooksImgBlkFront').getAttribute('src').catch(() => '')) || '';
const priceWhole = (await page.locator('.a-price-whole').first().innerText().catch(() => '0')) || '0';
const priceFraction = (await page.locator('.a-price-fraction').first().innerText().catch(() => '00')) || '00';
price = parseFloat(`${priceWhole}.${priceFraction}`.replace(/[^\d.]/g, '')) || 0;
images = await page.locator('#altImages img').evaluateAll(imgs =>
imgs.map(img => (img as HTMLImageElement).src.replace(/\._.*_\./, '.'))
.filter(src => src && !src.includes('video') && !src.includes('play-button'))
);
const features = await page.locator('#feature-bullets li span').evaluateAll(els => els.map(el => el.textContent?.trim() || ''));
if (features.length > 0) attributes['features'] = features.join('; ');
} else if (platform === 'Temu') {
title = await this.getTextWithSelfHealing(page, platform, 'h1[data-test="product-title"]', 'Product Title');
mainImage = (await page.locator('img[data-test="main-image"]').getAttribute('src').catch(() => '')) || '';
const priceStr = await this.getTextWithSelfHealing(page, platform, 'div[data-test="product-price"]', 'Price');
price = parseFloat(priceStr.replace(/[^\d.]/g, '')) || 0;
}
const product: Partial<Product> = {
platform,
productId: this.extractId(url),
title: title.trim(),
originalTitle: title.trim(),
mainImage: mainImage || (images.length > 0 ? images[0] : ''),
detailUrl: url,
price,
originalPrice: price,
currency: platform === '1688' ? 'CNY' : 'USD',
skus,
attributes,
images: images.length > 0 ? images : [mainImage].filter(Boolean) as string[]
};
return product;
} catch (error: any) {
logger.error(`[Crawler] Failed to crawl ${url}: ${error.message}`);
throw error;
} finally {
await browser.close();
}
}
/**
* @description 注入指纹混淆脚本,重写 Canvas/WebGL 属性
*/
private static async injectFingerprintObfuscator(context: BrowserContext) {
await context.addInitScript(() => {
// 1. 重写 Canvas 指纹
const originalGetContext = HTMLCanvasElement.prototype.getContext;
(HTMLCanvasElement.prototype as any).getContext = function (type: any, ...args: any[]) {
const context = originalGetContext.apply(this, [type, ...args] as any);
if (type === '2d' && context) {
const originalFillText = (context as any).fillText;
(context as any).fillText = function (...args: any[]) {
// 在绘制文字时加入极其微小的扰动
(this as any).fillStyle = `rgba(${Math.random()}, 0, 0, 0.01)`;
return originalFillText.apply(this, args);
};
}
return context;
};
// 2. 模拟 WebGL 渲染器信息
const originalGetParameter = WebGLRenderingContext.prototype.getParameter;
WebGLRenderingContext.prototype.getParameter = function (parameter: number) {
if (parameter === 37445) return 'Intel Inc.'; // UNMASKED_VENDOR_WEBGL
if (parameter === 37446) return 'Intel(R) Iris(R) Xe Graphics'; // UNMASKED_RENDERER_WEBGL
return originalGetParameter.apply(this, [parameter]);
};
// 3. 隐藏 WebDriver 标记
Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
});
}
/**
* @description 模拟人类浏览行为:随机滚动、移动鼠标与停顿
*/
private static async simulateHumanBehavior(page: Page, url: string) {
await page.goto(url, { waitUntil: 'domcontentloaded', timeout: 60000 });
// 随机停顿 1-3s
await page.waitForTimeout(1000 + Math.random() * 2000);
// 1. 模拟分段平滑滚动 (CORE_EXT_07)
const viewportHeight = page.viewportSize()?.height || 1080;
const totalScrolls = 3 + Math.floor(Math.random() * 3);
for (let i = 0; i < totalScrolls; i++) {
const scrollStep = 300 + Math.random() * 500;
await page.evaluate((step) => {
window.scrollBy({ top: step, behavior: 'smooth' });
}, scrollStep);
await page.waitForTimeout(1000 + Math.random() * 1500);
// 2. 在滚动间隙模拟随机鼠标移动 (CORE_EXT_07)
const targetX = Math.random() * 800;
const targetY = Math.random() * viewportHeight;
await page.mouse.move(targetX, targetY, { steps: 10 + Math.floor(Math.random() * 20) });
}
// 3. 随机移动到可能感兴趣的区域 (如图片/详情)
await page.mouse.move(Math.random() * 500, Math.random() * 500, { steps: 25 });
// 等待网络空闲
await page.waitForLoadState('networkidle').catch(() => {});
}
private static getRandomUserAgent(): string {
const uas = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36'
];
return uas[Math.floor(Math.random() * uas.length)];
}
private static detectPlatform(url: string): string {
if (url.includes('1688.com')) return '1688';
if (url.includes('amazon.com')) return 'Amazon';
if (url.includes('temu.com')) return 'Temu';
if (url.includes('aliexpress.com')) return 'AliExpress';
return 'Unknown';
}
private static extractId(url: string): string {
try {
const u = new URL(url);
if (u.hostname.includes('1688.com')) {
const match = url.match(/offer\/(\d+)\.html/);
return match ? match[1] : '1688-' + Date.now();
}
if (u.hostname.includes('amazon.com')) {
const match = url.match(/dp\/(\w+)/);
return match ? match[1] : 'amz-' + Date.now();
}
return 'prod-' + Math.random().toString(36).substring(7);
} catch {
return 'unknown-' + Date.now();
}
}
/**
* [CORE_AI_09] 自愈式采集辅助
* @description 尝试获取元素文本,若失败则调用 AI 进行选择器修复
*/
private static async getTextWithSelfHealing(
page: Page,
platform: string,
selector: string,
targetField: string
): Promise<string> {
// 0. 优先检查缓存中是否有已修复的选择器
const healed = await SelfHealingService.getHealedSelector(platform, targetField);
const activeSelector = healed || selector;
try {
// 1. 尝试活动选择器 (带超时)
const text = await page.locator(activeSelector).first().innerText({ timeout: 5000 });
if (text && text.trim()) return text.trim();
throw new Error('Element found but text is empty');
} catch (err) {
logger.warn(`[Crawler] Selector failed: ${activeSelector}. Triggering self-healing...`);
// 2. 获取 DOM 片段
const domSnippet = await page.evaluate(() => {
return document.body.innerHTML.substring(0, 10000);
});
// 3. 调用 AI 修复中心
try {
const repair = await SelfHealingService.repairSelector({
platform,
targetField,
oldSelector: activeSelector,
htmlContext: domSnippet
});
if (repair.success && repair.newSelector) {
logger.info(`[Crawler] AI found new selector: ${repair.newSelector} (Confidence: ${repair.confidence})`);
const repairedText = await page.locator(repair.newSelector).first().innerText({ timeout: 5000 });
if (repairedText && repairedText.trim()) return repairedText.trim();
}
} catch (aiErr) {
logger.error(`[Crawler] AI repair failed: ${aiErr}`);
}
return ''; // 最终失败返回空
}
}
}

View File

@@ -0,0 +1,293 @@
import { logger } from '../utils/logger';
import { ConfigService } from './ConfigService';
/**
* PlatformApiService - 平台API对接服务
*
* 功能定位:
* - 仅处理有API平台的对接Amazon MWS, eBay API, Shopee Open API等
* - 无API平台TikTok Shop, Temu等的采集由浏览器插件处理
*
* 安全约束:
* - 严禁在后端进行网页爬取避免服务器IP被封
* - 所有API调用需携带traceId和tenantId
*
* @author AI-Backend-1
* @taskId BE-P005, BE-P006, BE-P007
*/
export interface PlatformApiConfig {
platform: 'AMAZON' | 'EBAY' | 'SHOPEE' | 'ALIEXPRESS';
apiKey: string;
apiSecret: string;
accessToken?: string;
refreshToken?: string;
marketplaceId?: string;
}
export interface SyncOptions {
tenantId: string;
shopId: string;
traceId: string;
businessType: 'TOC' | 'TOB';
startDate?: Date;
endDate?: Date;
limit?: number;
}
export interface ProductSyncResult {
platformProductId: string;
title: string;
price: number;
currency: string;
stock: number;
status: string;
syncedAt: Date;
}
export interface OrderSyncResult {
platformOrderId: string;
status: string;
totalAmount: number;
currency: string;
items: Array<{
sku: string;
quantity: number;
unitPrice: number;
}>;
syncedAt: Date;
}
export class PlatformApiService {
private static readonly API_ENDPOINTS = {
AMAZON: 'https://sellingpartnerapi-na.amazon.com',
EBAY: 'https://api.ebay.com/sell',
SHOPEE: 'https://partner.shopeemobile.com/api/v2',
ALIEXPRESS: 'https://openapi.aliexpress.com',
};
private static readonly RATE_LIMITS = {
AMAZON: { requests: 10, window: 1000 }, // 10 req/s
EBAY: { requests: 100, window: 60000 }, // 100 req/min
SHOPEE: { requests: 100, window: 60000 }, // 100 req/min
ALIEXPRESS: { requests: 50, window: 60000 }, // 50 req/min
};
/**
* 同步商品数据 - 从平台API获取商品信息
* @param config 平台API配置
* @param options 同步选项(包含五元组追踪信息)
* @returns 同步结果
*/
static async syncProducts(
config: PlatformApiConfig,
options: SyncOptions
): Promise<ProductSyncResult[]> {
const { tenantId, shopId, traceId, businessType } = options;
logger.info(`[PlatformApiService] Starting product sync`, {
platform: config.platform,
tenantId,
shopId,
traceId,
businessType,
});
try {
switch (config.platform) {
case 'AMAZON':
return await this.syncAmazonProducts(config, options);
case 'EBAY':
return await this.syncEbayProducts(config, options);
case 'SHOPEE':
return await this.syncShopeeProducts(config, options);
case 'ALIEXPRESS':
return await this.syncAliexpressProducts(config, options);
default:
throw new Error(`Unsupported platform: ${config.platform}`);
}
} catch (error: any) {
logger.error(`[PlatformApiService] Product sync failed`, {
platform: config.platform,
tenantId,
shopId,
traceId,
error: error.message,
});
throw error;
}
}
/**
* 同步订单数据 - 从平台API获取订单信息
* @param config 平台API配置
* @param options 同步选项(包含五元组追踪信息)
* @returns 同步结果
*/
static async syncOrders(
config: PlatformApiConfig,
options: SyncOptions
): Promise<OrderSyncResult[]> {
const { tenantId, shopId, traceId, businessType } = options;
logger.info(`[PlatformApiService] Starting order sync`, {
platform: config.platform,
tenantId,
shopId,
traceId,
businessType,
});
try {
switch (config.platform) {
case 'AMAZON':
return await this.syncAmazonOrders(config, options);
case 'EBAY':
return await this.syncEbayOrders(config, options);
case 'SHOPEE':
return await this.syncShopeeOrders(config, options);
case 'ALIEXPRESS':
return await this.syncAliexpressOrders(config, options);
default:
throw new Error(`Unsupported platform: ${config.platform}`);
}
} catch (error: any) {
logger.error(`[PlatformApiService] Order sync failed`, {
platform: config.platform,
tenantId,
shopId,
traceId,
error: error.message,
});
throw error;
}
}
/**
* 更新商品库存 - 通过平台API更新库存
* @param config 平台API配置
* @param sku SKU编码
* @param quantity 库存数量
* @param options 同步选项(包含五元组追踪信息)
*/
static async updateInventory(
config: PlatformApiConfig,
sku: string,
quantity: number,
options: SyncOptions
): Promise<void> {
const { tenantId, shopId, traceId, businessType } = options;
logger.info(`[PlatformApiService] Updating inventory`, {
platform: config.platform,
sku,
quantity,
tenantId,
shopId,
traceId,
businessType,
});
// 实际实现需调用各平台API
// 此处为框架代码具体实现根据平台API文档补充
throw new Error('Not implemented - requires platform-specific API integration');
}
// ==================== Private Methods ====================
private static async syncAmazonProducts(
config: PlatformApiConfig,
options: SyncOptions
): Promise<ProductSyncResult[]> {
// TODO: 实现Amazon SP-API商品同步
// 参考: https://developer-docs.amazon.com/sp-api/docs
logger.info('[PlatformApiService] Amazon product sync - placeholder');
return [];
}
private static async syncAmazonOrders(
config: PlatformApiConfig,
options: SyncOptions
): Promise<OrderSyncResult[]> {
// TODO: 实现Amazon SP-API订单同步
logger.info('[PlatformApiService] Amazon order sync - placeholder');
return [];
}
private static async syncEbayProducts(
config: PlatformApiConfig,
options: SyncOptions
): Promise<ProductSyncResult[]> {
// TODO: 实现eBay API商品同步
// 参考: https://developer.ebay.com/api-docs
logger.info('[PlatformApiService] eBay product sync - placeholder');
return [];
}
private static async syncEbayOrders(
config: PlatformApiConfig,
options: SyncOptions
): Promise<OrderSyncResult[]> {
// TODO: 实现eBay API订单同步
logger.info('[PlatformApiService] eBay order sync - placeholder');
return [];
}
private static async syncShopeeProducts(
config: PlatformApiConfig,
options: SyncOptions
): Promise<ProductSyncResult[]> {
// TODO: 实现Shopee Open API商品同步
// 参考: https://open.shopee.com/documents
logger.info('[PlatformApiService] Shopee product sync - placeholder');
return [];
}
private static async syncShopeeOrders(
config: PlatformApiConfig,
options: SyncOptions
): Promise<OrderSyncResult[]> {
// TODO: 实现Shopee Open API订单同步
logger.info('[PlatformApiService] Shopee order sync - placeholder');
return [];
}
private static async syncAliexpressProducts(
config: PlatformApiConfig,
options: SyncOptions
): Promise<ProductSyncResult[]> {
// TODO: 实现AliExpress API商品同步
logger.info('[PlatformApiService] AliExpress product sync - placeholder');
return [];
}
private static async syncAliexpressOrders(
config: PlatformApiConfig,
options: SyncOptions
): Promise<OrderSyncResult[]> {
// TODO: 实现AliExpress API订单同步
logger.info('[PlatformApiService] AliExpress order sync - placeholder');
return [];
}
/**
* 检查API限流
* @param platform 平台名称
*/
private static checkRateLimit(platform: string): boolean {
const limit = this.RATE_LIMITS[platform as keyof typeof this.RATE_LIMITS];
if (!limit) return true;
// TODO: 实现基于Redis的分布式限流检查
return true;
}
/**
* 刷新访问令牌
* @param config 平台API配置
*/
private static async refreshAccessToken(config: PlatformApiConfig): Promise<string> {
// TODO: 实现令牌刷新逻辑
throw new Error('Token refresh not implemented');
}
}

View File

@@ -1,115 +0,0 @@
import { Job } from 'bullmq';
import { WorkerHub } from './WorkerHub';
import { CrawlerService } from '../services/CrawlerService';
import { AIService } from '../services/AIService';
import { FingerprintEngine } from '../core/ai/FingerprintEngine';
import { ProductService } from '../services/ProductService';
import { AuditService } from '../services/AuditService';
import { logger } from '../utils/logger';
/**
* [CORE_WORK_01] 采集 Worker (Crawler Worker)
* @description 异步执行产品抓取、多模态解析、指纹生成并入库,支持任务追踪与审计
*/
export class CrawlerWorker {
private static QUEUE_NAME = 'crawler-tasks';
/**
* 初始化并注册 Worker
*/
static init() {
WorkerHub.registerWorker(this.QUEUE_NAME, async (job: Job) => {
const { url, sandbox, traceContext } = job.data;
const { tenantId, shopId, taskId, traceId, userId } = traceContext;
logger.info(`[CrawlerWorker] Starting task ${job.id} for URL: ${url}`);
try {
// 1. 抓取
let productData = await CrawlerService.crawlProduct(url, { useSandbox: sandbox });
// 2. 多模态优化
const optimized = await AIService.analyzeMultiModalProduct({
title: productData.title || '',
description: productData.description,
attributes: productData.attributes || {},
imageUrls: productData.images || []
});
productData.title = optimized.optimizedTitle;
productData.description = optimized.optimizedDescription;
productData.attributes = { ...productData.attributes, ...optimized.validatedAttributes };
// 3. 指纹生成
const fingerprint = await FingerprintEngine.generateCompositeFingerprint({
title: productData.title,
description: productData.description,
mainImage: productData.mainImage || ''
});
// 4. 入库
const id = await ProductService.create({
...productData,
phash: fingerprint.phash,
semanticHash: fingerprint.semanticHash,
vectorEmbedding: JSON.stringify(fingerprint.vectorEmbedding),
status: 'draft'
});
// 5. 审计日志
await AuditService.log({
tenantId,
shopId,
taskId,
traceId,
userId,
module: 'SYNC',
action: 'CRAWLER_ASYNC_COMPLETE',
resourceType: 'product',
resourceId: String(id),
afterSnapshot: { url, id },
result: 'success',
source: 'node'
});
return { id, url, status: 'completed' };
} catch (err: any) {
logger.error(`[CrawlerWorker] Task ${job.id} failed: ${err.message}`);
// 错误审计
await AuditService.log({
tenantId,
shopId,
taskId,
traceId,
userId,
module: 'SYNC',
action: 'CRAWLER_ASYNC_FAILED',
resourceType: 'product',
resourceId: url,
result: 'failed',
errorCode: 'CRAWLER_WORKER_ERROR',
errorMessage: err.message,
source: 'node'
});
throw err;
}
}, 10); // 并发数限制为 10
}
/**
* 提交采集任务到队列
*/
static async submit(data: {
url: string;
sandbox?: boolean;
traceContext: any;
}) {
const queue = WorkerHub.getQueue(this.QUEUE_NAME);
return await queue.add(`crawl-${Date.now()}`, data, {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 }
});
}
}

View File

@@ -0,0 +1,310 @@
import { Job } from 'bullmq';
import { WorkerHub } from './WorkerHub';
import { PlatformApiService, PlatformApiConfig, SyncOptions } from '../services/PlatformApiService';
import { ProductService } from '../services/ProductService';
import { OrderService } from '../services/OrderService';
import { AuditService } from '../services/AuditService';
import { logger } from '../utils/logger';
/**
* PlatformSyncWorker - 平台数据同步Worker
*
* 功能定位:
* - 异步执行有API平台的数据同步Amazon, eBay, Shopee等
* - 支持商品同步、订单同步、库存更新
* - 无API平台的采集由浏览器插件处理不经过此Worker
*
* 安全约束:
* - 并发数限制 ≤ 10符合资源保护要求
* - 所有操作携带五元组追踪信息
* - 支持限流和错误重试
*
* @author AI-Backend-1
* @taskId BE-P008, BE-O007
*/
interface SyncJobData {
syncType: 'PRODUCT' | 'ORDER' | 'INVENTORY';
platformConfig: PlatformApiConfig;
syncOptions: SyncOptions;
retryCount?: number;
}
export class PlatformSyncWorker {
private static readonly QUEUE_NAME = 'platform-sync-tasks';
private static readonly MAX_RETRIES = 3;
private static readonly CONCURRENCY = 10; // 符合资源限制
/**
* 初始化并注册Worker
*/
static init() {
WorkerHub.registerWorker(
this.QUEUE_NAME,
async (job: Job<SyncJobData>) => {
const { syncType, platformConfig, syncOptions, retryCount = 0 } = job.data;
const { tenantId, shopId, taskId, traceId, businessType } = syncOptions;
logger.info(`[PlatformSyncWorker] Starting ${syncType} sync task`, {
jobId: job.id,
platform: platformConfig.platform,
tenantId,
shopId,
taskId,
traceId,
businessType,
retryCount,
});
const startTime = Date.now();
try {
let result: any;
switch (syncType) {
case 'PRODUCT':
result = await this.syncProducts(platformConfig, syncOptions);
break;
case 'ORDER':
result = await this.syncOrders(platformConfig, syncOptions);
break;
case 'INVENTORY':
result = await this.syncInventory(platformConfig, syncOptions);
break;
default:
throw new Error(`Unknown sync type: ${syncType}`);
}
const duration = Date.now() - startTime;
// 审计日志 - 成功
await AuditService.log({
tenantId,
shopId,
taskId,
traceId,
businessType,
module: 'PLATFORM_SYNC',
action: `${syncType}_SYNC_SUCCESS`,
resourceType: 'sync_job',
resourceId: String(job.id),
afterSnapshot: {
platform: platformConfig.platform,
syncType,
duration,
resultCount: result?.length || 0,
},
result: 'success',
source: 'node',
});
logger.info(`[PlatformSyncWorker] ${syncType} sync completed`, {
jobId: job.id,
duration,
resultCount: result?.length || 0,
});
return {
success: true,
syncType,
platform: platformConfig.platform,
duration,
resultCount: result?.length || 0,
data: result,
};
} catch (error: any) {
const duration = Date.now() - startTime;
logger.error(`[PlatformSyncWorker] ${syncType} sync failed`, {
jobId: job.id,
platform: platformConfig.platform,
error: error.message,
duration,
retryCount,
});
// 审计日志 - 失败
await AuditService.log({
tenantId,
shopId,
taskId,
traceId,
businessType,
module: 'PLATFORM_SYNC',
action: `${syncType}_SYNC_FAILED`,
resourceType: 'sync_job',
resourceId: String(job.id),
result: 'failed',
errorCode: 'PLATFORM_SYNC_ERROR',
errorMessage: error.message,
source: 'node',
});
// 重试逻辑
if (retryCount < this.MAX_RETRIES) {
logger.info(`[PlatformSyncWorker] Retrying task ${job.id}`, {
retryCount: retryCount + 1,
});
throw error; // 抛出错误触发BullMQ重试
}
// 超过重试次数,返回失败结果
return {
success: false,
syncType,
platform: platformConfig.platform,
duration,
error: error.message,
retryCount,
};
}
},
this.CONCURRENCY
);
logger.info('[PlatformSyncWorker] Worker registered successfully');
}
// ==================== Private Methods ====================
/**
* 同步商品数据
*/
private static async syncProducts(
config: PlatformApiConfig,
options: SyncOptions
): Promise<any[]> {
const products = await PlatformApiService.syncProducts(config, options);
// 保存到数据库
const savedProducts = [];
for (const product of products) {
try {
const productId = await ProductService.create({
tenantId: options.tenantId,
shopId: options.shopId,
platform: config.platform,
platformProductId: product.platformProductId,
title: product.title,
price: product.price,
currency: product.currency,
status: product.status,
traceId: options.traceId,
businessType: options.businessType,
syncedAt: product.syncedAt,
});
savedProducts.push({ ...product, internalId: productId });
} catch (error: any) {
logger.error('[PlatformSyncWorker] Failed to save product', {
platformProductId: product.platformProductId,
error: error.message,
});
// 继续处理其他商品
}
}
return savedProducts;
}
/**
* 同步订单数据
*/
private static async syncOrders(
config: PlatformApiConfig,
options: SyncOptions
): Promise<any[]> {
const orders = await PlatformApiService.syncOrders(config, options);
// 保存到数据库
const savedOrders = [];
for (const order of orders) {
try {
const orderId = await OrderService.create({
tenantId: options.tenantId,
shopId: options.shopId,
platform: config.platform,
platformOrderId: order.platformOrderId,
status: order.status,
totalAmount: order.totalAmount,
currency: order.currency,
items: order.items,
traceId: options.traceId,
taskId: options.taskId,
businessType: options.businessType,
syncedAt: order.syncedAt,
});
savedOrders.push({ ...order, internalId: orderId });
} catch (error: any) {
logger.error('[PlatformSyncWorker] Failed to save order', {
platformOrderId: order.platformOrderId,
error: error.message,
});
// 继续处理其他订单
}
}
return savedOrders;
}
/**
* 同步库存数据
*/
private static async syncInventory(
config: PlatformApiConfig,
options: SyncOptions
): Promise<any[]> {
// 先同步商品获取最新库存
const products = await PlatformApiService.syncProducts(config, options);
// 更新库存
const updatedInventory = [];
for (const product of products) {
try {
// TODO: 调用库存服务更新库存
// await InventoryService.updateStock(...);
updatedInventory.push({
platformProductId: product.platformProductId,
stock: product.stock,
updatedAt: new Date(),
});
} catch (error: any) {
logger.error('[PlatformSyncWorker] Failed to update inventory', {
platformProductId: product.platformProductId,
error: error.message,
});
}
}
return updatedInventory;
}
/**
* 提交同步任务
*/
static async submitSyncTask(
syncType: 'PRODUCT' | 'ORDER' | 'INVENTORY',
platformConfig: PlatformApiConfig,
syncOptions: SyncOptions
): Promise<string> {
const job = await WorkerHub.addJob(this.QUEUE_NAME, {
syncType,
platformConfig,
syncOptions,
retryCount: 0,
}, {
attempts: this.MAX_RETRIES,
backoff: {
type: 'exponential',
delay: 5000, // 5秒初始延迟
},
});
logger.info(`[PlatformSyncWorker] Sync task submitted`, {
jobId: job.id,
syncType,
platform: platformConfig.platform,
});
return job.id as string;
}
}