Skip to content

Commit 87a4001

Browse files
committed
Spring Boot Kafka
1 parent 158b0d4 commit 87a4001

File tree

5 files changed

+399
-0
lines changed

5 files changed

+399
-0
lines changed

spring-boot-study-kafka/README.md

Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
Spring Boot 的便捷,无出其右,仍然是三部曲,创建springboot 项目,配置项目,编写示例代码。
2+
安装 Kafka 测试环境请参加: [https://blog.csdn.net/fishpro/article/details/105761986](https://blog.csdn.net/fishpro/article/details/105761986)
3+
4+
# 1 新建 Spring Boot Maven 示例工程项目
5+
注意:是用来 IDEA 开发工具
6+
1. `File > New > Project`,如下图选择 `Spring Initializr` 然后点击 【Next】下一步
7+
2. 填写 `GroupId`(包名)、`Artifact`(项目名) 即可。点击 下一步
8+
groupId=com.fishpro
9+
artifactId=kafka
10+
3. 选择依赖 `Spring Web Starter` 前面打钩。
11+
4. 项目名设置为 `spring-boot-study-kafka`.
12+
13+
# 2 引入依赖 Pom
14+
15+
```xml
16+
<?xml version="1.0" encoding="UTF-8"?>
17+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
18+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
19+
<modelVersion>4.0.0</modelVersion>
20+
<parent>
21+
<groupId>org.springframework.boot</groupId>
22+
<artifactId>spring-boot-starter-parent</artifactId>
23+
<version>2.3.0.RELEASE</version>
24+
<relativePath/> <!-- lookup parent from repository -->
25+
</parent>
26+
<groupId>com.fishpro</groupId>
27+
<artifactId>kafka</artifactId>
28+
<version>0.0.1-SNAPSHOT</version>
29+
<name>kafka</name>
30+
<description>Demo project for Spring Boot</description>
31+
32+
<properties>
33+
<java.version>1.8</java.version>
34+
</properties>
35+
36+
<dependencies>
37+
<dependency>
38+
<groupId>org.springframework.boot</groupId>
39+
<artifactId>spring-boot-starter-actuator</artifactId>
40+
</dependency>
41+
<dependency>
42+
<groupId>org.springframework.boot</groupId>
43+
<artifactId>spring-boot-starter-web</artifactId>
44+
</dependency>
45+
46+
<dependency>
47+
<groupId>org.springframework.kafka</groupId>
48+
<artifactId>spring-kafka</artifactId>
49+
</dependency>
50+
51+
<dependency>
52+
<groupId>org.springframework.boot</groupId>
53+
<artifactId>spring-boot-starter-test</artifactId>
54+
<scope>test</scope>
55+
<exclusions>
56+
<exclusion>
57+
<groupId>org.junit.vintage</groupId>
58+
<artifactId>junit-vintage-engine</artifactId>
59+
</exclusion>
60+
</exclusions>
61+
</dependency>
62+
<dependency>
63+
<groupId>org.springframework.kafka</groupId>
64+
<artifactId>spring-kafka-test</artifactId>
65+
<scope>test</scope>
66+
</dependency>
67+
</dependencies>
68+
69+
<build>
70+
<plugins>
71+
<plugin>
72+
<groupId>org.springframework.boot</groupId>
73+
<artifactId>spring-boot-maven-plugin</artifactId>
74+
</plugin>
75+
</plugins>
76+
</build>
77+
<repositories>
78+
<repository>
79+
<id>public</id>
80+
<name>aliyun nexus</name>
81+
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
82+
<releases>
83+
<enabled>true</enabled>
84+
</releases>
85+
</repository>
86+
</repositories>
87+
88+
</project>
89+
90+
91+
```
92+
# 3 application 配置
93+
94+
```yml
95+
96+
spring:
97+
kafka:
98+
# 指定kafka server的地址,集群配多个,中间,逗号隔开
99+
bootstrap-servers: 127.0.0.1:9092
100+
# 生产者
101+
producer:
102+
# 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
103+
# 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
104+
retries: 0
105+
# 每次批量发送消息的数量,produce积累到一定数据,一次发送
106+
batch-size: 16384
107+
# produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
108+
buffer-memory: 33554432
109+
# 指定消息key和消息体的编解码方式
110+
key-serializer: org.apache.kafka.common.serialization.StringSerializer
111+
value-serializer: org.apache.kafka.common.serialization.StringSerializer
112+
properties:
113+
linger.ms: 1
114+
# 消费者
115+
consumer:
116+
enable-auto-commit: false
117+
auto-commit-interval: 100ms
118+
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
119+
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
120+
properties:
121+
session.timeout.ms: 15000
122+
group-id: group
123+
server:
124+
port: 8081
125+
```
126+
127+
# 4 代码实例
128+
## 4.1 发送消息
129+
建立文件 KafkaDemoController.java 设置为 RestController
130+
```java
131+
package com.fishpro.kafka.controller;
132+
133+
import org.springframework.beans.factory.annotation.Autowired;
134+
import org.springframework.kafka.core.KafkaTemplate;
135+
import org.springframework.web.bind.annotation.GetMapping;
136+
import org.springframework.web.bind.annotation.RequestParam;
137+
import org.springframework.web.bind.annotation.RestController;
138+
139+
@RestController
140+
public class KafkaDemoController {
141+
@Autowired
142+
private KafkaTemplate<String,Object> kafkaTemplate;
143+
144+
@GetMapping("/message/send")
145+
public boolean send(@RequestParam String message){
146+
kafkaTemplate.send("testTopic",message);
147+
return true;
148+
}
149+
}
150+
151+
152+
```
153+
154+
## 4.2 接收消息
155+
新建文件 CustomerListener 设置标签为 @Component
156+
```java
157+
package com.fishpro.kafka.listener;
158+
159+
import org.springframework.kafka.annotation.KafkaListener;
160+
import org.springframework.stereotype.Component;
161+
162+
@Component
163+
public class CustomerListener {
164+
165+
@KafkaListener(topics="testTopic")
166+
public void onMessage(String message){
167+
System.out.println(message);
168+
}
169+
}
170+
171+
```
172+
## 4.3 测试发送与接收
173+
输入发送消息 url http://localhost:8081/message/send?message=abc
174+
此时 CustomerListener 也会实时接收到消息。
175+
176+
## 4.4 问题
177+
- 出现了:springboot整合kafka出现No group.id found in consumer config
178+
- **原因是 未配置消费端**
179+
180+
```yml
181+
# 消费者
182+
consumer:
183+
enable-auto-commit: false
184+
auto-commit-interval: 100ms
185+
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
186+
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
187+
properties:
188+
session.timeout.ms: 15000
189+
group-id: group
190+
```
191+
## 4.5 异步同步消息
192+
建立异步消息同步消息发送
193+
194+
```java
195+
196+
package com.fishpro.kafka.service;
197+
198+
import org.apache.kafka.clients.producer.ProducerRecord;
199+
import org.springframework.beans.factory.annotation.Autowired;
200+
import org.springframework.kafka.core.KafkaTemplate;
201+
import org.springframework.kafka.support.SendResult;
202+
import org.springframework.stereotype.Service;
203+
import org.springframework.util.concurrent.ListenableFuture;
204+
import org.springframework.util.concurrent.ListenableFutureCallback;
205+
206+
import java.util.concurrent.ExecutionException;
207+
import java.util.concurrent.TimeUnit;
208+
import java.util.concurrent.TimeoutException;
209+
210+
@Service
211+
public class KafkaSendService {
212+
213+
@Autowired
214+
private KafkaTemplate<String,Object> kafkaTemplate;
215+
216+
/**
217+
* 异步示例
218+
* */
219+
public void sendAnsyc(final String topic,final String message){
220+
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic,message);
221+
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
222+
@Override
223+
public void onSuccess(SendResult<String, Object> result) {
224+
System.out.println("发送消息成功:" + result);
225+
}
226+
227+
@Override
228+
public void onFailure(Throwable ex) {
229+
System.out.println("发送消息失败:"+ ex.getMessage());
230+
}
231+
});
232+
}
233+
234+
/**
235+
* 同步示例
236+
* */
237+
public void sendSync(final String topic,final String message){
238+
ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(topic, message);
239+
try {
240+
kafkaTemplate.send(producerRecord).get(10, TimeUnit.SECONDS);
241+
System.out.println("发送成功");
242+
}
243+
catch (ExecutionException e) {
244+
System.out.println("发送消息失败:"+ e.getMessage());
245+
}
246+
catch (TimeoutException | InterruptedException e) {
247+
System.out.println("发送消息失败:"+ e.getMessage());
248+
}
249+
}
250+
}
251+
252+
253+
```
254+
修改 KafkaDemoController 增加异步同步消息测试
255+
256+
```java
257+
/**
258+
* kafka 消息发送
259+
* */
260+
@RestController
261+
public class KafkaDemoController {
262+
@Autowired
263+
private KafkaTemplate<String,Object> kafkaTemplate;
264+
@Autowired
265+
private KafkaSendService kafkaSendService;
266+
267+
@GetMapping("/message/send")
268+
public boolean send(@RequestParam String message){
269+
kafkaTemplate.send("testTopic",message);
270+
return true;
271+
}
272+
273+
//同步
274+
@GetMapping("/message/sendSync")
275+
public boolean sendSync(@RequestParam String message){
276+
kafkaSendService.sendSync("synctopic",message);
277+
return true;
278+
}
279+
280+
//异步示例
281+
@GetMapping("/message/sendAnsyc")
282+
public boolean sendAnsys(@RequestParam String message){
283+
kafkaSendService.sendAnsyc("ansyctopic",message);
284+
return true;
285+
}
286+
287+
//事务消息发送
288+
// @GetMapping("/message/sendTransaction")
289+
// public boolean sendTransaction(){
290+
// kafkaTemplate.executeInTransaction(kafkaTemplate -> {
291+
// kafkaTemplate.send("transactionTopic", "TransactionMessage");
292+
// return true;
293+
// });
294+
// return true;
295+
// }
296+
}
297+
298+
```
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.fishpro.kafka.config;
2+
3+
import org.apache.kafka.clients.admin.NewTopic;
4+
import org.springframework.context.annotation.Bean;
5+
import org.springframework.context.annotation.Configuration;
6+
7+
//@Configuration
8+
public class KafkaInitialConfiguration {
9+
@Bean
10+
public NewTopic initialTopic(){
11+
//创建TopicName为topic.quick.initial的Topic并设置分区数为8以及副本数为1
12+
return new NewTopic("sunday",1,(short)1);
13+
}
14+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.fishpro.kafka.controller;
2+
3+
import org.springframework.beans.factory.annotation.Autowired;
4+
import org.springframework.kafka.core.KafkaTemplate;
5+
import org.springframework.web.bind.annotation.GetMapping;
6+
import org.springframework.web.bind.annotation.RequestParam;
7+
import org.springframework.web.bind.annotation.RestController;
8+
9+
@RestController
10+
public class KafkaDemoController {
11+
@Autowired
12+
private KafkaTemplate<String,Object> kafkaTemplate;
13+
14+
@GetMapping("/message/send")
15+
public boolean send(@RequestParam String message){
16+
kafkaTemplate.send("testTopic",message);
17+
return true;
18+
}
19+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.fishpro.kafka.listener;
2+
3+
import org.springframework.kafka.annotation.KafkaListener;
4+
import org.springframework.stereotype.Component;
5+
6+
@Component
7+
public class CustomerListener {
8+
9+
@KafkaListener(topics="testTopic")
10+
public void onMessage(String message){
11+
System.out.println(message);
12+
}
13+
}

0 commit comments

Comments
 (0)