Back to list
FlowForge を構築:.NET 上で DAG ベースのワークフローエンジンを実装する
Building FlowForge: Architecting a DAG-Based Workflow Engine in .NET
Translated: 2026/3/14 10:13:59
Japanese Translation
FlowForge を新たに構築しました。これは、ユーザーが画面上のノードをドラッグ&ドロップするだけで、複数の外部アプリケーションを接続し、複雑なフローをオーケストラートできる、視覚的なワークフロー自動化プラットフォームです。一見すると、Zapier などのクローンを作るのは簡単そうに見えます:React ファイアンエンドでボックスを描画し、バックエンドで実行するだけです。しかし、アーキテクチャの詳細に深入りするにつれて、状態管理、並行処理、および障害耐性に対処するプロセスは、大規模な分散システムの問題へと変化しました。無駄なことは一切言わず、私がゼロからワークフロー自動化プラットフォームを構築した実際のアーキテクチャをご覧ください。
システム要件
機能的要件:
- ユーザーは OAuth を介して Google、Slack などの複数の第三者アプリケーションで認証できます。
- ユーザーはドラッグ&ドロップキャンバスを使用してワークフローを構築および設定できます。
非機能的要件:
- 拡張性:新しい統合を追加するには、コアエンジンを変更することなく、単に新しいファイルを追加する程度です。
- 障害耐性(保存):サーバーが実行中の半々に突然クラッシュした場合でも、ワークフローは最初のまま正確に再開する必要があります。
- 並行ブランチ:エンジンでは、複雑な依存関係(例:ノード B とノード C は同時に実行でき、どちらもノード A の完了を待つかかる)をサポートする必要があります。
コアエンティティ
- フロー:ワークフローのブループリント。
- コネクション:外部 OAuth 認証情報と統合メタデータ。
- フローインスタンス:フローの一意の執行実行。
- ノードインスタンス:そのフロー内の個別のステップの実行状態。
技術スタックとアーキテクチャの高レベル
- フロントエンド:React.js
- バックエンド:ASP.NET Core Web API
- データベース:MySQL
- コアサービス:
- クライアント:ユーザーインターフェースのための視覚的ノードベースのエディタ。
- API ゲートウェイ:バックエンドサービスへのルーティングと負荷分散を処理。
- フローサービス:ワークフローの CRUD オペレーションを管理。
- コネクションサービス:外部アプリケーションとの接続とトークン管理を安全に処理。
- フローエンジン:解析およびワークフローを実行するシステムの中核「脳」。
深掘り 1:コアエンジンの進化
バージョン 1:線形実行(素朴なアプローチ)
バージョン 2:トポロジカルソート(カーン's アルゴリズム)
しかし、私は巨大な制限に直面しました:状態の喪失。カーン's アルゴリズムは完全に ASP.NET サーバーのメモリアルで実行されたため、ASP.NET サーバーがリスタートした場合、実行全体が消失して、存在したことがなかったようでした。
バージョン 3:保存可能な DAG スケジューラー(最終形)
以下の実際の C# 実装をPersistent Engine を提示します:
public class FlowEngine {
private readonly IServiceProvider _serviceProvider;
private readonly ILogger _logger;
private readonly IEnumerable _availableNodes;
private readonly DagConvertor _dagConvertor;
private readonly NodeExecutionRepository _nodeExecutionRepository;
private readonly IFlowInstanceRepository _flowInstanceRepository;
public FlowEngine(
IServiceProvider serviceProvider,
ILogger logger,
IEnumerable availableNodes,
DagConvertor dagConvertor,
NodeExecutionRepository nodeExecutionRepository,
IFlowInstanceRepository flowInstanceRepository) {
_serviceProvider = serviceProvider;
_logger = logger;
_availableNodes = availableNodes;
_dagConvertor = dagConvertor;
_nodeExecutionRepository = nodeExecutionRepository;
_flowInstanceRepository = flowInstanceRepository;
}
// サービスが DAG 構築ロジックを再利用できるようにします
public Dag BuildDag(ParsedFlow parsedFlow) {
return _dagConvertor.ParsedFlowToDag(parsedFlow);
}
// ============================================================
// 🚀 保存可能な DAG スケジューラー
// ============================================================
public async Task RunPersistentAsync(
Guid flowInstanceId,
ParsedFlow parsedFlow,
string clerkUserId,
Dictionary? initialPayload = null) {
var payload = initialPayload ?? new Dictionary();
var dag = _dagConvertor.ParsedFlowToDag(parsedFlow);
while (true) {
// 早期失敗(fail fast)
if (await _nodeExecutionRepository.AnyFailedAsync(flowInstanceId)) {
await _flowInstanceRepository.MarkFailedAsync(flowInstanceId);
return;
}
// 完了チェック
// ...
}
Original Content
I recently built FlowForge, a visual workflow automation platform where users can connect multiple external applications and orchestrate complex flows simply by dragging and dropping nodes onto a canvas. At first glance, building a Zapier-like clone seemed straightforward: build a React frontend to draw the boxes, and a backend to execute them. But as I got deeper into the architecture, handling state, concurrency, and fault tolerance turned this into a massive distributed systems challenge. Without any filler, let's dive into the architecture of how I actually built a workflow automation platform from scratch. System Requirements Functional Requirements: Users can authenticate with multiple third-party applications (Google, Slack, etc.) via OAuth. Users can build and configure workflows using a drag-and-drop canvas. Non-Functional Requirements: Pluggability: Adding a new integration must be as simple as adding a new file, without modifying the core engine. Fault Tolerance (Persistence): Even if the server crashes unexpectedly mid-execution, the workflow must be able to resume exactly where it left off. Concurrent Branching: The engine must support complex dependencies (e.g., Node B and Node C can run simultaneously, but both must wait for Node A to finish). Core Entities Flow: The blueprint of the workflow. Connection: External OAuth credentials and integration metadata. FlowInstance: A single, unique execution run of a Flow. NodeInstance: The execution state of an individual step within that flow. Tech Stack & High-Level Architecture Frontend: React.js Backend: ASP.NET Core Web API Database: MySQL Core Services: Client: The visual node-based editor for the user interface. API Gateway: Handles routing and load distribution to the backend services. Flow Service: Manages CRUD operations for the workflows. Connection Service: Securely handles external application connections and token management. Flow Engine: The core "brain" of the system that parses and executes the workflows. Deep Dive 1: The Evolution of the Core Engine V1: Linear Execution (The Naive Approach) V2: Topological Sorting (Kahn's Algorithm) However, I hit a massive limitation: State Loss. Because Kahn's algorithm was running entirely in server memory, if the ASP.NET server restarted, the entire execution vanished like it never existed. V3: The Persistent DAG Scheduler (The Final Form) Here is the actual C# implementation of the persistent engine: public class FlowEngine { private readonly IServiceProvider _serviceProvider; private readonly ILogger _logger; private readonly IEnumerable _availableNodes; private readonly DagConvertor _dagConvertor; private readonly NodeExecutionRepository _nodeExecutionRepository; private readonly IFlowInstanceRepository _flowInstanceRepository; public FlowEngine( IServiceProvider serviceProvider, ILogger logger, IEnumerable availableNodes, DagConvertor dagConvertor, NodeExecutionRepository nodeExecutionRepository, IFlowInstanceRepository flowInstanceRepository) { _serviceProvider = serviceProvider; _logger = logger; _availableNodes = availableNodes; _dagConvertor = dagConvertor; _nodeExecutionRepository = nodeExecutionRepository; _flowInstanceRepository = flowInstanceRepository; } // Allows service to reuse DAG building logic public Dag BuildDag(ParsedFlow parsedFlow) { return _dagConvertor.ParsedFlowToDag(parsedFlow); } // ============================================================ // 🚀 Persistent DAG Scheduler // ============================================================ public async Task RunPersistentAsync( Guid flowInstanceId, ParsedFlow parsedFlow, string clerkUserId, Dictionary? initialPayload = null) { var payload = initialPayload ?? new Dictionary(); var dag = _dagConvertor.ParsedFlowToDag(parsedFlow); while (true) { // Fail fast if (await _nodeExecutionRepository.AnyFailedAsync(flowInstanceId)) { await _flowInstanceRepository.MarkFailedAsync(flowInstanceId); return; } // Complete if done if (await _nodeExecutionRepository.AllCompletedAsync(flowInstanceId)) { await _flowInstanceRepository.MarkCompletedAsync(flowInstanceId); return; } var nodeExecution = await _nodeExecutionRepository.GetNextReadyNodeAsync(flowInstanceId); if (nodeExecution == null) { await Task.Delay(50); continue; } await _nodeExecutionRepository.MarkRunningAsync(nodeExecution.Id); var parsedNode = parsedFlow.Nodes.First(n => n.Id == nodeExecution.NodeId); try { var output = await ExecuteNodeAsync( parsedNode, parsedFlow.Name, clerkUserId, payload); await _nodeExecutionRepository .MarkCompletedAsync(nodeExecution.Id); if (output != null) { foreach (var kvp in output) payload[kvp.Key] = kvp.Value; } await UnlockChildrenAsync( flowInstanceId, dag, parsedNode.Id); } catch (Exception ex) { await _nodeExecutionRepository .MarkFailedAsync(nodeExecution.Id, ex.Message); } } } private async Task?> ExecuteNodeAsync( ParsedNode node, string flowName, string clerkUserId, Dictionary payload) { var executor = _availableNodes.FirstOrDefault(n => n.Type == node.Type); if (executor == null) throw new InvalidOperationException( $"Executor for node type '{node.Type}' not found."); var context = new FlowExecutionContext( clerkUserId, payload, ConvertToJsonElement(node.Data)); return await executor.ExecuteAsync(context, _serviceProvider); } private async Task UnlockChildrenAsync( Guid flowInstanceId, Dag dag, string completedNodeId) { if (!dag.AdjList.TryGetValue(completedNodeId, out var children)) return; foreach (var child in children) { if (!dag.ReverseAdjList.TryGetValue(child.Id, out var parents)) continue; var allParentsCompleted = await _nodeExecutionRepository .AreAllParentsCompletedAsync(flowInstanceId, parents); if (allParentsCompleted) { await _nodeExecutionRepository .MarkReadyAsync(flowInstanceId, child.Id); } } } private JsonElement ConvertToJsonElement(object? obj) { if (obj == null) return default; var json = JsonSerializer.Serialize(obj); return JsonSerializer.Deserialize(json); } } Deep Dive 2: Building a Pluggable Architecture Instead of hardcoding a massive switch statement or storing the node executors in a static list, I made every node type (Action, Trigger, Conditional) inherit from an INode interface. At startup, the application scans the assembly to find all implementations and registers them dynamically in the Dependency Injection container. public static class ServiceCollectionExtenctions { public static IServiceCollection AddNodeServices(this IServiceCollection services) { var NodeType = typeof(INode); var implementations = AppDomain.CurrentDomain.GetAssemblies() .SelectMany(s => s.GetTypes()) .Where(t => NodeType.IsAssignableFrom(t) && t is { IsClass: true, IsAbstract: false }); foreach (var implementation in implementations) { services.AddScoped(typeof(INode), implementation); } return services; } } Now, adding a new Slack integration is as simple as creating a new class that implements INode. The engine handles the rest automatically. Deep Dive 3: Ensuring Concurrent Execution If a parent node completes its execution and unlocks two distinct child nodes, both of those children will independently transition to the Ready state. Because the engine processes ready nodes asynchronously, both child branches will execute concurrently without blocking each other. Trade-Offs & Scaling Bottlenecks ThreadPool Exhaustion: Right now, I am executing nodes asynchronously in memory. If the platform scales to 100,000 concurrent users running workflows, we will hit the ASP.NET ThreadPool limits. To solve this, I would need to decouple the execution by introducing a Message Queue (like RabbitMQ) and dedicated background worker services. Database Write Limits: Flow execution is incredibly write-heavy (updating status to Running, Completed, Failed every few milliseconds). MySQL is great, but at massive scale, it will hit write-throughput bottlenecks. Shifting the execution state storage to a Key-Value database (like DynamoDB or Redis) would be necessary for global scalability. Conclusion You can explore the complete architecture and source code here: FlowForge on GitHub. What are your thoughts on using database polling for the DAG scheduler versus an event-driven message queue? Let me know in the comments!