streaming消费kafka手动维护offset到redis
1.redis工具类
```scala
package com.qupojie.kafka_offset
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
import java.util
import scala.collection.JavaConverters.mapAsScalaMapConverter
import scala.collection.mutable
object RedisUtils {
private val config = new JedisPoolConfig
private val redisHost = "hadoop110"
private val redisPort = 6379
config.setMaxTotal(30)
config.setMaxIdle(10)
private val pool = new JedisPool(config, redisHost, redisPort, 10000)
private val topicPrefix = "kafka:topic"
private val offsetMap: mutable.Map[TopicPartition, Long] = mutable.Map()
private def getKey(topic: String, groupId: String, prefix: String = topicPrefix): String = s"$prefix:$topic:$groupId"
private def getRedisConnection: Jedis = pool.getResource
//获取redis offset
def getOffsetsFromRedis(topics: Array[String], groupId: String): mutable.Map[TopicPartition, Long] = {
val jedis: Jedis = getRedisConnection
topics.foreach(topic => {
jedis.select(1)
val resultoffsetMap: util.Map[String, String] = jedis.hgetAll(getKey(topic, groupId))
//关闭流
jedis.close()
//判断
if (resultoffsetMap.size() == 0) {
//如果没用读到redis数据就给初始化offset
offsetMap.put(new TopicPartition(topic, 0), 0L)
offsetMap.put(new TopicPartition(topic, 1), 0L)
offsetMap.put(new TopicPartition(topic, 2), 0L)
offsetMap
} else {
resultoffsetMap.asScala
.foreach(offset => {
//如果读到redis数据就封装map返回
offsetMap.put(new TopicPartition(topic, offset._1.toInt), offset._2.toLong)
})
}
})
offsetMap
}
def saveOffsetsToRedis(ranges: Array[OffsetRange], groupId: String): Unit = {
val jedis: Jedis = getRedisConnection
jedis.select(1)
val offsetList: Map[String, Array[(String, (Int, Long))]] = ranges
.map(range => (range.topic, range.partition -> range.untilOffset))
.groupBy(_._1)
offsetList.map {
case (topic, buffer) => (topic, buffer.map(_._2))
}.foreach {
case (topic, partitionAndOffset) =>
val offsets: Array[(String, String)] = partitionAndOffset.map(elem => (elem._1.toString, elem._2.toString))
import scala.collection.JavaConverters._
jedis.hmset(getKey(topic, groupId), offsets.toMap.asJava)
}
jedis.close()
}
}
```scala
2.spark streaming实现类
```scala
package com.qupojie.kafka_offset
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Assign
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Durations, Seconds, StreamingContext}
import scala.collection.mutable
object SparkConsumerKafka01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SparkConsumerKafka01").setMaster("local[*]")
val sc: StreamingContext = new StreamingContext(conf = conf, batchDuration = Durations.seconds(5))
val kafkaParams: mutable.HashMap[String, String] = new mutable.HashMap[String, String]()
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop110:9092,hadoop112:9092,hadoop112:9092");
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("spark.streaming.kafka.maxRatePerPartition", "10");
kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
var groupId = "SparkConsumerKafka01"
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // 最新的位置
val dbIndex = 1
val topics = Array("spark_test02")
val fromOffsets: mutable.Map[TopicPartition, Long] = RedisUtils.getOffsetsFromRedis(topics, "SparkConsumerKafka01")
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
sc,
PreferConsistent,
Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)
stream.foreachRDD(rdd => {
println("--------当前时间"+System.nanoTime()+"--------")
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition(iter => {
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} 分区: ${o.partition} 起始偏移量:${o.fromOffset} 结束偏移量: ${o.untilOffset}")
RedisUtils.saveOffsetsToRedis(offsetRanges, groupId)
})
})
sc.start()
sc.awaitTermination()
}
}
```scala
3.运行图
原文地址:https://blog.csdn.net/qq_42890382/article/details/143870681
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!