Kafka
作为一个优秀的消息中间件一直被大量开发者使用,本文以Spring Boot 2.1.6.RELEASE
为例。
PS:此文是已在服务端安装好Kafka
的前提下进行的。(请自行查找怎么安装Kafka
及创建Topic
等)
引入Maven依赖
1 2 3 4
| <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
|
配置文件配置Kafka
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
|
kafka.bootstrap.servers=xxxxxxx:9092
kafka.producer.retries=2
kafka.producer.batch.size=16384 kafka.producer.buffer.memory=33554432 kafka.producer.linger=1 kafka.producer.acks=all
producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
kafka.consumer.group.id=dev-consumer-group
kafka.consumer.auto.offset.reset=earliest kafka.consumer.enable.auto.commit=true kafka.consumer.session.timeout=60000 kafka.consumer.auto.commit.interval=1000
kafka.consumer.concurrency=2
kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
|
其中kafka.bootstrap.servers
是Kafka
的地址,如果是服务器地址,要确保打开对应端口的外网访问,如果多个可以用逗号隔开
Kafka生产者的配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| @Configuration @EnableKafka public class KafkaProducerConfig {
@Value("${kafka.bootstrap.servers}") private String servers; @Value("${kafka.producer.retries}") private int retries; @Value("${kafka.producer.batch.size}") private int batchSize; @Value("${kafka.producer.linger}") private int linger; @Value("${kafka.producer.buffer.memory}") private int bufferMemory; @Value("${kafka.producer.acks}") private String acks;
public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, linger); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.ACKS_CONFIG, acks); return props; }
public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); }
@Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); }
}
|
Kafka消费者的配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| @Configuration @EnableKafka public class KafkaConsumerConfig {
@Value("${kafka.bootstrap.servers}") private String servers; @Value("${kafka.consumer.enable.auto.commit}") private boolean enableAutoCommit; @Value("${kafka.consumer.session.timeout}") private String sessionTimeout; @Value("${kafka.consumer.auto.commit.interval}") private String autoCommitInterval; @Value("${kafka.consumer.group.id}") private String groupId; @Value("${kafka.consumer.auto.offset.reset}") private String autoOffsetReset; @Value("${kafka.consumer.concurrency}") private int concurrency;
@Bean(name = "kafkaListenerContainerFactory") public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(concurrency); factory.getContainerProperties().setPollTimeout(1500); return factory; }
public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); }
public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId + HostUtil.getLocalHost().getHostAddress().replace(".","")); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); return propsMap; }
}
|
配置都完成了,先往Topic
里发送几条消息:
PS:这里的KafkaMessage
是自己创建的类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @Resource private KafkaTemplate<String, String> kafkaTemplate;
private Gson gson = new GsonBuilder().create();
public void send() { KafkaMessage message = new KafkaMessage(); message.setId(System.currentTimeMillis()); message.setMsg(UUID.randomUUID().toString()); message.setSendTime(new Date()); log.info("++ message = {}", gson.toJson(message)); ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send("test", gson.toJson(message)); sendCallBack(listenableFuture); }
private void sendCallBack(ListenableFuture> listenableFuture) { try { SendResult sendResult = listenableFuture.get(3, TimeUnit.SECONDS); listenableFuture.addCallback( successCallBack -> log.info("kafka Producer发送消息成功!topic=" + sendResult.getRecordMetadata().topic() \+ ",partition=" + sendResult.getRecordMetadata().partition() \+ ",offset=" + sendResult.getRecordMetadata().offset()), failureCallBack -> log.error("kafka Producer发送消息失败!sendResult=" + gson.toJson(sendResult.getProducerRecord())));
} catch (Exception e) { log.error("获取producer返回值失败", e); } }
|
发送完成了,还要有消费方消费消息:
1 2 3 4 5 6 7 8 9 10 11 12 13
| @KafkaListener(topics = {"test"}, containerFactory = "kafkaListenerContainerFactory") public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
log.info("----------------- record =" + record); log.info("------------------ message =" + message); } }
|
这样就可以接收test
这个topic
的消息了。
注意这里的Topic
test
是已经在Kafka
里面创建好的,如果没有创建Topic
test
,是发送不到这个Topic
里面的,怎么创建Topic
这里不再多做介绍。