In a previous blog post , we saw how to use CDC pattern to turn a SQL database into a stream of events.
Today we will see how to route events from one Kafka topic to multiple topics using Kafka Streams and Spring Boot.
This Event Router pattern might be particularly useful when an external system publish into a single Kafka topic.
Step 1 : Docker services We will start our journey by running a Confluent stack on Docker Compose with :
Zookeeper Confluent Server (a.k.a Kafka) Schema Registry Confluent Control Center AKHQ image : confluentinc/cp-zookeeper
container_name : zookeeper
ZOOKEEPER_CLIENT_PORT : 2181
ZOOKEEPER_TICK_TIME : 2000
image : confluentinc/cp-server
KAFKA_ZOOKEEPER_CONNECT : 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP : PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS : PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS : io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR : 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS : 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR : 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR : 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR : 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR : 1
KAFKA_JMX_HOSTNAME : localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL : http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS : broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS : 1
CONFLUENT_METRICS_ENABLE : 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID : 'anonymous'
image : confluentinc/cp-schema-registry
hostname : schema-registry
container_name : schema-registry
SCHEMA_REGISTRY_HOST_NAME : schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS : 'broker:29092'
SCHEMA_REGISTRY_LISTENERS : http://0.0.0.0:8081
image : confluentinc/cp-enterprise-control-center
container_name : control-center
CONTROL_CENTER_BOOTSTRAP_SERVERS : 'broker:29092'
CONTROL_CENTER_SCHEMA_REGISTRY_URL : "http://schema-registry:8081"
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER : 'http://connect:8083'
CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT : '/connectors'
CONTROL_CENTER_REPLICATION_FACTOR : 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS : 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS : 1
CONFLUENT_METRICS_TOPIC_REPLICATION : 1
bootstrap.servers: "broker:29092"
url: "http://schema-registry:8081"
Step 2 : Spring Boot boilerplate We will use Spring Initializr to generate a Spring Boot project with the following settings :
and the following dependencies :
Spring for Apache Kafka Spring for Apache Kafka Streams After generating the project, I manually added two other dependencies :
jackson-databind
for JsonNode connect-json
for JsonSerializer & JsonDeserializerYou should end up with a similarly pom.xml
file :
<? xml version = "1.0" encoding = "UTF-8" ?>
< project xmlns = "http://maven.apache.org/POM/4.0.0" xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" >
< modelVersion >4.0.0</ modelVersion >
< groupId >org.springframework.boot</ groupId >
< artifactId >spring-boot-starter-parent</ artifactId >
< relativePath /> <!-- lookup parent from repository -->
< groupId >com.vspiewak</ groupId >
< artifactId >kstream-events-router</ artifactId >
< version >0.0.1-SNAPSHOT</ version >
< name >kstream-events-router</ name >
< description >Kafka Stream Events Router</ description >
< java.version >23</ java.version >
< groupId >org.springframework.boot</ groupId >
< artifactId >spring-boot-starter</ artifactId >
< groupId >org.apache.kafka</ groupId >
< artifactId >kafka-streams</ artifactId >
< groupId >org.springframework.kafka</ groupId >
< artifactId >spring-kafka</ artifactId >
< groupId >org.apache.kafka</ groupId >
< artifactId >connect-json</ artifactId >
< groupId >com.fasterxml.jackson.core</ groupId >
< artifactId >jackson-databind</ artifactId >
< version >2.18.1</ version >
< groupId >org.springframework.boot</ groupId >
< artifactId >spring-boot-starter-test</ artifactId >
< groupId >org.springframework.kafka</ groupId >
< artifactId >spring-kafka-test</ artifactId >
< groupId >org.springframework.boot</ groupId >
< artifactId >spring-boot-maven-plugin</ artifactId >
Step 3 : application.yaml Our Kafka stream application will consume an input topic and route events to different topics based on the country field of the event :
name : kstream-events-router
bootstrap-servers : localhost:9092
default.deserialization.exception.handler : org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
auto-offset-reset : earliest
franceTopic : france-events
Note : We use default.deserialization.exception.handler
property to configure a Log & Continue behavior on deserialization exceptions. This is useful for malformed events (a.k.a Poison Pills)
We will also use a TopicProperties
record to load our custom properties :
@ ConfigurationProperties ( prefix = "spring.kafka.topics" )
public record TopicProperties (
Note : Use @EnableConfigurationProperties(value={TopicProperties.class})
to configure the custom properties 😉
Step 4 : Insert some events on start We need to implement a simple KafkaProducer
service :
public class KafkaProducer {
private KafkaTemplate< String , String > kafkaTemplate;
public void sendMessage (String topicName , String key , String message ) {
. send (topicName, key, message)
. whenComplete ((_, ex) -> {
log. info ( "message sent to topic: {}" , message);
log. error ( "error sending message: {}" , ex. getMessage ());
Then we can easily insert few events on start using the Spring ApplicationReady Event as follow :
@ Value ( "${spring.kafka.topics.inputTopic}" )
private KafkaProducer kafkaProducer;
@ EventListener (ApplicationReadyEvent.class)
public void onStartup () {
payloads. forEach (payload -> {
kafkaProducer. sendMessage (topicName, payload[ 0 ], payload[ 1 ]);
public static final List< String []> payloads = List. of (
new String []{ "poison pill" , "poison pill" },
{ "country": "france", "message": "Bonjour ! 🇫🇷" }
{ "country": "spain", "message": "¡Buen día! 🇪🇸" }
{ "country": "japan", "message": "こんにちは 🇯🇵" }
We will implement TopicNameExtractor<String,JsonNode>
as follow :
public class CountryTopicExtractor implements TopicNameExtractor < String , JsonNode > {
private final TopicProperties topicProperties;
public CountryTopicExtractor (TopicProperties topicProperties ) {
this .topicProperties = topicProperties;
public String extract (String key , JsonNode value , RecordContext recordContext ) {
var country = value. get ( "country" ). asText ();
return switch (country) {
case "france" -> topicProperties. franceTopic ();
case "spain" -> topicProperties. spainTopic ();
default -> topicProperties. otherTopic ();
Step 6 : Topology Definition We can finally implement our Kafka Streams Topology as follow :
Serializer< JsonNode > jsonNodeSerializer = new JsonSerializer ();
Deserializer< JsonNode > jsonNodeDeserializer = new JsonDeserializer ();
Serde< JsonNode > jsonNodeSerde = Serdes. serdeFrom (jsonNodeSerializer, jsonNodeDeserializer);
topicProperties. inputTopic (),
Consumed. with (Serdes. String (), jsonNodeSerde)
. withName ( "input-source" )
. to ( new CountryTopicExtractor (topicProperties));
Running the application You need to start docker services using docker compose up -d
and then launch the Spring Boot application using ./mvnw clean spring-boot:run
AKHQ will be available at http://localhost:8080
Messages from the input-events
topic have been routed to the others based on their country
field value 🎉
Source Code You can find the complete sources on this Github repository 🫡