feat: 新增多模块功能与服务实现
新增广告计划、用户资产、B2B交易、合规规则等核心模型 实现爬虫工作器、贸易服务、现金流预测等业务服务 添加RBAC权限测试、压力测试等测试用例 完善扩展程序的消息处理与内容脚本功能 重构应用入口与文档生成器 更新项目规则与业务闭环分析文档
This commit is contained in:
473
server/src/workers/CertificateReminderWorker.ts
Normal file
473
server/src/workers/CertificateReminderWorker.ts
Normal file
@@ -0,0 +1,473 @@
|
||||
import { Worker, Job } from 'bullmq';
|
||||
import db from '../config/database';
|
||||
import { logger } from '../utils/logger';
|
||||
import { CertificateService } from '../services/CertificateService';
|
||||
import { Certificate } from '../models/Certificate';
|
||||
|
||||
/**
|
||||
* ReminderType - 提醒类型
|
||||
*/
|
||||
export type ReminderType =
|
||||
| 'CERTIFICATE_EXPIRING_30'
|
||||
| 'CERTIFICATE_EXPIRING_7'
|
||||
| 'CERTIFICATE_EXPIRING_1'
|
||||
| 'CERTIFICATE_EXPIRED';
|
||||
|
||||
/**
|
||||
* ReminderNotification - 提醒通知
|
||||
*/
|
||||
export interface ReminderNotification {
|
||||
id?: string;
|
||||
tenantId: string;
|
||||
shopId: string;
|
||||
certificateId: string;
|
||||
certificateName: string;
|
||||
certificateType: string;
|
||||
expiryDate: Date;
|
||||
reminderType: ReminderType;
|
||||
daysRemaining: number;
|
||||
message: string;
|
||||
isSent: boolean;
|
||||
sentAt?: Date;
|
||||
createdAt?: Date;
|
||||
}
|
||||
|
||||
/**
|
||||
* CertificateReminderWorker - 证书更新提醒 Worker
|
||||
*
|
||||
* [BE-COM003] 证书更新提醒
|
||||
* @description 定期检查即将到期的证书并发送提醒通知
|
||||
*
|
||||
* 功能定位:
|
||||
* - 定时扫描即将到期的证书
|
||||
* - 发送多级提醒(30天、7天、1天、已过期)
|
||||
* - 支持多种通知渠道
|
||||
*
|
||||
* 安全约束:
|
||||
* - 所有操作必须携带五元组追踪信息
|
||||
* - 通知发送必须记录日志
|
||||
*
|
||||
* @author AI-Backend-7
|
||||
* @taskId BE-COM003
|
||||
*/
|
||||
export class CertificateReminderWorker {
|
||||
private static readonly NOTIFICATION_TABLE = 'cf_certificate_notification';
|
||||
private static readonly QUEUE_NAME = 'certificate.reminder';
|
||||
|
||||
private static worker: Worker | null = null;
|
||||
private static reminderInterval: NodeJS.Timeout | null = null;
|
||||
|
||||
/**
|
||||
* 初始化数据库表
|
||||
*/
|
||||
static async initTable(): Promise<void> {
|
||||
const hasTable = await db.schema.hasTable(this.NOTIFICATION_TABLE);
|
||||
if (!hasTable) {
|
||||
logger.info(`[CertificateReminderWorker] Creating ${this.NOTIFICATION_TABLE} table...`);
|
||||
await db.schema.createTable(this.NOTIFICATION_TABLE, (table) => {
|
||||
table.string('id', 36).primary();
|
||||
table.string('tenant_id', 64).notNullable().index();
|
||||
table.string('shop_id', 64).notNullable().index();
|
||||
table.string('certificate_id', 64).notNullable().index();
|
||||
table.string('certificate_name', 255).notNullable();
|
||||
table.string('certificate_type', 32).notNullable();
|
||||
table.date('expiry_date').notNullable();
|
||||
table.enum('reminder_type', [
|
||||
'CERTIFICATE_EXPIRING_30',
|
||||
'CERTIFICATE_EXPIRING_7',
|
||||
'CERTIFICATE_EXPIRING_1',
|
||||
'CERTIFICATE_EXPIRED'
|
||||
]).notNullable();
|
||||
table.integer('days_remaining').notNullable();
|
||||
table.text('message').notNullable();
|
||||
table.boolean('is_sent').defaultTo(false).notNullable();
|
||||
table.timestamp('sent_at');
|
||||
table.timestamp('created_at').defaultTo(db.fn.now());
|
||||
|
||||
table.index(['tenant_id', 'is_sent'], 'idx_notif_tenant_sent');
|
||||
table.index(['certificate_id', 'reminder_type'], 'idx_notif_cert_type');
|
||||
});
|
||||
logger.info(`[CertificateReminderWorker] Table ${this.NOTIFICATION_TABLE} created`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 启动 Worker
|
||||
*/
|
||||
static async start(): Promise<void> {
|
||||
await this.initTable();
|
||||
|
||||
this.worker = new Worker(
|
||||
this.QUEUE_NAME,
|
||||
async (job: Job) => {
|
||||
return await this.processJob(job);
|
||||
},
|
||||
{
|
||||
connection: {
|
||||
host: process.env.REDIS_HOST || 'localhost',
|
||||
port: Number(process.env.REDIS_PORT) || 6379,
|
||||
},
|
||||
concurrency: 5,
|
||||
}
|
||||
);
|
||||
|
||||
this.worker.on('completed', (job) => {
|
||||
logger.info(`[CertificateReminderWorker] Job completed: ${job.id}`);
|
||||
});
|
||||
|
||||
this.worker.on('failed', (job, error) => {
|
||||
logger.error(`[CertificateReminderWorker] Job failed: ${job?.id}, error: ${error.message}`);
|
||||
});
|
||||
|
||||
this.reminderInterval = setInterval(async () => {
|
||||
await this.scanAndNotify();
|
||||
}, 24 * 60 * 60 * 1000);
|
||||
|
||||
logger.info('[CertificateReminderWorker] Worker started');
|
||||
|
||||
await this.scanAndNotify();
|
||||
}
|
||||
|
||||
/**
|
||||
* 停止 Worker
|
||||
*/
|
||||
static async stop(): Promise<void> {
|
||||
if (this.reminderInterval) {
|
||||
clearInterval(this.reminderInterval);
|
||||
this.reminderInterval = null;
|
||||
}
|
||||
|
||||
if (this.worker) {
|
||||
await this.worker.close();
|
||||
this.worker = null;
|
||||
}
|
||||
|
||||
logger.info('[CertificateReminderWorker] Worker stopped');
|
||||
}
|
||||
|
||||
/**
|
||||
* 扫描并发送提醒
|
||||
*/
|
||||
static async scanAndNotify(): Promise<void> {
|
||||
logger.info('[CertificateReminderWorker] Starting certificate expiry scan...');
|
||||
|
||||
try {
|
||||
await this.processExpiringCertificates(30, 'CERTIFICATE_EXPIRING_30');
|
||||
await this.processExpiringCertificates(7, 'CERTIFICATE_EXPIRING_7');
|
||||
await this.processExpiringCertificates(1, 'CERTIFICATE_EXPIRING_1');
|
||||
await this.processExpiredCertificates();
|
||||
|
||||
await CertificateService.markExpiredCertificates();
|
||||
|
||||
logger.info('[CertificateReminderWorker] Certificate expiry scan completed');
|
||||
} catch (error: any) {
|
||||
logger.error(`[CertificateReminderWorker] Scan failed: ${error.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理即将到期的证书
|
||||
* @param days 天数
|
||||
* @param reminderType 提醒类型
|
||||
*/
|
||||
private static async processExpiringCertificates(
|
||||
days: number,
|
||||
reminderType: ReminderType
|
||||
): Promise<void> {
|
||||
const certificates = await CertificateService.getExpiringCertificates(days);
|
||||
|
||||
for (const cert of certificates) {
|
||||
const daysRemaining = this.calculateDaysRemaining(cert.expiryDate);
|
||||
|
||||
if (daysRemaining <= days && daysRemaining > days - 1) {
|
||||
await this.createAndSendNotification(cert, reminderType, daysRemaining);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理已过期的证书
|
||||
*/
|
||||
private static async processExpiredCertificates(): Promise<void> {
|
||||
const now = new Date();
|
||||
const rows = await db('cf_certificate')
|
||||
.where('status', 'APPROVED')
|
||||
.where('expiry_date', '<', now);
|
||||
|
||||
for (const row of rows) {
|
||||
const cert: Certificate = {
|
||||
id: row.id,
|
||||
tenantId: row.tenant_id,
|
||||
shopId: row.shop_id,
|
||||
taskId: row.task_id,
|
||||
traceId: row.trace_id,
|
||||
businessType: row.business_type,
|
||||
certificateName: row.certificate_name,
|
||||
certificateType: row.certificate_type,
|
||||
certificateNo: row.certificate_no,
|
||||
productId: row.product_id,
|
||||
productName: row.product_name,
|
||||
issuer: row.issuer,
|
||||
issueDate: row.issue_date,
|
||||
expiryDate: row.expiry_date,
|
||||
status: row.status,
|
||||
fileUrl: row.file_url,
|
||||
fileHash: row.file_hash,
|
||||
remarks: row.remarks,
|
||||
createdAt: row.created_at,
|
||||
updatedAt: row.updated_at,
|
||||
};
|
||||
|
||||
await this.createAndSendNotification(cert, 'CERTIFICATE_EXPIRED', 0);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建并发送通知
|
||||
*/
|
||||
private static async createAndSendNotification(
|
||||
cert: Certificate,
|
||||
reminderType: ReminderType,
|
||||
daysRemaining: number
|
||||
): Promise<void> {
|
||||
const existingNotification = await db(this.NOTIFICATION_TABLE)
|
||||
.where({
|
||||
certificate_id: cert.id,
|
||||
reminder_type: reminderType,
|
||||
})
|
||||
.first();
|
||||
|
||||
if (existingNotification) {
|
||||
logger.info(`[CertificateReminderWorker] Notification already exists: certId=${cert.id}, type=${reminderType}`);
|
||||
return;
|
||||
}
|
||||
|
||||
const notification: ReminderNotification = {
|
||||
id: this.generateId(),
|
||||
tenantId: cert.tenantId,
|
||||
shopId: cert.shopId,
|
||||
certificateId: cert.id!,
|
||||
certificateName: cert.certificateName,
|
||||
certificateType: cert.certificateType,
|
||||
expiryDate: cert.expiryDate,
|
||||
reminderType,
|
||||
daysRemaining,
|
||||
message: this.generateMessage(cert, reminderType, daysRemaining),
|
||||
isSent: false,
|
||||
createdAt: new Date(),
|
||||
};
|
||||
|
||||
await db(this.NOTIFICATION_TABLE).insert({
|
||||
id: notification.id,
|
||||
tenant_id: notification.tenantId,
|
||||
shop_id: notification.shopId,
|
||||
certificate_id: notification.certificateId,
|
||||
certificate_name: notification.certificateName,
|
||||
certificate_type: notification.certificateType,
|
||||
expiry_date: notification.expiryDate,
|
||||
reminder_type: notification.reminderType,
|
||||
days_remaining: notification.daysRemaining,
|
||||
message: notification.message,
|
||||
is_sent: false,
|
||||
created_at: notification.createdAt,
|
||||
});
|
||||
|
||||
await this.sendNotification(notification);
|
||||
|
||||
logger.info(`[CertificateReminderWorker] Notification created: certId=${cert.id}, type=${reminderType}, daysRemaining=${daysRemaining}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送通知
|
||||
*/
|
||||
private static async sendNotification(notification: ReminderNotification): Promise<void> {
|
||||
try {
|
||||
logger.info(`[CertificateReminderWorker] Sending notification: id=${notification.id}, type=${notification.reminderType}, certId=${notification.certificateId}`);
|
||||
|
||||
await this.sendSystemNotification(notification);
|
||||
await this.sendEmailNotification(notification);
|
||||
|
||||
await db(this.NOTIFICATION_TABLE)
|
||||
.where({ id: notification.id })
|
||||
.update({
|
||||
is_sent: true,
|
||||
sent_at: new Date(),
|
||||
});
|
||||
|
||||
logger.info(`[CertificateReminderWorker] Notification sent: id=${notification.id}`);
|
||||
} catch (error: any) {
|
||||
logger.error(`[CertificateReminderWorker] Failed to send notification: id=${notification.id}, error=${error.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送系统通知
|
||||
*/
|
||||
private static async sendSystemNotification(notification: ReminderNotification): Promise<void> {
|
||||
const notificationTable = 'cf_system_notification';
|
||||
|
||||
const hasTable = await db.schema.hasTable(notificationTable);
|
||||
if (!hasTable) {
|
||||
await db.schema.createTable(notificationTable, (table) => {
|
||||
table.string('id', 36).primary();
|
||||
table.string('tenant_id', 64).notNullable().index();
|
||||
table.string('user_id', 64);
|
||||
table.string('type', 32).notNullable();
|
||||
table.string('title', 255).notNullable();
|
||||
table.text('content');
|
||||
table.boolean('is_read').defaultTo(false);
|
||||
table.timestamp('created_at').defaultTo(db.fn.now());
|
||||
});
|
||||
}
|
||||
|
||||
await db(notificationTable).insert({
|
||||
id: `SYS-NOTIF-${Date.now()}-${Math.random().toString(36).substring(2, 8)}`,
|
||||
tenant_id: notification.tenantId,
|
||||
type: 'CERTIFICATE_REMINDER',
|
||||
title: `证书到期提醒: ${notification.certificateName}`,
|
||||
content: notification.message,
|
||||
is_read: false,
|
||||
created_at: new Date(),
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送邮件通知
|
||||
*/
|
||||
private static async sendEmailNotification(notification: ReminderNotification): Promise<void> {
|
||||
logger.info(`[CertificateReminderWorker] Email notification: tenantId=${notification.tenantId}, subject=证书到期提醒: ${notification.certificateName}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理任务
|
||||
*/
|
||||
private static async processJob(job: Job): Promise<any> {
|
||||
const { action, data } = job.data;
|
||||
|
||||
switch (action) {
|
||||
case 'SCAN_EXPIRING':
|
||||
await this.scanAndNotify();
|
||||
return { success: true, scannedAt: new Date() };
|
||||
|
||||
case 'SEND_REMINDER':
|
||||
await this.sendNotification(data as ReminderNotification);
|
||||
return { success: true, notificationId: data.id };
|
||||
|
||||
default:
|
||||
throw new Error(`Unknown action: ${action}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 计算剩余天数
|
||||
*/
|
||||
private static calculateDaysRemaining(expiryDate: Date): number {
|
||||
const now = new Date();
|
||||
const expiry = new Date(expiryDate);
|
||||
const diffTime = expiry.getTime() - now.getTime();
|
||||
const diffDays = Math.ceil(diffTime / (1000 * 60 * 60 * 24));
|
||||
return diffDays;
|
||||
}
|
||||
|
||||
/**
|
||||
* 生成提醒消息
|
||||
*/
|
||||
private static generateMessage(
|
||||
cert: Certificate,
|
||||
reminderType: ReminderType,
|
||||
daysRemaining: number
|
||||
): string {
|
||||
const certInfo = `证书名称: ${cert.certificateName}\n证书类型: ${cert.certificateType}\n证书编号: ${cert.certificateNo}\n到期日期: ${new Date(cert.expiryDate).toLocaleDateString('zh-CN')}`;
|
||||
|
||||
switch (reminderType) {
|
||||
case 'CERTIFICATE_EXPIRING_30':
|
||||
return `【提醒】您的证书将在 ${daysRemaining} 天后到期,请及时安排续期。\n\n${certInfo}\n\n建议提前30天开始办理续期手续,以免影响业务运营。`;
|
||||
|
||||
case 'CERTIFICATE_EXPIRING_7':
|
||||
return `【紧急】您的证书将在 ${daysRemaining} 天后到期,请立即安排续期!\n\n${certInfo}\n\n请尽快联系发证机构办理续期,避免证书过期影响产品销售。`;
|
||||
|
||||
case 'CERTIFICATE_EXPIRING_1':
|
||||
return `【最后提醒】您的证书将在明天到期!\n\n${certInfo}\n\n请立即处理,否则证书过期后将无法继续销售相关产品。`;
|
||||
|
||||
case 'CERTIFICATE_EXPIRED':
|
||||
return `【过期通知】您的证书已过期!\n\n${certInfo}\n\n该证书已失效,相关产品可能面临下架风险。请立即办理续期或更新证书。`;
|
||||
|
||||
default:
|
||||
return `证书提醒\n\n${certInfo}`;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 生成唯一ID
|
||||
*/
|
||||
private static generateId(): string {
|
||||
return `REMIND-${Date.now()}-${Math.random().toString(36).substring(2, 10)}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取待发送通知列表
|
||||
*/
|
||||
static async getPendingNotifications(tenantId: string): Promise<ReminderNotification[]> {
|
||||
const rows = await db(this.NOTIFICATION_TABLE)
|
||||
.where('tenant_id', tenantId)
|
||||
.where('is_sent', false)
|
||||
.orderBy('created_at', 'desc');
|
||||
|
||||
return rows.map(row => ({
|
||||
id: row.id,
|
||||
tenantId: row.tenant_id,
|
||||
shopId: row.shop_id,
|
||||
certificateId: row.certificate_id,
|
||||
certificateName: row.certificate_name,
|
||||
certificateType: row.certificate_type,
|
||||
expiryDate: row.expiry_date,
|
||||
reminderType: row.reminder_type,
|
||||
daysRemaining: row.days_remaining,
|
||||
message: row.message,
|
||||
isSent: Boolean(row.is_sent),
|
||||
sentAt: row.sent_at,
|
||||
createdAt: row.created_at,
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取通知历史
|
||||
*/
|
||||
static async getNotificationHistory(
|
||||
tenantId: string,
|
||||
options?: { limit?: number; offset?: number }
|
||||
): Promise<{ data: ReminderNotification[]; total: number }> {
|
||||
const limit = options?.limit || 20;
|
||||
const offset = options?.offset || 0;
|
||||
|
||||
let query = db(this.NOTIFICATION_TABLE)
|
||||
.where('tenant_id', tenantId);
|
||||
|
||||
const totalQuery = query.clone();
|
||||
const [{ count }] = await totalQuery.count('* as count');
|
||||
const total = Number(count);
|
||||
|
||||
const rows = await query
|
||||
.orderBy('created_at', 'desc')
|
||||
.limit(limit)
|
||||
.offset(offset);
|
||||
|
||||
const data = rows.map(row => ({
|
||||
id: row.id,
|
||||
tenantId: row.tenant_id,
|
||||
shopId: row.shop_id,
|
||||
certificateId: row.certificate_id,
|
||||
certificateName: row.certificate_name,
|
||||
certificateType: row.certificate_type,
|
||||
expiryDate: row.expiry_date,
|
||||
reminderType: row.reminder_type,
|
||||
daysRemaining: row.days_remaining,
|
||||
message: row.message,
|
||||
isSent: Boolean(row.is_sent),
|
||||
sentAt: row.sent_at,
|
||||
createdAt: row.created_at,
|
||||
}));
|
||||
|
||||
return { data, total };
|
||||
}
|
||||
}
|
||||
15
server/src/workers/CrawlerWorker.ts
Normal file
15
server/src/workers/CrawlerWorker.ts
Normal file
@@ -0,0 +1,15 @@
|
||||
import { logger } from '../utils/logger';
|
||||
|
||||
/**
|
||||
* Crawler Worker
|
||||
* @description 爬虫工作器,负责管理爬虫任务的执行
|
||||
*/
|
||||
export class CrawlerWorker {
|
||||
/**
|
||||
* 初始化爬虫工作器
|
||||
*/
|
||||
static init() {
|
||||
logger.info('🚀 Crawler Worker initialized');
|
||||
// 这里可以添加初始化逻辑,比如启动爬虫任务队列
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user