feat: 添加货币和汇率管理功能

refactor: 重构前端路由和登录逻辑

docs: 更新业务闭环、任务和架构文档

style: 调整代码格式和文件结构

chore: 更新依赖项和配置文件
This commit is contained in:
2026-03-19 19:08:15 +08:00
parent 8de9ea0aaa
commit eafa1bbe94
203 changed files with 20240 additions and 39580 deletions

View File

@@ -0,0 +1,457 @@
import { DomainEventBus } from '../runtime/DomainEventBus';
// 数据分类
export interface DataClassification {
id: string;
name: string;
description: string;
sensitivity: 'public' | 'internal' | 'confidential' | 'restricted';
retentionPeriod: number; // 天数
createdAt: Date;
lastUpdated: Date;
}
// 数据访问策略
export interface DataAccessPolicy {
id: string;
name: string;
description: string;
classificationId: string;
roles: string[];
permissions: 'read' | 'write' | 'delete' | 'admin';
conditions?: string; // 访问条件
createdAt: Date;
lastUpdated: Date;
}
// 数据审计日志
export interface DataAuditLog {
id: string;
userId: string;
action: 'read' | 'write' | 'delete' | 'access' | 'export';
dataClassification: string;
dataEntity: string;
recordId?: string;
ipAddress: string;
timestamp: Date;
status: 'success' | 'failed';
errorMessage?: string;
}
// 数据隐私设置
export interface DataPrivacySetting {
id: string;
name: string;
description: string;
type: 'anonymization' | 'masking' | 'encryption' | 'retention';
configuration: Record<string, any>;
enabled: boolean;
createdAt: Date;
lastUpdated: Date;
}
// 数据治理
export class DataGovernance {
private static instance: DataGovernance;
private dataClassifications: Map<string, DataClassification> = new Map();
private accessPolicies: Map<string, DataAccessPolicy> = new Map();
private auditLogs: Map<string, DataAuditLog> = new Map();
private privacySettings: Map<string, DataPrivacySetting> = new Map();
private eventBus: DomainEventBus;
private constructor() {
this.eventBus = DomainEventBus.getInstance();
}
static getInstance(): DataGovernance {
if (!DataGovernance.instance) {
DataGovernance.instance = new DataGovernance();
}
return DataGovernance.instance;
}
// 创建数据分类
async createDataClassification(classification: Omit<DataClassification, 'id' | 'createdAt' | 'lastUpdated'>): Promise<DataClassification> {
const id = `class_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
const newClassification: DataClassification = {
...classification,
id,
createdAt: new Date(),
lastUpdated: new Date()
};
this.dataClassifications.set(id, newClassification);
this.eventBus.publish('data.classification.created', newClassification);
return newClassification;
}
// 获取数据分类
getDataClassification(classificationId: string): DataClassification | undefined {
return this.dataClassifications.get(classificationId);
}
// 获取所有数据分类
getAllDataClassifications(): DataClassification[] {
return Array.from(this.dataClassifications.values());
}
// 更新数据分类
async updateDataClassification(classificationId: string, updates: Partial<DataClassification>): Promise<DataClassification | null> {
const classification = this.dataClassifications.get(classificationId);
if (!classification) {
return null;
}
const updatedClassification = {
...classification,
...updates,
lastUpdated: new Date()
};
this.dataClassifications.set(classificationId, updatedClassification);
this.eventBus.publish('data.classification.updated', updatedClassification);
return updatedClassification;
}
// 创建数据访问策略
async createAccessPolicy(policy: Omit<DataAccessPolicy, 'id' | 'createdAt' | 'lastUpdated'>): Promise<DataAccessPolicy> {
const id = `policy_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
const newPolicy: DataAccessPolicy = {
...policy,
id,
createdAt: new Date(),
lastUpdated: new Date()
};
this.accessPolicies.set(id, newPolicy);
this.eventBus.publish('data.access.policy.created', newPolicy);
return newPolicy;
}
// 获取数据访问策略
getAccessPolicy(policyId: string): DataAccessPolicy | undefined {
return this.accessPolicies.get(policyId);
}
// 获取所有数据访问策略
getAllAccessPolicies(filters?: {
classificationId?: string;
role?: string;
permissions?: string;
}): DataAccessPolicy[] {
let result = Array.from(this.accessPolicies.values());
if (filters) {
if (filters.classificationId) {
result = result.filter(p => p.classificationId === filters.classificationId);
}
if (filters.role) {
result = result.filter(p => p.roles.includes(filters.role!));
}
if (filters.permissions) {
result = result.filter(p => p.permissions === filters.permissions);
}
}
return result;
}
// 更新数据访问策略
async updateAccessPolicy(policyId: string, updates: Partial<DataAccessPolicy>): Promise<DataAccessPolicy | null> {
const policy = this.accessPolicies.get(policyId);
if (!policy) {
return null;
}
const updatedPolicy = {
...policy,
...updates,
lastUpdated: new Date()
};
this.accessPolicies.set(policyId, updatedPolicy);
this.eventBus.publish('data.access.policy.updated', updatedPolicy);
return updatedPolicy;
}
// 检查数据访问权限
async checkAccessPermission(userId: string, role: string, dataClassification: string, action: 'read' | 'write' | 'delete' | 'admin'): Promise<{
allowed: boolean;
policyId?: string;
reason?: string;
}> {
// 这里应该有实际的权限检查逻辑
// 暂时模拟检查
await new Promise(resolve => setTimeout(resolve, 100));
const policies = Array.from(this.accessPolicies.values()).filter(
p => p.classificationId === dataClassification && p.roles.includes(role)
);
if (policies.length === 0) {
return { allowed: false, reason: 'No policy found for this role and classification' };
}
const policy = policies.find(p => {
switch (action) {
case 'admin':
return p.permissions === 'admin';
case 'delete':
return p.permissions === 'admin' || p.permissions === 'delete';
case 'write':
return p.permissions === 'admin' || p.permissions === 'delete' || p.permissions === 'write';
case 'read':
return true;
default:
return false;
}
});
if (policy) {
return { allowed: true, policyId: policy.id };
} else {
return { allowed: false, reason: 'Insufficient permissions' };
}
}
// 记录数据审计日志
async logAuditEvent(log: Omit<DataAuditLog, 'id' | 'timestamp'>): Promise<DataAuditLog> {
const id = `log_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
const newLog: DataAuditLog = {
...log,
id,
timestamp: new Date()
};
this.auditLogs.set(id, newLog);
this.eventBus.publish('data.audit.log.created', newLog);
return newLog;
}
// 获取审计日志
getAuditLog(logId: string): DataAuditLog | undefined {
return this.auditLogs.get(logId);
}
// 获取审计日志
getAuditLogs(filters?: {
userId?: string;
action?: string;
dataClassification?: string;
startDate?: Date;
endDate?: Date;
}): DataAuditLog[] {
let result = Array.from(this.auditLogs.values());
if (filters) {
if (filters.userId) {
result = result.filter(l => l.userId === filters.userId);
}
if (filters.action) {
result = result.filter(l => l.action === filters.action);
}
if (filters.dataClassification) {
result = result.filter(l => l.dataClassification === filters.dataClassification);
}
if (filters.startDate) {
result = result.filter(l => l.timestamp >= filters.startDate!);
}
if (filters.endDate) {
result = result.filter(l => l.timestamp <= filters.endDate!);
}
}
return result.sort((a, b) => b.timestamp.getTime() - a.timestamp.getTime());
}
// 创建数据隐私设置
async createPrivacySetting(setting: Omit<DataPrivacySetting, 'id' | 'createdAt' | 'lastUpdated'>): Promise<DataPrivacySetting> {
const id = `privacy_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
const newSetting: DataPrivacySetting = {
...setting,
id,
createdAt: new Date(),
lastUpdated: new Date()
};
this.privacySettings.set(id, newSetting);
this.eventBus.publish('data.privacy.setting.created', newSetting);
return newSetting;
}
// 获取数据隐私设置
getPrivacySetting(settingId: string): DataPrivacySetting | undefined {
return this.privacySettings.get(settingId);
}
// 获取所有数据隐私设置
getAllPrivacySettings(filters?: {
type?: string;
enabled?: boolean;
}): DataPrivacySetting[] {
let result = Array.from(this.privacySettings.values());
if (filters) {
if (filters.type) {
result = result.filter(s => s.type === filters.type);
}
if (filters.enabled !== undefined) {
result = result.filter(s => s.enabled === filters.enabled);
}
}
return result;
}
// 更新数据隐私设置
async updatePrivacySetting(settingId: string, updates: Partial<DataPrivacySetting>): Promise<DataPrivacySetting | null> {
const setting = this.privacySettings.get(settingId);
if (!setting) {
return null;
}
const updatedSetting = {
...setting,
...updates,
lastUpdated: new Date()
};
this.privacySettings.set(settingId, updatedSetting);
this.eventBus.publish('data.privacy.setting.updated', updatedSetting);
return updatedSetting;
}
// 启用/禁用数据隐私设置
async togglePrivacySetting(settingId: string, enabled: boolean): Promise<boolean> {
const setting = this.privacySettings.get(settingId);
if (!setting) {
return false;
}
setting.enabled = enabled;
setting.lastUpdated = new Date();
this.privacySettings.set(settingId, setting);
this.eventBus.publish('data.privacy.setting.toggled', setting);
return true;
}
// 应用数据隐私处理
async applyPrivacyTreatment(data: any, classificationId: string): Promise<any> {
// 这里应该有实际的隐私处理逻辑
// 暂时模拟处理
await new Promise(resolve => setTimeout(resolve, 500));
const classification = this.dataClassifications.get(classificationId);
if (!classification) {
return data;
}
// 根据数据分类应用不同的隐私处理
switch (classification.sensitivity) {
case 'restricted':
// 应用最严格的隐私处理
return this.maskSensitiveData(data);
case 'confidential':
// 应用中等隐私处理
return this.anonymizeData(data);
case 'internal':
// 应用轻度隐私处理
return this.removePII(data);
case 'public':
default:
// 无需处理
return data;
}
}
// 掩码敏感数据
private maskSensitiveData(data: any): any {
// 这里应该有实际的掩码逻辑
return { ...data, masked: true };
}
// 匿名化数据
private anonymizeData(data: any): any {
// 这里应该有实际的匿名化逻辑
return { ...data, anonymized: true };
}
// 移除个人身份信息
private removePII(data: any): any {
// 这里应该有实际的PII移除逻辑
return { ...data, piiRemoved: true };
}
// 生成数据治理报告
async generateGovernanceReport(): Promise<{
summary: {
totalClassifications: number;
totalPolicies: number;
totalAuditLogs: number;
totalPrivacySettings: number;
};
classifications: DataClassification[];
policies: DataAccessPolicy[];
recentAuditLogs: DataAuditLog[];
privacySettings: DataPrivacySetting[];
}> {
const classifications = Array.from(this.dataClassifications.values());
const policies = Array.from(this.accessPolicies.values());
const auditLogs = Array.from(this.auditLogs.values()).sort((a, b) => b.timestamp.getTime() - a.timestamp.getTime()).slice(0, 100);
const privacySettings = Array.from(this.privacySettings.values());
return {
summary: {
totalClassifications: classifications.length,
totalPolicies: policies.length,
totalAuditLogs: this.auditLogs.size,
totalPrivacySettings: privacySettings.length
},
classifications,
policies,
recentAuditLogs: auditLogs,
privacySettings
};
}
// 获取数据治理统计信息
getGovernanceStats(): {
totalClassifications: number;
totalPolicies: number;
totalAuditLogs: number;
totalPrivacySettings: number;
enabledPrivacySettings: number;
auditLogsToday: number;
} {
const today = new Date();
today.setHours(0, 0, 0, 0);
const auditLogsToday = Array.from(this.auditLogs.values())
.filter(log => log.timestamp >= today)
.length;
const enabledPrivacySettings = Array.from(this.privacySettings.values())
.filter(setting => setting.enabled)
.length;
return {
totalClassifications: this.dataClassifications.size,
totalPolicies: this.accessPolicies.size,
totalAuditLogs: this.auditLogs.size,
totalPrivacySettings: this.privacySettings.size,
enabledPrivacySettings,
auditLogsToday
};
}
}

View File

@@ -0,0 +1,392 @@
import { DomainEventBus } from '../runtime/DomainEventBus';
// 数据源配置
export interface DataSourceConfig {
id: string;
name: string;
type: 'database' | 'api' | 'file' | 'stream';
connectionString: string;
credentials?: Record<string, string>;
options?: Record<string, any>;
status: 'active' | 'inactive' | 'error';
createdAt: Date;
lastUpdated: Date;
}
// 数据转换规则
export interface DataTransformationRule {
id: string;
name: string;
sourceField: string;
targetField: string;
transformation: string; // 转换表达式
conditions?: string; // 条件表达式
createdAt: Date;
lastUpdated: Date;
}
// 数据集成任务
export interface IntegrationTask {
id: string;
name: string;
sourceDataSourceId: string;
targetDataSourceId: string;
transformationRules: string[]; // 规则ID列表
schedule: string; // cron表达式
status: 'pending' | 'running' | 'success' | 'failed';
lastRun: Date | null;
nextRun: Date | null;
createdAt: Date;
lastUpdated: Date;
}
// 集成任务执行日志
export interface IntegrationLog {
id: string;
taskId: string;
status: 'success' | 'failed';
startTime: Date;
endTime: Date;
recordsProcessed: number;
recordsFailed: number;
errorMessage?: string;
metadata?: Record<string, any>;
}
// 数据集成平台
export class DataIntegrationPlatform {
private static instance: DataIntegrationPlatform;
private dataSources: Map<string, DataSourceConfig> = new Map();
private transformationRules: Map<string, DataTransformationRule> = new Map();
private integrationTasks: Map<string, IntegrationTask> = new Map();
private integrationLogs: Map<string, IntegrationLog> = new Map();
private eventBus: DomainEventBus;
private constructor() {
this.eventBus = DomainEventBus.getInstance();
}
static getInstance(): DataIntegrationPlatform {
if (!DataIntegrationPlatform.instance) {
DataIntegrationPlatform.instance = new DataIntegrationPlatform();
}
return DataIntegrationPlatform.instance;
}
// 注册数据源
async registerDataSource(dataSource: Omit<DataSourceConfig, 'id' | 'status' | 'createdAt' | 'lastUpdated'>): Promise<DataSourceConfig> {
const id = `ds_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
const newDataSource: DataSourceConfig = {
...dataSource,
id,
status: 'inactive',
createdAt: new Date(),
lastUpdated: new Date()
};
this.dataSources.set(id, newDataSource);
this.eventBus.publish('datasource.registered', newDataSource);
return newDataSource;
}
// 获取数据源信息
getDataSource(dataSourceId: string): DataSourceConfig | undefined {
return this.dataSources.get(dataSourceId);
}
// 获取所有数据源
getAllDataSources(filters?: {
type?: string;
status?: string;
}): DataSourceConfig[] {
let result = Array.from(this.dataSources.values());
if (filters) {
if (filters.type) {
result = result.filter(ds => ds.type === filters.type);
}
if (filters.status) {
result = result.filter(ds => ds.status === filters.status);
}
}
return result;
}
// 更新数据源
async updateDataSource(dataSourceId: string, updates: Partial<DataSourceConfig>): Promise<DataSourceConfig | null> {
const dataSource = this.dataSources.get(dataSourceId);
if (!dataSource) {
return null;
}
const updatedDataSource = {
...dataSource,
...updates,
lastUpdated: new Date()
};
this.dataSources.set(dataSourceId, updatedDataSource);
this.eventBus.publish('datasource.updated', updatedDataSource);
return updatedDataSource;
}
// 测试数据源连接
async testDataSourceConnection(dataSourceId: string): Promise<{ success: boolean; message: string }> {
const dataSource = this.dataSources.get(dataSourceId);
if (!dataSource) {
return { success: false, message: 'DataSource not found' };
}
try {
// 这里应该有实际的连接测试逻辑
// 暂时模拟测试
await new Promise(resolve => setTimeout(resolve, 1000));
const success = Math.random() > 0.1; // 90% 成功率
if (success) {
dataSource.status = 'active';
this.dataSources.set(dataSourceId, dataSource);
return { success: true, message: 'Connection successful' };
} else {
dataSource.status = 'error';
this.dataSources.set(dataSourceId, dataSource);
return { success: false, message: 'Connection failed' };
}
} catch (error) {
dataSource.status = 'error';
this.dataSources.set(dataSourceId, dataSource);
return { success: false, message: error instanceof Error ? error.message : 'Unknown error' };
}
}
// 创建数据转换规则
async createTransformationRule(rule: Omit<DataTransformationRule, 'id' | 'createdAt' | 'lastUpdated'>): Promise<DataTransformationRule> {
const id = `rule_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
const newRule: DataTransformationRule = {
...rule,
id,
createdAt: new Date(),
lastUpdated: new Date()
};
this.transformationRules.set(id, newRule);
this.eventBus.publish('transformation.rule.created', newRule);
return newRule;
}
// 获取转换规则
getTransformationRule(ruleId: string): DataTransformationRule | undefined {
return this.transformationRules.get(ruleId);
}
// 获取所有转换规则
getAllTransformationRules(): DataTransformationRule[] {
return Array.from(this.transformationRules.values());
}
// 更新转换规则
async updateTransformationRule(ruleId: string, updates: Partial<DataTransformationRule>): Promise<DataTransformationRule | null> {
const rule = this.transformationRules.get(ruleId);
if (!rule) {
return null;
}
const updatedRule = {
...rule,
...updates,
lastUpdated: new Date()
};
this.transformationRules.set(ruleId, updatedRule);
this.eventBus.publish('transformation.rule.updated', updatedRule);
return updatedRule;
}
// 创建集成任务
async createIntegrationTask(task: Omit<IntegrationTask, 'id' | 'status' | 'lastRun' | 'nextRun' | 'createdAt' | 'lastUpdated'>): Promise<IntegrationTask> {
const id = `task_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
const newTask: IntegrationTask = {
...task,
id,
status: 'pending',
lastRun: null,
nextRun: this.calculateNextRun(task.schedule),
createdAt: new Date(),
lastUpdated: new Date()
};
this.integrationTasks.set(id, newTask);
this.eventBus.publish('integration.task.created', newTask);
return newTask;
}
// 获取集成任务
getIntegrationTask(taskId: string): IntegrationTask | undefined {
return this.integrationTasks.get(taskId);
}
// 获取所有集成任务
getAllIntegrationTasks(filters?: {
status?: string;
sourceDataSourceId?: string;
targetDataSourceId?: string;
}): IntegrationTask[] {
let result = Array.from(this.integrationTasks.values());
if (filters) {
if (filters.status) {
result = result.filter(t => t.status === filters.status);
}
if (filters.sourceDataSourceId) {
result = result.filter(t => t.sourceDataSourceId === filters.sourceDataSourceId);
}
if (filters.targetDataSourceId) {
result = result.filter(t => t.targetDataSourceId === filters.targetDataSourceId);
}
}
return result;
}
// 更新集成任务
async updateIntegrationTask(taskId: string, updates: Partial<IntegrationTask>): Promise<IntegrationTask | null> {
const task = this.integrationTasks.get(taskId);
if (!task) {
return null;
}
const updatedTask = {
...task,
...updates,
lastUpdated: new Date()
};
if (updates.schedule) {
updatedTask.nextRun = this.calculateNextRun(updates.schedule);
}
this.integrationTasks.set(taskId, updatedTask);
this.eventBus.publish('integration.task.updated', updatedTask);
return updatedTask;
}
// 执行集成任务
async executeIntegrationTask(taskId: string): Promise<IntegrationLog> {
const task = this.integrationTasks.get(taskId);
if (!task) {
throw new Error('Task not found');
}
task.status = 'running';
this.integrationTasks.set(taskId, task);
this.eventBus.publish('integration.task.started', task);
const startTime = new Date();
let recordsProcessed = 0;
let recordsFailed = 0;
let errorMessage: string | undefined;
try {
// 这里应该有实际的任务执行逻辑
// 暂时模拟执行
await new Promise(resolve => setTimeout(resolve, 3000));
recordsProcessed = Math.floor(Math.random() * 1000) + 100;
recordsFailed = Math.floor(Math.random() * 10);
if (recordsFailed > 5) {
throw new Error('Integration failed');
}
task.status = 'success';
task.lastRun = new Date();
task.nextRun = this.calculateNextRun(task.schedule);
} catch (error) {
task.status = 'failed';
errorMessage = error instanceof Error ? error.message : 'Unknown error';
}
this.integrationTasks.set(taskId, task);
const endTime = new Date();
const log: IntegrationLog = {
id: `log_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
taskId,
status: task.status === 'success' ? 'success' : 'failed',
startTime,
endTime,
recordsProcessed,
recordsFailed,
errorMessage
};
this.integrationLogs.set(log.id, log);
this.eventBus.publish('integration.task.completed', { task, log });
return log;
}
// 获取任务执行日志
getIntegrationLog(logId: string): IntegrationLog | undefined {
return this.integrationLogs.get(logId);
}
// 获取任务的执行日志
getTaskLogs(taskId: string): IntegrationLog[] {
return Array.from(this.integrationLogs.values()).filter(log => log.taskId === taskId);
}
// 计算下次执行时间
private calculateNextRun(cronExpression: string): Date {
// 这里应该有实际的cron表达式解析逻辑
// 暂时返回当前时间后1小时
const nextRun = new Date();
nextRun.setHours(nextRun.getHours() + 1);
return nextRun;
}
// 启动所有定时任务
async startScheduledTasks(): Promise<void> {
// 这里应该有实际的定时任务启动逻辑
// 暂时模拟启动
this.eventBus.publish('integration.tasks.started', { timestamp: new Date() });
}
// 停止所有定时任务
async stopScheduledTasks(): Promise<void> {
// 这里应该有实际的定时任务停止逻辑
// 暂时模拟停止
this.eventBus.publish('integration.tasks.stopped', { timestamp: new Date() });
}
// 获取平台状态
getPlatformStatus(): {
totalDataSources: number;
activeDataSources: number;
totalTasks: number;
runningTasks: number;
successTasks: number;
failedTasks: number;
totalLogs: number;
} {
const dataSources = Array.from(this.dataSources.values());
const tasks = Array.from(this.integrationTasks.values());
const logs = Array.from(this.integrationLogs.values());
return {
totalDataSources: dataSources.length,
activeDataSources: dataSources.filter(ds => ds.status === 'active').length,
totalTasks: tasks.length,
runningTasks: tasks.filter(t => t.status === 'running').length,
successTasks: tasks.filter(t => t.status === 'success').length,
failedTasks: tasks.filter(t => t.status === 'failed').length,
totalLogs: logs.length
};
}
}

View File

@@ -0,0 +1,399 @@
import { DomainEventBus } from '../runtime/DomainEventBus';
// 数据质量规则
export interface DataQualityRule {
id: string;
name: string;
description: string;
type: 'validation' | 'completeness' | 'accuracy' | 'consistency' | 'timeliness';
expression: string; // 规则表达式
severity: 'low' | 'medium' | 'high';
threshold: number;
enabled: boolean;
createdAt: Date;
lastUpdated: Date;
}
// 数据质量检查结果
export interface QualityCheckResult {
id: string;
ruleId: string;
dataSourceId: string;
entity: string; // 表名或实体名
recordsChecked: number;
recordsFailed: number;
failureRate: number;
status: 'pass' | 'warning' | 'fail';
timestamp: Date;
details?: Record<string, any>;
}
// 数据质量指标
export interface DataQualityMetrics {
dataSourceId: string;
entity: string;
completeness: number; // 完整性百分比
accuracy: number; // 准确性百分比
consistency: number; // 一致性百分比
timeliness: number; // 及时性百分比
overallScore: number; // 总体得分
lastUpdated: Date;
}
// 数据质量服务
export class DataQualityService {
private static instance: DataQualityService;
private qualityRules: Map<string, DataQualityRule> = new Map();
private checkResults: Map<string, QualityCheckResult> = new Map();
private qualityMetrics: Map<string, DataQualityMetrics> = new Map();
private eventBus: DomainEventBus;
private constructor() {
this.eventBus = DomainEventBus.getInstance();
}
static getInstance(): DataQualityService {
if (!DataQualityService.instance) {
DataQualityService.instance = new DataQualityService();
}
return DataQualityService.instance;
}
// 创建数据质量规则
async createQualityRule(rule: Omit<DataQualityRule, 'id' | 'createdAt' | 'lastUpdated'>): Promise<DataQualityRule> {
const id = `rule_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
const newRule: DataQualityRule = {
...rule,
id,
createdAt: new Date(),
lastUpdated: new Date()
};
this.qualityRules.set(id, newRule);
this.eventBus.publish('quality.rule.created', newRule);
return newRule;
}
// 获取质量规则
getQualityRule(ruleId: string): DataQualityRule | undefined {
return this.qualityRules.get(ruleId);
}
// 获取所有质量规则
getAllQualityRules(filters?: {
type?: string;
severity?: string;
enabled?: boolean;
}): DataQualityRule[] {
let result = Array.from(this.qualityRules.values());
if (filters) {
if (filters.type) {
result = result.filter(r => r.type === filters.type);
}
if (filters.severity) {
result = result.filter(r => r.severity === filters.severity);
}
if (filters.enabled !== undefined) {
result = result.filter(r => r.enabled === filters.enabled);
}
}
return result;
}
// 更新质量规则
async updateQualityRule(ruleId: string, updates: Partial<DataQualityRule>): Promise<DataQualityRule | null> {
const rule = this.qualityRules.get(ruleId);
if (!rule) {
return null;
}
const updatedRule = {
...rule,
...updates,
lastUpdated: new Date()
};
this.qualityRules.set(ruleId, updatedRule);
this.eventBus.publish('quality.rule.updated', updatedRule);
return updatedRule;
}
// 启用/禁用质量规则
async toggleQualityRule(ruleId: string, enabled: boolean): Promise<boolean> {
const rule = this.qualityRules.get(ruleId);
if (!rule) {
return false;
}
rule.enabled = enabled;
rule.lastUpdated = new Date();
this.qualityRules.set(ruleId, rule);
this.eventBus.publish('quality.rule.toggled', rule);
return true;
}
// 执行数据质量检查
async runQualityCheck(dataSourceId: string, entity: string): Promise<QualityCheckResult[]> {
const enabledRules = this.getAllQualityRules({ enabled: true });
const results: QualityCheckResult[] = [];
for (const rule of enabledRules) {
const result = await this.checkRule(dataSourceId, entity, rule);
results.push(result);
}
// 更新数据质量指标
await this.updateQualityMetrics(dataSourceId, entity, results);
this.eventBus.publish('quality.check.completed', { dataSourceId, entity, results, timestamp: new Date() });
return results;
}
// 检查单个规则
private async checkRule(dataSourceId: string, entity: string, rule: DataQualityRule): Promise<QualityCheckResult> {
// 这里应该有实际的规则检查逻辑
// 暂时模拟检查
await new Promise(resolve => setTimeout(resolve, 500));
const recordsChecked = Math.floor(Math.random() * 1000) + 100;
const recordsFailed = Math.floor(Math.random() * 50);
const failureRate = recordsFailed / recordsChecked;
let status: 'pass' | 'warning' | 'fail';
if (failureRate === 0) {
status = 'pass';
} else if (failureRate < rule.threshold) {
status = 'warning';
} else {
status = 'fail';
}
const result: QualityCheckResult = {
id: `result_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
ruleId: rule.id,
dataSourceId,
entity,
recordsChecked,
recordsFailed,
failureRate,
status,
timestamp: new Date()
};
this.checkResults.set(result.id, result);
return result;
}
// 更新数据质量指标
private async updateQualityMetrics(dataSourceId: string, entity: string, results: QualityCheckResult[]): Promise<void> {
const key = `${dataSourceId}:${entity}`;
// 计算各项指标
let completeness = 100;
let accuracy = 100;
let consistency = 100;
let timeliness = 100;
for (const result of results) {
const rule = this.qualityRules.get(result.ruleId);
if (rule) {
const score = (1 - result.failureRate) * 100;
switch (rule.type) {
case 'completeness':
completeness = score;
break;
case 'accuracy':
accuracy = score;
break;
case 'consistency':
consistency = score;
break;
case 'timeliness':
timeliness = score;
break;
}
}
}
const overallScore = (completeness + accuracy + consistency + timeliness) / 4;
const metrics: DataQualityMetrics = {
dataSourceId,
entity,
completeness,
accuracy,
consistency,
timeliness,
overallScore,
lastUpdated: new Date()
};
this.qualityMetrics.set(key, metrics);
this.eventBus.publish('quality.metrics.updated', metrics);
}
// 获取数据质量指标
getQualityMetrics(dataSourceId: string, entity: string): DataQualityMetrics | undefined {
const key = `${dataSourceId}:${entity}`;
return this.qualityMetrics.get(key);
}
// 获取所有数据质量指标
getAllQualityMetrics(filters?: {
dataSourceId?: string;
minScore?: number;
}): DataQualityMetrics[] {
let result = Array.from(this.qualityMetrics.values());
if (filters) {
if (filters.dataSourceId) {
result = result.filter(m => m.dataSourceId === filters.dataSourceId);
}
if (filters.minScore !== undefined) {
result = result.filter(m => m.overallScore >= filters.minScore!);
}
}
return result;
}
// 获取质量检查结果
getQualityCheckResult(resultId: string): QualityCheckResult | undefined {
return this.checkResults.get(resultId);
}
// 获取数据源的质量检查结果
getDataSourceCheckResults(dataSourceId: string): QualityCheckResult[] {
return Array.from(this.checkResults.values()).filter(r => r.dataSourceId === dataSourceId);
}
// 获取规则的质量检查结果
getRuleCheckResults(ruleId: string): QualityCheckResult[] {
return Array.from(this.checkResults.values()).filter(r => r.ruleId === ruleId);
}
// 生成数据质量报告
async generateQualityReport(dataSourceId?: string): Promise<{
summary: {
totalChecks: number;
passedChecks: number;
warningChecks: number;
failedChecks: number;
averageScore: number;
};
details: QualityCheckResult[];
metrics: DataQualityMetrics[];
}> {
let results = Array.from(this.checkResults.values());
let metrics = Array.from(this.qualityMetrics.values());
if (dataSourceId) {
results = results.filter(r => r.dataSourceId === dataSourceId);
metrics = metrics.filter(m => m.dataSourceId === dataSourceId);
}
const passedChecks = results.filter(r => r.status === 'pass').length;
const warningChecks = results.filter(r => r.status === 'warning').length;
const failedChecks = results.filter(r => r.status === 'fail').length;
const averageScore = metrics.length > 0
? metrics.reduce((sum, m) => sum + m.overallScore, 0) / metrics.length
: 0;
return {
summary: {
totalChecks: results.length,
passedChecks,
warningChecks,
failedChecks,
averageScore
},
details: results,
metrics
};
}
// 自动修复数据质量问题
async autoFixQualityIssues(dataSourceId: string, entity: string, ruleId: string): Promise<{
success: boolean;
fixedRecords: number;
message: string;
}> {
// 这里应该有实际的自动修复逻辑
// 暂时模拟修复
await new Promise(resolve => setTimeout(resolve, 2000));
const fixedRecords = Math.floor(Math.random() * 50);
const success = Math.random() > 0.2; // 80% 成功率
return {
success,
fixedRecords,
message: success
? `Fixed ${fixedRecords} records`
: 'Auto-fix failed'
};
}
// 获取数据质量趋势
getQualityTrend(dataSourceId: string, entity: string, days: number = 30): {
dates: string[];
scores: number[];
} {
// 这里应该有实际的趋势分析逻辑
// 暂时返回模拟数据
const dates: string[] = [];
const scores: number[] = [];
for (let i = days - 1; i >= 0; i--) {
const date = new Date();
date.setDate(date.getDate() - i);
dates.push(date.toISOString().split('T')[0]);
scores.push(Math.random() * 20 + 80); // 80-100 之间的随机数
}
return {
dates,
scores
};
}
// 获取数据质量统计信息
getQualityStats(): {
totalRules: number;
enabledRules: number;
totalChecks: number;
passedChecks: number;
warningChecks: number;
failedChecks: number;
averageScore: number;
} {
const rules = Array.from(this.qualityRules.values());
const results = Array.from(this.checkResults.values());
const metrics = Array.from(this.qualityMetrics.values());
const passedChecks = results.filter(r => r.status === 'pass').length;
const warningChecks = results.filter(r => r.status === 'warning').length;
const failedChecks = results.filter(r => r.status === 'fail').length;
const averageScore = metrics.length > 0
? metrics.reduce((sum, m) => sum + m.overallScore, 0) / metrics.length
: 0;
return {
totalRules: rules.length,
enabledRules: rules.filter(r => r.enabled).length,
totalChecks: results.length,
passedChecks,
warningChecks,
failedChecks,
averageScore
};
}
}