refactor: 优化代码结构和类型定义

feat(types): 添加express.d.ts类型引用
style: 格式化express.d.ts中的接口定义
refactor: 移除未使用的AntFC类型导入
chore: 删除自动生成的.umi-production文件
feat: 添加店铺管理相关表和初始化脚本
docs: 更新安全规则和交互指南文档
refactor: 统一使用FC类型替代React.FC
perf: 优化图表组件导入方式
style: 添加.prettierrc配置文件
refactor: 调整组件导入顺序和结构
feat: 添加平台库存管理路由
fix: 修复订单同步时的库存检查逻辑
docs: 更新RBAC设计和租户管理文档
refactor: 优化部门控制器代码
This commit is contained in:
2026-03-30 01:20:57 +08:00
parent d327706087
commit 1b14947e7b
106 changed files with 11251 additions and 38565 deletions

View File

@@ -13,6 +13,7 @@
"lint": "eslint src --ext .ts"
},
"dependencies": {
"@types/node-cache": "^4.1.3",
"bcrypt": "^5.1.1",
"bullmq": "^4.12.3",
"compression": "^1.7.4",
@@ -24,9 +25,11 @@
"knex": "^3.1.0",
"morgan": "^1.10.0",
"mysql2": "^3.6.5",
"node-cache": "^5.1.2",
"redis": "^4.6.12",
"sharp": "^0.34.5",
"typescript": "^5.3.3",
"xstate": "^5.30.0",
"zod": "^4.3.6"
},
"devDependencies": {

View File

@@ -1,3 +1,4 @@
/// <reference path="../../types/express.d.ts" />
import { Request, Response } from 'express';
import { HierarchyService } from '../../services/tenant/HierarchyService';
import { logger } from '../../utils/logger';
@@ -5,7 +6,10 @@ import { logger } from '../../utils/logger';
export class DepartmentController {
static async createDepartment(req: Request, res: Response) {
try {
const { tenantId } = req.user;
const tenantId = req.user?.tenantId;
if (!tenantId) {
return res.status(401).json({ success: false, error: 'Unauthorized' });
}
const { name, parentId, managerId } = req.body;
if (!name) {
@@ -31,7 +35,10 @@ export class DepartmentController {
static async updateDepartment(req: Request, res: Response) {
try {
const { tenantId } = req.user;
const tenantId = req.user?.tenantId;
if (!tenantId) {
return res.status(401).json({ success: false, error: 'Unauthorized' });
}
const { id } = req.params;
const updates = req.body;
@@ -53,7 +60,10 @@ export class DepartmentController {
static async deleteDepartment(req: Request, res: Response) {
try {
const { tenantId } = req.user;
const tenantId = req.user?.tenantId;
if (!tenantId) {
return res.status(401).json({ success: false, error: 'Unauthorized' });
}
const { id } = req.params;
await HierarchyService.deleteDepartment(id, tenantId);
@@ -70,7 +80,10 @@ export class DepartmentController {
static async getDepartmentTree(req: Request, res: Response) {
try {
const { tenantId } = req.user;
const tenantId = req.user?.tenantId;
if (!tenantId) {
return res.status(401).json({ success: false, error: 'Unauthorized' });
}
const tree = await HierarchyService.getDepartmentTree(tenantId);
@@ -86,7 +99,11 @@ export class DepartmentController {
static async assignManager(req: Request, res: Response) {
try {
const { tenantId, id: assignedBy } = req.user;
const tenantId = req.user?.tenantId;
const assignedBy = req.user?.id;
if (!tenantId || !assignedBy) {
return res.status(401).json({ success: false, error: 'Unauthorized' });
}
const { departmentId } = req.params;
const { managerId } = req.body;
@@ -113,7 +130,10 @@ export class DepartmentController {
static async getManager(req: Request, res: Response) {
try {
const { tenantId } = req.user;
const tenantId = req.user?.tenantId;
if (!tenantId) {
return res.status(401).json({ success: false, error: 'Unauthorized' });
}
const { departmentId } = req.params;
const manager = await HierarchyService.getDepartmentManager(
@@ -133,7 +153,10 @@ export class DepartmentController {
static async getDepartmentStats(req: Request, res: Response) {
try {
const { tenantId } = req.user;
const tenantId = req.user?.tenantId;
if (!tenantId) {
return res.status(401).json({ success: false, error: 'Unauthorized' });
}
const { departmentId } = req.params;
const stats = await HierarchyService.getDepartmentStats(
@@ -153,7 +176,10 @@ export class DepartmentController {
static async getHierarchyStats(req: Request, res: Response) {
try {
const { tenantId } = req.user;
const tenantId = req.user?.tenantId;
if (!tenantId) {
return res.status(401).json({ success: false, error: 'Unauthorized' });
}
const stats = await HierarchyService.getHierarchyStats(tenantId);

View File

@@ -0,0 +1,386 @@
import { Request, Response } from 'express';
import { PlatformInventoryService } from '../../services/inventory/PlatformInventoryService';
import { logger } from '../../utils/logger';
export class PlatformInventoryController {
static async initializePlatformInventory(req: Request, res: Response) {
try {
const { tenantId } = (req as any).traceContext || { tenantId: 'default-tenant' };
const { skuId, platform, shopId, externalId, initialQuantity, oversellPercentage } = req.body;
const triggeredBy = req.headers['x-user-id'] as string || 'SYSTEM';
if (!skuId || !platform || !shopId || !externalId || !initialQuantity) {
return res.status(400).json({ success: false, error: 'Missing required fields: skuId, platform, shopId, externalId, initialQuantity' });
}
await PlatformInventoryService.initializePlatformInventory({
tenantId,
skuId,
platform,
shopId,
externalId,
initialQuantity,
oversellPercentage,
triggeredBy
});
res.json({ success: true, message: 'Platform inventory initialized successfully' });
} catch (err: any) {
logger.error(`[PlatformInventoryController] Initialize failed: ${err.message}`);
res.status(500).json({ success: false, error: err.message });
}
}
/**
* 【核心API】基于广告计划实际出单量实时扣减库存
* 由于订单列表同步有延迟,以广告计划出单量为主要扣减依据
*/
static async deductByAdCampaignSales(req: Request, res: Response) {
try {
const { tenantId } = (req as any).traceContext || { tenantId: 'default-tenant' };
const { platform, shopId, skuId, campaignId, actualSales } = req.body;
const triggeredBy = req.headers['x-user-id'] as string || 'AD_SYSTEM';
if (!platform || !shopId || !skuId || !campaignId || actualSales === undefined) {
return res.status(400).json({
success: false,
error: 'Missing required fields: platform, shopId, skuId, campaignId, actualSales'
});
}
const result = await PlatformInventoryService.deductByAdCampaignSales({
tenantId,
platform,
shopId,
skuId,
campaignId,
actualSales,
triggeredBy
});
res.json({
success: true,
data: result,
message: result.message
});
} catch (err: any) {
logger.error(`[PlatformInventoryController] Deduct by ad campaign sales failed: ${err.message}`);
res.status(500).json({ success: false, error: err.message });
}
}
/**
* 【兼容API】基于订单列表的库存扣减用于校验和补充
* 当订单列表同步过来时,对比已扣减数量,补扣差额
*/
static async syncOrderAndAdjustInventory(req: Request, res: Response) {
try {
const { tenantId } = (req as any).traceContext || { tenantId: 'default-tenant' };
const { platform, shopId, skuId, orderId, quantity } = req.body;
const triggeredBy = req.headers['x-user-id'] as string || 'ORDER_SYNC';
if (!platform || !shopId || !skuId || !orderId || !quantity) {
return res.status(400).json({
success: false,
error: 'Missing required fields: platform, shopId, skuId, orderId, quantity'
});
}
const result = await PlatformInventoryService.syncOrderAndAdjustInventory({
tenantId,
platform,
shopId,
skuId,
orderId,
quantity,
triggeredBy
});
res.json({
success: true,
data: result,
message: result.message
});
} catch (err: any) {
logger.error(`[PlatformInventoryController] Sync order and adjust inventory failed: ${err.message}`);
res.status(500).json({ success: false, error: err.message });
}
}
static async deductOnOrder(req: Request, res: Response) {
try {
const { tenantId } = (req as any).traceContext || { tenantId: 'default-tenant' };
const { platform, shopId, skuId, quantity, orderId } = req.body;
const triggeredBy = req.headers['x-user-id'] as string || 'MANUAL';
if (!platform || !shopId || !skuId || !quantity || !orderId) {
return res.status(400).json({ success: false, error: 'Missing required fields: platform, shopId, skuId, quantity, orderId' });
}
const result = await PlatformInventoryService.deductOnOrder({
tenantId,
platform,
shopId,
skuId,
quantity,
orderId,
triggeredBy
});
res.json({ success: true, data: result });
} catch (err: any) {
logger.error(`[PlatformInventoryController] Deduct on order failed: ${err.message}`);
res.status(500).json({ success: false, error: err.message });
}
}
static async deductForAdCampaign(req: Request, res: Response) {
try {
const { tenantId } = (req as any).traceContext || { tenantId: 'default-tenant' };
const { platform, shopId, skuId, estimatedQuantity, campaignId, campaignName } = req.body;
const triggeredBy = req.headers['x-user-id'] as string || 'MANUAL';
if (!platform || !shopId || !skuId || !estimatedQuantity || !campaignId || !campaignName) {
return res.status(400).json({ success: false, error: 'Missing required fields: platform, shopId, skuId, estimatedQuantity, campaignId, campaignName' });
}
const result = await PlatformInventoryService.deductForAdCampaign({
tenantId,
platform,
shopId,
skuId,
estimatedQuantity,
campaignId,
campaignName,
triggeredBy
});
res.json({ success: true, data: result });
} catch (err: any) {
logger.error(`[PlatformInventoryController] Deduct for ad campaign failed: ${err.message}`);
res.status(500).json({ success: false, error: err.message });
}
}
static async replenishInventory(req: Request, res: Response) {
try {
const { tenantId } = (req as any).traceContext || { tenantId: 'default-tenant' };
const { platform, shopId, skuId, quantity, reason } = req.body;
const triggeredBy = req.headers['x-user-id'] as string || 'MANUAL';
if (!platform || !shopId || !skuId || !quantity) {
return res.status(400).json({ success: false, error: 'Missing required fields: platform, shopId, skuId, quantity' });
}
await PlatformInventoryService.replenishInventory({
tenantId,
platform,
shopId,
skuId,
quantity,
triggeredBy,
reason: reason || 'Manual replenish'
});
res.json({ success: true, message: 'Inventory replenished successfully' });
} catch (err: any) {
logger.error(`[PlatformInventoryController] Replenish inventory failed: ${err.message}`);
res.status(500).json({ success: false, error: err.message });
}
}
static async getPlatformInventoryStatus(req: Request, res: Response) {
try {
const { tenantId } = (req as any).traceContext || { tenantId: 'default-tenant' };
const { platform, shopId, skuId } = req.params;
if (!platform || !shopId || !skuId) {
return res.status(400).json({ success: false, error: 'Missing required parameters: platform, shopId, skuId' });
}
const status = await PlatformInventoryService.getPlatformInventoryStatus({
tenantId,
platform,
shopId,
skuId
});
res.json({ success: true, data: status });
} catch (err: any) {
logger.error(`[PlatformInventoryController] Get platform inventory status failed: ${err.message}`);
res.status(500).json({ success: false, error: err.message });
}
}
static async getPlatformInventoryLogs(req: Request, res: Response) {
try {
const { tenantId } = (req as any).traceContext || { tenantId: 'default-tenant' };
const { platform, shopId, skuId, limit, offset } = req.query;
const logs = await PlatformInventoryService.getPlatformInventoryLogs({
tenantId,
platform: platform as string,
shopId: shopId as string,
skuId: skuId as string,
limit: limit ? parseInt(limit as string) : undefined,
offset: offset ? parseInt(offset as string) : undefined
});
res.json({ success: true, data: logs });
} catch (err: any) {
logger.error(`[PlatformInventoryController] Get platform inventory logs failed: ${err.message}`);
res.status(500).json({ success: false, error: err.message });
}
}
static async initializeAdCampaignInventory(req: Request, res: Response) {
try {
const { tenantId } = (req as any).traceContext || { tenantId: 'default-tenant' };
const { skuId, platform, shopId, campaignId, campaignName, allocatedQuantity } = req.body;
const triggeredBy = req.headers['x-user-id'] as string || 'SYSTEM';
if (!skuId || !platform || !shopId || !campaignId || !allocatedQuantity) {
return res.status(400).json({ success: false, error: 'Missing required fields: skuId, platform, shopId, campaignId, allocatedQuantity' });
}
await PlatformInventoryService.initializeAdCampaignInventory({
tenantId,
skuId,
platform,
shopId,
campaignId,
campaignName: campaignName || campaignId,
allocatedQuantity,
triggeredBy
});
res.json({ success: true, message: 'Ad campaign inventory initialized successfully' });
} catch (err: any) {
logger.error(`[PlatformInventoryController] Initialize ad campaign inventory failed: ${err.message}`);
res.status(500).json({ success: false, error: err.message });
}
}
static async deductFromAdCampaign(req: Request, res: Response) {
try {
const { tenantId } = (req as any).traceContext || { tenantId: 'default-tenant' };
const { campaignId, skuId, quantity } = req.body;
const triggeredBy = req.headers['x-user-id'] as string || 'MANUAL';
if (!campaignId || !skuId || !quantity) {
return res.status(400).json({ success: false, error: 'Missing required fields: campaignId, skuId, quantity' });
}
const result = await PlatformInventoryService.deductFromAdCampaign({
tenantId,
campaignId,
skuId,
quantity,
triggeredBy
});
res.json({ success: true, data: result });
} catch (err: any) {
logger.error(`[PlatformInventoryController] Deduct from ad campaign failed: ${err.message}`);
res.status(500).json({ success: false, error: err.message });
}
}
static async pauseAdCampaign(req: Request, res: Response) {
try {
const { tenantId } = (req as any).traceContext || { tenantId: 'default-tenant' };
const { campaignId } = req.params;
const { skuId, reason } = req.body;
const triggeredBy = req.headers['x-user-id'] as string || 'MANUAL';
if (!campaignId || !skuId) {
return res.status(400).json({ success: false, error: 'Missing required fields: campaignId, skuId' });
}
await PlatformInventoryService.pauseAdCampaign({
tenantId,
campaignId,
skuId,
reason: reason || 'Manual pause',
triggeredBy
});
res.json({ success: true, message: 'Ad campaign paused successfully' });
} catch (err: any) {
logger.error(`[PlatformInventoryController] Pause ad campaign failed: ${err.message}`);
res.status(500).json({ success: false, error: err.message });
}
}
static async resumeAdCampaign(req: Request, res: Response) {
try {
const { tenantId } = (req as any).traceContext || { tenantId: 'default-tenant' };
const { campaignId } = req.params;
const { skuId, reason } = req.body;
const triggeredBy = req.headers['x-user-id'] as string || 'MANUAL';
if (!campaignId || !skuId) {
return res.status(400).json({ success: false, error: 'Missing required fields: campaignId, skuId' });
}
await PlatformInventoryService.resumeAdCampaign({
tenantId,
campaignId,
skuId,
reason: reason || 'Manual resume',
triggeredBy
});
res.json({ success: true, message: 'Ad campaign resumed successfully' });
} catch (err: any) {
logger.error(`[PlatformInventoryController] Resume ad campaign failed: ${err.message}`);
res.status(500).json({ success: false, error: err.message });
}
}
static async getAdCampaignInventoryLogs(req: Request, res: Response) {
try {
const { tenantId } = (req as any).traceContext || { tenantId: 'default-tenant' };
const { campaignId, skuId, limit, offset } = req.query;
const logs = await PlatformInventoryService.getAdCampaignInventoryLogs({
tenantId,
campaignId: campaignId as string,
skuId: skuId as string,
limit: limit ? parseInt(limit as string) : undefined,
offset: offset ? parseInt(offset as string) : undefined
});
res.json({ success: true, data: logs });
} catch (err: any) {
logger.error(`[PlatformInventoryController] Get ad campaign logs failed: ${err.message}`);
res.status(500).json({ success: false, error: err.message });
}
}
static async updateOversellPercentage(req: Request, res: Response) {
try {
const { tenantId } = (req as any).traceContext || { tenantId: 'default-tenant' };
const { skuId, platform, shopId, oversellPercentage } = req.body;
const triggeredBy = req.headers['x-user-id'] as string || 'SYSTEM';
if (!skuId || !platform || !shopId || !oversellPercentage) {
return res.status(400).json({ success: false, error: 'Missing required fields: skuId, platform, shopId, oversellPercentage' });
}
await PlatformInventoryService.updateOversellPercentage({
tenantId,
skuId,
platform,
shopId,
oversellPercentage,
triggeredBy
});
res.json({ success: true, message: 'Oversell percentage updated successfully' });
} catch (err: any) {
logger.error(`[PlatformInventoryController] Update oversell percentage failed: ${err.message}`);
res.status(500).json({ success: false, error: err.message });
}
}
}

View File

@@ -1,10 +1,10 @@
import { Router } from 'express';
import { DepartmentController } from '../controllers/DepartmentController';
import { authenticate } from '../../middleware/auth';
import { requirePermission } from '../../core/guards/rbac.guard';
const router = Router();
router.use(authenticate);
router.get('/tree', requirePermission('department:read'), DepartmentController.getDepartmentTree);
router.post('/', DepartmentController.createDepartment);
router.put('/:id', DepartmentController.updateDepartment);

View File

@@ -5,6 +5,8 @@ import orderRoutes from './order';
import financeRoutes from './finance';
import syncRoutes from './sync';
import departmentRoutes from './department';
import platformInventoryRoutes from './platform-inventory';
import shopRoutes from './shop';
// import monitoringRoutes from './monitoring';
// import operationAgentRoutes from './operation-agent';
// import aiSelfImprovementRoutes from './ai-self-improvement';
@@ -28,6 +30,7 @@ router.use('/orders', orderRoutes);
router.use('/finance', financeRoutes);
router.use('/sync', syncRoutes);
router.use('/departments', departmentRoutes);
router.use('/platform-inventory', platformInventoryRoutes);
// router.use('/monitoring', monitoringRoutes);
// router.use('/telemetry', telemetryRoutes);
// router.use('/ai', aiRoutes);
@@ -80,5 +83,6 @@ router.use('/leaderboard', leaderboardRoutes);
router.use('/trace', traceRoutes);
router.use('/trade', tradeRoutes);
router.use('/vault', vaultRoutes);
router.use('/shops', shopRoutes);
export default router;

View File

@@ -0,0 +1,30 @@
import { Router } from 'express';
import { PlatformInventoryController } from '../controllers/PlatformInventoryController';
import { requirePermission } from '../../core/guards/rbac.guard';
const router = Router();
// 平台库存管理
router.post('/platform-inventory/initialize', requirePermission('inventory:write'), PlatformInventoryController.initializePlatformInventory);
router.post('/platform-inventory/deduct', requirePermission('inventory:write'), PlatformInventoryController.deductOnOrder);
router.post('/platform-inventory/replenish', requirePermission('inventory:write'), PlatformInventoryController.replenishInventory);
router.get('/platform-inventory/status/:platform/:shopId/:skuId', requirePermission('inventory:read'), PlatformInventoryController.getPlatformInventoryStatus);
router.get('/platform-inventory/logs', requirePermission('inventory:read'), PlatformInventoryController.getPlatformInventoryLogs);
// 【核心API】基于广告计划实际出单量实时扣减库存
router.post('/platform-inventory/deduct-by-ad-sales', requirePermission('inventory:write'), PlatformInventoryController.deductByAdCampaignSales);
// 【兼容API】基于订单列表的库存扣减用于校验和补充
router.post('/platform-inventory/sync-order', requirePermission('inventory:write'), PlatformInventoryController.syncOrderAndAdjustInventory);
// 超卖比例设置
router.post('/platform-inventory/oversell', requirePermission('inventory:write'), PlatformInventoryController.updateOversellPercentage);
// 广告计划库存管理
router.post('/ad-campaign-inventory/initialize', requirePermission('inventory:write'), PlatformInventoryController.initializeAdCampaignInventory);
router.post('/ad-campaign-inventory/deduct', requirePermission('inventory:write'), PlatformInventoryController.deductFromAdCampaign);
router.post('/ad-campaign-inventory/:campaignId/pause', requirePermission('inventory:write'), PlatformInventoryController.pauseAdCampaign);
router.post('/ad-campaign-inventory/:campaignId/resume', requirePermission('inventory:write'), PlatformInventoryController.resumeAdCampaign);
router.get('/ad-campaign-inventory/logs', requirePermission('inventory:read'), PlatformInventoryController.getAdCampaignInventoryLogs);
export default router;

View File

@@ -0,0 +1,348 @@
/// <reference path="../../types/express.d.ts" />
import { Router, Request, Response } from 'express';
import { ShopService } from '../../services/platform/ShopService';
import { ShopMemberService } from '../../services/platform/ShopMemberService';
import { requirePermission } from '../../core/guards/rbac.guard';
import { logger } from '../../utils/logger';
const router = Router();
// 获取我的店铺列表
router.get('/my-shops', requirePermission('platform:read'), async (req: Request, res: Response) => {
try {
const tenantId = req.user?.tenantId;
if (!tenantId) {
return res.status(401).json({ error: 'Unauthorized' });
}
const shops = await ShopService.getByTenant(tenantId);
// 为每个店铺添加成员数量
const shopsWithMemberCount = await Promise.all(
shops.map(async (shop) => {
const members = await ShopMemberService.getShopMembers(shop.id);
return {
...shop,
memberCount: members.length,
};
})
);
res.json({ success: true, data: shopsWithMemberCount });
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.error(`[Shop] Get my shops error: ${errorMessage}`);
res.status(500).json({ error: errorMessage });
}
});
// 获取店铺详情
router.get('/:id', requirePermission('platform:read'), async (req: Request, res: Response) => {
try {
const id = String(req.params.id);
const shop = await ShopService.getById(id);
if (!shop) {
return res.status(404).json({ error: 'Shop not found' });
}
if (shop.tenantId !== req.user?.tenantId) {
return res.status(403).json({ error: 'Forbidden' });
}
res.json({ success: true, data: shop });
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.error(`[Shop] Get shop error: ${errorMessage}`);
res.status(500).json({ error: errorMessage });
}
});
// 创建店铺
router.post('/', requirePermission('platform:write'), async (req: Request, res: Response) => {
try {
const tenantId = req.user?.tenantId;
if (!tenantId) {
return res.status(401).json({ error: 'Unauthorized' });
}
const { name, platform, shopId, departmentId, config, path, depth } = req.body;
if (!name || !platform || !shopId) {
return res.status(400).json({ error: 'Missing required fields: name, platform, shopId' });
}
const shop = await ShopService.create(tenantId, {
name,
platform,
shopId,
departmentId: departmentId || 'default',
config,
path: path || `/${name}`,
depth: depth || 1,
});
res.json({ success: true, data: shop });
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.error(`[Shop] Create shop error: ${errorMessage}`);
res.status(500).json({ error: errorMessage });
}
});
// 更新店铺信息
router.put('/:id', requirePermission('platform:write'), async (req: Request, res: Response) => {
try {
const id = String(req.params.id);
const shop = await ShopService.getById(id);
if (!shop) {
return res.status(404).json({ error: 'Shop not found' });
}
if (shop.tenantId !== req.user?.tenantId) {
return res.status(403).json({ error: 'Forbidden' });
}
const { name, departmentId, config } = req.body;
const updatedShop = await ShopService.update(id, {
name,
departmentId,
config,
});
res.json({ success: true, data: updatedShop });
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.error(`[Shop] Update shop error: ${errorMessage}`);
res.status(500).json({ error: errorMessage });
}
});
// 刷新店铺授权
router.post('/:id/refresh-auth', requirePermission('platform:write'), async (req: Request, res: Response) => {
try {
const id = String(req.params.id);
const shop = await ShopService.getById(id);
if (!shop) {
return res.status(404).json({ error: 'Shop not found' });
}
if (shop.tenantId !== req.user?.tenantId) {
return res.status(403).json({ error: 'Forbidden' });
}
const result = await ShopService.refreshAuth(id);
res.json({ success: true, data: result });
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.error(`[Shop] Refresh auth error: ${errorMessage}`);
res.status(500).json({ error: errorMessage });
}
});
// 删除店铺
router.delete('/:id', requirePermission('platform:write'), async (req: Request, res: Response) => {
try {
const id = String(req.params.id);
const shop = await ShopService.getById(id);
if (!shop) {
return res.status(404).json({ error: 'Shop not found' });
}
if (shop.tenantId !== req.user?.tenantId) {
return res.status(403).json({ error: 'Forbidden' });
}
await ShopService.delete(id);
res.json({ success: true, data: null });
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.error(`[Shop] Delete shop error: ${errorMessage}`);
res.status(500).json({ error: errorMessage });
}
});
// 获取店铺成员列表
router.get('/:shopId/members', requirePermission('platform:read'), async (req: Request, res: Response) => {
try {
const shopId = String(req.params.shopId);
const shop = await ShopService.getById(shopId);
if (!shop) {
return res.status(404).json({ error: 'Shop not found' });
}
if (shop.tenantId !== req.user?.tenantId) {
return res.status(403).json({ error: 'Forbidden' });
}
const members = await ShopMemberService.getShopMembers(shopId);
// 为每个成员添加用户信息
const membersWithUserInfo = await Promise.all(
members.map(async (member) => {
const user = await db('cf_user').where('id', member.userId).first();
return {
...member,
userName: user?.name,
userEmail: user?.email,
};
})
);
res.json({ success: true, data: membersWithUserInfo });
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.error(`[Shop] Get members error: ${errorMessage}`);
res.status(500).json({ error: errorMessage });
}
});
// 获取用户拥有的店铺
router.get('/user/:userId/shops', requirePermission('user:read'), async (req: Request, res: Response) => {
try {
const userId = String(req.params.userId);
// 检查权限:只能查看自己的店铺或有用户管理权限
if (userId !== req.user?.userId && !req.user?.permissions?.includes('user:write')) {
return res.status(403).json({ error: 'Forbidden' });
}
const members = await ShopMemberService.getUserShops(userId);
// 为每个成员添加店铺信息
const shopsWithInfo = await Promise.all(
members.map(async (member) => {
const shop = await ShopService.getById(member.shopId);
return {
...member,
shopName: shop?.name,
platform: shop?.platform,
shopStatus: shop?.status,
};
})
);
res.json({ success: true, data: shopsWithInfo });
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.error(`[Shop] Get user shops error: ${errorMessage}`);
res.status(500).json({ error: errorMessage });
}
});
// 添加店铺成员
router.post('/members', requirePermission('platform:write'), async (req: Request, res: Response) => {
try {
const { shopId, userId, role, permissions, assignedBy } = req.body;
if (!shopId || !userId || !role || !permissions || !assignedBy) {
return res.status(400).json({ error: 'Missing required fields' });
}
const shop = await ShopService.getById(shopId);
if (!shop) {
return res.status(404).json({ error: 'Shop not found' });
}
if (shop.tenantId !== req.user?.tenantId) {
return res.status(403).json({ error: 'Forbidden' });
}
const member = await ShopMemberService.create({
shopId,
userId,
role,
permissions,
assignedBy,
});
res.json({ success: true, data: member });
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.error(`[Shop] Add member error: ${errorMessage}`);
res.status(500).json({ error: errorMessage });
}
});
// 移除店铺成员
router.delete('/members/:shopId/:userId', requirePermission('platform:write'), async (req: Request, res: Response) => {
try {
const shopId = String(req.params.shopId);
const userId = String(req.params.userId);
const shop = await ShopService.getById(shopId);
if (!shop) {
return res.status(404).json({ error: 'Shop not found' });
}
if (shop.tenantId !== req.user?.tenantId) {
return res.status(403).json({ error: 'Forbidden' });
}
await ShopMemberService.removeMember(shopId, userId);
res.json({ success: true, data: null });
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.error(`[Shop] Remove member error: ${errorMessage}`);
res.status(500).json({ error: errorMessage });
}
});
// 更新成员角色和权限
router.put('/members/:shopId/:userId', requirePermission('platform:write'), async (req: Request, res: Response) => {
try {
const shopId = String(req.params.shopId);
const userId = String(req.params.userId);
const { role, permissions } = req.body;
if (!role || !permissions) {
return res.status(400).json({ error: 'Missing required fields' });
}
const shop = await ShopService.getById(shopId);
if (!shop) {
return res.status(404).json({ error: 'Shop not found' });
}
if (shop.tenantId !== req.user?.tenantId) {
return res.status(403).json({ error: 'Forbidden' });
}
const member = await ShopMemberService.updateMember(shopId, userId, {
role,
permissions,
});
res.json({ success: true, data: member });
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.error(`[Shop] Update member error: ${errorMessage}`);
res.status(500).json({ error: errorMessage });
}
});
// 获取店铺统计信息
router.get('/stats', requirePermission('platform:read'), async (req: Request, res: Response) => {
try {
const tenantId = req.user?.tenantId;
if (!tenantId) {
return res.status(401).json({ error: 'Unauthorized' });
}
const stats = await ShopService.getStats(tenantId);
res.json({ success: true, data: stats });
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.error(`[Shop] Get stats error: ${errorMessage}`);
res.status(500).json({ error: errorMessage });
}
});
// 导入数据库
import db from '../../config/database';
export default router;

View File

@@ -34,6 +34,7 @@ export default class DatabaseSchema {
await this.initCurrencyTables();
await this.initBatchOperationTables();
await this.initInvitationTables();
await this.initPlatformInventoryTables();
logger.info('[DatabaseSchema] All tables initialized successfully');
}
@@ -1321,4 +1322,110 @@ export default class DatabaseSchema {
logger.info('[DatabaseSchema] Table cf_invitation_log created');
}
}
private static async initPlatformInventoryTables(): Promise<void> {
const hasPlatformInventoryTable = await db.schema.hasTable('cf_platform_inventory');
if (!hasPlatformInventoryTable) {
logger.info('[DatabaseSchema] Creating cf_platform_inventory table...');
await db.schema.createTable('cf_platform_inventory', (table) => {
table.string('id', 36).primary();
table.string('tenant_id', 36).notNullable().index();
table.string('sku_id', 64).notNullable().index();
table.string('platform', 32).notNullable();
table.string('shop_id', 36).notNullable();
table.string('external_id', 128).notNullable();
table.integer('allocated_quantity').defaultTo(0);
table.integer('available_quantity').defaultTo(0);
table.integer('total_sold').defaultTo(0);
table.integer('total_reserved').defaultTo(0);
table.decimal('oversell_percentage', 5, 2).defaultTo(5);
table.datetime('created_at').notNullable().defaultTo(db.fn.now());
table.datetime('updated_at').notNullable().defaultTo(db.fn.now());
table.unique(['sku_id', 'platform', 'shop_id']);
table.index(['sku_id', 'platform']);
table.index('available_quantity');
table.index('tenant_id');
});
logger.info('[DatabaseSchema] Table cf_platform_inventory created');
}
const hasPlatformInventoryLogTable = await db.schema.hasTable('cf_platform_inventory_log');
if (!hasPlatformInventoryLogTable) {
logger.info('[DatabaseSchema] Creating cf_platform_inventory_log table...');
await db.schema.createTable('cf_platform_inventory_log', (table) => {
table.string('id', 36).primary();
table.string('tenant_id', 36).notNullable().index();
table.string('sku_id', 64).notNullable().index();
table.string('platform', 32).notNullable();
table.string('shop_id', 36).notNullable();
table.string('external_id', 128).notNullable();
table.enum('action', ['INIT', 'DEDUCT_ORDER', 'DEDUCT_AD', 'REPLENISH', 'RESERVE', 'RELEASE', 'ADJUST']).notNullable();
table.integer('quantity_change').notNullable();
table.integer('old_quantity').notNullable();
table.integer('new_quantity').notNullable();
table.string('trigger_type', 32).notNullable();
table.string('trigger_id', 128).notNullable();
table.string('triggered_by', 128).notNullable();
table.text('reason').notNullable();
table.json('metadata').nullable();
table.datetime('created_at').notNullable().defaultTo(db.fn.now());
table.index(['sku_id', 'platform', 'created_at']);
table.index('trigger_type');
table.index('trigger_id');
});
logger.info('[DatabaseSchema] Table cf_platform_inventory_log created');
}
const hasAdCampaignInventoryTable = await db.schema.hasTable('cf_ad_campaign_inventory');
if (!hasAdCampaignInventoryTable) {
logger.info('[DatabaseSchema] Creating cf_ad_campaign_inventory table...');
await db.schema.createTable('cf_ad_campaign_inventory', (table) => {
table.string('id', 36).primary();
table.string('tenant_id', 36).notNullable().index();
table.string('sku_id', 64).notNullable().index();
table.string('platform', 32).notNullable();
table.string('shop_id', 36).notNullable();
table.string('campaign_id', 128).notNullable();
table.string('campaign_name', 255).notNullable();
table.integer('allocated_quantity').defaultTo(0);
table.integer('available_quantity').defaultTo(0);
table.integer('estimated_sales').defaultTo(0);
table.integer('actual_sales').defaultTo(0);
table.enum('status', ['ACTIVE', 'PAUSED', 'COMPLETED', 'CANCELLED']).defaultTo('ACTIVE');
table.datetime('created_at').notNullable().defaultTo(db.fn.now());
table.datetime('updated_at').notNullable().defaultTo(db.fn.now());
table.unique(['campaign_id', 'sku_id']);
table.index(['sku_id', 'platform']);
table.index('status');
});
logger.info('[DatabaseSchema] Table cf_ad_campaign_inventory created');
}
const hasAdCampaignInventoryLogTable = await db.schema.hasTable('cf_ad_campaign_inventory_log');
if (!hasAdCampaignInventoryLogTable) {
logger.info('[DatabaseSchema] Creating cf_ad_campaign_inventory_log table...');
await db.schema.createTable('cf_ad_campaign_inventory_log', (table) => {
table.string('id', 36).primary();
table.string('tenant_id', 36).notNullable().index();
table.string('sku_id', 64).notNullable().index();
table.string('platform', 32).notNullable();
table.string('shop_id', 36).notNullable();
table.string('campaign_id', 128).notNullable();
table.enum('action', ['INIT', 'ALLOCATE', 'DEDUCT', 'PAUSE', 'RESUME', 'COMPLETE', 'ADJUST']).notNullable();
table.integer('quantity_change').notNullable();
table.integer('old_quantity').notNullable();
table.integer('new_quantity').notNullable();
table.string('trigger_type', 32).notNullable();
table.string('trigger_id', 128).notNullable();
table.string('triggered_by', 128).notNullable();
table.text('reason').notNullable();
table.json('metadata').nullable();
table.datetime('created_at').notNullable().defaultTo(db.fn.now());
table.index(['campaign_id', 'sku_id', 'created_at']);
table.index('trigger_type');
table.index('trigger_id');
});
logger.info('[DatabaseSchema] Table cf_ad_campaign_inventory_log created');
}
}
}

View File

@@ -2,6 +2,7 @@ import db from '../../config/database';
import { FinanceService } from '../../services/finance/FinanceService';
import { InventoryService } from '../../services/inventory/InventoryService';
import { SupplyChainService } from '../../services/integration/SupplyChainService';
import { PlatformInventoryService } from '../../services/inventory/PlatformInventoryService';
import { logger } from '../../utils/logger';
import { SummaryAggregationService } from '../Analytics/SummaryAggregationService';
@@ -547,12 +548,12 @@ export class ConsumerOrderService {
* 同步/抓取外部平台订单
*/
static async syncPlatformOrder(orderData: Partial<ConsumerOrder>): Promise<string> {
const { platform, platform_order_id, tenant_id } = orderData;
const { platform, platform_order_id, tenant_id, items, shop_id } = orderData;
if (!platform || !platform_order_id || !tenant_id) {
throw new Error('Missing required order data: platform, platform_order_id, tenant_id');
}
const existing = await db(this.TABLE_NAME)
.where({ platform, platform_order_id, tenant_id })
.first();
@@ -587,11 +588,51 @@ export class ConsumerOrderService {
created_at: new Date(),
updated_at: new Date()
};
const orderItems = typeof items === 'string' ? JSON.parse(items) : items;
for (const item of orderItems) {
const skuId = item.sku_id || item.sku;
const quantity = item.quantity;
if (!skuId || !quantity) {
logger.warn(`[ConsumerOrder] Invalid item data: ${JSON.stringify(item)}`);
continue;
}
const isAvailable = await PlatformInventoryService.checkInventoryAvailability({
tenantId: tenant_id,
platform,
shopId: shop_id || '',
skuId,
quantity
});
if (!isAvailable) {
const platformInventory = await db('cf_platform_inventory')
.where({ tenant_id: tenant_id, platform, shop_id: shop_id, sku_id: skuId })
.first();
const available = platformInventory?.available_quantity || 0;
throw new Error(
`Insufficient inventory: ${platform} has ${available} items for SKU ${skuId}, need ${quantity}`
);
}
await PlatformInventoryService.deductOnOrder({
tenantId: tenant_id,
platform,
shopId: shop_id || '',
skuId,
quantity,
orderId: platform_order_id,
triggeredBy: 'ORDER_WEBHOOK'
});
}
await db(this.TABLE_NAME).insert(orderToInsert);
logger.info(`[ConsumerOrder] Created new order ${orderId} for platform ${platform}`);
logger.info(`[ConsumerOrder] Created new order ${orderId} for platform ${platform}, inventory deducted`);
// 如果订单已支付,触发利润计算与分账 (BIZ_FIN_05)
if (orderData.status === 'PAID') {
await this.processOrderSettlement(orderId);
}

View File

@@ -0,0 +1,16 @@
{"timestamp":"2026-03-29T03:12:08.235Z","level":"info","message":"🔧 Initializing shop management tables..."}
{"timestamp":"2026-03-29T03:12:08.253Z","level":"info","message":"[RedisService] Connected to Redis"}
{"timestamp":"2026-03-29T03:12:08.258Z","level":"error","message":"❌ Failed to initialize shop management tables:","data":""}
{"timestamp":"2026-03-29T03:16:10.996Z","level":"info","message":"🔧 Initializing shop management tables..."}
{"timestamp":"2026-03-29T03:16:11.011Z","level":"info","message":"[RedisService] Connected to Redis"}
{"timestamp":"2026-03-29T03:16:11.016Z","level":"error","message":"❌ Failed to initialize shop management tables:","data":""}
{"timestamp":"2026-03-29T03:17:52.079Z","level":"info","message":"🔧 Initializing shop management tables..."}
{"timestamp":"2026-03-29T03:17:52.099Z","level":"info","message":"[RedisService] Connected to Redis"}
{"timestamp":"2026-03-29T03:17:52.105Z","level":"error","message":"❌ Failed to initialize shop management tables:","data":""}
{"timestamp":"2026-03-29T03:19:11.739Z","level":"info","message":"🔧 Initializing shop management tables..."}
{"timestamp":"2026-03-29T03:19:11.755Z","level":"info","message":"[RedisService] Connected to Redis"}
{"timestamp":"2026-03-29T03:19:11.760Z","level":"error","message":"❌ Failed to initialize shop management tables:","data":""}
{"timestamp":"2026-03-29T04:49:10.952Z","level":"info","message":"[RedisService] Connected to Redis"}
{"timestamp":"2026-03-29T05:29:28.985Z","level":"info","message":"[RedisService] Connected to Redis"}
{"timestamp":"2026-03-29T05:30:39.535Z","level":"error","message":"[RBAC] Missing auth context for request /my-shops"}
{"timestamp":"2026-03-29T14:46:17.245Z","level":"info","message":"[RedisService] Connected to Redis"}

View File

@@ -0,0 +1,31 @@
/**
* 初始化店铺管理相关表
*/
import { ShopService } from '../services/platform/ShopService';
import { ShopMemberService } from '../services/platform/ShopMemberService';
import { logger } from '../utils/logger';
async function initShopTables() {
try {
logger.info('🔧 Initializing shop management tables...');
// 初始化店铺表
await ShopService.initTable();
// 初始化店铺成员表
await ShopMemberService.initTable();
logger.info('✅ Shop management tables initialized successfully');
} catch (error: any) {
logger.error('❌ Failed to initialize shop management tables:', error.message);
process.exit(1);
}
}
// 执行初始化
if (require.main === module) {
initShopTables();
}
export { initShopTables };

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,206 @@
import fs from 'fs';
import path from 'path';
interface LogEntry {
timestamp: string;
level: 'info' | 'error' | 'warn' | 'debug';
message: string;
data?: any;
tenantId?: string;
userId?: string;
traceId?: string;
businessType?: 'TOC' | 'TOB';
}
export class TenantLogger {
private static readonly LOG_DIR = path.join(__dirname, '../../logs');
private static readonly DEFAULT_LOG_FILE = 'system.log';
/**
* 初始化日志目录
*/
private static initLogDir(): void {
if (!fs.existsSync(this.LOG_DIR)) {
fs.mkdirSync(this.LOG_DIR, { recursive: true });
}
}
/**
* 获取租户日志文件路径
*/
private static getLogFilePath(tenantId?: string): string {
this.initLogDir();
if (tenantId) {
return path.join(this.LOG_DIR, `tenant_${tenantId}.log`);
}
return path.join(this.LOG_DIR, this.DEFAULT_LOG_FILE);
}
/**
* 写入日志到文件
*/
private static writeLogToFile(entry: LogEntry): void {
try {
const logFilePath = this.getLogFilePath(entry.tenantId);
const logLine = JSON.stringify(entry) + '\n';
fs.appendFileSync(logFilePath, logLine, 'utf8');
} catch (error) {
console.error('Failed to write log to file', error);
}
}
/**
* 记录信息日志
*/
static info(message: string, data?: any, tenantId?: string, userId?: string, traceId?: string, businessType?: 'TOC' | 'TOB') {
const entry: LogEntry = {
timestamp: new Date().toISOString(),
level: 'info',
message,
data,
tenantId,
userId,
traceId,
businessType
};
console.log(message, data);
this.writeLogToFile(entry);
}
/**
* 记录错误日志
*/
static error(message: string, data?: any, tenantId?: string, userId?: string, traceId?: string, businessType?: 'TOC' | 'TOB') {
const entry: LogEntry = {
timestamp: new Date().toISOString(),
level: 'error',
message,
data,
tenantId,
userId,
traceId,
businessType
};
console.error(message, data);
this.writeLogToFile(entry);
}
/**
* 记录警告日志
*/
static warn(message: string, data?: any, tenantId?: string, userId?: string, traceId?: string, businessType?: 'TOC' | 'TOB') {
const entry: LogEntry = {
timestamp: new Date().toISOString(),
level: 'warn',
message,
data,
tenantId,
userId,
traceId,
businessType
};
console.warn(message, data);
this.writeLogToFile(entry);
}
/**
* 记录调试日志
*/
static debug(message: string, data?: any, tenantId?: string, userId?: string, traceId?: string, businessType?: 'TOC' | 'TOB') {
const entry: LogEntry = {
timestamp: new Date().toISOString(),
level: 'debug',
message,
data,
tenantId,
userId,
traceId,
businessType
};
console.debug(message, data);
this.writeLogToFile(entry);
}
/**
* 获取租户日志
*/
static getTenantLogs(tenantId: string, limit: number = 100): LogEntry[] {
try {
const logFilePath = this.getLogFilePath(tenantId);
if (!fs.existsSync(logFilePath)) {
return [];
}
const logContent = fs.readFileSync(logFilePath, 'utf8');
const logLines = logContent.split('\n').filter(line => line.trim());
const logs: LogEntry[] = [];
for (let i = logLines.length - 1; i >= 0 && logs.length < limit; i--) {
try {
const entry = JSON.parse(logLines[i]);
logs.unshift(entry);
} catch (error) {
// 跳过无效的日志行
}
}
return logs;
} catch (error) {
console.error('Failed to get tenant logs', error);
return [];
}
}
/**
* 清理租户日志
*/
static cleanupTenantLogs(tenantId: string, daysToKeep: number = 30): boolean {
try {
const logFilePath = this.getLogFilePath(tenantId);
if (!fs.existsSync(logFilePath)) {
return true;
}
const stats = fs.statSync(logFilePath);
const daysSinceCreation = (Date.now() - stats.birthtime.getTime()) / (1000 * 60 * 60 * 24);
if (daysSinceCreation > daysToKeep) {
fs.unlinkSync(logFilePath);
return true;
}
return false;
} catch (error) {
console.error('Failed to cleanup tenant logs', error);
return false;
}
}
/**
* 清理过期日志
*/
static cleanupExpiredLogs(daysToKeep: number = 30): void {
try {
this.initLogDir();
const files = fs.readdirSync(this.LOG_DIR);
files.forEach(file => {
const filePath = path.join(this.LOG_DIR, file);
const stats = fs.statSync(filePath);
const daysSinceCreation = (Date.now() - stats.birthtime.getTime()) / (1000 * 60 * 60 * 24);
if (daysSinceCreation > daysToKeep) {
fs.unlinkSync(filePath);
}
});
} catch (error) {
console.error('Failed to cleanup expired logs', error);
}
}
}
export default TenantLogger;

View File

@@ -0,0 +1,338 @@
import axios from 'axios';
import crypto from 'crypto';
import { logger } from '../../utils/logger';
export enum PaymentProvider {
ALIPAY = 'alipay',
WECHAT_PAY = 'wechat_pay'
}
export interface PaymentConfig {
appId: string;
appSecret: string;
merchantId: string;
privateKey: string;
publicKey: string;
notifyUrl: string;
returnUrl: string;
}
export interface CreatePaymentRequest {
orderId: string;
amount: number;
currency: string;
subject: string;
body: string;
provider: PaymentProvider;
tenantId: string;
shopId: string;
userId: string;
}
export interface CreatePaymentResponse {
success: boolean;
paymentUrl?: string;
qrCode?: string;
transactionId?: string;
error?: string;
}
export interface PaymentStatus {
transactionId: string;
orderId: string;
status: 'PENDING' | 'PAID' | 'FAILED' | 'REFUNDED';
amount: number;
paidAt?: Date;
}
export class PaymentGatewayService {
private static configs: Map<PaymentProvider, PaymentConfig> = new Map();
static init() {
this.loadConfigs();
logger.info('[PaymentGatewayService] Initialized');
}
private static loadConfigs() {
const alipayConfig: PaymentConfig = {
appId: process.env.ALIPAY_APP_ID || '',
appSecret: process.env.ALIPAY_APP_SECRET || '',
merchantId: process.env.ALIPAY_MERCHANT_ID || '',
privateKey: process.env.ALIPAY_PRIVATE_KEY || '',
publicKey: process.env.ALIPAY_PUBLIC_KEY || '',
notifyUrl: process.env.ALIPAY_NOTIFY_URL || '',
returnUrl: process.env.ALIPAY_RETURN_URL || ''
};
const wechatConfig: PaymentConfig = {
appId: process.env.WECHAT_APP_ID || '',
appSecret: process.env.WECHAT_APP_SECRET || '',
merchantId: process.env.WECHAT_MERCHANT_ID || '',
privateKey: process.env.WECHAT_PRIVATE_KEY || '',
publicKey: process.env.WECHAT_PUBLIC_KEY || '',
notifyUrl: process.env.WECHAT_NOTIFY_URL || '',
returnUrl: process.env.WECHAT_RETURN_URL || ''
};
this.configs.set(PaymentProvider.ALIPAY, alipayConfig);
this.configs.set(PaymentProvider.WECHAT_PAY, wechatConfig);
}
static async createPayment(request: CreatePaymentRequest): Promise<CreatePaymentResponse> {
const config = this.configs.get(request.provider);
if (!config) {
return { success: false, error: 'Payment provider not configured' };
}
try {
if (request.provider === PaymentProvider.ALIPAY) {
return await this.createAlipayPayment(request, config);
} else if (request.provider === PaymentProvider.WECHAT_PAY) {
return await this.createWechatPayment(request, config);
}
return { success: false, error: 'Unsupported payment provider' };
} catch (error: any) {
logger.error('[PaymentGatewayService] Create payment failed', { error, request });
return { success: false, error: error.message };
}
}
private static async createAlipayPayment(request: CreatePaymentRequest, config: PaymentConfig): Promise<CreatePaymentResponse> {
const timestamp = new Date().toISOString();
const bizContent = JSON.stringify({
out_trade_no: request.orderId,
total_amount: request.amount.toFixed(2),
subject: request.subject,
body: request.body,
product_code: 'FAST_INSTANT_TRADE_PAY'
});
const params = {
app_id: config.appId,
method: 'alipay.trade.page.pay',
format: 'JSON',
charset: 'utf-8',
sign_type: 'RSA2',
timestamp: timestamp,
version: '1.0',
notify_url: config.notifyUrl,
return_url: config.returnUrl,
biz_content: bizContent
};
const sign = this.generateAlipaySign(params, config.privateKey);
params['sign'] = sign;
const paymentUrl = `https://openapi.alipay.com/gateway.do?${new URLSearchParams(params).toString()}`;
return {
success: true,
paymentUrl,
transactionId: `ALIPAY_${request.orderId}_${Date.now()}`
};
}
private static async createWechatPayment(request: CreatePaymentRequest, config: PaymentConfig): Promise<CreatePaymentResponse> {
const nonceStr = crypto.randomBytes(16).toString('hex');
const timestamp = Math.floor(Date.now() / 1000).toString();
const params = {
appid: config.appId,
mch_id: config.merchantId,
nonce_str: nonceStr,
body: request.subject,
out_trade_no: request.orderId,
total_fee: Math.round(request.amount * 100),
spbill_create_ip: '127.0.0.1',
notify_url: config.notifyUrl,
trade_type: 'NATIVE'
};
const sign = this.generateWechatSign(params, config.appSecret);
params['sign'] = sign;
const xmlData = this.toXml(params);
try {
const response = await axios.post('https://api.mch.weixin.qq.com/pay/unifiedorder', xmlData, {
headers: { 'Content-Type': 'application/xml' }
});
const result = this.fromXml(response.data);
if (result.return_code === 'SUCCESS' && result.result_code === 'SUCCESS') {
return {
success: true,
qrCode: result.code_url,
transactionId: `WECHAT_${request.orderId}_${Date.now()}`
};
}
return { success: false, error: result.return_msg || 'Payment creation failed' };
} catch (error: any) {
logger.error('[PaymentGatewayService] WeChat payment failed', { error });
return { success: false, error: error.message };
}
}
static async queryPaymentStatus(provider: PaymentProvider, orderId: string): Promise<PaymentStatus | null> {
const config = this.configs.get(provider);
if (!config) {
return null;
}
try {
if (provider === PaymentProvider.ALIPAY) {
return await this.queryAlipayStatus(orderId, config);
} else if (provider === PaymentProvider.WECHAT_PAY) {
return await this.queryWechatStatus(orderId, config);
}
return null;
} catch (error: any) {
logger.error('[PaymentGatewayService] Query payment status failed', { error, orderId, provider });
return null;
}
}
private static async queryAlipayStatus(orderId: string, config: PaymentConfig): Promise<PaymentStatus | null> {
const timestamp = new Date().toISOString();
const bizContent = JSON.stringify({ out_trade_no: orderId });
const params = {
app_id: config.appId,
method: 'alipay.trade.query',
format: 'JSON',
charset: 'utf-8',
sign_type: 'RSA2',
timestamp: timestamp,
version: '1.0',
biz_content: bizContent
};
const sign = this.generateAlipaySign(params, config.privateKey);
params['sign'] = sign;
const response = await axios.post('https://openapi.alipay.com/gateway.do', params);
const result = response.data.alipay_trade_query_response;
if (result.code === '10000') {
return {
transactionId: result.trade_no,
orderId: result.out_trade_no,
status: result.trade_status === 'TRADE_SUCCESS' ? 'PAID' : 'PENDING',
amount: parseFloat(result.total_amount),
paidAt: result.send_pay_date ? new Date(result.send_pay_date) : undefined
};
}
return null;
}
private static async queryWechatStatus(orderId: string, config: PaymentConfig): Promise<PaymentStatus | null> {
const nonceStr = crypto.randomBytes(16).toString('hex');
const params = {
appid: config.appId,
mch_id: config.merchantId,
out_trade_no: orderId,
nonce_str: nonceStr
};
const sign = this.generateWechatSign(params, config.appSecret);
params['sign'] = sign;
const xmlData = this.toXml(params);
const response = await axios.post('https://api.mch.weixin.qq.com/pay/orderquery', xmlData, {
headers: { 'Content-Type': 'application/xml' }
});
const result = this.fromXml(response.data);
if (result.return_code === 'SUCCESS' && result.result_code === 'SUCCESS') {
return {
transactionId: result.transaction_id,
orderId: result.out_trade_no,
status: result.trade_state === 'SUCCESS' ? 'PAID' : 'PENDING',
amount: parseInt(result.total_fee) / 100,
paidAt: result.time_end ? this.parseWechatDate(result.time_end) : undefined
};
}
return null;
}
static async handleWebhook(provider: PaymentProvider, data: any): Promise<{ success: boolean; orderId?: string; status?: string }> {
try {
if (provider === PaymentProvider.ALIPAY) {
return this.handleAlipayWebhook(data);
} else if (provider === PaymentProvider.WECHAT_PAY) {
return this.handleWechatWebhook(data);
}
return { success: false };
} catch (error: any) {
logger.error('[PaymentGatewayService] Handle webhook failed', { error, provider });
return { success: false };
}
}
private static handleAlipayWebhook(data: any): { success: boolean; orderId?: string; status?: string } {
if (data.trade_status === 'TRADE_SUCCESS') {
return { success: true, orderId: data.out_trade_no, status: 'PAID' };
}
return { success: false };
}
private static handleWechatWebhook(data: any): { success: boolean; orderId?: string; status?: string } {
if (data.result_code === 'SUCCESS') {
return { success: true, orderId: data.out_trade_no, status: 'PAID' };
}
return { success: false };
}
private static generateAlipaySign(params: any, privateKey: string): string {
const sortedKeys = Object.keys(params).sort();
const signString = sortedKeys.map(key => `${key}=${params[key]}`).join('&');
const signer = crypto.createSign('RSA-SHA256');
signer.update(signString);
return signer.sign(privateKey, 'base64');
}
private static generateWechatSign(params: any, apiKey: string): string {
const sortedKeys = Object.keys(params).sort();
const signString = sortedKeys.map(key => `${key}=${params[key]}`).join('&') + `&key=${apiKey}`;
return crypto.createHash('md5').update(signString).digest('hex').toUpperCase();
}
private static toXml(params: any): string {
let xml = '<xml>';
for (const [key, value] of Object.entries(params)) {
xml += `<${key}><![CDATA[${value}]]></${key}>`;
}
xml += '</xml>';
return xml;
}
private static fromXml(xml: string): any {
const result: any = {};
const regex = /<(\w+)><!\[CDATA\[(.*?)\]\]><\/\1>/g;
let match;
while ((match = regex.exec(xml)) !== null) {
result[match[1]] = match[2];
}
return result;
}
private static parseWechatDate(dateStr: string): Date {
const year = dateStr.substring(0, 4);
const month = dateStr.substring(4, 6);
const day = dateStr.substring(6, 8);
const hour = dateStr.substring(8, 10);
const minute = dateStr.substring(10, 12);
const second = dateStr.substring(12, 14);
return new Date(`${year}-${month}-${day}T${hour}:${minute}:${second}`);
}
}

View File

@@ -0,0 +1,245 @@
/**
* [BE-SH001] 店铺管理服务
* 负责店铺的CRUD操作、授权管理和状态同步
*/
import db from '../../config/database';
import { logger } from '../../utils/logger';
import { ShopMemberService } from './ShopMemberService';
export interface Shop {
id: string;
tenantId: string;
departmentId: string;
name: string;
platform: string;
shopId: string;
status: string;
config?: Record<string, any>;
path: string;
depth: number;
createdAt: string;
updatedAt: string;
deletedAt?: string;
}
export interface ShopCreate {
departmentId: string;
name: string;
platform: string;
shopId: string;
config?: Record<string, any>;
path: string;
depth: number;
}
export interface ShopUpdate {
departmentId?: string;
name?: string;
status?: string;
config?: Record<string, any>;
path?: string;
depth?: number;
}
export class ShopService {
private static readonly TABLE = 'cf_shop';
private static readonly CACHE_PREFIX = 'shop:';
private static readonly CACHE_TTL = 1800;
static async initTable(): Promise<void> {
const hasTable = await db.schema.hasTable(this.TABLE);
if (!hasTable) {
await db.schema.createTable(this.TABLE, (table) => {
table.string('id', 64).primary();
table.string('tenant_id', 64).notNullable();
table.string('department_id', 64).notNullable();
table.string('name', 255).notNullable();
table.string('platform', 64).notNullable();
table.string('shop_id', 64).notNullable();
table.string('status', 64).notNullable().defaultTo('INACTIVE');
table.json('config');
table.string('path', 255).notNullable();
table.integer('depth').notNullable();
table.timestamps(true, true);
table.timestamp('deleted_at').nullable();
table.index(['tenant_id']);
table.index(['platform']);
table.index(['status']);
table.unique(['platform', 'shop_id']);
});
logger.info('✅ Table cf_shop created');
}
}
static async create(tenantId: string, data: ShopCreate): Promise<Shop> {
const id = this.generateId();
const now = new Date().toISOString();
const [newShop] = await db(this.TABLE)
.insert({
id,
tenant_id: tenantId,
department_id: data.departmentId,
name: data.name,
platform: data.platform,
shop_id: data.shopId,
status: 'INACTIVE',
config: data.config ? JSON.stringify(data.config) : null,
path: data.path,
depth: data.depth,
created_at: now,
updated_at: now,
})
.returning('*');
logger.info('[ShopService] Shop created', {
shopId: id,
tenantId,
name: data.name,
platform: data.platform,
});
return this.mapToShop(newShop);
}
static async getById(id: string): Promise<Shop | null> {
const shop = await db(this.TABLE).where('id', id).first();
return shop ? this.mapToShop(shop) : null;
}
static async getByTenant(tenantId: string): Promise<Shop[]> {
const shops = await db(this.TABLE)
.where('tenant_id', tenantId)
.where('deleted_at', null)
.orderBy('created_at', 'desc');
return shops.map(s => this.mapToShop(s));
}
static async update(id: string, data: ShopUpdate): Promise<Shop> {
const updateData: Record<string, any> = { updated_at: new Date().toISOString() };
if (data.departmentId) updateData.department_id = data.departmentId;
if (data.name) updateData.name = data.name;
if (data.status) updateData.status = data.status;
if (data.config) updateData.config = JSON.stringify(data.config);
if (data.path) updateData.path = data.path;
if (data.depth) updateData.depth = data.depth;
const [shop] = await db(this.TABLE)
.where('id', id)
.where('deleted_at', null)
.update(updateData)
.returning('*');
if (!shop) {
throw new Error('Shop not found');
}
logger.info('[ShopService] Shop updated', {
shopId: id,
updates: data,
});
return this.mapToShop(shop);
}
static async delete(id: string): Promise<void> {
// 先删除店铺成员
const members = await ShopMemberService.getShopMembers(id);
for (const member of members) {
await ShopMemberService.removeMember(id, member.userId);
}
// 软删除店铺
await db(this.TABLE)
.where('id', id)
.update({
deleted_at: new Date().toISOString(),
updated_at: new Date().toISOString(),
});
logger.info('[ShopService] Shop deleted', { shopId: id });
}
static async refreshAuth(id: string): Promise<{ success: boolean; message: string }> {
// 这里应该调用平台授权服务来刷新授权
// 暂时模拟成功
const now = new Date().toISOString();
await db(this.TABLE)
.where('id', id)
.where('deleted_at', null)
.update({
status: 'ACTIVE',
updated_at: now,
});
logger.info('[ShopService] Shop auth refreshed', { shopId: id });
return { success: true, message: '授权已刷新' };
}
static async updateStatus(id: string, status: string): Promise<Shop> {
const [shop] = await db(this.TABLE)
.where('id', id)
.where('deleted_at', null)
.update({
status,
updated_at: new Date().toISOString(),
})
.returning('*');
if (!shop) {
throw new Error('Shop not found');
}
logger.info('[ShopService] Shop status updated', {
shopId: id,
status,
});
return this.mapToShop(shop);
}
static async getStats(tenantId: string): Promise<{
total: number;
active: number;
inactive: number;
expired: number;
error: number;
}> {
const shops = await this.getByTenant(tenantId);
return {
total: shops.length,
active: shops.filter(s => s.status === 'ACTIVE').length,
inactive: shops.filter(s => s.status === 'INACTIVE').length,
expired: shops.filter(s => s.status === 'EXPIRED').length,
error: shops.filter(s => s.status === 'ERROR').length,
};
}
private static generateId(): string {
return `shop_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
private static mapToShop(data: any): Shop {
return {
id: data.id,
tenantId: data.tenant_id,
departmentId: data.department_id,
name: data.name,
platform: data.platform,
shopId: data.shop_id,
status: data.status,
config: data.config ? JSON.parse(data.config) : undefined,
path: data.path,
depth: data.depth,
createdAt: data.created_at,
updatedAt: data.updated_at,
deletedAt: data.deleted_at,
};
}
}

View File

@@ -16,14 +16,14 @@ declare global {
interface Request {
user?: User;
traceContext?: {
tenantId: string;
shopId: string;
taskId: string;
traceId: string;
userId: string;
roleCode: string;
businessType: 'TOC' | 'TOB';
};
tenantId: string;
shopId: string;
taskId: string;
traceId: string;
userId: string;
roleCode: string;
businessType: 'TOC' | 'TOB';
};
}
}
}

View File

@@ -1 +1,2 @@
// 基本类型定义
/// <reference path="./express.d.ts" />

View File

@@ -1,6 +1,8 @@
import { TenantLogger } from '../services/logging/TenantLogger';
export const logger = {
info: (msg: string, data?: any) => console.log(`[INFO] ${msg}`, data || ''),
error: (msg: string, data?: any) => console.error(`[ERROR] ${msg}`, data || ''),
warn: (msg: string, data?: any) => console.warn(`[WARN] ${msg}`, data || ''),
debug: (msg: string, data?: any) => console.debug(`[DEBUG] ${msg}`, data || '')
info: (msg: string, data?: any, tenantId?: string, userId?: string, traceId?: string, businessType?: 'TOC' | 'TOB') => TenantLogger.info(msg, data, tenantId, userId, traceId, businessType),
error: (msg: string, data?: any, tenantId?: string, userId?: string, traceId?: string, businessType?: 'TOC' | 'TOB') => TenantLogger.error(msg, data, tenantId, userId, traceId, businessType),
warn: (msg: string, data?: any, tenantId?: string, userId?: string, traceId?: string, businessType?: 'TOC' | 'TOB') => TenantLogger.warn(msg, data, tenantId, userId, traceId, businessType),
debug: (msg: string, data?: any, tenantId?: string, userId?: string, traceId?: string, businessType?: 'TOC' | 'TOB') => TenantLogger.debug(msg, data, tenantId, userId, traceId, businessType)
};

View File

@@ -0,0 +1,441 @@
import { Worker, Job } from 'bullmq';
import db from '../config/database';
import { logger } from '../utils/logger';
import { SubscriptionService } from '../domains/Homepage/SubscriptionService';
import { PaymentGatewayService, PaymentProvider } from '../services/payment/PaymentGatewayService';
import { PaymentService } from '../services/core/PaymentService';
import { v4 as uuidv4 } from 'uuid';
export interface AutoRenewTask {
id?: string;
tenantId: string;
userId: string;
subscriptionId: string;
plan: string;
billingCycle: 'monthly' | 'yearly';
amount: number;
currency: string;
paymentMethod?: PaymentProvider;
status: 'PENDING' | 'PROCESSING' | 'SUCCESS' | 'FAILED';
errorMessage?: string;
retryCount: number;
lastRetryAt?: Date;
createdAt: Date;
updatedAt: Date;
}
export class AutoRenewWorker {
private static readonly TASK_TABLE = 'cf_auto_renew_task';
private static readonly QUEUE_NAME = 'subscription.auto-renew';
private static worker: Worker | null = null;
private static scanInterval: NodeJS.Timeout | null = null;
static async initTable(): Promise<void> {
const hasTable = await db.schema.hasTable(this.TASK_TABLE);
if (!hasTable) {
logger.info(`[AutoRenewWorker] Creating ${this.TASK_TABLE} table...`);
await db.schema.createTable(this.TASK_TABLE, (table) => {
table.string('id', 36).primary();
table.string('tenant_id', 64).notNullable().index();
table.string('user_id', 64).notNullable().index();
table.string('subscription_id', 64).notNullable().index();
table.string('plan', 32).notNullable();
table.enum('billing_cycle', ['monthly', 'yearly']).notNullable();
table.decimal('amount', 10, 2).notNullable();
table.string('currency', 10).notNullable().defaultTo('CNY');
table.enum('payment_method', ['ALIPAY', 'WECHAT_PAY']);
table.enum('status', ['PENDING', 'PROCESSING', 'SUCCESS', 'FAILED']).notNullable().defaultTo('PENDING');
table.text('error_message');
table.integer('retry_count').notNullable().defaultTo(0);
table.timestamp('last_retry_at');
table.timestamp('created_at').defaultTo(db.fn.now());
table.timestamp('updated_at').defaultTo(db.fn.now());
table.index(['tenant_id', 'status'], 'idx_renew_tenant_status');
table.index(['subscription_id'], 'idx_renew_subscription');
});
logger.info(`[AutoRenewWorker] Table ${this.TASK_TABLE} created`);
}
}
static async start(): Promise<void> {
await this.initTable();
this.worker = new Worker(
this.QUEUE_NAME,
async (job: Job) => {
return await this.processJob(job);
},
{
connection: {
host: process.env.REDIS_HOST || 'localhost',
port: Number(process.env.REDIS_PORT) || 6379,
},
concurrency: 3,
}
);
this.worker.on('completed', (job) => {
logger.info(`[AutoRenewWorker] Job completed: ${job.id}`);
});
this.worker.on('failed', (job, error) => {
logger.error(`[AutoRenewWorker] Job failed: ${job?.id}, error: ${error.message}`);
});
this.scanInterval = setInterval(async () => {
await this.scanAndCreateTasks();
}, 6 * 60 * 60 * 1000);
logger.info('[AutoRenewWorker] Worker started');
await this.scanAndCreateTasks();
}
static async stop(): Promise<void> {
if (this.scanInterval) {
clearInterval(this.scanInterval);
this.scanInterval = null;
}
if (this.worker) {
await this.worker.close();
this.worker = null;
}
logger.info('[AutoRenewWorker] Worker stopped');
}
static async scanAndCreateTasks(): Promise<void> {
logger.info('[AutoRenewWorker] Starting auto-renew scan...');
try {
const now = new Date();
const targetDate = new Date();
targetDate.setDate(targetDate.getDate() + 3);
const subscriptions = await db('cf_subscription')
.where('status', 'active')
.where('autoRenew', true)
.where('endDate', '>=', now)
.where('endDate', '<=', targetDate);
for (const sub of subscriptions) {
await this.createRenewTaskIfNotExists(sub);
}
logger.info(`[AutoRenewWorker] Auto-renew scan completed, found ${subscriptions.length} subscriptions`);
} catch (error: any) {
logger.error(`[AutoRenewWorker] Scan failed: ${error.message}`);
}
}
private static async createRenewTaskIfNotExists(subscription: any): Promise<void> {
const existingTask = await db(this.TASK_TABLE)
.where('subscription_id', subscription.id)
.whereIn('status', ['PENDING', 'PROCESSING'])
.first();
if (existingTask) {
logger.info(`[AutoRenewWorker] Renew task already exists: subId=${subscription.id}`);
return;
}
const prices: Record<string, Record<string, number>> = {
basic: { monthly: 29, yearly: 290 },
pro: { monthly: 99, yearly: 990 },
enterprise: { monthly: 299, yearly: 2990 },
};
const amount = subscription.plan === 'free' ? 0 : prices[subscription.plan]?.[subscription.billingCycle || 'monthly'] || 0;
const task: AutoRenewTask = {
id: uuidv4(),
tenantId: subscription.tenantId,
userId: subscription.userId,
subscriptionId: subscription.id,
plan: subscription.plan,
billingCycle: subscription.billingCycle || 'monthly',
amount,
currency: 'CNY',
paymentMethod: PaymentProvider.ALIPAY,
status: 'PENDING',
retryCount: 0,
createdAt: new Date(),
updatedAt: new Date()
};
await db(this.TASK_TABLE).insert({
id: task.id,
tenant_id: task.tenantId,
user_id: task.userId,
subscription_id: task.subscriptionId,
plan: task.plan,
billing_cycle: task.billingCycle,
amount: task.amount,
currency: task.currency,
payment_method: task.paymentMethod,
status: task.status,
retry_count: task.retryCount,
created_at: task.createdAt,
updated_at: task.updatedAt
});
logger.info(`[AutoRenewWorker] Created renew task: taskId=${task.id}, subId=${task.subscriptionId}`);
}
private static async processJob(job: Job): Promise<any> {
const { taskId } = job.data;
const task = await this.getTaskById(taskId);
if (!task) {
throw new Error(`Task not found: ${taskId}`);
}
try {
await this.updateTaskStatus(taskId, 'PROCESSING');
if (task.amount > 0) {
await this.processPayment(task);
}
await this.extendSubscription(task);
await this.updateTaskStatus(taskId, 'SUCCESS');
logger.info(`[AutoRenewWorker] Auto-renew successful: taskId=${taskId}`);
return { success: true, taskId };
} catch (error: any) {
await this.handleTaskFailure(task, error.message);
throw error;
}
}
private static async processPayment(task: AutoRenewTask): Promise<void> {
if (!task.paymentMethod) {
throw new Error('No payment method configured');
}
const orderId = `RENEW-${task.subscriptionId}-${Date.now()}`;
const paymentResponse = await PaymentGatewayService.createPayment({
orderId,
amount: task.amount,
currency: task.currency,
subject: `订阅续费 - ${task.plan}`,
body: `自动续费订阅: ${task.plan} (${task.billingCycle})`,
provider: task.paymentMethod,
tenantId: task.tenantId,
shopId: 'default-shop',
userId: task.userId
});
if (!paymentResponse.success) {
throw new Error(paymentResponse.error || 'Payment creation failed');
}
await PaymentService.createPayment({
tenantId: task.tenantId,
shopId: 'default-shop',
amount: task.amount,
currency: task.currency,
status: 'PENDING',
paymentMethod: task.paymentMethod,
transactionId: paymentResponse.transactionId,
type: 'SUBSCRIPTION',
relatedId: task.subscriptionId,
method: task.paymentMethod,
traceId: task.id,
taskId: task.id,
businessType: 'TOC'
});
const maxWaitTime = 5 * 60 * 1000;
const startTime = Date.now();
while (Date.now() - startTime < maxWaitTime) {
await new Promise(resolve => setTimeout(resolve, 5000));
const paymentStatus = await PaymentGatewayService.queryPaymentStatus(
task.paymentMethod,
orderId
);
if (paymentStatus) {
if (paymentStatus.status === 'PAID') {
await PaymentService.updatePaymentStatus(
orderId,
'COMPLETED',
task.id
);
return;
} else if (paymentStatus.status === 'FAILED') {
await PaymentService.updatePaymentStatus(
orderId,
'FAILED',
task.id
);
throw new Error('Payment failed');
}
}
}
throw new Error('Payment timeout');
}
private static async extendSubscription(task: AutoRenewTask): Promise<void> {
const subscription = await db('cf_subscription')
.where('id', task.subscriptionId)
.first();
if (!subscription) {
throw new Error('Subscription not found');
}
const currentEndDate = new Date(subscription.endDate);
const newEndDate = new Date(currentEndDate);
if (task.billingCycle === 'monthly') {
newEndDate.setMonth(newEndDate.getMonth() + 1);
} else {
newEndDate.setFullYear(newEndDate.getFullYear() + 1);
}
await db('cf_subscription')
.where('id', task.subscriptionId)
.update({
endDate: newEndDate,
updatedAt: new Date()
});
logger.info(`[AutoRenewWorker] Subscription extended: subId=${task.subscriptionId}, newEndDate=${newEndDate}`);
}
private static async handleTaskFailure(task: AutoRenewTask, errorMessage: string): Promise<void> {
const newRetryCount = task.retryCount + 1;
const maxRetries = 3;
if (newRetryCount < maxRetries) {
await db(this.TASK_TABLE)
.where('id', task.id)
.update({
status: 'PENDING',
error_message: errorMessage,
retry_count: newRetryCount,
last_retry_at: new Date(),
updated_at: new Date()
});
logger.warn(`[AutoRenewWorker] Task failed, will retry: taskId=${task.id}, retryCount=${newRetryCount}`);
} else {
await db(this.TASK_TABLE)
.where('id', task.id)
.update({
status: 'FAILED',
error_message: errorMessage,
retry_count: newRetryCount,
last_retry_at: new Date(),
updated_at: new Date()
});
logger.error(`[AutoRenewWorker] Task failed, max retries reached: taskId=${task.id}`);
}
}
private static async getTaskById(taskId: string): Promise<AutoRenewTask | null> {
const row = await db(this.TASK_TABLE).where('id', taskId).first();
if (!row) return null;
return {
id: row.id,
tenantId: row.tenant_id,
userId: row.user_id,
subscriptionId: row.subscription_id,
plan: row.plan,
billingCycle: row.billing_cycle,
amount: parseFloat(row.amount),
currency: row.currency,
paymentMethod: row.payment_method,
status: row.status,
errorMessage: row.error_message,
retryCount: row.retry_count,
lastRetryAt: row.last_retry_at,
createdAt: row.created_at,
updatedAt: row.updated_at
};
}
private static async updateTaskStatus(taskId: string, status: AutoRenewTask['status']): Promise<void> {
await db(this.TASK_TABLE)
.where('id', taskId)
.update({
status,
updated_at: new Date()
});
}
static async getPendingTasks(tenantId: string): Promise<AutoRenewTask[]> {
const rows = await db(this.TASK_TABLE)
.where('tenant_id', tenantId)
.whereIn('status', ['PENDING', 'PROCESSING'])
.orderBy('created_at', 'desc');
return rows.map(row => ({
id: row.id,
tenantId: row.tenant_id,
userId: row.user_id,
subscriptionId: row.subscription_id,
plan: row.plan,
billingCycle: row.billing_cycle,
amount: parseFloat(row.amount),
currency: row.currency,
paymentMethod: row.payment_method,
status: row.status,
errorMessage: row.error_message,
retryCount: row.retry_count,
lastRetryAt: row.last_retry_at,
createdAt: row.created_at,
updatedAt: row.updated_at
}));
}
static async getTaskHistory(
tenantId: string,
options?: { limit?: number; offset?: number }
): Promise<{ data: AutoRenewTask[]; total: number }> {
const limit = options?.limit || 20;
const offset = options?.offset || 0;
let query = db(this.TASK_TABLE)
.where('tenant_id', tenantId);
const totalQuery = query.clone();
const [{ count }] = await totalQuery.count('* as count');
const total = Number(count);
const rows = await query
.orderBy('created_at', 'desc')
.limit(limit)
.offset(offset);
const data = rows.map(row => ({
id: row.id,
tenantId: row.tenant_id,
userId: row.user_id,
subscriptionId: row.subscription_id,
plan: row.plan,
billingCycle: row.billing_cycle,
amount: parseFloat(row.amount),
currency: row.currency,
paymentMethod: row.payment_method,
status: row.status,
errorMessage: row.error_message,
retryCount: row.retry_count,
lastRetryAt: row.last_retry_at,
createdAt: row.created_at,
updatedAt: row.updated_at
}));
return { data, total };
}
}

View File

@@ -0,0 +1,404 @@
import { Worker, Job } from 'bullmq';
import db from '../config/database';
import { logger } from '../utils/logger';
import { SubscriptionService } from '../domains/Homepage/SubscriptionService';
import { v4 as uuidv4 } from 'uuid';
export type SubscriptionReminderType =
| 'SUBSCRIPTION_EXPIRING_30'
| 'SUBSCRIPTION_EXPIRING_7'
| 'SUBSCRIPTION_EXPIRING_1'
| 'SUBSCRIPTION_EXPIRED';
export interface SubscriptionReminderNotification {
id?: string;
tenantId: string;
userId: string;
subscriptionId: string;
plan: string;
endDate: Date;
reminderType: SubscriptionReminderType;
daysRemaining: number;
message: string;
isSent: boolean;
sentAt?: Date;
createdAt?: Date;
}
export class SubscriptionReminderWorker {
private static readonly NOTIFICATION_TABLE = 'cf_subscription_notification';
private static readonly QUEUE_NAME = 'subscription.reminder';
private static worker: Worker | null = null;
private static reminderInterval: NodeJS.Timeout | null = null;
static async initTable(): Promise<void> {
const hasTable = await db.schema.hasTable(this.NOTIFICATION_TABLE);
if (!hasTable) {
logger.info(`[SubscriptionReminderWorker] Creating ${this.NOTIFICATION_TABLE} table...`);
await db.schema.createTable(this.NOTIFICATION_TABLE, (table) => {
table.string('id', 36).primary();
table.string('tenant_id', 64).notNullable().index();
table.string('user_id', 64).notNullable().index();
table.string('subscription_id', 64).notNullable().index();
table.string('plan', 32).notNullable();
table.date('end_date').notNullable();
table.enum('reminder_type', [
'SUBSCRIPTION_EXPIRING_30',
'SUBSCRIPTION_EXPIRING_7',
'SUBSCRIPTION_EXPIRING_1',
'SUBSCRIPTION_EXPIRED'
]).notNullable();
table.integer('days_remaining').notNullable();
table.text('message').notNullable();
table.boolean('is_sent').defaultTo(false).notNullable();
table.timestamp('sent_at');
table.timestamp('created_at').defaultTo(db.fn.now());
table.index(['tenant_id', 'is_sent'], 'idx_sub_notif_tenant_sent');
table.index(['subscription_id', 'reminder_type'], 'idx_sub_notif_sub_type');
});
logger.info(`[SubscriptionReminderWorker] Table ${this.NOTIFICATION_TABLE} created`);
}
}
static async start(): Promise<void> {
await this.initTable();
this.worker = new Worker(
this.QUEUE_NAME,
async (job: Job) => {
return await this.processJob(job);
},
{
connection: {
host: process.env.REDIS_HOST || 'localhost',
port: Number(process.env.REDIS_PORT) || 6379,
},
concurrency: 5,
}
);
this.worker.on('completed', (job) => {
logger.info(`[SubscriptionReminderWorker] Job completed: ${job.id}`);
});
this.worker.on('failed', (job, error) => {
logger.error(`[SubscriptionReminderWorker] Job failed: ${job?.id}, error: ${error.message}`);
});
this.reminderInterval = setInterval(async () => {
await this.scanAndNotify();
}, 24 * 60 * 60 * 1000);
logger.info('[SubscriptionReminderWorker] Worker started');
await this.scanAndNotify();
}
static async stop(): Promise<void> {
if (this.reminderInterval) {
clearInterval(this.reminderInterval);
this.reminderInterval = null;
}
if (this.worker) {
await this.worker.close();
this.worker = null;
}
logger.info('[SubscriptionReminderWorker] Worker stopped');
}
static async scanAndNotify(): Promise<void> {
logger.info('[SubscriptionReminderWorker] Starting subscription expiry scan...');
try {
await this.processExpiringSubscriptions(30, 'SUBSCRIPTION_EXPIRING_30');
await this.processExpiringSubscriptions(7, 'SUBSCRIPTION_EXPIRING_7');
await this.processExpiringSubscriptions(1, 'SUBSCRIPTION_EXPIRING_1');
await this.processExpiredSubscriptions();
logger.info('[SubscriptionReminderWorker] Subscription expiry scan completed');
} catch (error: any) {
logger.error(`[SubscriptionReminderWorker] Scan failed: ${error.message}`);
}
}
private static async processExpiringSubscriptions(
days: number,
reminderType: SubscriptionReminderType
): Promise<void> {
const now = new Date();
const targetDate = new Date();
targetDate.setDate(targetDate.getDate() + days);
const subscriptions = await db('cf_subscription')
.where('status', 'active')
.where('endDate', '>=', now)
.where('endDate', '<=', targetDate);
for (const sub of subscriptions) {
const daysRemaining = this.calculateDaysRemaining(sub.endDate);
if (daysRemaining <= days && daysRemaining > days - 1) {
await this.createAndSendNotification(sub, reminderType, daysRemaining);
}
}
}
private static async processExpiredSubscriptions(): Promise<void> {
const now = new Date();
const subscriptions = await db('cf_subscription')
.where('status', 'active')
.where('endDate', '<', now);
for (const sub of subscriptions) {
await this.createAndSendNotification(sub, 'SUBSCRIPTION_EXPIRED', 0);
await db('cf_subscription').where('id', sub.id).update({
status: 'expired',
updatedAt: new Date()
});
}
}
private static async createAndSendNotification(
subscription: any,
reminderType: SubscriptionReminderType,
daysRemaining: number
): Promise<void> {
const existingNotification = await db(this.NOTIFICATION_TABLE)
.where({
subscription_id: subscription.id,
reminder_type: reminderType,
})
.first();
if (existingNotification) {
logger.info(`[SubscriptionReminderWorker] Notification already exists: subId=${subscription.id}, type=${reminderType}`);
return;
}
const notification: SubscriptionReminderNotification = {
id: this.generateId(),
tenantId: subscription.tenantId,
userId: subscription.userId,
subscriptionId: subscription.id,
plan: subscription.plan,
endDate: subscription.endDate,
reminderType,
daysRemaining,
message: this.generateMessage(subscription, reminderType, daysRemaining),
isSent: false,
createdAt: new Date(),
};
await db(this.NOTIFICATION_TABLE).insert({
id: notification.id,
tenant_id: notification.tenantId,
user_id: notification.userId,
subscription_id: notification.subscriptionId,
plan: notification.plan,
end_date: notification.endDate,
reminder_type: notification.reminderType,
days_remaining: notification.daysRemaining,
message: notification.message,
is_sent: false,
created_at: notification.createdAt,
});
await this.sendNotification(notification);
logger.info(`[SubscriptionReminderWorker] Notification created: subId=${subscription.id}, type=${reminderType}, daysRemaining=${daysRemaining}`);
}
private static async sendNotification(notification: SubscriptionReminderNotification): Promise<void> {
try {
logger.info(`[SubscriptionReminderWorker] Sending notification: id=${notification.id}, type=${notification.reminderType}, subId=${notification.subscriptionId}`);
await this.sendSystemNotification(notification);
await this.sendEmailNotification(notification);
await db(this.NOTIFICATION_TABLE)
.where({ id: notification.id })
.update({
is_sent: true,
sent_at: new Date(),
});
logger.info(`[SubscriptionReminderWorker] Notification sent: id=${notification.id}`);
} catch (error: any) {
logger.error(`[SubscriptionReminderWorker] Failed to send notification: id=${notification.id}, error=${error.message}`);
}
}
private static async sendSystemNotification(notification: SubscriptionReminderNotification): Promise<void> {
const notificationTable = 'cf_system_notification';
const hasTable = await db.schema.hasTable(notificationTable);
if (!hasTable) {
await db.schema.createTable(notificationTable, (table) => {
table.string('id', 36).primary();
table.string('tenant_id', 64).notNullable().index();
table.string('user_id', 64);
table.string('type', 32).notNullable();
table.string('title', 255).notNullable();
table.text('content');
table.boolean('is_read').defaultTo(false);
table.timestamp('created_at').defaultTo(db.fn.now());
});
}
await db(notificationTable).insert({
id: `SYS-NOTIF-${Date.now()}-${Math.random().toString(36).substring(2, 8)}`,
tenant_id: notification.tenantId,
user_id: notification.userId,
type: 'SUBSCRIPTION_REMINDER',
title: this.getNotificationTitle(notification.reminderType),
content: notification.message,
is_read: false,
created_at: new Date(),
});
}
private static async sendEmailNotification(notification: SubscriptionReminderNotification): Promise<void> {
logger.info(`[SubscriptionReminderWorker] Email notification: tenantId=${notification.tenantId}, subject=${this.getNotificationTitle(notification.reminderType)}`);
}
private static getNotificationTitle(reminderType: SubscriptionReminderType): string {
switch (reminderType) {
case 'SUBSCRIPTION_EXPIRING_30':
return '订阅即将到期提醒';
case 'SUBSCRIPTION_EXPIRING_7':
return '订阅即将到期紧急提醒';
case 'SUBSCRIPTION_EXPIRING_1':
return '订阅最后提醒';
case 'SUBSCRIPTION_EXPIRED':
return '订阅已过期通知';
default:
return '订阅提醒';
}
}
private static async processJob(job: Job): Promise<any> {
const { action, data } = job.data;
switch (action) {
case 'SCAN_EXPIRING':
await this.scanAndNotify();
return { success: true, scannedAt: new Date() };
case 'SEND_REMINDER':
await this.sendNotification(data as SubscriptionReminderNotification);
return { success: true, notificationId: data.id };
default:
throw new Error(`Unknown action: ${action}`);
}
}
private static calculateDaysRemaining(endDate: Date): number {
const now = new Date();
const expiry = new Date(endDate);
const diffTime = expiry.getTime() - now.getTime();
const diffDays = Math.ceil(diffTime / (1000 * 60 * 60 * 24));
return diffDays;
}
private static generateMessage(
subscription: any,
reminderType: SubscriptionReminderType,
daysRemaining: number
): string {
const planNames: Record<string, string> = {
'free': '免费版',
'basic': '基础版',
'pro': '专业版',
'enterprise': '企业版'
};
const planName = planNames[subscription.plan] || subscription.plan;
const subInfo = `订阅套餐: ${planName}\n到期日期: ${new Date(subscription.endDate).toLocaleDateString('zh-CN')}`;
switch (reminderType) {
case 'SUBSCRIPTION_EXPIRING_30':
return `【提醒】您的订阅将在 ${daysRemaining} 天后到期,请及时安排续费。\n\n${subInfo}\n\n建议提前30天开始办理续费手续以免影响业务运营。`;
case 'SUBSCRIPTION_EXPIRING_7':
return `【紧急】您的订阅将在 ${daysRemaining} 天后到期,请立即安排续费!\n\n${subInfo}\n\n请尽快完成续费避免订阅过期影响系统功能使用。`;
case 'SUBSCRIPTION_EXPIRING_1':
return `【最后提醒】您的订阅将在明天到期!\n\n${subInfo}\n\n请立即处理否则订阅过期后高级功能将无法使用。`;
case 'SUBSCRIPTION_EXPIRED':
return `【过期通知】您的订阅已过期!\n\n${subInfo}\n\n该订阅已失效高级功能已暂停。请立即续费恢复服务。`;
default:
return `订阅提醒\n\n${subInfo}`;
}
}
private static generateId(): string {
return `SUB-REMIND-${Date.now()}-${Math.random().toString(36).substring(2, 10)}`;
}
static async getPendingNotifications(tenantId: string): Promise<SubscriptionReminderNotification[]> {
const rows = await db(this.NOTIFICATION_TABLE)
.where('tenant_id', tenantId)
.where('is_sent', false)
.orderBy('created_at', 'desc');
return rows.map(row => ({
id: row.id,
tenantId: row.tenant_id,
userId: row.user_id,
subscriptionId: row.subscription_id,
plan: row.plan,
endDate: row.end_date,
reminderType: row.reminder_type,
daysRemaining: row.days_remaining,
message: row.message,
isSent: Boolean(row.is_sent),
sentAt: row.sent_at,
createdAt: row.created_at,
}));
}
static async getNotificationHistory(
tenantId: string,
options?: { limit?: number; offset?: number }
): Promise<{ data: SubscriptionReminderNotification[]; total: number }> {
const limit = options?.limit || 20;
const offset = options?.offset || 0;
let query = db(this.NOTIFICATION_TABLE)
.where('tenant_id', tenantId);
const totalQuery = query.clone();
const [{ count }] = await totalQuery.count('* as count');
const total = Number(count);
const rows = await query
.orderBy('created_at', 'desc')
.limit(limit)
.offset(offset);
const data = rows.map(row => ({
id: row.id,
tenantId: row.tenant_id,
userId: row.user_id,
subscriptionId: row.subscription_id,
plan: row.plan,
endDate: row.end_date,
reminderType: row.reminder_type,
daysRemaining: row.days_remaining,
message: row.message,
isSent: Boolean(row.is_sent),
sentAt: row.sent_at,
createdAt: row.created_at,
}));
return { data, total };
}
}