当前位置: 首页 > news >正文

建设银行网站的目的是什么意思二手网站信用体系建设

建设银行网站的目的是什么意思,二手网站信用体系建设,我有域名怎么建网站,西固网站建设storm框架中的kafkaspout类实现的是BaseRichSpout#xff0c;它里面已经重写了fail和ack方法#xff0c;所以我们的bolt必须实现ack机制#xff0c;就可以保证消息的重新发送#xff1b;如果不实现ack机制#xff0c;那么kafkaspout就无法得到消息的处理响应#xff0c;就…storm框架中的kafkaspout类实现的是BaseRichSpout它里面已经重写了fail和ack方法所以我们的bolt必须实现ack机制就可以保证消息的重新发送如果不实现ack机制那么kafkaspout就无法得到消息的处理响应就会在超时以后再次发送消息导致消息的重复发送。 但是回想一下我们自己写一个spout类实现BaseRichSpout并让他具备消息重发那么我们是会在我们的spout类里面定义一个map集合并以msgId作为key。 public class MySpout extends BaseRichSpout {private static final long serialVersionUID 5028304756439810609L;// key:messageId,Dataprivate HashMapString, String waitAck new HashMapString, String();private SpoutOutputCollector collector;public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields(sentence));}public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {this.collector collector;}public void nextTuple() {String sentence the cow jumped over the moon;String messageId UUID.randomUUID().toString().replaceAll(-, );waitAck.put(messageId, sentence);//指定messageId开启ackfail机制collector.emit(new Values(sentence), messageId);}Overridepublic void ack(Object msgId) {System.out.println(消息处理成功: msgId);System.out.println(删除缓存中的数据...);waitAck.remove(msgId);}Overridepublic void fail(Object msgId) {System.out.println(消息处理失败: msgId);System.out.println(重新发送失败的信息...);//重发如果不开启ackfail机制那么spout的map对象中的该数据不会被删除的,而且下游collector.emit(new Values(waitAck.get(msgId)),msgId);} } 那么kafkaspout会不会也是这样还保存这已发送未收到bolt响应的消息呢如果这样如果消息处理不断失败不断重发消息不断积累在kafkaspout节点上kafkaspout端会不就会出现内存溢出 其实并没有回想kafka的原理Kafka会为每一个consumergroup保留一些metadata信息–当前消费的消息的position也即offset。这个offset由consumer控制。正常情况下consumer会在消费完一条消息后线性增加这个offset。当然consumer也可将offset设成一个较小的值重新消费一些消息。也就是说kafkaspot在消费kafka的数据是通过offset读取到消息并发送给bolt后kafkaspot只是保存者当前的offset值。 当失败或成功根据msgId查询offset值然后再去kafka消费该数据来确保消息的重新发送。 那么虽然offset数据小但是当offset的数据量上去了还是会内存溢出的 其实并没有kafkaspout发现缓存的数据超过限制了会把某端的数据清理掉的。 kafkaspot中发送数据的代码 collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset)); 可以看到msgID里面包装了offset参数。 它不缓存已经发送出去的数据信息。 当他接收到来至bolt的响应后会从接收到的msgId中得到offset。以下是从源码中折取的关键代码 public void ack(Object msgId) {KafkaMessageId id (KafkaMessageId) msgId;PartitionManager m _coordinator.getManager(id.partition);if (m ! null) {m.ack(id.offset);}}m.ack(id.offset);public void ack(Long offset) {_pending.remove(offset);//处理成功移除offsetnumberAcked;}public void fail(Object msgId) {KafkaMessageId id (KafkaMessageId) msgId;PartitionManager m _coordinator.getManager(id.partition);if (m ! null) {m.fail(id.offset);}}m.fail(id.offset);public void fail(Long offset) {failed.add(offset);//处理失败添加offsetnumberFailed;}SortedSetLong _pending new TreeSetLong();SortedSetLong failed new TreeSetLong(); 关于kafkaspot的源码解析大家可以看这边博客http://www.cnblogs.com/cruze/p/4241181.html 源码解析中涉及了很多kafka的概念所以仅仅理解kafka的概念想完全理解kafkaspot源码是很难的如果不理解kafka概念那么就只需要在理解storm的ack机制上明白kafkaspot做了上面的两件事就可以了。 转发:http://www.cnblogs.com/intsmaze/p/5947078.html
http://www.zqtcl.cn/news/266788/

相关文章:

  • 网站改版方案案例入门级网页设计培训学员
  • 安徽优化网站运营平台
  • 小型企业网站设计教程面备案网站建设
  • 重庆业务外包网站建设办公室装修一般多少钱一个平方
  • 网站查询域名ip解析手机短视频网站的建设
  • 甘肃机械化建设工程有限公司网站微小店网站建设价格
  • 个人空间网站建设报告网络游戏交易平台
  • 深圳医疗网站建设中小企业网站功能
  • 汕头集团做网站方案建设网站要买空间吗
  • 宁波搭建网站专业展馆展厅设计公司深圳
  • 山东省建设工程电子信息网站广州开发区第一小学
  • 网站建设推广重要性河北高端网站建设
  • 网站的seo方案怎么做wordpress自动转内链
  • 番禺手机网站制作推广wordpress远程数据库
  • 企业网站seo外包 s深圳国内设计网站
  • 临海高端营销型网站建设地址免费网站alexa排名查询
  • 做企业网站的轻量级cms建设电子商务网站流程图
  • 淘宝网站设计分析国内在线免费服务器
  • wordpress网站文章加密网站建设 博采网络
  • 哪个网站做美食好一点网络运维个人工作总结
  • 做网红用哪个网站教人做策划的网站
  • 百度免费网站怎样建设wordpress模板目录结构
  • 长沙简单的网站建设公司wordpress+手机应用
  • 用spl做网站wordpress不用缓存
  • 微网站模板标签网站被攻击怎么让百度重新蜘蛛自动抓
  • 自己想做一个网站网页背景怎么设置
  • 国外做项目的网站软件定制外包平台
  • 做网站要用什么软件房地产建设网站
  • 龙岗爱联有学网站建设装饰公司简介
  • pc端网站怎么做自适应哪个公司网站备案快