TL;DR: Have you ever wondered how features like Google Maps' live traffic work? These systems have to gather and process data in real-time. The architecture of these systems generally involves a data pipeline that processes and transfers data to be processed further until it reaches the clients. In this article, we will see something similar with a simple example using Kafka Streams. The sample app can be found here.
Introduction to Spring Cloud Stream
Spring Cloud Stream is a framework designed to support stream processing provided by various messaging systems like Apache Kafka, RabbitMQ, etc. The framework allows you to create processing logic without having to deal with any specific platform. It helps you build highly scalable event-driven microservices connected using these messaging systems.
The framework provides a flexible programming model built on already established and familiar Spring idioms and best practices. The way it works is simple; you have to provide implementations (called Binder implementations)for the messaging system that you are using. Spring cloud stream supports:
And a few others. The links above will take you to the binder implementations. In this article, we will look into a simple application that uses Kafka Streams as a stream processor listening to events on a topic, processing the data, and publishing it to the outgoing topic.
Introduction to Apache Kafka
Apache Kafka is a distributed publish-subscribe messaging system. It is a system that publishes and subscribes to a stream of records, similar to a message queue. Kafka is suitable for both offline and online message consumption. It is fault-tolerant, robust, and has a high throughput. Kafka is run as a cluster on one or more servers that can span multiple data centers. The Kafka cluster stores stream of records in categories called topics. Each record consists of a key, a value, and a timestamp. For more information on topics, Producer API, Consumer API, and event streaming, please visit this link.
Introduction to Kafka Streams
Kafka Streams is a library that can be used to consume data, process it, and produce new data, all in real-time. It works on a continuous, never-ending stream of data. Consider an example of the stock market. The stock prices fluctuate every second, and to be able to provide real-time value to the customer, you would use something like Kafka streams.
Pre-requisites:
- Basic knowledge of Java 11.
- Basic knowledge of Spring Boot.
- A basic understanding of Apache Kafka.
- Docker and Docker Compose for running Kafka locally.
Setting up Spring Boot App
Let us first create a Spring Boot project with the help of the Spring boot Initializr, and then open the project in our favorite IDE. Select Gradle project and Java language. Last but not least, select Spring boot version 2.5.4
. Fill in the project metadata and click generate.
For Spring Cloud, We need to configure Spring Kafka and Kafka Streams in our gradle.build
:
dependencies {
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.apache.kafka:kafka-streams'
implementation 'org.springframework.kafka:spring-kafka'
}
Let's setup the config for Kafka. We need to define a few parameters on how we want to serialize and deserialize the data. The config is easy to set up and understand. Since our application will be listening to a topic and producing the output to a different topic, our application is a producer and a consumer both. So, we need to define config for both producer and consumer.
In the application.yml file, we need to add these entries.
kafka:
bootstrap-servers: localhost:9092
properties:
schema.registry.url: http://localhost:8081
producer:
client-id: ${spring.application.name}
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
client-id: ${spring.application.name}
group-id: ${spring.application.name}-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
streams:
client-id: ${spring.application.name}-stream
application-id: ${spring.application.name}
properties:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
If you look at the config carefully, we are setting up serializers and de-serializers for the producer, the consumer, and the streams (serde is just short for serializer-deserializer). This is the only setup we need for the Spring boot project.
Let's jump into creating the producer, the consumer, and the stream processor. I have taken a simple example here. We are producing random numbers every 2 seconds using a scheduler.
@Component
@AllArgsConstructor
public class NumberProducer {
NumberPublisher numberPublisher;
@Scheduled(fixedRate = 2000)
public void produceIntStream() {
Random random = new Random();
numberPublisher.produce(random.nextInt(1000));
}
}
The number publisher is the actual publisher that puts the data on a topic. We set a key for the message and the data (which is a random number in our case).
@Slf4j
@Component
@AllArgsConstructor
public class NumberPublisher {
private final KafkaTemplate<String, String> kafkaTemplate;
public void produce(Integer randomNumber) {
String s = "Odd";
if (randomNumber % 2 == 0) s = "Even";
System.out.println("Produced number: " + randomNumber);
kafkaTemplate.send(INPUT_TOPIC_NAME, s, String.valueOf(randomNumber));
}
}
The key is defined as a String, which is either even or odd based on the number. We use the Kafka template to send the message; this comes from the spring-kafka library. It abstracts out the logic for publishing and consuming the messages.
Next up, we set up our stream processor that listens to the topic on which the publisher is putting the messages. This is where it gets interesting. We listen to the INPUT_TOPIC
and then process the data. In this case, the job of the stream processor is to filter out the odd numbers and only send the even numbers on the OUTPUT_TOPIC
.
@Configuration
@EnableKafkaStreams
public class KafkaStream {
public static final String OUTPUT_TOPIC_NAME = "even-number-topic";
public static final String INPUT_TOPIC_NAME = "number-topic";
@Bean
public KStream<String, String> evenNumbersStream(StreamsBuilder kStreamBuilder) {
KStream<String, String> input = kStreamBuilder.stream(INPUT_TOPIC_NAME);
KStream<String, String> output = input.filter((key, value) -> key.equals("Even"));
output.to(OUTPUT_TOPIC_NAME);
return output;
}
}
You might be wondering about that KStream in the return type of our method. I will give a brief overview here as it is outside the scope of this article.
KStream -> A Kafka stream that is append-only. When you provide data with the same key, it will not update the previous record. It provides several operations that are very useful for data processing, like a filter, map, partition, flatMap, etc. You can read more about KStreams here.
Finally, when we have processed the data, we put it on an OUTGOING_TOPIC
. For the sake of simplicity and completion, I am listening to that topic in our application. This generally will not be the case, as there would be another application that would be consuming from that topic and hence the name OUTGOING_TOPIC
.
@Component
@EnableKafka
public class EvenNumberConsumer {
@KafkaListener(topics = OUTPUT_TOPIC_NAME)
public void receive(String value) {
System.out.println("Received number: " + value);
}
}
The application code is complete. Let's set up Kafka locally.
Setting up Kafka Locally
Setting up Kafka is easy, but it requires some dependency to run, you just need to use the docker-compose file below, and it will start the Kafka server locally. Add the docker compose.yml
to the repository's root directory. Start the required dependency using: docker-compose up
.
version: '3.7'
services:
kafka:
image: confluentinc/cp-kafka:5.5.0
container_name: kafka
hostname: kafka
restart: always
environment:
KAFKA_LISTENERS: LISTENER_DOCKER_INTERNAL://:29092,LISTENER_DOCKER_EXTERNAL://:9092
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka:29092,LISTENER_DOCKER_EXTERNAL://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- 9092:9092
- 9999:9999
depends_on:
- zookeeper
zookeeper:
container_name: zookeeper
hostname: zookeeper
image: confluentinc/cp-zookeeper:5.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- 2181:2181
schema-registry:
image: confluentinc/cp-schema-registry:5.5.0
hostname: schema-registry
container_name: schema-registry
restart: always
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "kafka:29092"
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081"
ports:
- 8081:8081
depends_on:
- zookeeper
That's it!
Verify Sending, Processing, and Receiving of Events
Run the Spring Boot app:
- Clone the sample code from the repo. Go to the root directory.
- Use the Gradle plugin to run your Spring Boot app using the command in the project directory.
./gradlew bootRun
Just run the application. You should see logs like this.
Received number: 910
Received number: 320
Received number: 16
Received number: 526
Received number: 76
Received number: 936
Received number: 642
Produced number: 510
Received number: 510
Produced number: 996
Received number: 996
Produced number: 897
Conclusion
Spring Cloud Stream provides a simple and convenient way to create apps that can process streams and publish data to different topics. You can build micro-services that talk to each other using Kafka messages and process data like you would process in a single application.
In this article, we have learned how to build a Spring Cloud Stream app that uses Kafka Streams. We saw how Spring Cloud Stream provides an easy way to set up and run an application that can consumer, process, and publish messages to Kafka topics without the hassle of configuring each. With such little code, we could do so much.
You can refer to the repository used in the article on Github.