· Tutorial · 4 min read
Routing events dynamically using Kafka Streams
Implementing an Event Router pattern with Spring Boot, Kafka Streams & TopicNameExtractor 🚰

In a previous blog post, we saw how to use CDC pattern to turn a SQL database into a stream of events.
Today we will see how to route events from one Kafka topic to multiple topics using Kafka Streams and Spring Boot.
This Event Router pattern might be particularly useful when an external system publish into a single Kafka topic.
Step 1 : Docker services
We will start our journey by running a Confluent stack on Docker Compose with :
- Zookeeper
- Confluent Server (a.k.a Kafka)
- Schema Registry
- Confluent Control Center
- AKHQ
services:
zookeeper: image: confluentinc/cp-zookeeper hostname: zookeeper container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000
broker: image: confluentinc/cp-server hostname: broker container_name: broker depends_on: - zookeeper ports: - "9092:9092" - "9101:9101" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1 KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_JMX_PORT: 9101 KAFKA_JMX_HOSTNAME: localhost KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081 CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092 CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1 CONFLUENT_METRICS_ENABLE: 'true' CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
schema-registry: image: confluentinc/cp-schema-registry hostname: schema-registry container_name: schema-registry depends_on: - broker ports: - "8081:8081" environment: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092' SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
control-center: image: confluentinc/cp-enterprise-control-center hostname: control-center container_name: control-center depends_on: - broker - schema-registry ports: - "9021:9021" environment: PORT: 9021 CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092' CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'http://connect:8083' CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT: '/connectors' CONTROL_CENTER_REPLICATION_FACTOR: 1 CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1 CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 CONFLUENT_METRICS_TOPIC_REPLICATION: 1
akhq: image: tchiotludo/akhq hostname: akhq container_name: akhq depends_on: - broker - schema-registry ports: - "8080:8080" environment: AKHQ_CONFIGURATION: | akhq: connections: docker-kafka-server: properties: bootstrap.servers: "broker:29092" schema-registry: url: "http://schema-registry:8081"
Step 2 : Spring Boot boilerplate
We will use Spring Initializr to generate a Spring Boot project with the following settings :
- Maven
- Java 23
- Spring 3.4.0
and the following dependencies :
- Spring for Apache Kafka
- Spring for Apache Kafka Streams
After generating the project, I manually added two other dependencies :
jackson-databind
for JsonNodeconnect-json
for JsonSerializer & JsonDeserializer
You should end up with a similarly pom.xml
file :
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.4.0</version> <relativePath/> <!-- lookup parent from repository --> </parent>
<groupId>com.vspiewak</groupId> <artifactId>kstream-events-router</artifactId> <version>0.0.1-SNAPSHOT</version> <name>kstream-events-router</name> <description>Kafka Stream Events Router</description>
<properties> <java.version>23</java.version> </properties>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>connect-json</artifactId> <version>3.9.0</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.18.1</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> </dependencies>
<build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
</project>
Step 3 : application.yaml
Our Kafka stream application will consume an input topic and route events to different topics based on the country field of the event :
spring: application: name: kstream-events-router
kafka: bootstrap-servers: localhost:9092 properties: default.deserialization.exception.handler: org.apache.kafka.streams.errors.LogAndContinueExceptionHandler consumer: auto-offset-reset: earliest topics: inputTopic: input-events franceTopic: france-events spainTopic: spain-events otherTopic: other-events
Note : We use default.deserialization.exception.handler
property to configure a Log & Continue behavior on deserialization exceptions. This is useful for malformed events (a.k.a Poison Pills)
We will also use a TopicProperties
record to load our custom properties :
@ConfigurationProperties(prefix = "spring.kafka.topics")public record TopicProperties( String inputTopic, String franceTopic, String spainTopic, String otherTopic) {}
Note : Use @EnableConfigurationProperties(value={TopicProperties.class})
to configure the custom properties 😉
Step 4 : Insert some events on start
We need to implement a simple KafkaProducer
service :
@Servicepublic class KafkaProducer {
@Autowired private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topicName, String key, String message) {
kafkaTemplate .send(topicName, key, message) .whenComplete((_, ex) -> { if (ex == null) { log.info("message sent to topic: {}", message); } else { log.error("error sending message: {}", ex.getMessage()); } });
}
}
Then we can easily insert few events on start using the Spring ApplicationReady Event as follow :
@Componentpublic class OnStartup {
@Value("${spring.kafka.topics.inputTopic}") String topicName;
@Autowired private KafkaProducer kafkaProducer;
@EventListener(ApplicationReadyEvent.class) public void onStartup() {
payloads.forEach(payload -> { kafkaProducer.sendMessage(topicName, payload[0], payload[1]); });
}
public static final List<String[]> payloads = List.of( new String[]{"poison pill", "poison pill"}, new String[]{ "event-id-1", """ { "country": "france", "message": "Bonjour ! 🇫🇷" } """ }, new String[]{ "event-id-2", """ { "country": "spain", "message": "¡Buen día! 🇪🇸" } """ }, new String[]{ "event-id-3", """ { "country": "japan", "message": "こんにちは 🇯🇵" } """ } );}
Step 5 : Implementing TopicNameExtractor<K,V>
We will implement TopicNameExtractor<String,JsonNode>
as follow :
public class CountryTopicExtractor implements TopicNameExtractor<String, JsonNode> {
private final TopicProperties topicProperties;
public CountryTopicExtractor(TopicProperties topicProperties) { this.topicProperties = topicProperties; }
@Override public String extract(String key, JsonNode value, RecordContext recordContext) {
var country = value.get("country").asText();
return switch (country) { case "france" -> topicProperties.franceTopic(); case "spain" -> topicProperties.spainTopic(); default -> topicProperties.otherTopic(); };
}}
Step 6 : Topology Definition
We can finally implement our Kafka Streams Topology as follow :
Serializer<JsonNode> jsonNodeSerializer = new JsonSerializer();Deserializer<JsonNode> jsonNodeDeserializer = new JsonDeserializer();Serde<JsonNode> jsonNodeSerde = Serdes.serdeFrom(jsonNodeSerializer, jsonNodeDeserializer);
builder .stream( topicProperties.inputTopic(), Consumed.with(Serdes.String(), jsonNodeSerde) .withName("input-source") ) .to(new CountryTopicExtractor(topicProperties));
return builder.build();
Running the application
You need to start docker services using docker compose up -d
and then launch the Spring Boot application using ./mvnw clean spring-boot:run
AKHQ will be available at http://localhost:8080
Messages from the input-events
topic have been routed to the others based on their country
field value 🎉
Source Code
You can find the complete sources on this Github repository 🫡