Streamline Kafka Streams with Spring Boot

Writing Kafka Streams applications doesn't have to be painful. In this blog, Benjamin explains how you can use Spring Boot to simplify your Kafka Streams development workflow. Discover configuration tips, custom annotations for streamlined testing, and how to leverage Spring's magic for smoother Avro integration.

Key takeaway #1

Spring Boot significantly simplifies Kafka Streams development by autoconfiguring a StreamsBuilder with minimal setup in application.yaml.

Key takeaway #2

The StreamsBuilderFactoryBeanConfigurer offers a clean way to customize advanced features like exception handling and state listeners.

Key takeaway #3

Creating a @TopologyTest streamlines unit testing by using Spring's test framework to automatically configure a TopologyTestDriver and mock dependencies.

Streamline Kafka Streams with Spring Boot

Writing Kafka Streams applications doesn't have to be painful. In this blog, Benjamin explains how you can use Spring Boot to simplify your Kafka Streams development workflow. Discover configuration tips, custom annotations for streamlined testing, and how to leverage Spring's magic for smoother Avro integration.

Key takeaway #1

Spring Boot significantly simplifies Kafka Streams development by autoconfiguring a StreamsBuilder with minimal setup in application.yaml.

Key takeaway #2

The StreamsBuilderFactoryBeanConfigurer offers a clean way to customize advanced features like exception handling and state listeners.

Key takeaway #3

Creating a @TopologyTest streamlines unit testing by using Spring's test framework to automatically configure a TopologyTestDriver and mock dependencies.

Streamline Kafka Streams with Spring Boot

Writing Kafka Streams applications doesn't have to be painful. In this blog, Benjamin explains how you can use Spring Boot to simplify your Kafka Streams development workflow. Discover configuration tips, custom annotations for streamlined testing, and how to leverage Spring's magic for smoother Avro integration.

Key takeaway #1

Spring Boot significantly simplifies Kafka Streams development by autoconfiguring a StreamsBuilder with minimal setup in application.yaml.

Key takeaway #2

The StreamsBuilderFactoryBeanConfigurer offers a clean way to customize advanced features like exception handling and state listeners.

Key takeaway #3

Creating a @TopologyTest streamlines unit testing by using Spring's test framework to automatically configure a TopologyTestDriver and mock dependencies.

I still remember the first Kafka Streams application I wrote. Not because it’s precious to me, but because the people who take care of the project keep telling me how much they “enjoy” working on it. It was a Spring Boot application, but I didn’t use the framework, I just worked with it. Since then, I’ve seen this pattern in many projects. But the good news is, it is possible to make it simple and I want to show you how.

For people who want to skip reading and jump right to the code, the source code used in this blog can be found here.

Start.spring.io

These are the only dependencies required to implement a Kafka Stream.

I have also included the web and actuator dependencies in the demo project to enable a long-running application.

Components

KafkaStreamsDefaultConfiguration. DEFAULT_STREAMS_CONFIG_BEAN_NAME

To autoconfigure a StreamsBuilder component inside of the Spring Boot context, we need to tell Spring Boot how to connect to kafka, and that’s done through the KafkaStreamsConfiguration bean alongside the KafkaProperties.

Let’s start with the application yaml:

spring:
 kafka:
   bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER}
   streams:
     application-id: kafka-streams-demo
     auto.offset.reset: earliest
     properties:
       processing.guarantee: exactly_once_v2
   properties:
     # kafka auth
     sasl.mechanism: PLAIN
     sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username='${KAFKA_USERNAME}' password='${KAFKA_PASSWORD}';
     security.protocol: SASL_SSL

KafkaProperties maps to the spring.kafka prefix, and when building the stream properties, it will join the values in spring.kafka.streams with spring.kafka.properties into a map. That map can then be used to instantiate the KafkaStreamsConfiguration.

@Bean(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfig(
      KafkaProperties kafkaProperties,
      ObjectProvider<SslBundles> sslBundles) {
  var properties = new HashMap<String, Object>();
  properties.putAll(kafkaProperties.buildStreamsProperties(sslBundles. getIfAvailable()));
  return new KafkaStreamsConfiguration(properties);
}

With just this configuration, we can start building our stream:

@Component
public class StreamTopology {
  @Autowired
  public void configure(StreamsBuilder builder) {
      builder.stream("input")
         // and so on
  }
 
}

StreamsBuilderFactoryBeanConfigurer

The StreamsBuilderFactoryBeanConfigurer component in Spring Boot is used to customize the configuration of StreamsBuilderFactoryBean, which is a key component in Kafka Streams applications.

This component allows you to fine-tune and configure various aspects of your Kafka Streams application, such as setting up custom properties, interceptors, and error handling strategies.

Here are some examples of how to use it:

@Bean
StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer(ApplicationContext ctx) {
  return factoryBean -> {
      // exception handler
      factoryBean.setStreamsUncaughtExceptionHandler(ex -> {
          if(ex instanceof RecoverableException) {
              return REPLACE_THREAD;
          }
          return SHUTDOWN_CLIENT;
      });
     
      // state listener
      factoryBean.setStateListener((oldState, newState) -> {
          if(newState == ERROR) {
              SpringApplication.exit(ctx);
              System.exit(1);
          }
      });
  };
}

Avro Serializer/deserializer (Serde)

Projects often require governance of data or compact and efficient data serialization. That’s when serialization systems such as Apache Avro come in handy. Setting up Serdes for this can be facilitated by using the KafkaProperties supplied by Spring Boot.

private final KafkaProperties kafkaProperties;
private final ObjectProvider<SslBundles> sslBundles;

@Override
public GenericAvroSerde genericAvroSerde(boolean isKey) {
  var serde = new GenericAvroSerde();
  serde.configure(kafkaProperties.buildStreamsProperties(sslBundles.getIfAvailable()), isKey);
  return serde;
}

The application’s YAML file needs connection information to use the Avro Serdes with the KafkaProperties:

spring:
 kafka:
   properties:
     # schema registry
     schema.registry.url: ${SCHEMA_REGISTRY_URL}
     schema.registry.basic.auth.credentials.source: USER_INFO
     schema.registry.basic.auth.user.info: "${SCHEMA_REGISTRY_USERNAME}:${SCHEMA_REGISTRY_PASSWORD}"

@TopologyTest

The kafka-streams-test-utils library makes testing Kafka Streams simple and fast with the TopologyTestDriver component.

To avoid repeating boilerplate code and mocking components, we can create a custom annotation that sets up a SpringBootTest that includes all the kafka components.

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@ExtendWith(SpringExtension.class)
@TypeExcludeFilters(TopologyTypeExcludeFilter.class)
@BootstrapWith(TopologyTestContextBootstrapper.class)
@Import(value = {
  TopologyTestDriverConfiguration.class, MockAvroSerdeFactory.class })
@ActiveProfiles(value = { "test" })
public @interface TopologyTest {
  ComponentScan.Filter[] includeFilters() default {};
 
  ComponentScan.Filter[] excludeFilters() default {};
}

TopologyTypeExcludeFilter

By extending StandardAnnotationCustomizableTypeExcludeFilter, we are able to use ComponentScan.Filter to define what to include or exclude to the Spring context when applying the TopologyTest annotation in a test class.

This way, it’s possible to define what package to include when setting up the SpringBootTest. For this demo project, we are only interested in autowiring a subset of the packages:

~@TopologyTest(
      includeFilters = {
              @ComponentScan.Filter(type = REGEX, pattern = { "eu.cymo.kafka_streams_demo.adapter.kafka.*" })})

By defining a custom TypeExcludeFilter, a fixed set of classes can be excluded from the spring context as well. For example, we could avoid the AvroSerdeFactory, which would try to connect with an actual SchemaRegistryClient.

TopologyTestDriverConfiguration

A StreamsBuilder is needed for the test context, so it is created here. Then the TopologyTestDriver is defined as a bean, so that it can be used in the unit test.

After the setup is done, a test can be something like this:

@TopologyTest(
      includeFilters = {
              @ComponentScan.Filter(type = REGEX, pattern = { "eu.cymo.kafka_streams_demo.adapter.kafka.*" })})
class StreamTopologyTest {
  @Autowired
  private AvroSerdeFactory avroSerdes;
 
  @Autowired
  private TopologyTestDriver driver;

  @Test
  void verifySomeFeature() { }
}

Work with the framework

Spring Boot is a very impressive framework that can handle a lot of complex tasks if we use it well. I often advise junior developers to investigate the source code of Spring Boot to demystify how it works. The more we understand the framework and its inner workings, the more we can leverage it. The same applies to Spring for Apache Kafka: reduce the boilerplate so you can focus on the implementation itself.

We value your privacy! We use cookies to enhance your browsing experience and analyse our traffic.
By clicking "Accept All", you consent to our use of cookies.
Dark blue outline of a cookie icon.