?
本文主要演示使用TPL 数据流库从数据流块(dataflow block)读写消息。
提供了同步方法和异步方法。
主要使用BufferBlock,其既能作为message source,有能作为message target。
Install-Package Microsoft.Tpl.Dataflow
?
using System.Threading.Tasks.Dataflow;
?
1.同步方法读写消息
?
1.1 分别使用Post和Receive方法进行读写。
public
void SynchronouslyPostAndReceive()
{
????var bufferBlock = new BufferBlock<int>();
?
????for (int i = 0; i < 3; i++)
????{
????????bufferBlock.Post(i);
????}
?
????for (int i = 0; i < 3; i++)
????{
????????Console.WriteLine(bufferBlock.Receive());
????}
}
输出:
0
1
2
解释:
?
从Post方法可以看出,此时BufferBlock扮演着ITargetBlock的角色;
同理,Receive方法调用时,BufferBlock扮演着ISourceBlock的角色;
这也就证明了BufferBlock同时用作Target和Source。已可以看出,同步的时候,BufferBlock实际上是FIFO。
?
1.2 使用TryReceive来读取
public
void SynchronouslyPostAndReceive()
{
????var bufferBlock = new BufferBlock<int>();
?
????for (int i = 0; i < 3; i++)
????{
????????bufferBlock.Post(i);
????}
?
????//for (int i = 0; i < 3; i++)
????//{
????// Console.WriteLine(bufferBlock.Receive());
????//}
?
????int
value;
????while (bufferBlock.TryReceive(out
value))
????{
????????Console.WriteLine(value);
????}
}
输出:
0
1
2
1.3并发读写
使用Task,将读写放在不同的线程。
当Receive,且无数据时,则会阻塞。
public
void ConcurrentlyPostAndReceive()
{
????var bufferBlock = new BufferBlock<int>();
????var post01 = Task.Run(() =>
????{
????????bufferBlock.Post(1);
????????bufferBlock.Post(2);
????????bufferBlock.Post(3);
????????bufferBlock.Post(4);
????});
?
????var receive = Task.Run(() =>
????{
????????for (int i = 0; i < 6; i++)
????????{
????????????Console.WriteLine(bufferBlock.Receive());
????????}
????});
?
????var post02 = Task.Run(() =>
????{
????????bufferBlock.Post(5);
????????bufferBlock.Post(6);
????});
?
????Task.WaitAll(post01, receive, post02);
}
输出:
5
1
2
3
4
6
2.异步方式读写消息
?
主要是TAP的一种套用。使用async 和await来实现。
?
static async Task AsyncSendReceive(BufferBlock<int> bufferBlock)
{
????// Post more messages to the block asynchronously.
????for (int i = 0; i < 3; i++)
????{
????????await bufferBlock.SendAsync(i);
????}
?
????// Asynchronously receive the messages back from the block.
????for (int i = 0; i < 3; i++)
????{
????????Console.WriteLine(await bufferBlock.ReceiveAsync());
????}
?
}
?
public
void Start()
{
????AsyncSendReceive(new BufferBlock<int>()).Wait();
}
HowTo:使用数据流读写消息
原文:http://www.cnblogs.com/pengzhen/p/4837434.html