NOTE : This Kafka Spark Consumer code is taken from Kafka spout of the Apache Storm project (https://github.com/apache/storm/tree/master/external/storm-kafka), which was originally created by wurstmeister (https://github.com/wurstmeister/storm-kafka-0.8-plus). Original Storm Kafka Spout Code has been modified to work with Spark Streaming.
This utility will help to pull messages from Kafka using Spark Streaming and have better handling of the Kafka Offsets and handle failures.
This Consumer have implemented a Custom Reliable Receiver which uses Kafka Consumer API to fetch messages from Kafka and store every received block in Spark BlockManager. The logic will automatically detect number of partitions for a topic and spawn as many Kafka Receiver based on configured number of Receivers. Each Receiver can fetch messages from one or more Kafka Partitions.
e.g. if Kafka have 100 partitions of a Topic, and Spark Consumer if configured with 20 Receivers, each Receiver will handle 5 partition.
This consumer can commit the offsets of processed batch , once Spark Streaming batch is completed.
In Spark driver code , Receivers is launched by calling ReceiverLauncher.launch
Please see Java or Scala code example on how to use this Low Level Consumer
Version 2.1.0 : Spark verison 2.x and above. Kafka version 0.10 and above. Support for handling Multi Topic .
Version 2.0.0 : Spark verison 2.x and above. Kafka version 0.10 and above. Support for Kafka Security . Used New Kafka Consumer API
Version 1.0.9 : Spark Version earlier to 2.0 ( i.e. 1.6 and prior). All Kafka version ( 0.8.x, 0.9.x, 0.10.x, 0.11.x). No support for Kafka Security. Used Low Level SimpleConsumer API
- User latest Kafka Consumer API. Support for Kafka Security
- Support for consuming from multiple topics
- Zookeeper for storing the offset for each Kafka partition, which will help to recover in case of failure
- Spark streaming job using this Consumer does not require WAL for recovery from Driver or Executor failures. As this consumer has capability to store the processed offset after every Batch interval, in case of any failure, Consumer can start from the correct offset from the last Processed offset.
- This Consumer has implemented PID (Proportional , Integral , Derivative ) based Rate Controller for controlling Back-Pressure.
- This consumer have capability to use Message Interceptor which can be used to preprocess kafka messages before writing to Spark Block Manager
- Support for Consumer Lag Checker (like ConsumerOffsetChecker) tool to find Consumer Lag
-
This Consumer is Receiver based fault tolerant reliable consumer . This Receiver is designed to recover from any underlying failure and does not require WAL feature in case of Driver failure. Please refer to Consumer Recovery from Driver/Executor Crash section below for more details.
-
This Consumer have mechanism to create Block from Kafka Stream and write to Spark BlockManager ( See more details in Consumer Tuning Options section below ).
-
This Consumer has in-built PID (Proportional, Integral, Derivative ) Controller to control the Spark Back Pressure . The PID Controller rate feedback loop mechanism is built using Zookeeper. The logic to control Back Pressure is by altering numer of the messaged consumed during every poll. Please refer Spark Consumer Properties section on how to enable back pressure. Also see Consumer Tuning Options section on how to tune PID Controller.
-
Number of partitions in RDD generated by this consumer is decoupled from the number of Kafka partitions. One can control the RDD partitions by controlling the Block creation interval and Block Size. Let assume you have Kafka Topic with 10 Partition. And your Block Interval is 200 Ms and Batch Interval is 5 Sec. This Consumer will generate 5 Blocks every second (1 second / Block Interval ) for each Partitions , and 5 x 10 x 5 = 250 Blocks for every Batch. As every block written to Spark BlockManager within Batch interval creates one Partition for underlying RDD , which mean every RDD created per batch will have 250 Partitions and this will increase processing parallelism. Whereas , if RDD partition is same as Kafka partition , every RDD will only have 10 partitions (same as kafka topic partition) and limit your processing parallelism.
-
Consumer will enable end to end No Data Loss guarantee without support for Spark WAL feature. Refer to Consumer Recovery from Driver/Executor Crash section for more details.
git clone https://github.com/dibbhatt/kafka-spark-consumer
cd kafka-spark-consumer
mvn install
And Use Below Dependency in your Maven
<dependency>
<groupId>dibbhatt</groupId>
<artifactId>kafka-spark-consumer</artifactId>
<version>2.1.0</version>
</dependency>
This Consumer is now part of Spark Packages : http://spark-packages.org/package/dibbhatt/kafka-spark-consumer
Include this package in your Spark Applications using:
- spark-shell, pyspark, or spark-submit $SPARK_HOME/bin/spark-shell --packages dibbhatt:kafka-spark-consumer:2.1.0
- sbt
If you use the sbt-spark-package plugin, in your sbt build file, add:
spDependencies += "dibbhatt/kafka-spark-consumer:2.1.0"
Otherwise,
resolvers += "Spark Packages Repo" at "http://dl.bintray.com/spark-packages/maven"
libraryDependencies += "dibbhatt" % "kafka-spark-consumer" % "2.1.0"
- Maven
In your pom.xml, add:
<dependencies>
<!-- list of dependencies -->
<dependency>
<groupId>dibbhatt</groupId>
<artifactId>kafka-spark-consumer</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
<repositories>
<!-- list of other repositories -->
<repository>
<id>SparkPackagesRepo</id>
<url>http://dl.bintray.com/spark-packages/maven</url>
</repository>
</repositories>
This consumer supports all Kafka versions 0.8, 0.9, 0.10 and 0.11. And work with all Spark versions. One need to include respective Spark and Kafka versions in application pom.xml. Please refer to version compatibility section above
e.g. Below example to include dependency for Spark 2.2.0 and Kafka 0.11.0
<properties>
<spark.version>2.2.0</spark.version>
<kafka.version>1.1.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.0</version>
</dependency>
</dependencies>
These are the Consumer Properties need to be used in your Driver Code. ( See Java and Scala Code example on how to use these properties)
- ZK quorum details of Kafka Cluster.
- zookeeper.hosts=host1,host2
- Kafka ZK Port
- zookeeper.port=2181
- Kafka Topic to consume (comma separated list of Kafka Topics for Multi topic fetch)
- kafka.topic=topicA
- Kafka Consumer ID. Identifier of the Consumer
- kafka.consumer.id=consumer-id
- Kafka Bootstrap Servers.
- bootstrap.servers=x.x.x.x:9092
- OPTIONAL - Force From Start . Default Consumer Starts from Latest offset.
- consumer.forcefromstart=true
- OPTIONAL - Maximum messages fetched in one Poll. Default 500
- max.poll.records=100
- OPTIONAL - Fill Frequence in MS . Default 1 Second
- consumer.fillfreqms=500
- OPTIONAL - Consumer Back Pressure Support. Default is true
- consumer.backpressure.enabled=false
- OPTIONAL - This can further control RDD Partitions. Number of Blocks fetched from Kafka to merge before writing to Spark Block Manager. Default is 1
- consumer.num_fetch_to_buffer=10
- OPTIONAL - ZK used for consumer offset commit. It can be different ZK cluster than what used for Kafka.
- zookeeper.consumer.connection=host1:2181,host2:2181
Properties props = new Properties();
props.put("zookeeper.hosts", "x.x.x.x");
props.put("zookeeper.port", "2181");
props.put("kafka.topic", "topicA");
props.put("kafka.consumer.id", "kafka-consumer");
props.put("bootstrap.servers", "x.x.x.x:9092");
// Optional Properties
props.put("max.poll.records", "250");
props.put("consumer.fillfreqms", "1000");
SparkConf _sparkConf = new SparkConf();
JavaStreamingContext jsc = new JavaStreamingContext(_sparkConf, Durations.seconds(30));
// Specify number of Receivers you need.
int numberOfReceivers = 3;
JavaDStream<MessageAndMetadata<byte[]>> unionStreams = ReceiverLauncher.launch(
jsc, props, numberOfReceivers, StorageLevel.MEMORY_ONLY());
unionStreams.foreachRDD(new VoidFunction<JavaRDD<MessageAndMetadata<byte[]>>>() {
@Override
public void call(JavaRDD<MessageAndMetadata<byte[]>> rdd) throws Exception {
//Start Application Logic
rdd.foreachPartition(new VoidFunction<Iterator<MessageAndMetadata<byte[]>>>() {
@Override
public void call(Iterator<MessageAndMetadata<byte[]>> mmItr) throws Exception {
while(mmItr.hasNext()) {
MessageAndMetadata<byte[]> mm = mmItr.next();
byte[] key = mm.getKey();
byte[] value = mm.getPayload();
if(key != null)
System.out.println(" key :" + new String(key));
if(value != null)
System.out.println(" Value :" + new String(value));
}
}
});
//End Application Logic
//commit offset
ProcessedOffsetManager.persistsPartition(rdd, props);
}
});
try {
jsc.start();
jsc.awaitTermination();
}catch (Exception ex ) {
jsc.ssc().sc().cancelAllJobs();
jsc.stop(true, false);
System.exit(-1);
}
Complete example is available here :
The src/main/java/consumer/kafka/client/SampleConsumer.java is the sample Java code which uses this ReceiverLauncher to generate DStreams from Kafka and apply a Output operation for every messages of the RDD.
val conf = new SparkConf()
.setMaster("spark://x.x.x.x:7077")
.setAppName("LowLevelKafkaConsumer")
.set("spark.executor.memory", "1g")
.set("spark.rdd.compress","true")
.set("spark.storage.memoryFraction", "1")
.set("spark.streaming.unpersist", "true")
val sc = new SparkContext(conf)
//Might want to uncomment and add the jars if you are running on standalone mode.
sc.addJar("/home/kafka-spark-consumer/target/kafka-spark-consumer-2.1.0-jar-with-dependencies.jar")
val ssc = new StreamingContext(sc, Seconds(10))
val topic = "topicA"
val zkhosts = "x.x.x.x"
val zkports = "2181"
//Specify number of Receivers you need.
val numberOfReceivers = 1
val kafkaProperties: Map[String, String] =
Map("zookeeper.hosts" -> zkhosts,
"zookeeper.port" -> zkports,
"kafka.topic" -> topic,
"kafka.consumer.id" -> "kafka-consumer",
"bootstrap.servers" - > "9092"
//optional properties
"max.poll.records" -> "250",
"consumer.fillfreqms" -> "1000"
val props = new java.util.Properties()
kafkaProperties foreach { case (key,value) => props.put(key, value)}
val unionStreams = ReceiverLauncher.launch(ssc, props, numberOfReceivers,StorageLevel.MEMORY_ONLY)
unionStreams.foreachRDD(rdd => {
//Start Application Logic e.g. rdd.foreachPartition
println("\n\nNumber of records in this batch : " + rdd.count())
//End Application Logic
//Persists the Max Offset of given Kafka Partition to ZK
ProcessedOffsetManager.persistsPartition(rdd, props)
} )
try{
ssc.start()
ssc.awaitTermination()
} catch {
case e: Exception => {
sc.cancelAllJobs()
ssc.stop(stopSparkContext = true, stopGracefully = false)
System.exit(-1)
}
}
Complete example is available here :
examples/scala/LowLevelKafkaConsumer.scala is a sample scala code on how to use this utility.
Add the multiple topic in kafka.topic property
props.put("kafka.topic", "topicA,topicB,topicC");
Sample Multi Topic Fetch example. Code is exactly same as Single topic Fetch.
Properties props = new Properties();
props.put("zookeeper.hosts", "zkhost");
props.put("zookeeper.port", "2181");
props.put("kafka.topic", "topicA,topicB,topicC");
props.put("kafka.consumer.id", "kafka-consumer");
// Optional Properties
props.put("zookeeper.broker.path", "/brokers");
props.put("zookeeper.consumer.path", "/consumers");
props.put("consumer.forcefromstart", "false");
props.put("max.poll.records", "10");
props.put("consumer.fillfreqms", "500");
props.put("consumer.backpressure.enabled", "true");
//Kafka properties
props.put("bootstrap.servers", "x.x.x.x:9092");
SparkConf _sparkConf = new SparkConf();
JavaStreamingContext jsc = new JavaStreamingContext(_sparkConf, Durations.seconds(30));
//Specify number of Receivers you need.
int numberOfReceivers = 6;
JavaDStream<MessageAndMetadata<byte[]>> unionStreams = ReceiverLauncher.launch(
jsc, props, numberOfReceivers, StorageLevel.MEMORY_ONLY());
unionStreams.foreachRDD(new VoidFunction<JavaRDD<MessageAndMetadata<byte[]>>>() {
@Override
public void call(JavaRDD<MessageAndMetadata<byte[]>> rdd) throws Exception {
//Start Application Logic
rdd.foreachPartition(new VoidFunction<Iterator<MessageAndMetadata<byte[]>>>() {
@Override
public void call(Iterator<MessageAndMetadata<byte[]>> mmItr) throws Exception {
int countTopicA = 0;
int countTopicB = 0;
int countTopicC = 0;
while(mmItr.hasNext()) {
MessageAndMetadata<byte[]> mm = mmItr.next();
if(mm.getTopic().equals("topicA")) {
countTopicA++;
}
else if (mm.getTopic().equals("topicB")) {
countTopicB++;
}
else if (mm.getTopic().equals("topicC")) {
countTopicC++;
}
}
System.out.println("topicA count " + countTopicA);
System.out.println("topicB count " + countTopicB);
System.out.println("topicC count " + countTopicC);
}
});
System.out.println("RDD count " + rdd.count());
//End Application Logic
//commit offset
System.out.println("Commiting Offset");
ProcessedOffsetManager.persistsPartition(rdd, props);
}
});
if you see client/SampleConsumer.java or examples/scala/LowLevelKafkaConsumer.scala , you need to add couple of lines (marked as 1) in you Driver Code
For Java
//Get the stream
JavaDStream<MessageAndMetadata<byte[]>> unionStreams = ReceiverLauncher.launch(
jsc, props, numberOfReceivers, StorageLevel.MEMORY_ONLY());
unionStreams.foreachRDD(new VoidFunction<JavaRDD<MessageAndMetadata<byte[]>>>() {
@Override
public void call(JavaRDD<MessageAndMetadata<byte[]>> rdd) throws Exception {
//Start Application Logic
System.out.println("Number of records in this batch : " + rdd.count());
//rdd.foreachPartition { ..}
//End Application Logic
//**1** commit offset
ProcessedOffsetManager.persistsPartition(rdd, props);
}
});
For Scala
//Get the Stream
val unionStreams = ReceiverLauncher.launch(ssc, props, numberOfReceivers,StorageLevel.MEMORY_ONLY)
unionStreams.foreachRDD(rdd => {
//Start Application Logic e.g. rdd.foreachPartition {..}
println("\n\nNumber of records in this batch : " + rdd.count())
//End Application Logic
//**1** Commit offset
ProcessedOffsetManager.persistsPartition(rdd, props)
} )
this consumer supports Kafka Security. One just need to add necessary kafka security properties to pull messages from Secured kafka cluster. e.g.
props.put("bootstrap.servers", "x.x.x.x:9093");
props.put("security.protocol", "SSL");
props.put("ssl.truststore.location","kafka.server.truststore.jks");
props.put("ssl.truststore.password", "test1234");
This consumer by default using below properties, which can not be changed. If you want diffrent key/value.deserializer, you can do it either in Spark Job or using the Custom Message Hanlder concept mentioned below.
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "none");
Please refer to this blog which explains why WAL is needed for Zero Data Loss.
Primary reason for WAL is , Receiver commit offset of consumed block to ZK after same is written to Spark BlockManager ( and may be replicated). If blocks which are already consumed and committed is not processed , Receiver can have data loss. This is because of how Spark applications operate in a distributed manner. When the driver process fails, all the executors running in a standalone/yarn/mesos cluster are killed as well, along with any data in their memory. In case of Spark Streaming, all the data received from sources like Kafka are buffered in the memory of the executors until their processing has completed. This buffered data cannot be recovered even if the driver is restarted.
Hence there is a need for the WAL feature to recover already Received (but not processed) blocks from WAL written to persistence store like HDFS.
But this Consumer has a different mechanism for Driver failure. Consumer maintains the offset of the processed blocks. Which mean, this consumer can commit offsets of the already processed blocks to ZK, and in case of Driver failures , it can start from the offset next to the last processed offset ( instead last consumed offset) for every Kafka partition. Thus Consumer can start from exact same offset since the last successful batch was processed and hence No data loss.
Receiver receives a Block of Data equal to configurable FetchSize from Kafka every FillFrequency. A given Block fetched from Kafka may contain many messages. Every Receiver spawns dedicated thread for every Kafka partition. e.g. if there is 5 Receiver for 20 Kafka Partition, each Receiver will spawn 4 threads. Each thread fetch messages from one and only one Kafka psartition and writes Blocks of data to Spark BlockManager.
Every write to Spark Block Manager creates one Partition for underlying RDD. Say for given Batch Interval if there are N blocks written by all Receiving Threads, the RDD for that batch will have N Partition .
Receiver can write One Block of data pulled from Kafka during every Fetch, or can merge multiple Fetches together . This can be used to further control the RDD partitions. This is controlled by consumer.num_fetch_to_buffer property ( default is 1). Receiver wraps every messages of a given Block with some additional MetaData like message offset and kafka Partition ID.
As one Receiver thread can write multiple blocks to BlockManager, it need to find highest offset for every RDD Partition which belongs to same Kafka partition , and repeat the same for all Kafka partition . Finally it can find the highest offset for a Kafka partition amongst all RDD partition. e.g. , if RDD Partition 4, 8 and 12 are generated by Receiver Thread X for Kafka Partition Y , and highest offset for 4 is 100, 8 is 400 and 12 is 800; then highest offset for Kafka Partition Y for this Batch is 800.
This Consumer perform very simple Map Reduce logic to get the highest offset for every Kafka partitions belongs to a given RDD for a Batch. This <Partition, Offset> tuple is written back to ZK as already processed offset after the Batch completes.
This require few lines to be added in Spark Driver Code to avail this feature. Please refer Offset Commit Mechanism section.
This consumer Polls Kafka every consumer.fillfreqms and during every Poll , it tries to fetch max.poll.records messages.
Hecne let assume your Batch Duration is 30 seconds, and you have 10 Kafka Partitions. and max.poll.records is 50 , Total messages consumer can fetch is Total = (BatchDuration/FillFrequency) * MaxPollRecords * Number of Kafka Partitions
e.g. for above example it can fetch 30 * 10 * 50 = 15000 messages every batch.
If you need higher rate, you can increase the max.poll.records property , or you can increase poll frequency using by lowering consumer.fillfreqms property.
These two parameter need to be carefully tuned keeping in mind your downstream processing rate and your memory settings.
You can enable the BackPressure mechanism by setting consumer.backpressure.enabled to "true" in Properties used for ReceiverLauncher
The Default PID settings is as below.
- Proportional = 1.0
- Integral = 0.0
- Derivative = 0.0
If you increase any or all of these , your damping factor will be higher. So if you want to lower the Consumer rate than what is being calculated with default PID settings , you can increase these values.
You can control the PID values by settings the Properties below.
- consumer.backpressure.proportional
- consumer.backpressure.integral
- consumer.backpressure.derivative
This Cosnumer support writing custom message handler which can pre-process every consumed messages from Kafka before writing to Spark Block Manager. This is can be used for Filter/ Map type of logic which can be applied to every message.
To implement Custom MessageHandler, one need to extend
public abstract class KafkaMessageHandler implements Cloneable, Serializable
and provide implementation of protected abstract E process(byte[] payload)
Default MessageHandler is IdentityMessageHandler which is just pass through of exact same fetched byte[] from Kafka
public class IdentityMessageHandler extends KafkaMessageHandler<byte[]> {
@Override
protected byte[] process(byte[] payload) {
return payload;
}
}
E.g. one can implement MyMessageHandler which will transform consumed byte[] from Kafka to some type E
java example
public class MyMessageHandler extends KafkaMessageHandler<E> {
@Override
protected E process(byte[] payload) {
//do something
//return object of type E
}
}
MyMessageHandler myHandler = new MyMessageHandler();
JavaDStream<MessageAndMetadata<T>> unionStreams = ReceiverLauncher.launch(
jsc, props, numberOfReceivers, StorageLevel.MEMORY_ONLY(), myHandler);
scala example
import consumer.kafka.KafkaMessageHandler
object MyMessageHandler extends KafkaMessageHandler[Array[Byte]] {
override def process(payload: Array[Byte]): Array[Byte] = {
val s = new String(payload)
if (!s.contains("test")) return s.getBytes
else return "NA".getBytes
}
}
val unionStreams = ReceiverLauncher.launch(ssc, props, numberOfReceivers,StorageLevel.MEMORY_ONLY, MyMessageHandler)
val stream = unionStreams.map(x => { val s = new String(x.getPayload); s })
You can use standrad OffsetChecker utility from Kafka .
For this to work , Zookeeper Consumer Path Property should be zookeeper.consumer.path=/consumers
This is set by default in this version thus even if you do not specify this property, offset checker will be anyway enabled.
One can run the offset checker like this.
kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --broker-info x.x.x.x:9092 --group <kafka.consumer.id> --topic <kafka.topic> --zookeeper y.y.y.y:2181
With new Kafka API, command is
bin/kafka-consumer-groups --describe --group <kafka.consumer.id> --zookeeper y.y.y.y:2181
Let assume your Driver code is in xyz.jar which is built using the spark-kafka-consumer as dependency.
Launch this using spark-submit
./bin/spark-submit --class x.y.z.YourDriver --master spark://x.x.x.x:7077 --executor-memory 1G /<Path_To>/xyz.jar
This will start the Spark Receiver and Fetch Kafka Messages for every partition of the given topic and generates the DStream.
e.g. to Test Consumer provided in the package with your Kafka settings please modify it to point to your Kafka and use below command for spark submit. You may need to change the Spark-Version and Kafka-Version in pom.xml.
./bin/spark-submit --class consumer.kafka.client.SampleConsumer --master spark://x.x.x.x:7077 --executor-memory 1G /<Path_To>/kafka-spark-consumer-2.1.0-jar-with-dependencies.jar