做网站选择什么相机,wordpress后台切换中文,基于cms设计网站的背景意义,培训网站平台怎样做有一段时间没有写文章#xff0c;techempower的测试规则评分竟然发生了变化#xff0c;只能忘着补充一下占比权重最多的数据更新示例了和深入设计一下组件模块化加载的设计。但在不久前有用户问了一下组件是否支持redis的Stream功能#xff0c;看了一样相关资料后把功能实现… 有一段时间没有写文章techempower的测试规则评分竟然发生了变化只能忘着补充一下占比权重最多的数据更新示例了和深入设计一下组件模块化加载的设计。但在不久前有用户问了一下组件是否支持redis的Stream功能看了一样相关资料后把功能实现之接下来就介绍一下如何用Beetlex.Redis来调用redis的Stream功能。什么是Stream是Redis5.0的Stream是一个新的强大的支持多播的可持久化的消息队列,它提供了消息添加多组和多消费者一致性读取和ack确认等功能更详细的介绍就不多说了可以通过网络找到更多详细描述。创建Stream组件通过RedisDB对象的GetStream访求来创建一个Stream访问对象对象创建后就可以进行一系列的 XACK| XADD| XDEL| XGROUP| XLEN| XRANGE| XREAD| XREADGROUP| XREVRANGE|XTRIM等指令操作。创建代码如下:RedisStreamEmployee stream DB.GetStreamEmployee(employees_stream);
XADD在介绍这个操作前先说一下Stream里存储的格式默认Stream消息是K-V的格式从基础指令上可以了解到这种结构XADD mystream * sensor-id 1234 temperature 19.8
但这种格式操作起来并不友好所以组件除了支持这种K-V的方式外还支持以对象的方式进行Stream消息处理。接下来看一下插入对象的调用RedisStreamEmployee stream DB.GetStreamEmployee(employees_stream);
var id await stream.Add(DataHelper.Defalut.Employees[0]);
id await stream.Add(DataHelper.Defalut.Employees[1]);
id await stream.Add(DataHelper.Defalut.Employees[2]);
var len await stream.Len();
组件支持直接入插对象其基础指令就是XADD employees_stream * date employeejson
组件直接采用一个K-V的方式来存储对象对于原则多个K-V的方式组件同样也支持只是在构建Stream指定类型用Dictionarystring,string即可接下其他就不多说了直接上指令用例了。XLENRedisStreamEmployee stream DB.GetStreamEmployee(employees_stream);
var len await stream.Len();
XDELRedisStreamEmployee stream DB.GetStreamEmployee(employees_stream);
var items await stream.Read(null, null, 0-0);
await stream.Del((from item in items select item.ID).ToArray());
XRANGERedisStreamEmployee stream DB.GetStreamEmployee(employees_stream);
var items await stream.Range();
items await stream.RangeAll();
XREVRANGERedisStreamEmployee stream DB.GetStreamEmployee(employees_stream);
var items await stream.RevRange();
items await stream.RevRangeAll();
XREADRedisStreamEmployee stream DB.GetStreamEmployee(employees_stream);
var items await stream.Read(0, null, 0-0);
items await stream.Read();
Stream的消费组前面介绍的指令感觉列表结构都能满足其实Stream重要的功能是在组消费这一块Redis可以针对Stream创建多个消费组和消费者而消息会做一致性消费处理。XGROUPRedisStreamEmployee stream DB.GetStreamEmployee(employees_stream);
var group await stream.GetGroup(henry);
XREADRedisStreamEmployee stream DB.GetStreamEmployee(employees_stream);
var group await stream.GetGroup(g1);
var items await group.Read(henry, 0);
实际XRead提供了是否等待和起始读已取参数 public async ValueTaskListStreamDataItemT ReadWait(string consumer,int timeout0public ValueTaskListStreamDataItemT Read(string consumer,string start null) public async ValueTaskListStreamDataItemT Read(string consumer, int? block, int? count, string start null)
一般情况下可以通过readwait来不停地消息新的消息while(true)
{items await group.ReadWait(henry);//处理消息foreach(var item in items){await item.Ack();}
}
XACKRedisStreamEmployee stream DB.GetStreamEmployee(employees_stream);
var group await stream.GetGroup(g1);
var items await group.Read(henry, 0);
foreach (var item in items)await item.Ack();
以上是BeetleX.Redis组件提供操作Stream的基础指令实际上Stream还有一些和运维相关的指令只是这些在实际业务上用不上所以就没有去实现了。