import { MessageConfig, MessageStatus, MessageQueue, Message } from '../types/message'; import { logger } from '../utils/logger'; import { v4 as uuidv4 } from 'uuid'; /** * 消息处理服务 * 负责管理系统中的消息,包括消息的发送、接收、处理等 */ export class MessageProcessingService { private static messageQueues: Map = new Map(); private static messages: Map = new Map(); private static messageStatus: Map = new Map(); /** * 创建消息队列 * @param config 消息队列配置 * @returns 创建结果 */ static async createMessageQueue(config: MessageConfig): Promise<{ success: boolean; queueId: string; message: string }> { try { if (this.messageQueues.has(config.id)) { return { success: false, queueId: '', message: 'Message queue already exists' }; } const queue: MessageQueue = { id: config.id, name: config.name, type: config.type, maxSize: config.maxSize || 10000, retentionPeriod: config.retentionPeriod || 86400000, // 24 hours created: new Date(), messages: [] }; this.messageQueues.set(config.id, queue); logger.info(`Message queue ${config.id} created successfully`); return { success: true, queueId: config.id, message: 'Message queue created successfully' }; } catch (error) { logger.error(`Error creating message queue: ${error}`); return { success: false, queueId: '', message: `Error creating message queue: ${error}` }; } } /** * 发送消息 * @param queueId 队列ID * @param payload 消息内容 * @returns 发送结果 */ static async sendMessage(queueId: string, payload: any): Promise<{ success: boolean; messageId: string; message: string }> { try { const queue = this.messageQueues.get(queueId); if (!queue) { return { success: false, messageId: '', message: 'Message queue not found' }; } // 检查队列大小 if (queue.messages.length >= queue.maxSize) { return { success: false, messageId: '', message: 'Message queue is full' }; } const messageId = uuidv4(); const message: Message = { id: messageId, queueId, payload, status: 'PENDING', created: new Date(), updated: new Date() }; // 添加到队列 queue.messages.push(messageId); this.messages.set(messageId, message); this.messageStatus.set(messageId, { status: 'PENDING', lastUpdated: new Date() }); logger.info(`Message ${messageId} sent to queue ${queueId}`); return { success: true, messageId, message: 'Message sent successfully' }; } catch (error) { logger.error(`Error sending message: ${error}`); return { success: false, messageId: '', message: `Error sending message: ${error}` }; } } /** * 接收消息 * @param queueId 队列ID * @returns 消息 */ static async receiveMessage(queueId: string): Promise { try { const queue = this.messageQueues.get(queueId); if (!queue) { return null; } if (queue.messages.length === 0) { return null; } // 取出第一个消息 const messageId = queue.messages.shift(); if (!messageId) { return null; } const message = this.messages.get(messageId); if (!message) { return null; } // 更新消息状态 message.status = 'PROCESSING'; message.updated = new Date(); this.messages.set(messageId, message); this.messageStatus.set(messageId, { status: 'PROCESSING', lastUpdated: new Date() }); logger.info(`Message ${messageId} received from queue ${queueId}`); return message; } catch (error) { logger.error(`Error receiving message: ${error}`); return null; } } /** * 确认消息处理完成 * @param messageId 消息ID * @returns 确认结果 */ static async acknowledgeMessage(messageId: string): Promise<{ success: boolean; message: string }> { try { const message = this.messages.get(messageId); if (!message) { return { success: false, message: 'Message not found' }; } // 更新消息状态 message.status = 'COMPLETED'; message.updated = new Date(); this.messages.set(messageId, message); this.messageStatus.set(messageId, { status: 'COMPLETED', lastUpdated: new Date() }); logger.info(`Message ${messageId} acknowledged`); return { success: true, message: 'Message acknowledged successfully' }; } catch (error) { logger.error(`Error acknowledging message: ${error}`); return { success: false, message: `Error acknowledging message: ${error}` }; } } /** * 拒绝消息 * @param messageId 消息ID * @param requeue 是否重新入队 * @returns 拒绝结果 */ static async rejectMessage(messageId: string, requeue: boolean = false): Promise<{ success: boolean; message: string }> { try { const message = this.messages.get(messageId); if (!message) { return { success: false, message: 'Message not found' }; } // 更新消息状态 message.status = 'REJECTED'; message.updated = new Date(); this.messages.set(messageId, message); this.messageStatus.set(messageId, { status: 'REJECTED', lastUpdated: new Date() }); // 如果需要重新入队 if (requeue) { const queue = this.messageQueues.get(message.queueId); if (queue) { queue.messages.push(messageId); } } logger.info(`Message ${messageId} rejected${requeue ? ' and requeued' : ''}`); return { success: true, message: 'Message rejected successfully' }; } catch (error) { logger.error(`Error rejecting message: ${error}`); return { success: false, message: `Error rejecting message: ${error}` }; } } /** * 获取消息状态 * @param messageId 消息ID * @returns 消息状态 */ static async getMessageStatus(messageId: string): Promise { try { const status = this.messageStatus.get(messageId); if (!status) { return null; } return status; } catch (error) { logger.error(`Error getting message status: ${error}`); return null; } } /** * 获取队列状态 * @param queueId 队列ID * @returns 队列状态 */ static async getQueueStatus(queueId: string): Promise<{ queueId: string; size: number; maxSize: number; messages: Message[] } | null> { try { const queue = this.messageQueues.get(queueId); if (!queue) { return null; } // 获取队列中的消息 const queueMessages: Message[] = []; for (const messageId of queue.messages) { const message = this.messages.get(messageId); if (message) { queueMessages.push(message); } } return { queueId, size: queue.messages.length, maxSize: queue.maxSize, messages: queueMessages }; } catch (error) { logger.error(`Error getting queue status: ${error}`); return null; } } /** * 列出所有队列 * @returns 队列列表 */ static async listQueues(): Promise { try { return Array.from(this.messageQueues.values()); } catch (error) { logger.error(`Error listing queues: ${error}`); return []; } } /** * 删除队列 * @param queueId 队列ID * @returns 删除结果 */ static async deleteQueue(queueId: string): Promise<{ success: boolean; message: string }> { try { const queue = this.messageQueues.get(queueId); if (!queue) { return { success: false, message: 'Message queue not found' }; } // 删除队列中的消息 for (const messageId of queue.messages) { this.messages.delete(messageId); this.messageStatus.delete(messageId); } // 删除队列 this.messageQueues.delete(queueId); logger.info(`Message queue ${queueId} deleted successfully`); return { success: true, message: 'Message queue deleted successfully' }; } catch (error) { logger.error(`Error deleting queue: ${error}`); return { success: false, message: `Error deleting queue: ${error}` }; } } /** * 清空队列 * @param queueId 队列ID * @returns 清空结果 */ static async clearQueue(queueId: string): Promise<{ success: boolean; clearedCount: number; message: string }> { try { const queue = this.messageQueues.get(queueId); if (!queue) { return { success: false, clearedCount: 0, message: 'Message queue not found' }; } const clearedCount = queue.messages.length; // 删除队列中的消息 for (const messageId of queue.messages) { this.messages.delete(messageId); this.messageStatus.delete(messageId); } // 清空队列 queue.messages = []; this.messageQueues.set(queueId, queue); logger.info(`Message queue ${queueId} cleared, removed ${clearedCount} messages`); return { success: true, clearedCount, message: 'Message queue cleared successfully' }; } catch (error) { logger.error(`Error clearing queue: ${error}`); return { success: false, clearedCount: 0, message: `Error clearing queue: ${error}` }; } } }