import { RedisService } from '../../utils/RedisService'; import { logger } from '../../utils/logger'; export type EventCallback = (payload: any) => Promise; /** * [BIZ_KER_115] 核心 Domain 间异步解耦建议 (EventBusOptimizationService) * @description 核心逻辑:提供高性能的进程内/跨进程事件分发机制。 * 支持基于 Redis Pub/Sub 的分布式事件同步,以及内存中的快速分发。 * 用于解耦 Order, Product, Logistics 等核心领域,降低同步调用链深度。 */ export class EventBusOptimizationService { private static handlers: Map = new Map(); private static readonly REDIS_EVENT_CHANNEL = 'core:event_bus:broadcast'; /** * 初始化分布式事件监听 */ static async init() { const redis = RedisService.getClient(); // 订阅 Redis 广播通道 await RedisService.subscribeToStateSync('EVENT_BUS', async (msg) => { if (msg.type === 'BROADCAST_EVENT') { await this.emitLocal(msg.event, msg.payload); } }); logger.info(`[EventBus] Distributed event bus initialized`); } /** * 订阅事件 */ static subscribe(event: string, callback: EventCallback) { const callbacks = this.handlers.get(event) || []; callbacks.push(callback); this.handlers.set(event, callbacks); logger.info(`[EventBus] Subscribed to event: ${event}`); } /** * 发布本地事件 (进程内) */ static async emitLocal(event: string, payload: any) { const callbacks = this.handlers.get(event); if (callbacks) { for (const cb of callbacks) { try { await cb(payload); } catch (err: any) { logger.error(`[EventBus] Handler for ${event} failed: ${err.message}`); } } } } /** * 发布全局事件 (跨进程/分布式) */ static async emitGlobal(event: string, payload: any) { // 1. 先触发本地监听 await this.emitLocal(event, payload); // 2. 通过 Redis 广播到其它节点 try { await RedisService.broadcastStateChange('EVENT_BUS', { type: 'BROADCAST_EVENT', event, payload, timestamp: Date.now() }); } catch (err: any) { logger.error(`[EventBus] Global emit failed: ${err.message}`); } } }