什么网站可以接单做设计,深圳租房建设局网站,vs做的网站怎么发布,网站建设考试知识点Spark Streaming No Receivers 方式的createDirectStream 方法不使用接收器#xff0c;而是创建输入流直接从Kafka 集群节点拉取消息。输入流保证每个消息从Kafka 集群拉取以后只完全转换一次#xff0c;保证语义一致性。但是当作业发生故障或重启时#xff0c;要保障从当前…Spark Streaming No Receivers 方式的createDirectStream 方法不使用接收器而是创建输入流直接从Kafka 集群节点拉取消息。输入流保证每个消息从Kafka 集群拉取以后只完全转换一次保证语义一致性。但是当作业发生故障或重启时要保障从当前的消费位点去处理数据(即Exactly Once语义)单纯的依靠SparkStreaming本身的机制是不太理想的生产环境中通常借助手动管理offset的方式来维护kafka的消费位点。本文分享将介绍如何手动管理Kafka的Offset希望对你有所帮助。本文主要包括以下内容如何使用MySQL管理Kafka的Offset如何使用Redis管理Kafka的OffSet如何使用MySQL管理Kafka的Offset 我们可以从Spark Streaming 应用程序中编写代码来手动管理Kafka偏移量偏移量可以从每一批流处理中生成的RDDS偏移量来获取获取方式为KafkaUtils.createDirectStream(...).foreachRDD { rdd
// 获取偏移量
val offsetRanges rdd.asInstanceOf[HasOffsetRanges].offsetRanges...}
当获取到偏移量之后可以将将其保存到外部存储设备中(MySQL、Redis、Zookeeper、HBase等)。使用案例代码 MySQL中用于保存偏移量的表CREATE TABLE topic_par_group_offset (topic varchar(255) NOT NULL,partition int(11) NOT NULL,groupid varchar(255) NOT NULL,offset bigint(20) DEFAULT NULL,PRIMARY KEY (topic,partition,groupid)
) ENGINEInnoDB DEFAULT CHARSETutf8 ;
常量配置类:ConfigConstantsobject ConfigConstants {// Kafka配置val kafkaBrokers kms-2:9092,kms-3:9092,kms-4:9092val groupId group_testval kafkaTopics testval batchInterval Seconds(5)val streamingStorageLevel StorageLevel.MEMORY_AND_DISK_SER_2val kafkaKeySer org.apache.kafka.common.serialization.StringSerializerval kafkaValueSer org.apache.kafka.common.serialization.StringSerializerval sparkSerializer org.apache.spark.serializer.KryoSerializerval batchSize 16384val lingerMs 1val bufferMemory 33554432// MySQL配置val user rootval password 123qweval url jdbc:mysql://localhost:3306/kafka_offsetval driver com.mysql.jdbc.Driver// 检查点配置val checkpointDir file:///e:/checkpointval checkpointInterval Seconds(10)// Redis配置val redisAddress 192.168.10.203val redisPort 6379val redisAuth 123qweval redisTimeout 3000
}
JDBC连接工具类:JDBCConnPoolobject JDBCConnPool {val log: Logger Logger.getLogger(JDBCConnPool.getClass)var dataSource: BasicDataSource null/*** 创建数据源** return*/def getDataSource(): BasicDataSource {if (dataSource null) {dataSource new BasicDataSource()dataSource.setDriverClassName(ConfigConstants.driver)dataSource.setUrl(ConfigConstants.url)dataSource.setUsername(ConfigConstants.user)dataSource.setPassword(ConfigConstants.password)dataSource.setMaxTotal(50)dataSource.setInitialSize(3)dataSource.setMinIdle(3)dataSource.setMaxIdle(10)dataSource.setMaxWaitMillis(2 * 10000)dataSource.setRemoveAbandonedTimeout(180)dataSource.setRemoveAbandonedOnBorrow(true)dataSource.setRemoveAbandonedOnMaintenance(true)dataSource.setTestOnReturn(true)dataSource.setTestOnBorrow(true)}return dataSource}/*** 释放数据源*/def closeDataSource() {if (dataSource ! null) {dataSource.close()}}/*** 获取数据库连接** return*/def getConnection(): Connection {var conn: Connection nulltry {if (dataSource ! null) {conn dataSource.getConnection()} else {conn getDataSource().getConnection()}} catch {case e: Exception log.error(e.getMessage(), e)}conn}/*** 关闭连接*/def closeConnection (ps:PreparedStatement , conn:Connection ) {if (ps ! null) {try {ps.close();} catch {case e:Exception log.error(预编译SQL语句对象PreparedStatement关闭异常 e.getMessage(), e);}}if (conn ! null) {try {conn.close();} catch {case e:Exception log.error(关闭连接对象Connection异常 e.getMessage(), e);}}}
}
Kafka生产者KafkaProducerTestobject KafkaProducerTest {def main(args: Array[String]): Unit {val props : Properties new Properties()props.put(bootstrap.servers, ConfigConstants.kafkaBrokers)props.put(batch.size, ConfigConstants.batchSize.asInstanceOf[Integer])props.put(linger.ms, ConfigConstants.lingerMs.asInstanceOf[Integer])props.put(buffer.memory, ConfigConstants.bufferMemory.asInstanceOf[Integer])props.put(key.serializer,ConfigConstants.kafkaKeySer)props.put(value.serializer, ConfigConstants.kafkaValueSer)val producer : Producer[String, String] new KafkaProducer[String, String](props)val startTime : Long System.currentTimeMillis()for ( i - 1 to 100) {producer.send(new ProducerRecord[String, String](ConfigConstants.kafkaTopics, Spark, Integer.toString(i)))}println(消耗时间 (System.currentTimeMillis() - startTime))producer.close()}
}
读取和保存Offset该对象的作用是从外部设备中读取和写入Offset包括MySQL和Redisobject OffsetReadAndSave {/*** 从MySQL中获取偏移量** param groupid* param topic* return*/def getOffsetMap(groupid: String, topic: String): mutable.Map[TopicPartition, Long] {val conn JDBCConnPool.getConnection()val selectSql select * from topic_par_group_offset where groupid ? and topic ?val ppst conn.prepareStatement(selectSql)ppst.setString(1, groupid)ppst.setString(2, topic)val result: ResultSet ppst.executeQuery()// 主题分区偏移量val topicPartitionOffset mutable.Map[TopicPartition, Long]()while (result.next()) {val topicPartition: TopicPartition new TopicPartition(result.getString(topic), result.getInt(partition))topicPartitionOffset (topicPartition - result.getLong(offset))}JDBCConnPool.closeConnection(ppst, conn)topicPartitionOffset}/*** 从Redis中获取偏移量** param groupid* param topic* return*/def getOffsetFromRedis(groupid: String, topic: String): Map[TopicPartition, Long] {val jedis: Jedis JedisConnPool.getConnection()var offsets mutable.Map[TopicPartition, Long]()val key s${topic}_${groupid}val fields : java.util.Map[String, String] jedis.hgetAll(key)for (partition - JavaConversions.mapAsScalaMap(fields)) {offsets.put(new TopicPartition(topic, partition._1.toInt), partition._2.toLong)}offsets.toMap}/*** 将偏移量写入MySQL** param groupid 消费者组ID* param offsetRange 消息偏移量范围*/def saveOffsetRanges(groupid: String, offsetRange: Array[OffsetRange]) {val conn JDBCConnPool.getConnection()val insertSql replace into topic_par_group_offset(topic, partition, groupid, offset) values(?,?,?,?)val ppst conn.prepareStatement(insertSql)for (offset - offsetRange) {ppst.setString(1, offset.topic)ppst.setInt(2, offset.partition)ppst.setString(3, groupid)ppst.setLong(4, offset.untilOffset)ppst.executeUpdate()}JDBCConnPool.closeConnection(ppst, conn)}/*** 将偏移量保存到Redis中* param groupid* param offsetRange*/def saveOffsetToRedis(groupid: String, offsetRange: Array[OffsetRange]) {val jedis :Jedis JedisConnPool.getConnection()for(offsetRange-offsetRange){val topicoffsetRange.topicval partitionoffsetRange.partitionval offsetoffsetRange.untilOffset// key为topic_groupid,field为partitionvalue为offsetjedis.hset(s${topic}_${groupid},partition.toString,offset.toString)}}
}业务处理类该对象是业务处理逻辑主要是消费Kafka数据再处理之后进行手动将偏移量保存到MySQL中。在启动程序时会判断外部存储设备中是否存在偏移量如果是首次启动则从最初的消费位点消费如果存在Offset则从当前的Offset去消费。观察现象当首次启动时会从头消费数据手动停止程序然后再次启动会发现会从当前提交的偏移量消费数据。object ManualCommitOffset {def main(args: Array[String]): Unit {val brokers ConfigConstants.kafkaBrokersval groupId ConfigConstants.groupIdval topics ConfigConstants.kafkaTopicsval batchInterval ConfigConstants.batchIntervalval conf new SparkConf().setAppName(ManualCommitOffset.getClass.getSimpleName).setMaster(local[1]).set(spark.serializer,ConfigConstants.sparkSerializer)val ssc new StreamingContext(conf, batchInterval)// 必须开启checkpoint,否则会报错ssc.checkpoint(ConfigConstants.checkpointDir)ssc.sparkContext.setLogLevel(OFF)//使用broker和topic创建direct kafka streamval topicSet topics.split( ).toSet// kafka连接参数val kafkaParams Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG - brokers,ConsumerConfig.GROUP_ID_CONFIG - groupId,ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG - classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG - classOf[StringDeserializer],ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG - (false: java.lang.Boolean),ConsumerConfig.AUTO_OFFSET_RESET_CONFIG - earliest)// 从MySQL中读取该主题对应的消费者组的分区偏移量val offsetMap OffsetReadAndSave.getOffsetMap(groupId, topics)var inputDStream: InputDStream[ConsumerRecord[String, String]] null//如果MySQL中已经存在了偏移量,则应该从该偏移量处开始消费if (offsetMap.size 0) {println(存在偏移量从该偏移量处进行消费)inputDStream KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, offsetMap))} else {//如果MySQL中没有存在了偏移量从最早开始消费inputDStream KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams))}// checkpoint时间间隔必须是batchInterval的整数倍inputDStream.checkpoint(ConfigConstants.checkpointInterval)// 保存batch的offsetvar offsetRanges Array[OffsetRange]()// 获取当前DS的消息偏移量val transformDS inputDStream.transform { rdd // 获取offsetoffsetRanges rdd.asInstanceOf[HasOffsetRanges].offsetRangesrdd}/*** 状态更新函数* param newValues:新的value值* param stateValue状态值* return*/def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] {var oldvalue stateValue.getOrElse(0) // 获取状态值// 遍历当前数据并更新状态for (newValue - newValues) {oldvalue newValue}// 返回最新的状态Option(oldvalue)}// 业务逻辑处理// 该示例统计消息key的个数用于查看是否是从已经提交的偏移量消费数据transformDS.map(meg (spark, meg.value().toInt)).updateStateByKey(updateFunc).print()// 打印偏移量和数据信息观察输出的结果transformDS.foreachRDD { (rdd, time) // 遍历打印该RDD数据rdd.foreach { record println(skey${record.key()},value${record.value()},partition${record.partition()},offset${record.offset()})}// 打印消费偏移量信息for (o - offsetRanges) {println(stopic${o.topic},partition${o.partition},fromOffset${o.fromOffset},untilOffset${o.untilOffset},time${time})}//将偏移量保存到到MySQL中OffsetReadAndSave.saveOffsetRanges(groupId, offsetRanges)}ssc.start()ssc.awaitTermination()}
}
如何使用Redis管理Kafka的OffSet Redis连接类object JedisConnPool {val config new JedisPoolConfig//最大连接数config.setMaxTotal(60)//最大空闲连接数config.setMaxIdle(10)config.setTestOnBorrow(true)//服务器ipval redisAddress :String ConfigConstants.redisAddress.toString// 端口号val redisPort:Int ConfigConstants.redisPort.toInt//访问密码val redisAuth :String ConfigConstants.redisAuth.toString//等待可用连接的最大时间val redisTimeout:Int ConfigConstants.redisTimeout.toIntval pool new JedisPool(config,redisAddress,redisPort,redisTimeout,redisAuth)def getConnection():Jedis {pool.getResource}}
业务逻辑处理该对象与上面的基本类似只不过使用的是Redis来进行存储Offset存储到Redis的数据类型是Hash基本格式为[key field value] - [ topic_groupid partition offset]即 key为topic_groupid,field为partitionvalue为offset。object ManualCommitOffsetToRedis {def main(args: Array[String]): Unit {val brokers ConfigConstants.kafkaBrokersval groupId ConfigConstants.groupIdval topics ConfigConstants.kafkaTopicsval batchInterval ConfigConstants.batchIntervalval conf new SparkConf().setAppName(ManualCommitOffset.getClass.getSimpleName).setMaster(local[1]).set(spark.serializer, ConfigConstants.sparkSerializer)val ssc new StreamingContext(conf, batchInterval)// 必须开启checkpoint,否则会报错ssc.checkpoint(ConfigConstants.checkpointDir)ssc.sparkContext.setLogLevel(OFF)//使用broker和topic创建direct kafka streamval topicSet topics.split( ).toSet// kafka连接参数val kafkaParams Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG - brokers,ConsumerConfig.GROUP_ID_CONFIG - groupId,ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG - classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG - classOf[StringDeserializer],ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG - (false: java.lang.Boolean),ConsumerConfig.AUTO_OFFSET_RESET_CONFIG - earliest)// 从Redis中读取该主题对应的消费者组的分区偏移量val offsetMap OffsetReadAndSave.getOffsetFromRedis(groupId, topics)var inputDStream: InputDStream[ConsumerRecord[String, String]] null//如果Redis中已经存在了偏移量,则应该从该偏移量处开始消费if (offsetMap.size 0) {println(存在偏移量从该偏移量处进行消费)inputDStream KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, offsetMap))} else {//如果Redis中没有存在了偏移量从最早开始消费inputDStream KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams))}// checkpoint时间间隔必须是batchInterval的整数倍inputDStream.checkpoint(ConfigConstants.checkpointInterval)// 保存batch的offsetvar offsetRanges Array[OffsetRange]()// 获取当前DS的消息偏移量val transformDS inputDStream.transform { rdd // 获取offsetoffsetRanges rdd.asInstanceOf[HasOffsetRanges].offsetRangesrdd}/*** 状态更新函数** param newValues :新的value值* param stateValue 状态值* return*/def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] {var oldvalue stateValue.getOrElse(0) // 获取状态值// 遍历当前数据并更新状态for (newValue - newValues) {oldvalue newValue}// 返回最新的状态Option(oldvalue)}// 业务逻辑处理// 该示例统计消息key的个数用于查看是否是从已经提交的偏移量消费数据transformDS.map(meg (spark, meg.value().toInt)).updateStateByKey(updateFunc).print()// 打印偏移量和数据信息观察输出的结果transformDS.foreachRDD { (rdd, time) // 遍历打印该RDD数据rdd.foreach { record println(skey${record.key()},value${record.value()},partition${record.partition()},offset${record.offset()})}// 打印消费偏移量信息for (o - offsetRanges) {println(stopic${o.topic},partition${o.partition},fromOffset${o.fromOffset},untilOffset${o.untilOffset},time${time})}//将偏移量保存到到Redis中OffsetReadAndSave.saveOffsetToRedis(groupId, offsetRanges)}ssc.start()ssc.awaitTermination()}}
总结 本文介绍了如何使用外部存储设备来保存Kafka的消费位点通过详细的代码示例说明了使用MySQL和Redis管理消费位点的方式。当然外部存储设备很多用户也可以使用其他的存储设备进行管理Offset比如Zookeeper和HBase等其基本处理思路都十分相似。大数据技术与数仓