建设银行网站的目的是什么意思,二手网站信用体系建设,我有域名怎么建网站,西固网站建设storm框架中的kafkaspout类实现的是BaseRichSpout#xff0c;它里面已经重写了fail和ack方法#xff0c;所以我们的bolt必须实现ack机制#xff0c;就可以保证消息的重新发送#xff1b;如果不实现ack机制#xff0c;那么kafkaspout就无法得到消息的处理响应#xff0c;就…storm框架中的kafkaspout类实现的是BaseRichSpout它里面已经重写了fail和ack方法所以我们的bolt必须实现ack机制就可以保证消息的重新发送如果不实现ack机制那么kafkaspout就无法得到消息的处理响应就会在超时以后再次发送消息导致消息的重复发送。 但是回想一下我们自己写一个spout类实现BaseRichSpout并让他具备消息重发那么我们是会在我们的spout类里面定义一个map集合并以msgId作为key。
public class MySpout extends BaseRichSpout {private static final long serialVersionUID 5028304756439810609L;// key:messageId,Dataprivate HashMapString, String waitAck new HashMapString, String();private SpoutOutputCollector collector;public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields(sentence));}public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {this.collector collector;}public void nextTuple() {String sentence the cow jumped over the moon;String messageId UUID.randomUUID().toString().replaceAll(-, );waitAck.put(messageId, sentence);//指定messageId开启ackfail机制collector.emit(new Values(sentence), messageId);}Overridepublic void ack(Object msgId) {System.out.println(消息处理成功: msgId);System.out.println(删除缓存中的数据...);waitAck.remove(msgId);}Overridepublic void fail(Object msgId) {System.out.println(消息处理失败: msgId);System.out.println(重新发送失败的信息...);//重发如果不开启ackfail机制那么spout的map对象中的该数据不会被删除的,而且下游collector.emit(new Values(waitAck.get(msgId)),msgId);}
}
那么kafkaspout会不会也是这样还保存这已发送未收到bolt响应的消息呢如果这样如果消息处理不断失败不断重发消息不断积累在kafkaspout节点上kafkaspout端会不就会出现内存溢出 其实并没有回想kafka的原理Kafka会为每一个consumergroup保留一些metadata信息–当前消费的消息的position也即offset。这个offset由consumer控制。正常情况下consumer会在消费完一条消息后线性增加这个offset。当然consumer也可将offset设成一个较小的值重新消费一些消息。也就是说kafkaspot在消费kafka的数据是通过offset读取到消息并发送给bolt后kafkaspot只是保存者当前的offset值。
当失败或成功根据msgId查询offset值然后再去kafka消费该数据来确保消息的重新发送。 那么虽然offset数据小但是当offset的数据量上去了还是会内存溢出的
其实并没有kafkaspout发现缓存的数据超过限制了会把某端的数据清理掉的。
kafkaspot中发送数据的代码
collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
可以看到msgID里面包装了offset参数。
它不缓存已经发送出去的数据信息。 当他接收到来至bolt的响应后会从接收到的msgId中得到offset。以下是从源码中折取的关键代码
public void ack(Object msgId) {KafkaMessageId id (KafkaMessageId) msgId;PartitionManager m _coordinator.getManager(id.partition);if (m ! null) {m.ack(id.offset);}}m.ack(id.offset);public void ack(Long offset) {_pending.remove(offset);//处理成功移除offsetnumberAcked;}public void fail(Object msgId) {KafkaMessageId id (KafkaMessageId) msgId;PartitionManager m _coordinator.getManager(id.partition);if (m ! null) {m.fail(id.offset);}}m.fail(id.offset);public void fail(Long offset) {failed.add(offset);//处理失败添加offsetnumberFailed;}SortedSetLong _pending new TreeSetLong();SortedSetLong failed new TreeSetLong();
关于kafkaspot的源码解析大家可以看这边博客http://www.cnblogs.com/cruze/p/4241181.html
源码解析中涉及了很多kafka的概念所以仅仅理解kafka的概念想完全理解kafkaspot源码是很难的如果不理解kafka概念那么就只需要在理解storm的ack机制上明白kafkaspot做了上面的两件事就可以了。
转发:http://www.cnblogs.com/intsmaze/p/5947078.html