Tổng quan

Đối với kafka thì Spring không cần phải làm thêm gì nhiều ngoài việc wrap lại các lớp của thư viện kafka clients, còn lại việc giao tiếp với kafka sẽ do thư viện này lo. Mặc dù đã cố gắng đơn giản hoá việc cấu hình cho Dev nhưng có vẻ mọi thứ vẫn tương đối phức tạp.

Cài đặt

Để có thể sử dụng spring kafka, chúng ta sẽ cần thêm dependecies vào dự án, ví dụ với gradle:

 implementation 'org.springframework.boot:spring-boot-starter:' + springBootVersion
 implementation 'org.springframework.kafka:spring-kafka:' + springKafkaVersion

Ví dụ

Giả sử chúng ta cần định kỳ gửi (publish) và tiêu thụ (Consume) (Subscribe) thông qua Kafka Greet message kiểu này:

package com.example.spring_kafka.message;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Greeting {
    private String message;
    private String name;
}

Việc đầu tiên chúng ta cần làm đó cấu hình kafka trong application.properties kiểu thế này:

kafka.bootstrap_address=localhost:9092
kafka.topic.greeting.name=greeting

Tiếp theo chúng ta cần tạo ra lớp GreetingMessageProducer để gửi message:

package com.example.spring_kafka.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import com.example.spring_kafka.message.Greeting;

@Component
public class GreetingMessageProducer {

    @Autowired
    private KafkaTemplate<String, Greeting> greetingKafkaTemplate;

    @Value("${kafka.topic.greeting.name}")
    private String greetingTopicName;

    public void sendGreetingMessage(Greeting greeting) {
        greetingKafkaTemplate.send(greetingTopicName, greeting);
    }
}

Với cấu hình thế này:

package com.example.spring_kafka.config;

import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.MAX_REQUEST_SIZE_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import com.example.spring_kafka.message.Greeting;

@Configuration
public class KafkaProducerConfig {

    @Value("${kafka.bootstrap_address}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        final Map<String, Object> configProps = new HashMap<>();
        configProps.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(MAX_REQUEST_SIZE_CONFIG, "20971520");
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ProducerFactory<String, Greeting> greetingProducerFactory() {
        final Map<String, Object> configProps = new HashMap<>();
        configProps.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
        return new KafkaTemplate<>(greetingProducerFactory());
    }
}

Tiếp theo là lớp KafkaConsumerConfig để tiêu thụ message:

package com.example.spring_kafka.consumer;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import com.example.spring_kafka.message.Greeting;

@Component
public class GreetingMessageConsumer {

    @KafkaListener(
        topics = "${kafka.topic.greeting.name}",
        containerFactory = "greetingKafkaListenerContainerFactory"
    )
    public void greetingListener(Greeting greeting) {
        System.out.println("Received greeting message: " + greeting);
    }
}

Và cuối cùng là lớp startup để khởi chạy chương trình:

package com.example.spring_kafka;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;

import com.example.spring_kafka.message.Greeting;
import com.example.spring_kafka.producer.GreetingMessageProducer;

@SpringBootApplication
public class SpringKafkaExample {

    public static void main(String[] args) throws Exception {

        final ConfigurableApplicationContext context =
            SpringApplication.run(SpringKafkaExample.class, args);

        final GreetingMessageProducer producer = context.getBean(GreetingMessageProducer.class);
        final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
        scheduler.scheduleAtFixedRate(() ->
                producer.sendGreetingMessage(
                    new Greeting("Greetings", "World!")
                ), 0, 1, TimeUnit.SECONDS
        );
    }
}

Sau khi đã start kafka và chạy chương trình chúng ta sẽ thấy in ra logs:

Received greeting message: Greeting(message=Greetings, name=World!)
Received greeting message: Greeting(message=Greetings, name=World!)
Received greeting message: Greeting(message=Greetings, name=World!)