· Tutorial  · 4 min read

Routing events dynamically using Kafka Streams

Implementing an Event Router pattern with Spring Boot, Kafka Streams & TopicNameExtractor 🚰

Implementing an Event Router pattern with Spring Boot, Kafka Streams & TopicNameExtractor 🚰

In a previous blog post, we saw how to use CDC pattern to turn a SQL database into a stream of events.

Today we will see how to route events from one Kafka topic to multiple topics using Kafka Streams and Spring Boot.

This Event Router pattern might be particularly useful when an external system publish into a single Kafka topic.

Step 1 : Docker services

We will start our journey by running a Confluent stack on Docker Compose with :

  • Zookeeper
  • Confluent Server (a.k.a Kafka)
  • Schema Registry
  • Confluent Control Center
  • AKHQ
services:
zookeeper:
image: confluentinc/cp-zookeeper
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
schema-registry:
image: confluentinc/cp-schema-registry
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
control-center:
image: confluentinc/cp-enterprise-control-center
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
ports:
- "9021:9021"
environment:
PORT: 9021
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'http://connect:8083'
CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT: '/connectors'
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
akhq:
image: tchiotludo/akhq
hostname: akhq
container_name: akhq
depends_on:
- broker
- schema-registry
ports:
- "8080:8080"
environment:
AKHQ_CONFIGURATION: |
akhq:
connections:
docker-kafka-server:
properties:
bootstrap.servers: "broker:29092"
schema-registry:
url: "http://schema-registry:8081"

Step 2 : Spring Boot boilerplate

We will use Spring Initializr to generate a Spring Boot project with the following settings :

  • Maven
  • Java 23
  • Spring 3.4.0

and the following dependencies :

  • Spring for Apache Kafka
  • Spring for Apache Kafka Streams

After generating the project, I manually added two other dependencies :

  • jackson-databind for JsonNode
  • connect-json for JsonSerializer & JsonDeserializer

You should end up with a similarly pom.xml file :

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.4.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.vspiewak</groupId>
<artifactId>kstream-events-router</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kstream-events-router</name>
<description>Kafka Stream Events Router</description>
<properties>
<java.version>23</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.18.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

Step 3 : application.yaml

Our Kafka stream application will consume an input topic and route events to different topics based on the country field of the event :

spring:
application:
name: kstream-events-router
kafka:
bootstrap-servers: localhost:9092
properties:
default.deserialization.exception.handler: org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
consumer:
auto-offset-reset: earliest
topics:
inputTopic: input-events
franceTopic: france-events
spainTopic: spain-events
otherTopic: other-events

Note : We use default.deserialization.exception.handler property to configure a Log & Continue behavior on deserialization exceptions. This is useful for malformed events (a.k.a Poison Pills)

We will also use a TopicProperties record to load our custom properties :

@ConfigurationProperties(prefix = "spring.kafka.topics")
public record TopicProperties(
String inputTopic,
String franceTopic,
String spainTopic,
String otherTopic
) {
}

Note : Use @EnableConfigurationProperties(value={TopicProperties.class}) to configure the custom properties 😉

Step 4 : Insert some events on start

We need to implement a simple KafkaProducer service :

@Service
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topicName, String key, String message) {
kafkaTemplate
.send(topicName, key, message)
.whenComplete((_, ex) -> {
if (ex == null) {
log.info("message sent to topic: {}", message);
} else {
log.error("error sending message: {}", ex.getMessage());
}
});
}
}

Then we can easily insert few events on start using the Spring ApplicationReady Event as follow :

@Component
public class OnStartup {
@Value("${spring.kafka.topics.inputTopic}")
String topicName;
@Autowired
private KafkaProducer kafkaProducer;
@EventListener(ApplicationReadyEvent.class)
public void onStartup() {
payloads.forEach(payload -> {
kafkaProducer.sendMessage(topicName, payload[0], payload[1]);
});
}
public static final List<String[]> payloads = List.of(
new String[]{"poison pill", "poison pill"},
new String[]{
"event-id-1",
"""
{ "country": "france", "message": "Bonjour ! 🇫🇷" }
"""
},
new String[]{
"event-id-2",
"""
{ "country": "spain", "message": "¡Buen día! 🇪🇸" }
"""
},
new String[]{
"event-id-3",
"""
{ "country": "japan", "message": "こんにちは 🇯🇵" }
"""
}
);
}

Step 5 : Implementing TopicNameExtractor<K,V>

We will implement TopicNameExtractor<String,JsonNode> as follow :

public class CountryTopicExtractor implements TopicNameExtractor<String, JsonNode> {
private final TopicProperties topicProperties;
public CountryTopicExtractor(TopicProperties topicProperties) {
this.topicProperties = topicProperties;
}
@Override
public String extract(String key, JsonNode value, RecordContext recordContext) {
var country = value.get("country").asText();
return switch (country) {
case "france" -> topicProperties.franceTopic();
case "spain" -> topicProperties.spainTopic();
default -> topicProperties.otherTopic();
};
}
}

Step 6 : Topology Definition

We can finally implement our Kafka Streams Topology as follow :

Serializer<JsonNode> jsonNodeSerializer = new JsonSerializer();
Deserializer<JsonNode> jsonNodeDeserializer = new JsonDeserializer();
Serde<JsonNode> jsonNodeSerde = Serdes.serdeFrom(jsonNodeSerializer, jsonNodeDeserializer);
builder
.stream(
topicProperties.inputTopic(),
Consumed.with(Serdes.String(), jsonNodeSerde)
.withName("input-source")
)
.to(new CountryTopicExtractor(topicProperties));
return builder.build();

Running the application

You need to start docker services using docker compose up -d and then launch the Spring Boot application using ./mvnw clean spring-boot:run
AKHQ will be available at http://localhost:8080

AKHQ

Messages from the input-events topic have been routed to the others based on their country field value 🎉

Source Code

You can find the complete sources on this Github repository 🫡

Back to Blog

Related Posts

View All Posts »