import { Queue } from 'bullmq'; import db from '../config/database'; import { logger } from '../utils/logger'; import { DomainEventBus, DomainEvent } from '../core/runtime/DomainEventBus'; export interface AuditLogEntry { tenantId: string; shopId?: string; taskId?: string; traceId: string; userId: string; roleCode?: string; module: string; action: string; resourceType: string; resourceId?: string; beforeSnapshot?: any; afterSnapshot?: any; result: 'success' | 'failed'; errorCode?: string; errorMessage?: string; clientIp?: string; userAgent?: string; source: 'console' | 'extension' | 'node'; metadata?: Record; timestamp: number; } /** * @description 跨境风控审计内核 (CORE_DEV_04 / BIZ_GOV_20) * 实现基于 BullMQ 的异步审计日志系统,并对接 DomainEventBus 实现全量业务事件审计。 */ export class AuditService { /** * 初始化数据库表并启动事件监听 (BIZ_GOV_20) */ static async initTable() { // 启动全量事件监听 (BIZ_GOV_20) this.startEventListener(); // 1. Audit Log 表 const hasAuditLogTable = await db.schema.hasTable('cf_audit_log'); if (!hasAuditLogTable) { console.log('📦 Creating cf_audit_log table...'); await db.schema.createTable('cf_audit_log', (table) => { table.increments('id').primary(); table.string('user_id', 100); table.string('action', 100).notNullable(); table.string('target_type', 100).notNullable(); table.string('target_id', 100).notNullable(); table.string('tenant_id', 100).index(); table.string('shop_id', 100).index(); table.string('trace_id', 100).index(); table.text('old_data'); table.text('new_data'); table.text('metadata'); table.timestamp('created_at').defaultTo(db.fn.now()); table.index(['action', 'target_type', 'target_id'], 'idx_audit_action_target'); table.index(['tenant_id', 'shop_id'], 'idx_audit_tenant_shop'); }); console.log('✅ Table cf_audit_log created'); } // 2. Operation Log 表 const hasOpLogTable = await db.schema.hasTable('cf_operation_log'); if (!hasOpLogTable) { console.log('📦 Creating cf_operation_log table...'); await db.schema.createTable('cf_operation_log', (table) => { table.bigIncrements('id').primary(); table.string('tenant_id', 64).notNullable(); table.string('shop_id', 64); table.string('task_id', 64); table.string('trace_id', 128).notNullable(); table.string('user_id', 64).notNullable(); table.string('role_code', 32); table.string('module', 64).notNullable(); table.string('action', 64).notNullable(); table.string('resource_type', 64).notNullable(); table.string('resource_id', 128); table.json('before_snapshot'); table.json('after_snapshot'); table.string('result', 16).notNullable(); table.string('error_code', 64); table.string('error_message', 512); table.string('client_ip', 64); table.string('user_agent', 512); table.string('source', 16).notNullable(); table.timestamp('created_at').defaultTo(db.fn.now()); table.index(['tenant_id', 'created_at'], 'idx_oplog_tenant_time'); table.index(['trace_id'], 'idx_oplog_trace'); table.index(['user_id', 'created_at'], 'idx_oplog_user_time'); table.index(['task_id'], 'idx_oplog_task'); table.index(['module', 'action', 'created_at'], 'idx_oplog_module_action_time'); }); console.log('✅ Table cf_operation_log created'); } } /** * 监听 DomainEventBus 上的所有事件并转化为审计日志 */ private static startEventListener() { logger.info('[AuditService] Starting DomainEventBus listener for full-pipeline auditing...'); DomainEventBus.getInstance().subscribeAll(async (event: DomainEvent) => { try { await this.log({ tenantId: event.tenantId, traceId: event.traceId || `bus-${Date.now()}`, userId: event.userId || 'EVENT_BUS', module: event.module, action: event.action, resourceType: event.resourceType, resourceId: event.resourceId, afterSnapshot: event.data, result: 'success', source: 'node', metadata: { eventTimestamp: event.timestamp } }); } catch (err: any) { logger.error(`[AuditService] Failed to process bus event: ${err.message}`); } }); } private static auditQueue = new Queue('audit-log', { connection: { host: process.env.REDIS_HOST || '127.0.0.1', port: parseInt(process.env.REDIS_PORT || '6379'), } }); /** * [ERP_MST_02] 记录配置项变更审计 (Granular Config Audit) * @description 针对 ERP 核心配置(如税码、财务科目、费率模板)的颗粒度变更进行审计溯源。 */ static async logConfigChange(params: { tenantId: string; userId: string; traceId: string; configKey: string; oldValue: any; newValue: any; changeReason?: string; metadata?: any; }) { logger.info(`[AuditService] Recording config change for ${params.configKey} (Tenant: ${params.tenantId})`); await this.log({ tenantId: params.tenantId, userId: params.userId, traceId: params.traceId, module: 'ERP_CONFIG', action: 'CONFIG_CHANGE', resourceType: 'SYSTEM_CONFIG', resourceId: params.configKey, beforeSnapshot: params.oldValue, afterSnapshot: params.newValue, result: 'success', source: 'node', metadata: { ...params.metadata, changeReason: params.changeReason } }); } /** * @description 异步记录审计日志 */ static async log(entry: Omit) { try { const fullEntry: AuditLogEntry = { ...entry, timestamp: Date.now(), }; // 1. 自动敏感数据脱敏 this.maskSensitiveData(fullEntry); // 2. 加入队列异步处理 await this.auditQueue.add('log', fullEntry, { removeOnComplete: true, removeOnFail: 1000, }); logger.info(`[Audit] Job added to queue: ${entry.action} on ${entry.resourceType}:${entry.resourceId}`); } catch (err: any) { logger.error(`[Audit] Failed to queue audit log: ${err.message}`); } } /** * @description 系统后台任务审计日志 (无 Request 上下文) */ static async logSystem(params: { action: string; resourceType: string; resourceId: string; beforeSnapshot?: any; afterSnapshot?: any; metadata?: any; }) { await this.log({ tenantId: 'SYSTEM', traceId: `sys-${Date.now()}`, userId: 'SYSTEM_BOT', module: 'SYSTEM', source: 'node', result: 'success', ...params }); } /** * @description 对比两个对象并记录差异 */ static async logDiff( params: { tenantId: string; shopId?: string; taskId?: string; traceId: string; userId: string; module: string; action: string; resourceType: string; resourceId: string; beforeSnapshot: any; afterSnapshot: any; source: 'console' | 'extension' | 'node'; } ) { const { beforeSnapshot, afterSnapshot, ...rest } = params; const diff = this.getDiff(beforeSnapshot, afterSnapshot); if (Object.keys(diff.changed).length === 0) return; await this.log({ ...rest, beforeSnapshot: diff.old, afterSnapshot: diff.new, result: 'success', metadata: { diffKeys: Object.keys(diff.changed) } }); } private static maskSensitiveData(entry: any) { const sensitiveKeys = ['password', 'token', 'secret', 'costPrice', 'apiKey', 'encryptedData', 'iv', 'tag']; const mask = (obj: any) => { if (!obj || typeof obj !== 'object') return; for (const key in obj) { if (sensitiveKeys.some(sk => key.toLowerCase().includes(sk.toLowerCase()))) { obj[key] = '***MASKED***'; } else if (typeof obj[key] === 'object') { mask(obj[key]); } } }; mask(entry.beforeSnapshot); mask(entry.afterSnapshot); } private static getDiff(oldData: any, newData: any) { const diff: { old: any; new: any; changed: any } = { old: {}, new: {}, changed: {} }; for (const key in newData) { if (JSON.stringify(oldData[key]) !== JSON.stringify(newData[key])) { diff.old[key] = oldData[key]; diff.new[key] = newData[key]; diff.changed[key] = true; } } return diff; } }