一个网站的域名突然换了,网站建设制作合同模板,局域网创建网站,怎样在拼多多上卖自己的产品今天用一个简单例子说说异步的多路径终止。我尽可能写得容易理解吧#xff0c;但今天的内容需要有一定的编程能力。今天这个话题#xff0c;来自于最近对gRPC的一些技术研究。话题本身跟gRPC没有太大关系。应用中#xff0c;我用到了全双工数据管道这样一个相对复杂的概念。… 今天用一个简单例子说说异步的多路径终止。我尽可能写得容易理解吧但今天的内容需要有一定的编程能力。 今天这个话题来自于最近对gRPC的一些技术研究。话题本身跟gRPC没有太大关系。应用中我用到了全双工数据管道这样一个相对复杂的概念。我们知道全双工连接是两个节点之间的连接但不是简单的“请求-响应”连接。任何一个节点都可以在任何时间发送消息。概念上还是有客户端和服务端的区分但这仅仅是概念上只是为了区分谁在监听连接尝试谁在建立连接。实际上做一个双工的API比做一个“请求-响应”式的API要复杂得多。由此延伸出了另一个想法做个类库在库内部构建双工管道供给消费者时只暴露简单的内容和熟悉的方式。 一、开始假设我们有这样一个API客户端建立连接有一个SendAsync消息从客户端发送到服务器有一个TryReceiveAsync消息试图等待来自服务器的消息服务器有消息发送为True返之为False服务器控制数据流终止如果服务器发送完最后一条消息则客户端不再发送任何消息。 接口代码可以写成这样interface ITransportTRequest, TResponse : IAsyncDisposable
{ValueTask SendAsync(TRequest request, CancellationToken cancellationToken);ValueTask(bool Success, TResponse Message) TryReceiveAsync(CancellationToken cancellationToken);
}
忽略连接的部分代码看起来并不复杂。下面我们创建两个循环并通过枚举器公开数据ITransportTRequest, TResponse transport;
public async IAsyncEnumerableTResponse ReceiveAsync([EnumeratorCancellation] CancellationToken cancellationToken)
{while (true){var (success, message) await transport.TryReceiveAsync(cancellationToken);if (!success) break;yield return message;}
}public async ValueTask SendAsync(IAsyncEnumerableTRequest data, CancellationToken cancellationToken)
{await foreach (var message in data.WithCancellation(cancellationToken)){await transport.SendAsync(message, cancellationToken);}
}
这里面用到了异步迭代器相关的概念。如果不明白可以去看我的另一篇专门讨论异步迭代器的文章【传送门】。 二、解决终止标志好像做好了我们用循环接收和发送并传递了外部的终止标志给这两个方法。真的做好了吗还没有。问题出在终止标志上。我们没有考虑到这两个流是相互依赖的特别是我们不希望生产者使用SendAsync的代码在任何连接失败的场景中仍然运行。实际上会有比我们想像中更多的终止路径我们可能已经为这两个方法提供了一个外部的终止令牌并且这个令牌可能已经被触发ReceiveAsync的消费者可能已经通过WithCancellation提供了一个终止令牌给GetAsyncEnumerator并且这个令牌可能已经被触发我们的发送/接收代码可能出错了ReceiveAsync的消费者在数据获取到中途要终止获取了 - 一个简单的原因是处理收到的数据时出错了SendAsync中的生产者可能发生了错误这只是一些可能的例子但实际的可能会更多。本质上这些都表示连接终止因此我们需要以某种方式包含所有这些场景进而允许发送和接收路径之间传达问题。换句话说我们需要自己的CancellationTokenSource。 显然这种需求用库来解决是比较完美的。我们可以把这些复杂的内容放在一个消费者可以访问的单一API中public IAsyncEnumerableTResponse Duplex(IAsyncEnumerableTRequest request, CancellationToken cancellationToken default);
这个方法允许它传入一个生产者通话它传入一个外部的终止令牌有一个异步的响应返回使用时我们可以这样做await foreach (MyResponse item in client.Duplex(ProducerAsync()))
{// ... todo
}
async IAsyncEnumerableMyRequest ProducerAsync([EnumeratorCancellation] CancellationToken cancellationToken default)
{for (int i 0; i 100; i){yield return new MyRequest(i);await Task.Delay(100, cancellationToken);}
}上面这段代码中我们ProducerAsync还没有实现太多内容目前只是传递了一个占位符。稍后我们可以枚举它而枚举行为实际上调用了代码。回到Duplex。这个方法至少需要考虑两种不同的终止方式通过cancellationToken传入的外部令牌使用过程中可能传递给GetAsyncEnumerator()的潜在的令牌这儿为什么不是之前列出的更多种终止方式呢这儿要考虑到编译器的组合方式。我们需要的不是一个CancellationToken而是一个CancellationTokenSource。public IAsyncEnumerableTResponse Duplex(IAsyncEnumerableTRequest request, CancellationToken cancellationToken default) DuplexImpl(transport, request, cancellationToken);private async static IAsyncEnumerableTResponse DuplexImpl(ITransportTRequest, TResponse transport, IAsyncEnumerableTRequest request, CancellationToken externalToken, [EnumeratorCancellation] CancellationToken enumeratorToken default)
{using var allDone CancellationTokenSource.CreateLinkedTokenSource(externalToken, enumeratorToken);// ... todo
}
这里DuplexImpl方法允许枚举终止但又与外部终止标记保持分离。这样在编译器层面不会被合并。在里面CreateLinkedTokenSource反倒像编译器的处理。现在我们有一个CancellationTokenSource需要时我们可能通过它来终止循环的运行。using var allDone CancellationTokenSource.CreateLinkedTokenSource(externalToken, enumeratorToken);
try
{// ... todo
}
finally
{allDone.Cancel();
}
通过这种方式我们可以处理这样的场景消费者没有获取所有数据而我们想要触发allDone但是我们退出了DuplexImpl。这时候迭代器的作用就很大了它让程序变得更简单因为用了using最终里面的任何内容都会定位到Dispose/DisposeAsync。 下一个是生产者也就是SendAsync。它也是双工的对传入的消息没有影响所以可以用Task.Run作为一个独立的代码路径开始运行而如果生产者出现错误则终止发送。上边的todo部分可以加入var send Task.Run(async ()
{try{await foreach (var message in request.WithCancellation(allDone.Token)){await transport.SendAsync(message, allDone.Token);}}catch{allDone.Cancel();throw;}
}, allDone.Token);// ... todo: receiveawait send;
这里启动了一个生产者的并行操作SendAsync。注意这里我们用标记allDone.Token把组合的终止标记传递给生产者。延迟await是为了允许ProducerAsync方法里可以使用终止令牌以满足复合双工操作的生命周期要求。 这样接收代码就变成了while (true)
{var (success, message) await transport.TryReceiveAsync(allDone.Token);if (!success) break;yield return message;
}allDone.Cancel();最后把这部分代码合在一起看看private async static IAsyncEnumerableTResponse DuplexImpl(ITransportTRequest, TResponse transport, IAsyncEnumerableTRequest request, CancellationToken externalToken, [EnumeratorCancellation] CancellationToken enumeratorToken default)
{using var allDone CancellationTokenSource.CreateLinkedTokenSource(externalToken, enumeratorToken);try{var send Task.Run(async () {try{await foreach (var message in request.WithCancellation(allDone.Token)){await transport.SendAsync(message, allDone.Token);}}catch{allDone.Cancel();throw;}}, allDone.Token);while (true){var (success, message) await transport.TryReceiveAsync(allDone.Token);if (!success) break;yield return message;}allDone.Cancel();await send;}finally{allDone.Cancel();}
}三、总结相关的处理就这么多。这里实现的关键点是外部令牌和枚举器令牌都对allDone有贡献传输中发送和接收代码使用allDone.Token生产者枚举使用allDone.Token任何情况下退出枚举器allDone都会被终止如果传输接收错误则allDone被终止如果消费者提前终止则allDone被终止当我们收到来自服务器的最后一条消息后allDone被终止如果生产者或传输发送错误allDone被终止 最后多说一点关于ConfigureAwait(false)默认情况下await包含一个对SynchronizationContext.Current的检查。除了表示额外的上下文切换之外在UI应用程序的情况下它也意味着在UI线程上运行不需要在UI线程上运行的代码。库代码通常不需要这样做。因此在库代码中通常应该在所有用到await的地方使用. configureawait (false)来绕过这个检查。而在一般应用程序的代码中应该默认只使用await而不使用ConfigureAwait除非你知道你在做什么。喜欢就来个三连让更多人因你而受益