feat: 实现前端组件库和API服务基础架构
refactor: 移除废弃的AGI策略演进服务 fix: 修正磁盘I/O指标字段命名 chore: 更新项目依赖版本 test: 添加前后端集成测试用例 docs: 更新AI模块接口文档 style: 统一审计日志字段命名规范 perf: 优化Redis订阅连接错误处理 build: 配置多项目工作区结构 ci: 添加Vite开发服务器CORS支持
This commit is contained in:
@@ -5,11 +5,43 @@ import { logger } from '../utils/logger';
|
||||
const REDIS_HOST = process.env.REDIS_HOST || '127.0.0.1';
|
||||
const REDIS_PORT = Number(process.env.REDIS_PORT) || 6379;
|
||||
|
||||
const connection = new IORedis({
|
||||
host: REDIS_HOST,
|
||||
port: REDIS_PORT,
|
||||
maxRetriesPerRequest: null,
|
||||
});
|
||||
let connection: IORedis | null = null;
|
||||
let redisConnected = false;
|
||||
|
||||
// 尝试连接Redis
|
||||
const connectRedis = () => {
|
||||
try {
|
||||
connection = new IORedis({
|
||||
host: REDIS_HOST,
|
||||
port: REDIS_PORT,
|
||||
maxRetriesPerRequest: null,
|
||||
retryStrategy: (times) => {
|
||||
// 指数退避重试
|
||||
return Math.min(times * 100, 3000);
|
||||
},
|
||||
});
|
||||
|
||||
connection.on('connect', () => {
|
||||
logger.info('[WorkerHub] Redis connected');
|
||||
redisConnected = true;
|
||||
});
|
||||
|
||||
connection.on('error', (err) => {
|
||||
logger.warn(`[WorkerHub] Redis connection error: ${err.message}`);
|
||||
redisConnected = false;
|
||||
});
|
||||
|
||||
connection.on('end', () => {
|
||||
logger.info('[WorkerHub] Redis connection ended');
|
||||
redisConnected = false;
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error(`[WorkerHub] Failed to create Redis connection: ${(error as any).message}`);
|
||||
}
|
||||
};
|
||||
|
||||
// 初始化Redis连接
|
||||
connectRedis();
|
||||
|
||||
/**
|
||||
* [CORE_WORK_01] 容器化采集调度中心 (Worker Hub)
|
||||
@@ -19,52 +51,86 @@ export class WorkerHub {
|
||||
private static queues: Record<string, Queue> = {};
|
||||
private static workers: Record<string, Worker> = {};
|
||||
|
||||
/**
|
||||
* 检查Redis连接状态
|
||||
*/
|
||||
private static isRedisConnected(): boolean {
|
||||
return redisConnected && connection !== null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 初始化队列
|
||||
*/
|
||||
static getQueue(name: string): Queue {
|
||||
if (!this.queues[name]) {
|
||||
this.queues[name] = new Queue(name, { connection: connection as any });
|
||||
logger.info(`[WorkerHub] Queue initialized: ${name}`);
|
||||
static getQueue(name: string): Queue | null {
|
||||
if (!this.isRedisConnected()) {
|
||||
logger.warn(`[WorkerHub] Redis not connected, queue ${name} not initialized`);
|
||||
return null;
|
||||
}
|
||||
return this.queues[name];
|
||||
|
||||
if (!this.queues[name] && connection) {
|
||||
try {
|
||||
this.queues[name] = new Queue(name, { connection: connection as any });
|
||||
logger.info(`[WorkerHub] Queue initialized: ${name}`);
|
||||
} catch (error) {
|
||||
logger.error(`[WorkerHub] Failed to initialize queue ${name}: ${(error as any).message}`);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
return this.queues[name] || null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 注册 Worker
|
||||
*/
|
||||
static registerWorker(name: string, processor: (job: Job) => Promise<any>, concurrency: number = 5) {
|
||||
if (!this.isRedisConnected()) {
|
||||
logger.warn(`[WorkerHub] Redis not connected, worker ${name} not registered`);
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.workers[name]) return;
|
||||
|
||||
const worker = new Worker(name, processor, {
|
||||
connection: connection as any,
|
||||
concurrency,
|
||||
removeOnComplete: { count: 100 },
|
||||
removeOnFail: { count: 500 }
|
||||
});
|
||||
if (connection) {
|
||||
try {
|
||||
const worker = new Worker(name, processor, {
|
||||
connection: connection as any,
|
||||
concurrency,
|
||||
removeOnComplete: { count: 100 },
|
||||
removeOnFail: { count: 500 }
|
||||
});
|
||||
|
||||
worker.on('completed', (job) => {
|
||||
logger.info(`[WorkerHub] Job ${name}:${job.id} completed`);
|
||||
});
|
||||
worker.on('completed', (job) => {
|
||||
logger.info(`[WorkerHub] Job ${name}:${job.id} completed`);
|
||||
});
|
||||
|
||||
worker.on('failed', (job, err) => {
|
||||
logger.error(`[WorkerHub] Job ${name}:${job?.id} failed: ${err.message}`);
|
||||
});
|
||||
worker.on('failed', (job, err) => {
|
||||
logger.error(`[WorkerHub] Job ${name}:${job?.id} failed: ${err.message}`);
|
||||
});
|
||||
|
||||
this.workers[name] = worker;
|
||||
logger.info(`[WorkerHub] Worker registered: ${name} (concurrency: ${concurrency})`);
|
||||
this.workers[name] = worker;
|
||||
logger.info(`[WorkerHub] Worker registered: ${name} (concurrency: ${concurrency})`);
|
||||
} catch (error) {
|
||||
logger.error(`[WorkerHub] Failed to register worker ${name}: ${(error as any).message}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 关闭所有连接
|
||||
*/
|
||||
static async shutdown() {
|
||||
for (const worker of Object.values(this.workers)) {
|
||||
await worker.close();
|
||||
try {
|
||||
for (const worker of Object.values(this.workers)) {
|
||||
await worker.close();
|
||||
}
|
||||
for (const queue of Object.values(this.queues)) {
|
||||
await queue.close();
|
||||
}
|
||||
if (connection) {
|
||||
await connection.quit();
|
||||
}
|
||||
} catch (error) {
|
||||
logger.warn(`[WorkerHub] Error during shutdown: ${(error as any).message}`);
|
||||
}
|
||||
for (const queue of Object.values(this.queues)) {
|
||||
await queue.close();
|
||||
}
|
||||
await connection.quit();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user