Spring boot kafka stream example

By using our site, you acknowledge that you have read and understand our Cookie PolicyPrivacy Policyand our Terms of Service. The dark mode beta is finally here.

spring boot kafka stream example

Change your preferences any time. Stack Overflow for Teams is a private, secure spot for you and your coworkers to find and share information. Hey guys I want to work with Kafka Streams real time processing in my spring boot project.

Let me start by saying that if you are new to Kafka streams, adding spring-boot on top of it is adding another level of complexity, and Kafka streams has a big learning curve as is. Here are the basics to get you going: pom:. Now the configuration object.

The code below assumes you are creating two stream apps, and keep in mind that each app represents its own processing topology:. This approach uses KafkaStreams bean calling kafkaStreams. Define KStream bean in you app. As an example, this is a very basic consumer application. It simply consumes data and logs records from the KStream to the standard output.

In this application, we defined a single input binding. Spring will create this binding with a name process-in-0i. You use this binding name to set other properties such as topic name. For example, spring.

Learn more. Asked 1 year, 8 months ago. Active 3 months ago. Viewed 18k times. I did producer and consumer now I want to stream real time. Active Oldest Votes. The code below assumes you are creating two stream apps, and keep in mind that each app represents its own processing topology: import org. ConsumerConfig; lightning power org.

Serdes; import org. StreamsConfig; import org. FailOnInvalidTimestamp; import org. Value; import org. Bean; import org. Configuration; import org. KafkaStreamsDefaultConfiguration; import org.

Use Spring Kafka to access an Event Streams service

StreamsBuilderFactoryBean; import java. HashMap; import java. Slf4j; import org. KeyValue; import org.Based on this configuration, you could also switch your Kafka producer from sending JSON to other serialization methods. This sample application also demonstrates the usage of three Kafka consumers within the same consumer group, so the messages are load-balanced between the three.

Each consumer implements a different deserialization approach. To better understand the configuration, have a look at the diagram below. As you can see, we create a Kafka topic with three partitions.

On the consumer side, there is only one application, but it implements three Kafka consumers with the same group. This is the configuration needed for having them in the same Kafka Consumer Group. When we start the application, Kafka assigns each consumer a different partition.

This consumer group will receive the messages in a load-balanced manner.

Spring Boot - Apache Kafka

The logic we are going to build is simple. Each time we call a given REST endpoint, hellothe app will produce a configurable number of messages and send them to the same topic, using a sequence number as the Kafka key.

It will wait using a CountDownLatch for all messages to be consumed before returning a message, Hello Kafka! There will be three consumers, each using a different deserialization mechanism, that will decrement the latch count when they receive a new message. First, you need to have a running Kafka cluster to connect to. For this application, I will use docker-compose and Kafka running in a single node. This is clearly far from being a production configuration, but it is good enough for the goal of this post.

Note that I configured Kafka to not create topics automatically. We will create our topic from the Spring Boot application since we want to pass some custom configuration anyway. If you want to play around with these Docker images e. The easiest way to get a skeleton for our app is to navigate to start. Then, download the zip file and use your favorite IDE to load the sources.

You may need to rename the application. These are the configuration values we are going to use for this sample application:. The second block is application-specific. This is the Java class that we will use as Kafka message. To keep the application simple, we will add the configuration in the main Spring Boot class.

Eventually, we want to include here both producer and consumer configuration, and use three different variations for deserialization. Remember that you can find the complete source code in the GitHub repository.

There are a few basic Serializers available in the core Kafka library javadoc for Strings, all kind of number classes and byte arrays, plus the JSON ones provided by Spring Kafka javadoc. That gives you a lot of flexibility to optimize the amount of data traveling through Kafka, in case you need to do so. But you have to consider two main advantages of doing this:. This is the first implementation of the controller, containing only the logic producing the messages.

In the constructor, we pass some configuration parameters and the KafkaTemplate that we customized to send String keys and JSON values.In the following tutorial we demonstrate how to configure Spring Kafka with Spring Boot. Spring Boot uses sensible default to configure Spring Kafka. We can override these defaults using the application. Previously we saw how to create a spring kafka consumer and producer which manually configures the Producer and Consumer.

To download and install Apache Kafka, please read the official documentation here. This tutorial assumes that server is started using the default configuration and no server ports are changed. We use Apache Maven to manage our project dependencies.

Make sure the following dependencies reside on the class-path. Spring Boot automatically configures and initializes a KafkaTemplate based on the properties configured in the application. By using the Service annotation we make the Sender class eligible for the spring container to do auto discovery. You can optionally configure these beans using the application. By annotating a method with KafkaListener annotation Spring Kafka will automatically create a message listener container.

Spring Boot tries to automatically configure your application with sensible defaults based on the specified dependencies inside your pom.

These values can be overridden using the application. You can find more information about Spring Boot Kafka Properties. We also create a application. These properties are injected in the configuration classes by spring boot. Finally, we wrote a simple Spring Boot application to demonstrate the application.

In order for this demo to work, we need a Kafka Server running on localhost on portwhich is the default configuration of Kafka. November 7, May 13, October 4, Discover more articles. Download it — spring-kafka-spring-boot-configuration-example.

Notify of.Spring Boot auto-configuration attempts to automatically configure your Spring application based on the JAR dependencies that have been added. In other words, if the spring-kafka If you want to learn more about Spring Kafka - head on over to the Spring Kafka tutorials page.

The project is built using Maven. The SpringKafkaApplication remains unchanged. What is important to note is that in order for the auto-configuration to work we need to opt-in by adding the EnableAutoConfiguration or SpringBootApplication which is same as adding Configuration EnableAutoConfiguration ComponentScan annotation to one of our Configuration classes.

You should only ever add one EnableAutoConfiguration annotation. It is recommended to add it to your primary Configuration class. The only things left to do are auto-wiring the KafkaTemplate and using it in the send method. By annotating the Sender class with ComponentSpring will instantiate this class as a bean that we will use in our test case. In order for this to work, we also need the EnableAutoConfiguration which was indirectly specified on SpringKafkaApplication by using the SpringBootApplication annotation.

The KafkaListener annotation creates a message listener container for the annotated receive method. For the ReceiverSpring Boot takes care of most of the configuration. There are however two properties that need to be explicitly set in the application. In order to verify that our code works, a basic SpringKafkaApplicationTest test case is used. It contains a testReceiver unit test case that uses the Sender to send a message to the 'boot. We then use the CountDownLatch from the Receiver to verify that a message was successfully received.

Note that we have added a dedicated application. Maven will then download the dependencies, compile the code and run the unit test case during which following logs should be generated:. If you would like to run the above code sample you can get the full source code here. Hopefully, this example will kick-start your Spring Kafka development. Feel free to leave a comment in case something was not clear or just to let me know if everything worked.

Spring Cloud Stream With Apache Kafka Binder - Example - JavaTechie

SpringApplication ; import org. Logger ; import org. LoggerFactory ; import org. Autowired ; import org. KafkaTemplate ; import org. CountDownLatch ; import org. ConsumerRecord ; import org.

KafkaListener ; import org. TimeUnit ; import org. ClassRule ; import org. Test ; import org. RunWith ; import org. SpringBootTest ; import org.By using our site, you acknowledge that you have read and understand our Cookie PolicyPrivacy Policyand our Terms of Service. The dark mode beta is finally here. Change your preferences any time. Stack Overflow for Teams is a private, secure spot for you and your coworkers to find and share information.

With plain Kafka topics, everything is working fine, but I unable to get working Spring Kafka Streams. I want to create a stream based on post. To apply a simple transformation and to send the messages from this stream to test streamingTopic2 topic.

Right now when I send the message into post. To post. Based on Caused by: org. SerializationException: Can't deserialize data [[, 34, You could get around your error by using new StringDeserializer or even do no conversion at all with ByteArrayDeserializer in your consumerFactorybut then you'll still need to handle how to later parse that event into a object that you want to manipulate and extract fields from.

Learn more. Asked 1 year, 7 months ago. Active 9 months ago. Viewed 2k times. If needed, please seek past the record to continue consumption. Caused by: org. SerializationException: Can't deserialize data [[, 34, 34, 58, 34, 53, 98, 56, 49, 53, 99, 97, 51, 52,97, 48, 52, 55, 97, 52, 48, 48,52, 50, 97, 34, 44, 34, 97,34, 58, 34, 83, 69, 78, 84, 34, 44, 34, 97,80,73,34, 58, 34, 48, 53, 54, 97, 57, 51, 49,45, 56, 97, 53,45, 52,52, 52, 45, 97,50, 48, 45, 53, 99, 51, 53, 52, 56, 57, 52, 98, 97, 53, 49, 34, 44, 34, 99,97,78]] from topic [streamingTopic2] Caused by: com.

Please suggest how to make it work. According to the stacktrace, you've set JsonDeserializer somewhere, but that isn't in the code you've shown Thanks, could you please show the example of how to properly use map for this case?By using our site, you acknowledge that you have read and understand our Cookie PolicyPrivacy Policyand our Terms of Service.

Stack Overflow for Teams is a private, secure spot for you and your coworkers to find and share information. Hey guys I want to work with Kafka Streams real time processing in my spring boot project. Let me start by saying that if you are new to Kafka streams, adding spring-boot on top of it is adding another level of complexity, and Kafka streams has a big learning curve as is. Here are the basics to get you going: pom:. Now the configuration object.

The code below assumes you are creating two stream apps, and keep in mind that each app represents its own processing topology:. This approach uses KafkaStreams bean calling kafkaStreams. Define KStream bean in you app. As an example, this is a very basic consumer application. It simply consumes data and logs records from the KStream to the standard output. In this application, we defined a single input binding.

Spring will create this binding with a name process-in-0i. You use this binding name to set other properties such as topic name. For example, spring. How are we doing? Please help us improve Stack Overflow. Take our short survey. Learn more. Asked 1 year, 8 months ago. Active 3 months ago. Viewed 18k times. I did producer and consumer now I want to stream real time.

Active Oldest Votes. The code below assumes you are creating two stream apps, and keep in mind that each app represents its own processing topology: import org. ConsumerConfig; import org. Serdes; import org. StreamsConfig; import org. FailOnInvalidTimestamp; import org.

spring boot kafka stream example

Value; import org. Bean; import org. Configuration; import org. KafkaStreamsDefaultConfiguration; import org. StreamsBuilderFactoryBean; import java.If you want to learn more about Spring Kafka - head on over to the Spring Kafka tutorials page. The Spring for Apache Kafka spring-kafka project applies core Spring concepts to the development of Kafka-based messaging solutions. Spring Kafka is a Spring main project. It is developed and maintained by Pivotal Software. We will build a sender to produce the message and a receiver to consume the message.

We build and run our example using Maven.

spring boot kafka stream example

If not already done, download and install Apache Maven. Make sure to select Kafka as a dependency. Click Generate Project to generate and download the Spring Boot project template.

To avoid having to manage the version compatibility of the different Spring dependencies, we will inherit the defaults from the spring-boot-starter-parent parent POM. The generated project contains Spring Boot Starters that manage the different Spring dependencies. The spring-boot-starter dependency is the core starter, it includes auto-configuration, logging, and YAML support.

The spring-boot-starter-test includes the dependencies for testing Spring Boot applications with libraries that include JUnitHamcrest and Mockito. A dependency on spring-kafka is added. We also include spring-kafka-test to have access to an embedded Kafka broker when running our unit test.

Note that the version of Spring Kafka is linked to the version of the Apache Kafka client that is used. You need to align the version of Spring Kafka to the version of the Kafka broker you connect to. For more information consult the complete Kafka client compatibility list. This is a convenient way to execute and transport code. Also, the plugin allows you to start the example via a Maven command. Start by creating a SpringKafkaApplication class. For more information on Spring Boot, check the Spring Boot getting started guide.

The below sections will detail how to create a sender and receiver together with their respective configurations. It is also possible to have Spring Boot autoconfigure Spring Kafka using default values so that actual code that needs to be written is reduced to a bare minimum. If you would like to send more complex objects you could, for example, use an Avro Kafka serializer or the Kafka Jsonserializer that ships with Spring Kafka.

How to Work with Apache Kafka in Your Spring Boot Application

We also create an application. Properties from this file will be injected by Spring Boot into our configuration beans using the Value annotation. For sending messages we will be using the KafkaTemplate which wraps a Producer and provides convenience methods to send data to Kafka topics.


thoughts on “Spring boot kafka stream example”

Leave a Reply

Your email address will not be published. Required fields are marked *