Tổng quan
Đối với kafka thì Spring không cần phải làm thêm gì nhiều ngoài việc wrap lại các lớp của thư viện kafka clients, còn lại việc giao tiếp với kafka sẽ do thư viện này lo. Mặc dù đã cố gắng đơn giản hoá việc cấu hình cho Dev nhưng có vẻ mọi thứ vẫn tương đối phức tạp.
Cài đặt
Để có thể sử dụng spring kafka, chúng ta sẽ cần thêm dependecies vào dự án, ví dụ với gradle:
implementation 'org.springframework.boot:spring-boot-starter:' + springBootVersion
implementation 'org.springframework.kafka:spring-kafka:' + springKafkaVersion
Ví dụ
Giả sử chúng ta cần định kỳ gửi (publish) và tiêu thụ (Consume) (Subscribe) thông qua Kafka Greet message kiểu này:
package com.example.spring_kafka.message;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Greeting {
private String message;
private String name;
}
Việc đầu tiên chúng ta cần làm đó cấu hình kafka trong application.properties
kiểu thế này:
kafka.bootstrap_address=localhost:9092
kafka.topic.greeting.name=greeting
Tiếp theo chúng ta cần tạo ra lớp GreetingMessageProducer
để gửi message:
package com.example.spring_kafka.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import com.example.spring_kafka.message.Greeting;
@Component
public class GreetingMessageProducer {
@Autowired
private KafkaTemplate<String, Greeting> greetingKafkaTemplate;
@Value("${kafka.topic.greeting.name}")
private String greetingTopicName;
public void sendGreetingMessage(Greeting greeting) {
greetingKafkaTemplate.send(greetingTopicName, greeting);
}
}
Với cấu hình thế này:
package com.example.spring_kafka.config;
import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.MAX_REQUEST_SIZE_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import com.example.spring_kafka.message.Greeting;
@Configuration
public class KafkaProducerConfig {
@Value("${kafka.bootstrap_address}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, String> producerFactory() {
final Map<String, Object> configProps = new HashMap<>();
configProps.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(MAX_REQUEST_SIZE_CONFIG, "20971520");
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, Greeting> greetingProducerFactory() {
final Map<String, Object> configProps = new HashMap<>();
configProps.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
return new KafkaTemplate<>(greetingProducerFactory());
}
}
Tiếp theo là lớp KafkaConsumerConfig
để tiêu thụ message:
package com.example.spring_kafka.consumer;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import com.example.spring_kafka.message.Greeting;
@Component
public class GreetingMessageConsumer {
@KafkaListener(
topics = "${kafka.topic.greeting.name}",
containerFactory = "greetingKafkaListenerContainerFactory"
)
public void greetingListener(Greeting greeting) {
System.out.println("Received greeting message: " + greeting);
}
}
Và cuối cùng là lớp startup để khởi chạy chương trình:
package com.example.spring_kafka;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import com.example.spring_kafka.message.Greeting;
import com.example.spring_kafka.producer.GreetingMessageProducer;
@SpringBootApplication
public class SpringKafkaExample {
public static void main(String[] args) throws Exception {
final ConfigurableApplicationContext context =
SpringApplication.run(SpringKafkaExample.class, args);
final GreetingMessageProducer producer = context.getBean(GreetingMessageProducer.class);
final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(() ->
producer.sendGreetingMessage(
new Greeting("Greetings", "World!")
), 0, 1, TimeUnit.SECONDS
);
}
}
Sau khi đã start kafka và chạy chương trình chúng ta sẽ thấy in ra logs:
Received greeting message: Greeting(message=Greetings, name=World!)
Received greeting message: Greeting(message=Greetings, name=World!)
Received greeting message: Greeting(message=Greetings, name=World!)