spark-kafka-writer v0.4.0 released!

July 22, 2017 - Spark, Kafka, tutorial

We’re pleased to announce version 0.4.0 of Spark Kafka Writer.

Spark Kafka Writer is a library that lets tou save your Spark data to Kafka seamlessly: RDDs, DStreams, Datasets and DataFrames.

The repository is on GitHub and you can find the latest version on maven central.

In this post, we’ll walk through the new support for writing DataFrames and Datasets to Kafka.

Writing a DataFrame to Kafka

From version 0.4.0 on, you’ll be able to write DataFrames to Kafka. This differs from writing the output of batch queries to Kafka using the Structure Streaming API, in the way that you control how you serialize Rows and you can access the callback API.

import com.github.benfradet.spark.kafka.writer._
import org.apache.kafka.clients.producer.{Callback, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.serialization.StringSerializer

@transient lazy val log = org.apache.log4j.Logger.getLogger("spark-kafka-writer")

val producerConfig = Map(
  "bootstrap.servers" -> "127.0.0.1:9092",
  "key.serializer" -> classOf[StringSerializer].getName,
  "value.serializer" -> classOf[StringSerializer].getName
)

val dataFrame: DataFrame = ...
dataFrame.writeToKafka(
  producerConfig,
  row => new ProducerRecord[String, String](topic, row.toString),
  Some(new Callback with Serializable {
    override def onCompletion(metadata: RecordMetadata, e: Exception): Unit = {
      if (Option(e).isDefined) {
        log.warn("error sending message", e)
      } else {
        log.info(s"everything went fine, record offset was ${metadata.offset()}")
      }
    }
  })
)

Writing a Dataset to Kafka

In the same way you can write DataFrames to Kafka, you’ll now be able to write Datasets to Kafka:

case class Foo(a: Int, b: String)

val dataset: Dataset[Foo] = ...
dataset.writeToKafka(
  producerConfig,
  foo => new ProducerRecord[String, String](topic, foo.toString),
  Some(new Callback with Serializable {
    override def onCompletion(metadata: RecordMetadata, e: Exception): Unit = {
      if (Option(e).isDefined) {
        log.warn("error sending message", e)
      } else {
        log.info(s"everything went fine, record offset was ${metadata.offset()}")
      }
    }
  })
)

Other updates

Version 0.4.0 also brings other changes:

Roadmap

For version 0.5.0, we’re aiming to provide a native API for Java and Python.

If you’d like to get involved, there are different ways you can contribute to the project:

You can also ask questions and discuss the project on the Gitter channel and check out the Scaladoc.