Streaming data with Apache Kafka and Akka Streams

Gökhan Gürgeç
cloudnesil
Published in
5 min readAug 6, 2019

--

This post will demonstrate how data can be streamed with Akka streams and connected to Apacha Kafka with Alpakka Kafka Connector.

Streaming data, can stream from various rivers, seas and change structure in streaming journey. Alpakka is a good initiative to ease streaming journey for various rivers and structures when we are using Akka Streams.

Official definition of Alpakka is “The Alpakka project is an open source initiative to implement stream-aware and reactive integration pipelines for Java and Scala. It is built on top of Akka Streams, and has been designed from the ground up to understand streaming natively and provide a DSL for reactive and stream-oriented programming, with built-in support for backpressure.”

We will use Alpakka Kafka connector that lets us connect Akka Streams with Apache Kafka.

We will try to demonstrate both publishing message to Apache Kafka and consuming message from Apache Kafka in this post.

Implementation was done in Scala and available in github.

The implementation consists of two parts.

  1. Producer part: This part reads a csv file downloaded from kaggle. The data consists of goodreads book data with various language. Via producer flow each line in csv converted to json and send to Apache Kafka topic.
  2. Consumer part: Consumes Apache Kafka messages and via Filter Flow filters German language books and export to a new csv file.
  1. Dependencies & Configurations:

In this example our Scala Version is 2.13.0 and Kafka version is 2.12.

a. build.sbt

We need to add akka-stream-kafka dependency for kafka operations, akka-stream-alpakka-csv for reading and writing csv as streams, akka-http-spray-json for json processing.

b. application.conf

In order to run the application you need a running Kafka. You can configure Kafka Bootstrap Server Address in application.conf. You need a created Kafka topic. You can configure book source and filtering language and output file also. Kafka consumer and producer configuration is also in application.conf. You can look the details of configuration of for Consumer from https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#settings and for Producer from https://doc.akka.io/docs/alpakka-kafka/current/producer.html#settings . We used default settings for this project but we can configure them according to project needs.

2. Main object and Running the application:

running the application is simple:

> sbt run

We have a Main object and we first call Kafka Producer and after it completes we call Kafka Consumer.

3. Kafka Producer:

Akka and Akka streams are hard to understand but when you understand them you can do lots of work with few lines of code.

With 5 lines of code we read books.csv as streams, parse the lines, convert into Json and send to Kafka.

Steps:

a. First step is configuring the Kafka producer.

private val kafkaProducerSettings = ProducerSettings(actorSystem, new StringSerializer, new StringSerializer)
.withBootstrapServers(bootstrapServer)

Configuration is straightforward. We need an actor system to run Producer, define key and value serializer (We use String serializer since we use Json string as key and value) and bootstrap server address. We get bootstrap server address from application.conf.

b. Second step is reading file as stream. Akka Streams has lots of factory structures for getting data from source and we are using FileIO.

val fileSource: Source[ByteString, Future[IOResult]] = FileIO.fromPath(Paths.get(booksSource))

The fileSource emits file content as ByteString and produces Future[IOResult] auxiliary value.

c. Third step is handling the csv and thanks to Akka Streams again. Alpakka contains akka-stream-alpakka-csv library for transforming csv data as streams.

val csvFlow: Source[Map[String, String], Future[IOResult]] = fileSource.via(CsvParsing.lineScanner(delimiter = CsvParsing.SemiColon)).via(CsvToMap.toMapAsStrings())

via method of source creates a new source by applying Flow to source. Here we use CsvParsing.lineScanner and CsvToMap.toMapAsStrings flows are used for parsing data as csv and transforming to Map.

d. Fourth step is converting parsed csv data to Json string. We used akka-http-spray-json library for Json processing.

val jsonFlow: Source[String, Future[IOResult]] = csvFlow.map(toJson).map(_.compactPrint)

e. Last step is sending the Json data to Apache Kafka.

val producerFlow: Source[ProducerRecord[String, String], Future[IOResult]] = jsonFlow.map { item: String =>
new ProducerRecord[String, String](topic, item,item)
}

producerFlow.runWith(Producer.plainSink(kafkaProducerSettings))

It is pretty straightforward we create ProducerRecord from Json string with key and value same. We can set key a different value for each record.

and finally we run the producer flow as Sink and bingo the stream we created run and all the lines of Csv send to Apache Kafka.

4. Kafka Consumer:

Kafka Consumer has reverse steps of Kafka producer. We listen the Kafka topic we get the messages, decode Json message to Book object, filter the language and write the filtered data to csv.

a. Again at first we need to configure consumer.

val kafkaConsumerSettings: ConsumerSettings[String, String] =
ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(bootstrapServer)
.withGroupId(groupId)

We are adding consumer group id. Usage of consumer group is told in this article good. https://dzone.com/articles/dont-use-apache-kafka-consumer-groups-the-wrong-wa

b. Second step is creating a Kafka Source.

val kafkaSource: Source[ConsumerMessage.CommittableMessage[String, String], Consumer.Control] = Consumer      .committableSource(kafkaConsumerSettings, Subscriptions.topics(topic))

According to your Apache Kafka committing and partitioning needs, Alpakka Kafka presents different Source factories.

Since we want at least one strategy and commit Apache Kafka that we processed the message manually we used committableSource.

c. Third step is decoding the Json message and converting to Book object.

val decodeMessageFlow: Source[(CommittableOffset, Book), Consumer.Control] = kafkaSource.via(decodeValueFlow)

Have a look JsonParser util class. We do parsing there.

d. After decoding the message we send Apache Kafka Server that we processes this message offset. We can do this in any step.

val commitFlow: Source[Book, Consumer.Control] = decodeMessageFlow.map { case (offset, data) =>
offset.commitScaladsl()
data
}

e. Now we are ready to filter items that belongs to language we defined in application.conf

val filterFlow: Source[Book, Consumer.Control] = commitFlow.filter(book => languageList.contains(book.languageCode))

f. The next step is converting our Book object to a csv item. We convert the item to string list in dataFlow. We create a headerCSVSource for adding header row to csv and prepend data flow with header. Akka streams has lots of methods to help you combining sources, flows

val headerCSVSource: Source[Seq[String], NotUsed] = Source.single(Seq("average_rating", "bookID", "ratings_count", "authors", "text_reviews_count", "title", "# num_pages", "isbn13", "language_code", "isbn"))

val dataFlow: Source[Seq[String], Consumer.Control] =filterFlow
.map { book =>
getFieldValuesAsStringList(book)
}

val csvWriteFlow: Source[ByteString, Consumer.Control] = dataFlow.prepend(headerCSVSource).via(CsvFormatting.format(delimiter = CsvFormatting.SemiColon))

g. And the last step is creating a File Sink and run the sink.

val fileSink: RunnableGraph[Consumer.Control] = csvWriteFlow.to(FileIO.toPath(Paths.get(outputFile)))

fileSink.run()

and now we have a books_ger.csv that contains books in German language. We can define any language in application.conf

--

--

Gökhan Gürgeç
cloudnesil

IT professional worked on various positions(test engineer, developer, project manager) of software development, passionate to good quality software development