首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >ActionBlock、TransformBlock 揭秘:微软流处理管道的核心组件与秘密武器

ActionBlock、TransformBlock 揭秘:微软流处理管道的核心组件与秘密武器

作者头像
郑子铭
发布2025-09-02 17:48:57
发布2025-09-02 17:48:57
2810
举报

你的 .NET 代码可能整洁、异步且完全可测试。

但它能每分钟处理 50,000 条消息... 而不会崩溃吗?

大多数开发者会使用:

代码语言:javascript
复制
foreach (var item in items) await ProcessAsync(item)
Task.WhenAll(...)

甚至原生的 Task.Run

这确实有效 —— 直到失效为止。

微软多年前就通过创建一个生产级的管道库解决了这个问题,它能处理: ✅ 并行处理 ✅ 节流 ✅ 反压 (Backpressure) ✅ 重试 ✅ 有界队列 (Bounded queues) ✅ 优雅关闭 (Graceful shutdown)

它叫做 System.Threading.Tasks.Dataflow

这篇博客将向你展示这个库为何存在,它解决了什么问题,以及如何像微软那样使用它——没有废话,只有经过生产验证的指导。

我们曾在一个每秒处理数千条消息的 Azure Functions IoT 解决方案中使用过 TPL Dataflow —— 对于高吞吐量、实时工作负载来说,它极其有效。

缩放图片将会显示

🤔 TPL Dataflow 为何存在? 来自微软官方文档:

“Dataflow 库使你能够在数据可用时立即处理它们,使用支持并发、有界容量和异步处理的管道。”

Task.RunParallel.ForEach,甚至 Channel<T> 这样的传统构造是:

  • • 低级的
  • • 不处理节流
  • • 没有原生的反压机制
  • • 与异步 I/O 不能很好地扩展

Dataflow 是微软为在 .NET 中构建持久、高吞吐量管道提供的解决方案 —— 尤其是在以下场景:

  • • 消息持续到达
  • • 每个项目需要异步转换
  • • 你需要故障隔离、重试和批处理

🛠️ 它解决的问题

🧱 什么是 TPL Dataflow? System.Threading.Tasks.Dataflow 是一个微软提供的 .NET 库,专为以下目的设计:

✅ 并行处理 ✅ 反压 (Backpressure) ✅ 节流 (Throttling) ✅ 有界容量 (Bounded capacity) ✅ 流处理 (Stream processing)

它将你的工作流建模为一个由块 (blocks) 组成的图 (graph) —— 每个块处理一个处理阶段,并内置了异步和重试语义。

📦 安装:

代码语言:javascript
复制
dotnet add package System.Threading.Tasks.Dataflow

🧱 核心构建块(附微软官方示例)

1️⃣ BufferBlock<T> – 异步队列 充当消息的暂存队列。

代码语言:javascript
复制
```csharp
var buffer = new BufferBlock<string>();
await buffer.SendAsync("msg1");
```
✅ 用于解耦生产者与消费者
✅ 支持反压

2️⃣ TransformBlock<TInput, TOutput> – 并行异步处理 并行应用异步逻辑。

代码语言:javascript
复制
```csharp
var transform = new TransformBlock<string, string>(
    async input => {
        await Task.Delay(100); // 模拟 I/O
        return input.ToUpper();
    },
    new ExecutionDataflowBlockOptions {
        MaxDegreeOfParallelism = 8,
        BoundedCapacity = 100
    });
```
✅ 非常适合 API 调用、解析、数据丰富 (enrichment)

3️⃣ ActionBlock<T> – 终端接收器 (Terminal Sink) 消费消息但不发出结果。

代码语言:javascript
复制
```csharp
var action = new ActionBlock<string>(
    async msg => {
        await SaveToDatabaseAsync(msg);
    },
    new ExecutionDataflowBlockOptions {
        MaxDegreeOfParallelism = 4,
        BoundedCapacity = 50
    });
```
✅ 用于持久化、网络调用、日志记录

4️⃣ BatchBlock<T> – 批处理以提高吞吐量 将输入分组为批次。

代码语言:javascript
复制
```csharp
var batch = new BatchBlock<MyDto>(batchSize: 10);
```
✅ 在进行批量插入或 API 调用之前非常有用

5️⃣ BroadcastBlock<T> – 发布到多个目标 csharp var broadcast = new BroadcastBlock<string>(msg => msg); ✅ 用于扇出 (fan-out) 模式 —— 例如,同时记录和处理

6️⃣ JoinBlock<T1, T2> – 合并流 csharp var join = new JoinBlock<string, int>(); 在所有输入可用时,将多个流组合成元组 (tuples)。

🧪 使用 Dataflow 的真实世界管道 这些不仅仅是假设。微软自身就使用这些模式。

示例 1:文件处理管道

代码语言:javascript
复制
var read = new TransformBlock<string, string>(
    async path => await File.ReadAllTextAsync(path));

var enrich = new TransformBlock<string, string>(
    async content => await EnrichWithAIAsync(content));

var save = new ActionBlock<string>(
    async enriched => await SaveToBlobAsync(enriched));

read.LinkTo(enrich, new DataflowLinkOptions { PropagateCompletion = true });
enrich.LinkTo(save, new DataflowLinkOptions { PropagateCompletion = true });

// 启动流程
read.Post("file1.txt");
read.Post("file2.txt");
read.Complete(); // 确保管道优雅关闭
await save.Completion;

💡 示例 2:使用 Polly 的 API 聚合器

代码语言:javascript
复制
var retryPolicy = Policy
    .Handle<Exception>()
    .WaitAndRetryAsync(, i => TimeSpan.FromMilliseconds());

var enrichBlock = new TransformBlock<InputDto, OutputDto>(
    async input => await retryPolicy.ExecuteAsync(async () =>
    {
        var response = await httpClient.GetAsync($"https://api.com/data/{input.Id}");
        var data = await response.Content.ReadAsStringAsync();
        return new OutputDto
        {
            Id = input.Id,
            Data = data
        };
    }));

💡 示例 3:用于并发消息处理的后台工作者

代码语言:javascript
复制
var messageProcessor = new ActionBlock<Message>(
    async message =>
    {
        await HandleAsync(message); // 你的自定义异步处理逻辑
    },
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = ,  // 并行处理最多 10 条消息
        BoundedCapacity =          // 通过限制缓冲区大小防止内存压力
    });

💡 示例 4:Azure Function 中的真实生产管道 用例:

  • • 触发器:Azure 服务总线 (Azure Service Bus)
  • • 步骤:反序列化 → 丰富 (API 调用) → 保存到 Cosmos DB
  • • 性能:负载下每分钟 10K+ 条消息
代码语言:javascript
复制
public classProcessMessagePipeline
{
    privatereadonly ActionBlock<ServiceBusReceivedMessage> _entryBlock;
    privatereadonly TransformBlock<ServiceBusReceivedMessage, MyDto> _transformBlock;
    privatereadonly ActionBlock<MyDto> _saveBlock;

    public ProcessMessagePipeline()
    {
        _transformBlock = new TransformBlock<ServiceBusReceivedMessage, MyDto>(
            async message => await TransformMessageAsync(message),
            new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = ,
                BoundedCapacity = 
            });

        _saveBlock = new ActionBlock<MyDto>(
            async dto => await SaveToCosmosAsync(dto),
            new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = ,
                BoundedCapacity = 
            });

        _transformBlock.LinkTo(_saveBlock, new DataflowLinkOptions { PropagateCompletion = true });

        _entryBlock = new ActionBlock<ServiceBusReceivedMessage>(
            async message => await _transformBlock.SendAsync(message),
            new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = ,
                BoundedCapacity = 
            });
    }

    public async Task ProcessAsync(ServiceBusReceivedMessage message)
    {
        await _entryBlock.SendAsync(message);
    }

    public void Complete() => _entryBlock.Complete();

    publicasync Task Completion => await _saveBlock.Completion;

    private async Task<MyDto> TransformMessageAsync(ServiceBusReceivedMessage message)
    {
        var dto = JsonSerializer.Deserialize<MyDto>(message.Body);
        dto.Enriched = await CallExternalApiAsync(dto);
        return dto;
    }

    private async Task SaveToCosmosAsync(MyDto dto)
    {
        await Task.Yield(); // 如果你的方法不是 CPU 密集型的,请移除此行
        await CosmosRepository.SaveAsync(dto);
    }
}

🧭 示例 4 — 可视化管道架构

代码语言:javascript
复制
[Azure 服务总线触发器]
            |
     [ActionBlock: 接收消息]
            |
[TransformBlock: 反序列化 + 丰富]
            |
 [ActionBlock: 保存到 Cosmos DB]

📉 何时不应使用 TPL Dataflow

  • • 对于极其简单的顺序处理(foreach 可能就够了)。
  • • 当你的处理步骤完全是同步且轻量级时(并行循环可能更简单)。
  • • 当工作流不是基于消息/数据流,而是复杂的、有状态的协调任务时(考虑 Actor 模型或状态机)。
  • • 当需要跨机器分布处理时(考虑分布式流处理框架如 Azure Stream Analytics, Kafka Streams, Spark Streaming)。

📈 在 Azure 中监控你的管道(微软推荐)

“使用块属性来检查吞吐量、队列长度和完成状态。” — TPL 文档

跟踪:

  • InputCount, OutputCount
  • Completion.IsCompleted
  • • 通过 BoundedCapacity 跟踪队列深度

记录指标:

代码语言:javascript
复制
_telemetry.TrackMetric("BlockQueueLength", block.InputCount);

✅ 使用 TrySendAsync() 配合回退机制来检测溢出。

🧯 关闭管道而不丢失数据(也能安心睡觉) 在关闭时:

代码语言:javascript
复制
pipeline.Complete();
await pipeline.Completion;

“这确保所有处理中的消息都能优雅地完成。” — 微软文档

⚖️ 与其他并发工具的比较

工具/构造

主要优点

主要缺点(对比 Dataflow)

适用场景

TPL Dataflow

内置反压、节流、有界容量、块链接、优雅关闭

学习曲线稍陡峭,配置选项多

复杂、高吞吐、多阶段异步数据流管道

Task.Run + async

简单,易于理解

手动管理并发度、无内置反压、队列失控风险高

简单后台任务,少量并发

Parallel.For/ForEach

CPU 密集型并行计算高效

不适用于异步 I/O、阻塞调用线程、无反压

CPU 密集型循环操作

Channel<T>

高效的生产者/消费者队列,比 BlockingCollection 更优

仅提供队列,不提供处理逻辑、并行度、节流或链接

需要解耦生产者/消费者的简单场景,作为基础块

Reactive Extensions (Rx)

强大的事件流组合、时间处理、错误处理

学习曲线陡峭,更偏向事件处理而非数据处理管道

事件驱动编程、复杂的事件流组合与转换

Azure Durable Functions

无服务器编排、状态管理、内置可靠性(重试、超时)

运行在无服务器环境可能有冷启动、状态管理开销

跨函数编排、有状态工作流、无服务器环境

🧠 要点 如果你正在构建:

  • • ETL 处理器
  • • 消息队列
  • • 文件或批处理作业
  • • 异步 API 管道
  • • 并行代理 (agents)

...而你没有使用 System.Threading.Tasks.Dataflow,你可能:

  • • 编写了比实际需要更多的代码
  • • 错失了内置的节流功能
  • • 正在与无界队列 (unbounded queues) 引发的 Bug 作斗争
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-08-30,如有侵权请联系 [email protected] 删除

本文分享自 DotNet NB 微信公众号,前往查看

如有侵权,请联系 [email protected] 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档