做网站好公司哪家好,网站流量少怎么做,wordpress对接公众号,网站设计公司网站协程#xff1a;Channel 热流
1、Channel是什么#xff1f;
生产者#xff1a;多个协程消费者#xff1a;多个协程中间#xff1a;Channel 管道 并发安全队列发送send接收recv
协程间通信
1、Channel可以用于协程间通信 // 通道Channelval channel ChannelIntChannel 热流
1、Channel是什么
生产者多个协程消费者多个协程中间Channel 管道 并发安全队列发送send接收recv
协程间通信
1、Channel可以用于协程间通信 // 通道Channelval channel ChannelInt()// 生产者launch{(1..6).forEach {delay(1000L)println(我生产了一个:$it)channel.send(it)}}// 消费者launch{(1..6).forEach {val r channel.receive()println(消费了一个:$r)}}capacity
1、生产速度消费速度
如果缓冲区满了send会挂起消费完后再生产capacity默认容量0
UNLIMITEDsend不再挂起
容量接近于无限容量不满就不会挂起 // 通道Channelval channel ChannelInt(Channel.UNLIMITED)消费方式 // 第一种发方式 消费(1..8).forEach {delay(2000L)val r channel.receive()println(消费了一个:$r)}iterator // 第二种发方式 消费val it channel.iterator()while (it.hasNext()) {val item it.next()delay(2000L)println(消费了一个:$item)}item in channel // 第三种发方式 消费for (item in channel) {delay(2000L)println(消费了一个:$item)}快捷方式
produce和ReceiveChannel
produce快速构建消费者
// 生产者的快捷方式val produce produce {(1..20).forEach { delay(2000L) ; send(it) }}// 普通的消费launch {for (item in produce) {println(消费了一个:$item)}}// receive()接收数据有数据没有消费send会一直阻塞launch {println(消费了一个:${produce.receive()})delay(2000)println(消费了一个:${produce.receive()})println(消费了一个:${produce.receive()})println(消费了一个:${produce.receive()})println(消费了一个:${produce.receive()})println(消费了一个:${produce.receive()})}produce(capacity 100)会增加缓冲区只要没有放满send不会再阻塞。 actor和SendChannel
actor快速构建消费者 // 消费者的快捷方式val consumer actorInt {(1..20).forEach {println(消费了一个:${receive()})}}// 普通的生成launch {(1..20).forEach { delay(2000L) ; consumer.send(it) }}close
1、channel.close
关闭一般是生产者去close
isClosedForSend
channel.close() 之前 isClosedForSend false channel.close() 之后 isClosedForSend true // 生产者launch {(1..6).forEach {if (!channel.isClosedForSend) {channel.send(it)println(我生产了一个$it)// if (it 3) channel.close() // 大部分情况下是生产者 去close}}println(close前 isClosedForSend:${channel.isClosedForSend} isClosedForReceive:${channel.isClosedForReceive})channel.close()println(close后 isClosedForSend:${channel.isClosedForSend} isClosedForReceive:${channel.isClosedForReceive})}isClosedForReceive
如果消费完了 isClosedForReceive true 否则就是false 如果缓冲区里面还有内容没有消费完 也是 false // 消费者launch {try {for (i in channel) {delay(2000L)println(我消费了一个:$i)}}finally {println(finally isClosedForSend:${channel.isClosedForSend} isClosedForReceive:${channel.isClosedForReceive})}}BroadcastChannel
1、广播给所有消费者多个地方可以接收到
创建 val channel ChannelInt()val broadcastChannel channel.broadcast(Channel.BUFFERED)生产 // 生产者launch {repeat(8) {delay(1000L)broadcastChannel.send(it 100001) // 发送}broadcastChannel.close() // 关闭}openSubscription
消费 repeat(8) {// 消费者launch {val r broadcastChannel.openSubscription()for (i in r) {println(协程$it ---- 消费者 ${i})}}}select
1、select: 择优选择数据谁先返回用谁的
加载首页数据可以作缓存缓存有用缓存缓存不存在去请求慢的不会再执行会被cancel
2、select 是一个用于多路选择的结构可以同时等待多个挂起函数或通道的操作完成。它类似于 switch 或 if-else 的多路分支语句但是它是用于协程的异步操作。
suspend fun selectExample() {selectUnit {someChannel.onReceive { value -// 处理从通道接收到的值}someDeferred.onAwait { result -// 处理异步操作完成后的返回值}onTimeout(1000) {// 在指定时间内没有任何操作完成时执行}}
}
3、select可以用于上游也可以用于下游
onAwait
async有onAwait
data class Home(val info1: String, val info2: String)data class HomeRequestResponseResultData(val code: Int, val msg: String, val home: Home)// 请求本地加载首页数据
fun CoroutineScope.getHomeLocalData() async (Dispatchers.IO) {delay(3000)Home(数据1..., 数据1...)
}// 请求网络服务器加载首页数据
fun CoroutineScope.getHomeRemoteData() async (Dispatchers.IO) {delay(6000)Home(数据3..., 数据4...)
}launch {val localRequestAction getHomeLocalData()val remoteRequestAction getHomeRemoteData()val resultResponse selectHomeRequestResponseResultData {localRequestAction.onAwait {// 做校验 工作// ...// 省略1000行代码HomeRequestResponseResultData(200, 恭喜你请求成功, it) // 最后一行作为返回值}remoteRequestAction.onAwait {// 做校验 工作// ...// 省略1000行代码HomeRequestResponseResultData(200, 恭喜你请求成功, it) // 最后一行作为返回值}}println(resultResponse:$resultResponse)}2、async需要在调用的CoroutineScope中执行
fun CoroutineScope.getHomeLocalData() async (Dispatchers.IO) {delay(3000)Home(数据1..., 数据1...)
}
// 对CoroutineScope扩展channel数组
哪个更快选择哪个Channel
onReceive
onReceive: 接收数据后的回调 val channels arrayOf(ChannelString?(), ChannelString?())launch {delay(6000)channels[0].send(login successful)}launch {delay(8000)channels[1].send(register successful)}val receiveResult selectString ? {for (channel in channels) {channel.onReceive {// 做校验 工作// ...// 省略1000行代码[$it] // 最后一行作为返回值}}}println(receiveResult)onJoin
launch无返回值但想看谁执行的最快 val job1 launch {println(launch1 run)} // 无返回值val job2 launch {println(launch2 run)} // 无返回值selectUnit {job1.onJoin { println(launch1 执行完成了 很快) }job2.onJoin { println(launch2 执行完成了 很快) }}onSend
发送数据并且显示回调的内容上游 // 准备Channel数组val channels arrayOf(ChannelChar(), ChannelChar())// 协程一Channel 的 发射源launch(Dispatchers.Default) {selectUnit {// 并行干活sendlaunch {channels[0].onSend(女) {println(channels[0].onSend(女) { $it })}}// 并行干活sendlaunch {channels[1].onSend(男) {println(channels[1].onSend(男) { $it })}}}}// 协程二下游 接收阶段launch { println(channel1 下游接收 ${channels[0].receive()}) }launch { println(channel2 下游接收 ${channels[1].receive()}) }输出
channel1 下游接收 女
channels[0].onSend(女) { RendezvousChannel34206005{EmptyQueue} }
// 1. onSend先发送消息
// 2. 下游接收到
// 3. onSend回调打印消息await
复用Channel