Pipelines - .NET中的新IO API指引(一)
Pipelines - .NET中的新IO API指引(二)
关于System.IO.Pipelines的一篇说明
System.IO.Pipelines: .NET高性能IO
System.IO.Pipelines 是对IO的统一抽象,文件、com口、网络等等,重点在于让调用者注意力集中在读、写缓冲区上,典型的就是 IDuplexPipe中的Input Output。
可以理解为将IO类抽象为读、写两个缓冲区。
目前官方实现还处于preview状态,作者使用Socket和NetworkStream 实现了一个 Pipelines.Sockets.Unofficial
作者在前两篇中提到使用System.IO.Pipelines 改造StackExchange.Redis,在本篇中作者采用了改造现有的SimplSockets库来说明System.IO.Pipelines的使用。
文章中的代码(SimplPipelines,KestrelServer )
public interface IMemoryOwner<T> : IDisposable
{
Memory<T> Memory { get; }
}
private sealed class ArrayPoolOwner<T>:IMemoryOwner<T>{
private readonly int _length;
private T[] _oversized;
internal ArrayPoolOwner(T[] oversized,int length){
_length=length;
_oversized=oversized;
}
public Memory<T> Memory=>new Memory<T>(GetArray(),0,_length);
private T[] GetArray()=>Interlocked.CompareExchange(ref _oversized,null,null)
?? throw new ObjectDisposedException(ToString());
public void Dispose(){
var arr=Interlocked.Exchange(ref _oversized,null);
if(arr!=null) ArrayPool<T>.Shared.Return(arr);
}
}
void DoSomething(IMemoryOwner<byte> data){
using(data){
// ... other things here ...
DoTheThing(data.Memory);
}
// ... more things here ...
}
通过ArrayPool的借、还机制避免频繁分配。
public static IMemoryOwner<T> Lease<T>(this ReadOnlySequence<T> source)
{
if (source.IsEmpty) return Empty<T>();
int len = checked((int)source.Length);
var arr = ArrayPool<T>.Shared.Rent(len);//借出
source.CopyTo(arr);
return new ArrayPoolOwner<T>(arr, len);//dispose时归还
}
public abstract class SimplPipeline : IDisposable
{
private IDuplexPipe _pipe;
protected SimplPipeline(IDuplexPipe pipe)
=> _pipe = pipe;
public void Dispose() => Close();
public void Close() {/* burn the pipe*/}
}
protected async ValueTask WriteAsync(IMemoryOwner<byte> payload, int messageId)//调用方不再使用payload,需要我们清理
{
using (payload)
{
await WriteAsync(payload.Memory, messageId);
}
}
protected ValueTask WriteAsync(ReadOnlyMemory<byte> payload, int messageId);//调用方自己清理
messageId标识一条消息,写入消息头部, 用于之后处理响应回复信息。
private readonly SemaphoreSlim _singleWriter= new SemaphoreSlim(1);
protected async ValueTask WriteAsync(ReadOnlyMemory<byte> payload, int messageId)
{
await _singleWriter.WaitAsync();
try
{
WriteFrameHeader(writer, payload.Length, messageId);
await writer.WriteAsync(payload);
}
finally
{
_singleWriter.Release();
}
}
protected ValueTask WriteAsync(ReadOnlyMemory<byte> payload, int messageId)
{
// try to get the conch; if not, switch to async
//writer已经被占用,异步
if (!_singleWriter.Wait(0))
return WriteAsyncSlowPath(payload, messageId);
bool release = true;
try
{
WriteFrameHeader(writer, payload.Length, messageId);
var write = writer.WriteAsync(payload);
if (write.IsCompletedSuccessfully) return default;
release = false;
return AwaitFlushAndRelease(write);
}
finally
{
if (release) _singleWriter.Release();
}
}
async ValueTask AwaitFlushAndRelease(ValueTask<FlushResult> flush)
{
try { await flush; }
finally { _singleWriter.Release(); }
}
三个地方
void WriteFrameHeader(PipeWriter writer, int length, int messageId)
{
var span = writer.GetSpan(8);
BinaryPrimitives.WriteInt32LittleEndian(
span, length);
BinaryPrimitives.WriteInt32LittleEndian(
span.Slice(4), messageId);
writer.Advance(8);
}
public class SimplPipelineClient : SimplPipeline
{
public async Task<IMemoryOwner<byte>> SendReceiveAsync(ReadOnlyMemory<byte> message)
{
var tcs = new TaskCompletionSource<IMemoryOwner<byte>>();
int messageId;
lock (_awaitingResponses)
{
messageId = ++_nextMessageId;
if (messageId == 0) messageId = 1;
_awaitingResponses.Add(messageId, tcs);
}
await WriteAsync(message, messageId);
return await tcs.Task;
}
public async Task<IMemoryOwner<byte>> SendReceiveAsync(IMemoryOwner<byte> message)
{
using (message)
{
return await SendReceiveAsync(message.Memory);
}
}
}
- _awaitingResponses 是个字典,保存已经发送的消息,用于将来处理对某条(messageId)消息的回复。
protected async Task StartReceiveLoopAsync(CancellationToken cancellationToken = default)
{
try
{
while (!cancellationToken.IsCancellationRequested)
{
var readResult = await reader.ReadAsync(cancellationToken);
if (readResult.IsCanceled) break;
var buffer = readResult.Buffer;
var makingProgress = false;
while (TryParseFrame(ref buffer, out var payload, out var messageId))
{
makingProgress = true;
await OnReceiveAsync(payload, messageId);
}
reader.AdvanceTo(buffer.Start, buffer.End);
if (!makingProgress && readResult.IsCompleted) break;
}
try { reader.Complete(); } catch { }
}
catch (Exception ex)
{
try { reader.Complete(ex); } catch { }
}
}
protected abstract ValueTask OnReceiveAsync(ReadOnlySequence<byte> payload, int messageId);
private bool TryParseFrame(
ref ReadOnlySequence<byte> input,
out ReadOnlySequence<byte> payload, out int messageId)
{
if (input.Length < 8)
{ // not enough data for the header
payload = default;
messageId = default;
return false;
}
int length;
if (input.First.Length >= 8)
{ // already 8 bytes in the first segment
length = ParseFrameHeader(
input.First.Span, out messageId);
}
else
{ // copy 8 bytes into a local span
Span<byte> local = stackalloc byte[8];
input.Slice(0, 8).CopyTo(local);
length = ParseFrameHeader(
local, out messageId);
}
// do we have the "length" bytes?
if (input.Length < length + 8)
{
payload = default;
return false;
}
// success!
payload = input.Slice(8, length);
input = input.Slice(payload.End);
return true;
}
static int ParseFrameHeader(
ReadOnlySpan<byte> input, out int messageId)
{
var length = BinaryPrimitives
.ReadInt32LittleEndian(input);
messageId = BinaryPrimitives
.ReadInt32LittleEndian(input.Slice(4));
return length;
}
protected override ValueTask OnReceiveAsync(
ReadOnlySequence<byte> payload, int messageId)
{
if (messageId != 0)
{ // request/response
TaskCompletionSource<IMemoryOwner<byte>> tcs;
lock (_awaitingResponses)
{
if (_awaitingResponses.TryGetValue(messageId, out tcs))
{
_awaitingResponses.Remove(messageId);
}
}
tcs?.TrySetResult(payload.Lease());
}
else
{ // unsolicited
MessageReceived?.Invoke(payload.Lease());
}
return default;
}
原文:https://www.cnblogs.com/yswenli/p/11937246.html