兔展在线制作网站,网站的推广方式有哪些,长沙网络公司营销推广,wordpress 文学付费。System.Threading.Channels是.Net Core基础类库中实现的一个多线程相关的库#xff0c;专门处理数据流相关的操作#xff0c;用来在生产者和订阅者之间传递数据#xff08;不知道可不可以理解为线程间传递数据#xff0c;我把它类比成了Go语言中的Channel#xff09;专门处理数据流相关的操作用来在生产者和订阅者之间传递数据不知道可不可以理解为线程间传递数据我把它类比成了Go语言中的Channel使用时需要通过NuGet安装。这个库的前身是System.Threading.Tasks.Channels来自实验性质的核心类库项目https://github.com/dotnet/corefxlab但是在2017年9月就不再更新了目前使用的话需要用到最新的System.Threading.Channels库如果你也是第一次接触的话就直接上手研究System.Threading.Channels就可以了。Channel API操作基于Channel对象其操作主要由ChannelReader和ChannelWriter两部分组成由Channelt提供的工厂方法创建一个有容量限制或者无限制、最大容量限制的channel。这点类似于Go语言中的chan的容量二者在这里有很多的类似的地方也有不同的地方。1.1. 和Go语言channel的一些比较Go语言中的channel默认是没有容量的在使用这个没有容量的channel时生产者和消费者必须“流动”起来否则将会阻塞也就是当生产者写入channel一个数据时必须同时有一个接收者接收否则写入操作会停止等待有一个消费者取走channel中的数据写入操作才会继续。在System.Threading.Channels库中没有类似Go语言的默认容量的机制需要按需调用不同的Channel对象public static ChannelT CreateBoundedT(int capacity); 可以创建一个带有容量限制的Channel实例对象。public static ChannelT CreateBoundedT(BoundedChannelOptions options) 创建一个自定义配置的Channel实例对象可配置容量、以及在接收到新数据时的操作模式等等BoundedChannelFullMode.Wait等待当前写入完成BoundedChannelFullMode.DropNewest删除并忽略管道中写入的最新的数据BoundedChannelFullMode.DropOldest删除并忽略管道中最旧的数据BoundedChannelFullMode.DropWrite删除当前正在写的数据以写入管道中的新数据public static ChannelT CreateUnboundedT(); 创建一个没有容量限制的Channel实例对象在实际使用时应当谨慎使用该创建方式因为可能会发生OutOfMemoryException。public static ChannelT CreateUnboundedT(UnboundedChannelOptions options)创建一个自定义配置的没有容量限制的Channel实例对象。该配置选项因为没有容量限制所以不会有写入等待操作模式只有默认的一些配置public bool SingleWriter { get; set; }是否需要一个一个读public bool SingleReader { get; set; }是否需要一个一个写public bool AllowSynchronousContinuations { get; set; }是否需要异步连续操作我个人理解为异步操作时同时进行读写Go语言的channel机制和System.Threading.Channels的不同之处有两个Go语言没有无限容量的channel而且就我个人的想法而言无限容量并不“无限”因为内存是有限的。System.Threading.Channels没有单向的channel类型。在Go中可以创建“只读”或者“只写”的channel但是System.Threading.Channels中没有提供这种操作。1.2. 生产者、消费者需要的方法生产者需要使用的一些方法TryWrite/WriteAsync/WaitToWriteAsync/Complete 消费者需要使用的一些方法TryRead/ReadAsync/WaitToReadAsync/Completion方法介绍TryRead/TryWrite尝试使用同步方式读取或写入一项数据返回读取或者写入是否成功。TryRead同时会以out的形式返回读取到的数据。ReadAsync/WriteAsync使用异步方式写入或者读取一项数据。TryComplete/Completion可以将channel标记为完成状态这样就不会写入多余的错误数据如果从已完成状态的channel中ReadAsync时会抛出异常所以在不需要异步读取时建议经常使用TryRead。WaitToReadAsync/WaitToWriteAsync在尝试读取或者写入数据之前调用该方法可获得一个Taskbool表示读取或者写入操作能否进行。创建一个控制台程序演示channel的用法 1 2 3 4 5 6 7 8 9101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384using System;using System.Collections.Generic;using System.Threading.Channels;using System.Threading.Tasks;namespace ConsoleApp1{ class Program { static void Main(string[] args) { Task.Run(async () { await ChannelRun(0,0, 1, 50, 5); }); Console.WriteLine(运行开始...); Console.ReadLine(); } /// summary /// channel运行 /// /summary /// param namereadDelayMs读取器每次读取完等待时间/param /// param namewriteDelayMs写入器每次写入完等待时间/param /// param namefinalNumberOfReaders几个读取器同时读取/param /// param namehowManyMessages写入器总共写入多少消息/param /// param namemaxCapacitychannel最大容量/param /// returns/returns public static async Task ChannelRun(int readDelayMs, int writeDelayMs, int finalNumberOfReaders,int howManyMessages, int maxCapacity ) { // 创建channel var channel Channel.CreateBoundedstring(maxCapacity); var reader channel.Reader; var writer channel.Writer; var tasks new ListTask(); // 读取器执行读取任务可以设置多个读取器同时读取 for (var i 0; i finalNumberOfReaders; i) { var idx i; tasks.Add(Task.Run(() Read(reader, idx 1,readDelayMs))); } // 写入器执行写入操作 for (var i 0; i howManyMessages; i) { Console.WriteLine($写入器在{DateTime.Now.ToLongTimeString()}写入{i}); await writer.WriteAsync($发布消息{i}); // 写入完等待片刻 await Task.Delay(writeDelayMs); } // 写入器标记完成状态 writer.Complete(); // 等待读取器读取完成 await reader.Completion; // 等待读取器所有的Task完成 await Task.WhenAll(tasks); } /// summary /// 读取数据任务 /// /summary /// param nametheReader读取器/param /// param namereaderNumber读取器编号/param /// param namedelayMs读取完等待时间/param /// returns任务/returns public static async Task Read(ChannelReaderstring theReader, int readerNumber, int delayMs) { // 循环判断读取器是否完成状态 while (await theReader.WaitToReadAsync()) { // 尝试读取数据 while (theReader.TryRead(out var theMessage)) { Console.WriteLine($线程{readerNumber}号读取器在{DateTime.Now.ToLongTimeString()}读取到了消息{theMessage}); // 读取完等待片刻 await Task.Delay(delayMs); } } } }}借助代码中的注释应当可以理解示例代码的作用对其中的关键点做个说明写入器只有一个写入的容量由channel的容量控制。读取器可以设置多个由Task调度同时读取。2.1. 写入器、读取器无等待写入器和读取器不等待不停的读写数据有一个读取器总共写入50个数据channel的容量为5调用传参如下1234Task.Run(async () { await ChannelRun(0,0, 1, 50, 5);});结果 写入读取操作在一秒内完成了观察输出可以发现写入和读取交替进行写入的数据会立刻被读取器读取出来打印在终端内。2.2. 读取器阻塞(等待)将读取器的等待时间设置长一些观察一下写入器是否会被阻塞调用传参如下1234Task.Run(async () { await ChannelRun(10000,0, 1, 50, 5);});结果 从输出的结果可以看见在程序开始时写入器写入了6个数据(但是调试的时候capacity的值时5这里的机制有待考证)然后每过10秒读取器读取一个数据后写入器才能写入一个数据由于读取器的速度限制相当于将写入器也进行了阻塞。2.3. 多个读取器同时读取读取器还是每读取一次暂停10秒但是有5个Task同时读取调用传参如下1234Task.Run(async () { await ChannelRun(10000,0, 1, 50, 5);});结果 从输出可以看出来5个读取器Task可以每10秒钟同时读取5个数据而写入器也同样的几乎是每次写入5个数据。System.Threading.Channels作为一个线程间通信的库用来当作发布者/订阅者组件使用非常方便。但是比起Go语言中的channel还是有些区别的因为c#的Async/Await从某中意义上讲并不是真正的多线程。 1 2 3 4 5 6 7 8 91011121314151617181920212223242526272829303132333435363738394041424344454647package mainimport ( fmt time)func main() { fmt.Println(运行开始...) channelRun(2000, 0, 5, 50, 10)}// channel运行实例// readDelayMs, writeDelayMs分别是读取器需要暂停的时间和写入器需要暂停的时间// finalNumberOfReaders是读取器个个数// howManyMessages是消息的总数// maxCapacity是channel的容量func channelRun(readDelayMs, writeDelayMs, finalNumberOfReaders, howManyMessages, maxCapacity int) { // 创建channel channel : make(chan string, maxCapacity) for i : 0; i finalNumberOfReaders; i { go func(i int) { read(channel, i, readDelayMs) }(i) } for i : 0; i howManyMessages; i { fmt.Printf(写入器在%v写入%v\n, time.Now().Format(2006-01-02 15:04:05.0000), i) channel - fmt.Sprintf(发布消息%v, i) time.Sleep(time.Duration(writeDelayMs) * time.Millisecond) }}// 读取器从channel中读取数据// channel时chan的实例// readerNumner代表了当前Goroutine的编号// delayMs表示当前的Goroutine读取完需要等待的时间func read(channel chan string, readerNumber, delayMs int) { // 不断循环读取 for { select { case msg : -channel: fmt.Printf(%v号Gouroutine 读取器在%v读取到了消息%s\n, readerNumber, time.Now().Format(2006-01-02 15:04:05.0000), msg) time.Sleep(time.Duration(delayMs) * time.Millisecond) } }}结果 https://medium.com/alexyakunin/go-vs-c-part-1-goroutines-vs-async-await-ac909c651c11https://github.com/dotnet/corefx/blob/master/src/System.Threading.Channels/tests/ChannelTests.cshttps://sachabarbs.wordpress.com/2018/11/28/system-threading-channels/