Introduction to Kafka Streams in Kotlin

1. Overview

In this article, we will learn how to use Kafka Streams in Kotlin.

2. Creating a Kafka Stream

Kafka Streams combine the simplicity of writing and deploying Kotlin applications with the benefits of Kafka’s server-side cluster technology.

It is a library for building applications, which stores the input and output data in Kafka clusters. In this section, we will see how we can create a Kafka Stream by using Kotlin.

2.1 Gradle Dependencies

First, let us add the following dependencies into our build.gradle:

dependencies {
    implementation 'org.apache.kafka:kafka-clients:2.4.1'
    implementation 'org.apache.kafka:kafka-streams:2.4.1'
}

2.2 Building a KafkaStreams

Once we have configured our build management, let us continue by defining a stream that reads records from the preferences-authorization topic, transforms them by using the PreferencesTransformer and sends the output records to either the preferences topic or the preferences-authorization topic.

Kafka Streams Transformer

2.2.1 Configuring KafkaStreams

We need to provide a set of properties for the configuration of our KafkaStreams:

val properties = Properties()

The identifier for the stream processing application:

properties[StreamsConfig.APPLICATION_ID_CONFIG] = config.applicationId

Default serializer/deserializer classes for keys and values:

properties[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String()::class.java
properties[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = SpecificAvroSerde::class.java

URLs to our local Kafka cluster and Schema Registry and the strategy on how to construct the subject name under which the value schema is registered with the schema registry. By setting TopicRecordNameStrategy, it allows us to publish different types of AVRO to the same topic:

properties[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = config.bootstrapServersConfig
properties[KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG] = config.schemaRegistryUrlConfig
properties["value.subject.name.strategy"] = TopicRecordNameStrategy::class.java

2.2.2 Building a KafkaStreams Topology

We need to create a KafkaStreams topology that defines the processing logic of our stream - that is how records should be consumed, transformed and produced.

In our example, the stream consumes records from the preferences-authorization topic and transforms them by using the PreferencesTransformer. Then we use branching to route records to different downstream topics based on the supplied predicates.

val builder = StreamsBuilder()

val (preferences, preferencesAuthorization) = builder
    .stream<String, SpecificRecord>(config.topics.preferencesAuthorization)
    .transform(TransformerSupplier { PreferencesTransformer(clock) })
    .branch(
        Predicate { _, v -> v is PreferencesCreated },
        Predicate { _, v -> v is CreatePreferencesDenied }
    )

The predicates are evaluated in order. In case of a match, the record is placed to the corresponding Kstream. In our example, if the record is a PreferencesCreated it is placed into the preferences KStream and routed to the preferences topic.

preferences
    .peek { _, record -> logger.info("Sent ${record.schema.name} to topic: ${config.topics.preferences}\n\trecord: $record") }
    .to(config.topics.preferences)

And if the record is a CreatePreferencesDenied it is placed into the preferencesAuthorization KStream and routed to the preferences-authorization topic.

preferencesAuthorization
    .peek { _, record -> logger.info("Sent ${record.schema.name} to topic: ${config.topics.preferencesAuthorization}\n\trecord: $record") }
    .to(config.topics.preferencesAuthorization)

If no predicate matches, the record is dropped.

Next, we need to build the topology:

val topology = builder.build()

2.2.3 Starting KafkaStreams service

Finally, we need to start our KafkaStreams service by calling the start() method.

val streams = KafkaStreams(topology, properties)

streams.start()

2.3 Building a KafkaStreams Transformer

We need to create the PreferencesTransformer that processes the records consumed from the preferences-authorization topic. For each record of type CreatePreferencesCommand it needs to do a validation. If the validation is successful, it needs to return a record of type PreferencesCreated. If the validation is not successful, then it needs to return a record of type CreatePreferencesDenied.

To keep the example simple, we added a dummy validation to check if the currency is USD, in a real application, the validations would be more complicated.

if (record is CreatePreferencesCommand) {
    val eventId = UUID.randomUUID().toString()
    val event = if (record.currency == "USD") {
        PreferencesCreated(eventId, clock.instant(), record.customerId, record.currency, record.country)
    } else {
        CreatePreferencesDenied(eventId, clock.instant(), record.customerId, record.currency, record.country)
    }
    return KeyValue(record.customerId, event)
}

3. Conclusion

In this article, we saw how to create Kafka Streams in Kotlin and how we can take advantage of the benefits of using Kafka’s server-side cluster technology to consume records from Kafka topics, transform those records and route them to different downstream topics.

The full source code is available on GitHub.

References: