feat: 初始化项目结构并添加核心功能模块
- 新增文档模板和导航结构 - 实现服务器基础API路由和控制器 - 添加扩展插件配置和前端框架 - 引入多租户和权限管理模块 - 集成日志和数据库配置 - 添加核心业务模型和类型定义
This commit is contained in:
115
server/src/workers/CrawlerWorker.ts
Normal file
115
server/src/workers/CrawlerWorker.ts
Normal file
@@ -0,0 +1,115 @@
|
||||
import { Job } from 'bullmq';
|
||||
import { WorkerHub } from './WorkerHub';
|
||||
import { CrawlerService } from '../services/CrawlerService';
|
||||
import { AIService } from '../services/AIService';
|
||||
import { FingerprintEngine } from '../core/ai/FingerprintEngine';
|
||||
import { ProductService } from '../services/ProductService';
|
||||
import { AuditService } from '../services/AuditService';
|
||||
import { logger } from '../utils/logger';
|
||||
|
||||
/**
|
||||
* [CORE_WORK_01] 采集 Worker (Crawler Worker)
|
||||
* @description 异步执行产品抓取、多模态解析、指纹生成并入库,支持任务追踪与审计
|
||||
*/
|
||||
export class CrawlerWorker {
|
||||
private static QUEUE_NAME = 'crawler-tasks';
|
||||
|
||||
/**
|
||||
* 初始化并注册 Worker
|
||||
*/
|
||||
static init() {
|
||||
WorkerHub.registerWorker(this.QUEUE_NAME, async (job: Job) => {
|
||||
const { url, sandbox, traceContext } = job.data;
|
||||
const { tenantId, shopId, taskId, traceId, userId } = traceContext;
|
||||
|
||||
logger.info(`[CrawlerWorker] Starting task ${job.id} for URL: ${url}`);
|
||||
|
||||
try {
|
||||
// 1. 抓取
|
||||
let productData = await CrawlerService.crawlProduct(url, { useSandbox: sandbox });
|
||||
|
||||
// 2. 多模态优化
|
||||
const optimized = await AIService.analyzeMultiModalProduct({
|
||||
title: productData.title || '',
|
||||
description: productData.description,
|
||||
attributes: productData.attributes || {},
|
||||
imageUrls: productData.images || []
|
||||
});
|
||||
|
||||
productData.title = optimized.optimizedTitle;
|
||||
productData.description = optimized.optimizedDescription;
|
||||
productData.attributes = { ...productData.attributes, ...optimized.validatedAttributes };
|
||||
|
||||
// 3. 指纹生成
|
||||
const fingerprint = await FingerprintEngine.generateCompositeFingerprint({
|
||||
title: productData.title,
|
||||
description: productData.description,
|
||||
mainImage: productData.mainImage || ''
|
||||
});
|
||||
|
||||
// 4. 入库
|
||||
const id = await ProductService.create({
|
||||
...productData,
|
||||
phash: fingerprint.phash,
|
||||
semanticHash: fingerprint.semanticHash,
|
||||
vectorEmbedding: JSON.stringify(fingerprint.vectorEmbedding),
|
||||
status: 'draft'
|
||||
});
|
||||
|
||||
// 5. 审计日志
|
||||
await AuditService.log({
|
||||
tenantId,
|
||||
shopId,
|
||||
taskId,
|
||||
traceId,
|
||||
userId,
|
||||
module: 'SYNC',
|
||||
action: 'CRAWLER_ASYNC_COMPLETE',
|
||||
resourceType: 'product',
|
||||
resourceId: String(id),
|
||||
afterSnapshot: { url, id },
|
||||
result: 'success',
|
||||
source: 'node'
|
||||
});
|
||||
|
||||
return { id, url, status: 'completed' };
|
||||
} catch (err: any) {
|
||||
logger.error(`[CrawlerWorker] Task ${job.id} failed: ${err.message}`);
|
||||
|
||||
// 错误审计
|
||||
await AuditService.log({
|
||||
tenantId,
|
||||
shopId,
|
||||
taskId,
|
||||
traceId,
|
||||
userId,
|
||||
module: 'SYNC',
|
||||
action: 'CRAWLER_ASYNC_FAILED',
|
||||
resourceType: 'product',
|
||||
resourceId: url,
|
||||
result: 'failed',
|
||||
errorCode: 'CRAWLER_WORKER_ERROR',
|
||||
errorMessage: err.message,
|
||||
source: 'node'
|
||||
});
|
||||
|
||||
throw err;
|
||||
}
|
||||
}, 10); // 并发数限制为 10
|
||||
}
|
||||
|
||||
/**
|
||||
* 提交采集任务到队列
|
||||
*/
|
||||
static async submit(data: {
|
||||
url: string;
|
||||
sandbox?: boolean;
|
||||
traceContext: any;
|
||||
}) {
|
||||
const queue = WorkerHub.getQueue(this.QUEUE_NAME);
|
||||
return await queue.add(`crawl-${Date.now()}`, data, {
|
||||
attempts: 3,
|
||||
backoff: { type: 'exponential', delay: 1000 }
|
||||
});
|
||||
}
|
||||
}
|
||||
70
server/src/workers/WorkerHub.ts
Normal file
70
server/src/workers/WorkerHub.ts
Normal file
@@ -0,0 +1,70 @@
|
||||
import { Job, Queue, Worker } from 'bullmq';
|
||||
import IORedis from 'ioredis';
|
||||
import { logger } from '../utils/logger';
|
||||
|
||||
const REDIS_HOST = process.env.REDIS_HOST || '127.0.0.1';
|
||||
const REDIS_PORT = Number(process.env.REDIS_PORT) || 6379;
|
||||
|
||||
const connection = new IORedis({
|
||||
host: REDIS_HOST,
|
||||
port: REDIS_PORT,
|
||||
maxRetriesPerRequest: null,
|
||||
});
|
||||
|
||||
/**
|
||||
* [CORE_WORK_01] 容器化采集调度中心 (Worker Hub)
|
||||
* @description 统一管理 BullMQ 队列与 Worker,支持异步采集、发布与同步任务
|
||||
*/
|
||||
export class WorkerHub {
|
||||
private static queues: Record<string, Queue> = {};
|
||||
private static workers: Record<string, Worker> = {};
|
||||
|
||||
/**
|
||||
* 初始化队列
|
||||
*/
|
||||
static getQueue(name: string): Queue {
|
||||
if (!this.queues[name]) {
|
||||
this.queues[name] = new Queue(name, { connection: connection as any });
|
||||
logger.info(`[WorkerHub] Queue initialized: ${name}`);
|
||||
}
|
||||
return this.queues[name];
|
||||
}
|
||||
|
||||
/**
|
||||
* 注册 Worker
|
||||
*/
|
||||
static registerWorker(name: string, processor: (job: Job) => Promise<any>, concurrency: number = 5) {
|
||||
if (this.workers[name]) return;
|
||||
|
||||
const worker = new Worker(name, processor, {
|
||||
connection: connection as any,
|
||||
concurrency,
|
||||
removeOnComplete: { count: 100 },
|
||||
removeOnFail: { count: 500 }
|
||||
});
|
||||
|
||||
worker.on('completed', (job) => {
|
||||
logger.info(`[WorkerHub] Job ${name}:${job.id} completed`);
|
||||
});
|
||||
|
||||
worker.on('failed', (job, err) => {
|
||||
logger.error(`[WorkerHub] Job ${name}:${job?.id} failed: ${err.message}`);
|
||||
});
|
||||
|
||||
this.workers[name] = worker;
|
||||
logger.info(`[WorkerHub] Worker registered: ${name} (concurrency: ${concurrency})`);
|
||||
}
|
||||
|
||||
/**
|
||||
* 关闭所有连接
|
||||
*/
|
||||
static async shutdown() {
|
||||
for (const worker of Object.values(this.workers)) {
|
||||
await worker.close();
|
||||
}
|
||||
for (const queue of Object.values(this.queues)) {
|
||||
await queue.close();
|
||||
}
|
||||
await connection.quit();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user