Spring Boot整合Kafka
This commit is contained in:
parent
adc5715072
commit
e1a7c76eff
|
|
@ -0,0 +1,41 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-parent</artifactId>
|
||||
<version>2.1.3.RELEASE</version>
|
||||
<relativePath/> <!-- lookup parent from repository -->
|
||||
</parent>
|
||||
<groupId>com.example</groupId>
|
||||
<artifactId>demo</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<name>demo</name>
|
||||
<description>Demo project for Spring Boot</description>
|
||||
|
||||
<properties>
|
||||
<java.version>1.8</java.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-web</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
||||
|
|
@ -0,0 +1,13 @@
|
|||
package com.example.demo;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
@SpringBootApplication
|
||||
public class KafkaApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(KafkaApplication.class, args);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,69 @@
|
|||
package com.example.demo.config;
|
||||
|
||||
import com.example.demo.domain.Message;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.kafka.annotation.EnableKafka;
|
||||
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.support.serializer.JsonDeserializer;
|
||||
import org.springframework.kafka.support.serializer.JsonSerializer;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author MrBird
|
||||
*/
|
||||
@EnableKafka
|
||||
@Configuration
|
||||
public class KafkaConsumerConfig {
|
||||
|
||||
@Value("${spring.kafka.bootstrap-servers}")
|
||||
private String bootstrapServers;
|
||||
|
||||
@Value("${spring.kafka.consumer.group-id}")
|
||||
private String consumerGroupId;
|
||||
|
||||
@Value("${spring.kafka.consumer.auto-offset-reset}")
|
||||
private String autoOffsetReset;
|
||||
|
||||
@Bean
|
||||
public ConsumerFactory<String, Message> consumerFactory() {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(
|
||||
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
|
||||
bootstrapServers);
|
||||
props.put(
|
||||
ConsumerConfig.GROUP_ID_CONFIG,
|
||||
consumerGroupId);
|
||||
props.put(
|
||||
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
|
||||
autoOffsetReset);
|
||||
// props.put(
|
||||
// ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
|
||||
// StringDeserializer.class);
|
||||
// props.put(
|
||||
// ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
|
||||
// StringDeserializer.class);
|
||||
return new DefaultKafkaConsumerFactory<>(
|
||||
props,
|
||||
new StringDeserializer(),
|
||||
new JsonDeserializer<>(Message.class));
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactory() {
|
||||
ConcurrentKafkaListenerContainerFactory<String, Message> factory
|
||||
= new ConcurrentKafkaListenerContainerFactory<>();
|
||||
factory.setConsumerFactory(consumerFactory());
|
||||
// factory.setRecordFilterStrategy(
|
||||
// r -> r.value().contains("fuck")
|
||||
// );
|
||||
return factory;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,45 @@
|
|||
package com.example.demo.config;
|
||||
|
||||
import com.example.demo.domain.Message;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
import org.springframework.kafka.support.serializer.JsonSerializer;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author MrBird
|
||||
*/
|
||||
@Configuration
|
||||
public class KafkaProducerConfig {
|
||||
|
||||
@Value("${spring.kafka.bootstrap-servers}")
|
||||
private String bootstrapServers;
|
||||
|
||||
@Bean
|
||||
public ProducerFactory<String, Message> producerFactory() {
|
||||
Map<String, Object> configProps = new HashMap<>();
|
||||
configProps.put(
|
||||
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
|
||||
bootstrapServers);
|
||||
configProps.put(
|
||||
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
|
||||
StringSerializer.class);
|
||||
configProps.put(
|
||||
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
|
||||
JsonSerializer.class);
|
||||
return new DefaultKafkaProducerFactory<>(configProps);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public KafkaTemplate<String, Message> kafkaTemplate() {
|
||||
return new KafkaTemplate<>(producerFactory());
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,48 @@
|
|||
package com.example.demo.controller;
|
||||
|
||||
import com.example.demo.domain.Message;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.support.SendResult;
|
||||
import org.springframework.util.concurrent.ListenableFuture;
|
||||
import org.springframework.util.concurrent.ListenableFutureCallback;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
/**
|
||||
* @author MrBird
|
||||
*/
|
||||
@RestController
|
||||
public class SendMessageController {
|
||||
|
||||
private Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
|
||||
@Autowired
|
||||
// private KafkaTemplate<String, String> kafkaTemplate;
|
||||
private KafkaTemplate<String, Message> kafkaTemplate;
|
||||
|
||||
// @GetMapping("send/{message}")
|
||||
// public void send(@PathVariable String message) {
|
||||
// this.kafkaTemplate.send("test", message);
|
||||
// ListenableFuture<SendResult<String, String>> future = this.kafkaTemplate.send("test", message);
|
||||
// future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
|
||||
// @Override
|
||||
// public void onSuccess(SendResult<String, String> result) {
|
||||
// logger.info("成功发送消息:{},offset=[{}]", message, result.getRecordMetadata().offset());
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public void onFailure(Throwable ex) {
|
||||
// logger.error("消息:{} 发送失败,原因:{}", message, ex.getMessage());
|
||||
// }
|
||||
// });
|
||||
// }
|
||||
|
||||
@GetMapping("send/{message}")
|
||||
public void sendMessage(@PathVariable String message) {
|
||||
this.kafkaTemplate.send("test", new Message("mrbird", message));
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,47 @@
|
|||
package com.example.demo.domain;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.time.LocalTime;
|
||||
|
||||
/**
|
||||
* @author MrBird
|
||||
*/
|
||||
public class Message implements Serializable {
|
||||
private static final long serialVersionUID = 6678420965611108427L;
|
||||
|
||||
private String from;
|
||||
|
||||
private String message;
|
||||
|
||||
public Message() {
|
||||
}
|
||||
|
||||
public Message(String from, String message) {
|
||||
this.from = from;
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Message{" +
|
||||
"from='" + from + '\'' +
|
||||
", message='" + message + '\'' +
|
||||
'}';
|
||||
}
|
||||
|
||||
public String getFrom() {
|
||||
return from;
|
||||
}
|
||||
|
||||
public void setFrom(String from) {
|
||||
this.from = from;
|
||||
}
|
||||
|
||||
public String getMessage() {
|
||||
return message;
|
||||
}
|
||||
|
||||
public void setMessage(String message) {
|
||||
this.message = message;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,38 @@
|
|||
package com.example.demo.listener;
|
||||
|
||||
import com.example.demo.domain.Message;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.kafka.annotation.PartitionOffset;
|
||||
import org.springframework.kafka.annotation.TopicPartition;
|
||||
import org.springframework.kafka.support.KafkaHeaders;
|
||||
import org.springframework.messaging.handler.annotation.Header;
|
||||
import org.springframework.messaging.handler.annotation.Payload;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author MrBird
|
||||
*/
|
||||
@Component
|
||||
public class KafkaMessageListener {
|
||||
|
||||
private Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
|
||||
// @KafkaListener(topics = "test", groupId = "test-consumer")
|
||||
// @KafkaListener(groupId = "test-consumer",
|
||||
// topicPartitions = @TopicPartition(topic = "test",
|
||||
// partitionOffsets = {
|
||||
// @PartitionOffset(partition = "0", initialOffset = "0")
|
||||
// }))
|
||||
// public void listen(@Payload String message,
|
||||
// @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
|
||||
// logger.info("接收消息: {},partition:{}", message, partition);
|
||||
// }
|
||||
|
||||
@KafkaListener(topics = "test", groupId = "test-consumer")
|
||||
public void listen(Message message) {
|
||||
logger.info("接收消息: {}", message);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,6 @@
|
|||
spring:
|
||||
kafka:
|
||||
bootstrap-servers: localhost:9092
|
||||
consumer:
|
||||
group-id: test-consumer
|
||||
auto-offset-reset: latest
|
||||
Loading…
Reference in New Issue