Skip to content

FlowEngineLib 架构设计文档

FlowEngineLib 的核心架构设计和技术实现详解

📋 目录

架构概览

总体架构

FlowEngineLib 采用分层架构设计,主要分为以下几层:

技术栈层次

┌─────────────────────────────────────┐
│     UI Layer (WinForms)             │
│  ST.Library.UI.NodeEditor           │
├─────────────────────────────────────┤
│     Business Logic Layer            │
│  FlowEngineControl, NodeManager     │
├─────────────────────────────────────┤
│     Domain Model Layer              │
│  Node Types, Data Models, Events    │
├─────────────────────────────────────┤
│     Infrastructure Layer            │
│  MQTT, Logging, Serialization       │
├─────────────────────────────────────┤
│     Framework Layer                 │
│  .NET 8.0 / .NET Framework 4.7.2    │
└─────────────────────────────────────┘

核心设计模式

1. 模板方法模式

节点基类定义了执行流程的模板,子类实现具体步骤。

csharp
// 抽象基类定义模板
public abstract class CVBaseServerNode : CVCommonNode
{
    // 模板方法
    public void Execute(CVStartCFC cfc)
    {
        OnBeforeExecute(cfc);
        DoServerWork(cfc);        // 抽象方法,子类实现
        OnAfterExecute(cfc);
        DoTransferData(m_op_data_out, cfc);
    }
    
    // 抽象方法
    protected abstract void DoServerWork(CVStartCFC cfc);
    
    // 钩子方法
    protected virtual void OnBeforeExecute(CVStartCFC cfc) { }
    protected virtual void OnAfterExecute(CVStartCFC cfc) { }
}

优势

  • 统一执行流程
  • 灵活扩展具体实现
  • 易于维护和测试

2. 策略模式

用于算法节点的算法选择。

csharp
public interface IAlgorithmStrategy
{
    AlgorithmResult Execute(ImageData image, Dictionary\\<string, object\> parameters);
}

public class EdgeDetectionStrategy : IAlgorithmStrategy
{
    public AlgorithmResult Execute(ImageData image, Dictionary\\<string, object\> parameters)
    {
        // 边缘检测实现
    }
}

public class AlgorithmNode : CVBaseServerNode
{
    private Dictionary\\<string, IAlgorithmStrategy\> _strategies;
    
    protected override void DoServerWork(CVStartCFC cfc)
    {
        var strategy = _strategies[AlgorithmType];
        var result = strategy.Execute(imageData, parameters);
    }
}

3. 观察者模式

用于事件通知机制。

csharp
// 事件发布者
public class BaseStartNode : CVCommonNode
{
    public event FlowStartEventHandler Finished;
    
    protected void RaiseFinished(FlowStartEventArgs args)
    {
        Finished?.Invoke(this, args);
    }
}

// 事件订阅者
flowEngine.Finished += (sender, args) => {
    Console.WriteLine($"Flow {args.FlowName} completed");
};

4. 单例模式

用于全局管理器。

csharp
public class FlowNodeManager
{
    private static FlowNodeManager _instance;
    private static readonly object _lock = new object();
    
    public static FlowNodeManager Instance
    {
        get
        {
            if (_instance == null)
            {
                lock (_lock)
                {
                    if (_instance == null)
                    {
                        _instance = new FlowNodeManager();
                    }
                }
            }
            return _instance;
        }
    }
}

5. 工厂模式

用于节点创建。

csharp
public class NodeFactory
{
    private Dictionary\\<string, Type\> _nodeTypes;
    
    public STNode CreateNode(string nodeTypeName)
    {
        if (_nodeTypes.TryGetValue(nodeTypeName, out Type nodeType))
        {
            return (STNode)Activator.CreateInstance(nodeType);
        }
        throw new NotSupportedException($"Node type {nodeTypeName} not found");
    }
    
    public void RegisterNodeType\<T\>() where T : STNode
    {
        var attr = typeof(T).GetCustomAttribute\<STNodeAttribute\>();
        if (attr != null)
        {
            _nodeTypes[attr.Path] = typeof(T);
        }
    }
}

6. 责任链模式

用于节点间的数据传递。

模块划分

1. 核心模块

FlowEngineControl

  • 职责: 流程引擎主控制器
  • 功能:
    • 管理流程生命周期
    • 节点编辑器集成
    • 流程执行控制

FlowNodeManager

  • 职责: 节点管理
  • 功能:
    • 节点注册和发现
    • 设备节点管理
    • 节点状态同步

FlowServiceManager

  • 职责: 服务管理
  • 功能:
    • MQTT服务管理
    • 服务状态维护
    • 服务查找

2. 节点模块

节点类层次

STNode (ST.Library.UI)
└── CVCommonNode
    ├── BaseStartNode
    │   ├── MQTTStartNode
    │   └── ModbusStartNode
    ├── CVBaseServerNode
    │   ├── CVCameraNode
    │   ├── AlgorithmNode
    │   ├── SMUNode
    │   ├── MotorNode
    │   └── CVBaseLoopServerNode
    │       └── PGLoopNode
    ├── CVEndNode
    └── LoopNode

节点职责分离

节点类型职责特点
StartNode流程入口管理流程上下文,触发流程
ServerNode业务执行执行具体业务逻辑
LoopNode循环控制管理循环状态和迭代
EndNode流程结束收集结果,清理资源

3. 通信模块

MQTT通信架构

主题设计

服务类型/{设备代码}/消息类型
├── Camera/CAM001/cmd          # 相机命令
├── Camera/CAM001/resp         # 相机响应
├── Camera/CAM001/data         # 相机数据
├── SMU/SMU001/cmd            # SMU命令
├── SMU/SMU001/resp           # SMU响应
└── Algorithm/ALG001/result   # 算法结果

4. 数据模块

数据流对象

csharp
// 流程控制对象
public class CVStartCFC
{
    public string NodeName { get; set; }
    public string SerialNumber { get; set; }
    public string FlowName { get; set; }
    public object Params { get; set; }
    public string EventName { get; set; }
}

// 传输动作对象
public class CVTransAction
{
    public ActionTypeEnum ActionType { get; set; }
    public object Data { get; set; }
    public StatusTypeEnum Status { get; set; }
}

// 循环控制对象
public class CVLoopCFC
{
    public string NodeName { get; set; }
    public string SerialNumber { get; set; }
    public int LoopIndex { get; set; }
    public object LoopData { get; set; }
}

数据模型设计原则

  1. 不可变性: 关键数据对象采用不可变设计
  2. 序列化友好: 支持JSON序列化
  3. 类型安全: 强类型定义
  4. 扩展性: 使用Dictionary\<string, object>支持动态属性

数据流设计

1. 流程执行数据流

2. 循环执行数据流

3. MQTT数据流

通信架构

1. MQTT通信层次

应用层 (Node)

抽象层 (MQTTHelper)

协议层 (MQTTnet)

传输层 (TCP/TLS)

2. 消息格式

命令消息

json
{
  "Version": "1.0",
  "MsgID": "12345",
  "Command": "Capture",
  "Params": {
    "ExposureTime": 1000,
    "Gain": 10
  },
  "Timestamp": "2024-01-01T12:00:00"
}

响应消息

json
{
  "Version": "1.0",
  "MsgID": "12345",
  "Code": 0,
  "Message": "Success",
  "Data": {
    "ImagePath": "/path/to/image.png",
    "Width": 1920,
    "Height": 1080
  },
  "Timestamp": "2024-01-01T12:00:01"
}

3. QoS策略

QoS级别使用场景特点
0状态上报不保证送达,性能最高
1一般命令至少送达一次
2关键命令确保送达且仅一次

执行引擎

1. 执行流程

csharp
public class FlowExecutor
{
    public async Task\<FlowResult\> ExecuteAsync(
        string flowName,
        CVStartCFC startCFC,
        CancellationToken ct)
    {
        try
        {
            // 1. 查找启动节点
            var startNode = FindStartNode(flowName);
            
            // 2. 构建执行上下文
            var context = BuildContext(startCFC);
            
            // 3. 执行流程
            await ExecuteFlowAsync(startNode, context, ct);
            
            // 4. 收集结果
            return CollectResults(context);
        }
        catch (Exception ex)
        {
            logger.Error($"Flow {flowName} execution failed", ex);
            return FlowResult.Failed(ex.Message);
        }
    }
    
    private async Task ExecuteFlowAsync(
        BaseStartNode startNode,
        FlowContext context,
        CancellationToken ct)
    {
        // 递归执行节点链
        var currentNode = startNode;
        while (currentNode != null && !ct.IsCancellationRequested)
        {
            await currentNode.ExecuteAsync(context);
            currentNode = GetNextNode(currentNode, context);
        }
    }
}

2. 并发控制

csharp
public class ConcurrentFlowManager
{
    private readonly SemaphoreSlim _semaphore;
    private readonly ConcurrentDictionary\\<string, FlowContext\> _runningFlows;
    
    public ConcurrentFlowManager(int maxConcurrent)
    {
        _semaphore = new SemaphoreSlim(maxConcurrent, maxConcurrent);
        _runningFlows = new ConcurrentDictionary\\<string, FlowContext\>();
    }
    
    public async Task\<FlowResult\> RunFlowAsync(
        string flowName,
        CancellationToken ct)
    {
        await _semaphore.WaitAsync(ct);
        try
        {
            var context = CreateContext(flowName);
            _runningFlows[flowName] = context;
            
            return await ExecuteFlowAsync(context, ct);
        }
        finally
        {
            _runningFlows.TryRemove(flowName, out _);
            _semaphore.Release();
        }
    }
}

3. 状态机设计

扩展机制

1. 节点注册机制

csharp
public class NodeRegistry
{
    private readonly Dictionary\\<string, NodeDescriptor\> _nodes;
    
    // 自动发现并注册节点
    public void ScanAndRegisterNodes(Assembly assembly)
    {
        var nodeTypes = assembly.GetTypes()
            .Where(t => t.IsSubclassOf(typeof(STNode)) && !t.IsAbstract);
            
        foreach (var type in nodeTypes)
        {
            var attr = type.GetCustomAttribute\<STNodeAttribute\>();
            if (attr != null)
            {
                RegisterNode(new NodeDescriptor
                {
                    Type = type,
                    Path = attr.Path,
                    Category = GetCategory(attr.Path)
                });
            }
        }
    }
    
    // 手动注册节点
    public void RegisterNode\<T\>() where T : STNode
    {
        // 注册逻辑
    }
}

2. 插件系统

csharp
public interface IFlowPlugin
{
    string Name { get; }
    string Version { get; }
    void Initialize(IFlowEngine engine);
    void RegisterNodes(NodeRegistry registry);
}

public class PluginLoader
{
    public void LoadPlugin(string pluginPath)
    {
        var assembly = Assembly.LoadFrom(pluginPath);
        var pluginTypes = assembly.GetTypes()
            .Where(t => typeof(IFlowPlugin).IsAssignableFrom(t));
            
        foreach (var type in pluginTypes)
        {
            var plugin = (IFlowPlugin)Activator.CreateInstance(type);
            plugin.Initialize(flowEngine);
            plugin.RegisterNodes(nodeRegistry);
        }
    }
}

3. 扩展点

扩展点接口说明
节点类型STNode自定义节点
算法策略IAlgorithmStrategy自定义算法
数据源IDataSource自定义数据源
通信协议IProtocol自定义协议
序列化器ISerializer自定义序列化

性能设计

1. 内存优化

对象池

csharp
public class ObjectPoolManager
{
    private static readonly ObjectPool\<CVStartCFC\> CFCPool = 
        ObjectPool.Create\<CVStartCFC\>();
        
    private static readonly ObjectPool<byte[]> BufferPool = 
        ObjectPool.Create(() => new byte[8192]);
    
    public static CVStartCFC RentCFC() => CFCPool.Get();
    public static void ReturnCFC(CVStartCFC cfc) => CFCPool.Return(cfc);
}

大对象处理

csharp
public class ImageProcessor
{
    public unsafe void ProcessImage(Span\<byte\> imageData)
    {
        // 使用Span避免数组复制
        fixed (byte* ptr = imageData)
        {
            // 处理图像
        }
    }
}

2. 并发优化

任务调度

csharp
public class TaskScheduler
{
    private readonly TaskScheduler _scheduler;
    private readonly int _maxDegreeOfParallelism;
    
    public async Task ExecuteParallelAsync(
        IEnumerable\<Func\<Task>\> tasks)
    {
        var options = new ParallelOptions
        {
            MaxDegreeOfParallelism = _maxDegreeOfParallelism,
            TaskScheduler = _scheduler
        };
        
        await Parallel.ForEachAsync(tasks, options, async (task, ct) =>
        {
            await task();
        });
    }
}

3. 缓存策略

csharp
public class CacheManager
{
    private readonly MemoryCache _cache;
    private readonly CacheOptions _options;
    
    public T GetOrCreate\<T\>(string key, Func\<T\> factory)
    {
        if (!_cache.TryGetValue(key, out T value))
        {
            value = factory();
            _cache.Set(key, value, _options.Expiration);
        }
        return value;
    }
}

4. 性能监控

csharp
public class PerformanceMonitor
{
    public void MeasureExecution(string operation, Action action)
    {
        var sw = Stopwatch.StartNew();
        try
        {
            action();
        }
        finally
        {
            sw.Stop();
            RecordMetric(operation, sw.ElapsedMilliseconds);
        }
    }
    
    private void RecordMetric(string operation, long duration)
    {
        // 记录到监控系统
        Metrics.Histogram(operation, duration);
    }
}

总结

FlowEngineLib 的架构设计遵循以下原则:

  1. 分层设计 - 清晰的层次结构,职责分离
  2. 模式应用 - 合理使用设计模式,提高代码质量
  3. 扩展性 - 支持插件和自定义节点
  4. 性能优化 - 内存管理、并发控制、缓存策略
  5. 可维护性 - 模块化设计,易于理解和维护

文档版本: 1.0
最后更新: 2024年
维护团队: ColorVision 开发团队

Released under the MIT License.