CC00084.kafka——|Hadoop&kafka.V69|——|kafka.v69|稳定性|重试队列.v01|
时间:2023-02-20 08:00:00
### --- 重试队列 ~~~ kafka没有重试机制不支持新闻重试,也没有死信队列,所以使用kafka做消息队列时, ~~~ 需要实现新闻重试的功能。
### --- 实现:创新kafka主题作为重试队列: ~~~ 创建一个topic作为重试topic,接收等待重试的消息。 ~~~ 普通topic消费者设置下一次重试新闻topic。 ~~~ 从重试topic获取待重试消息存储redis的zset以下消费时间排序 ~~~ 定时任务从redis获取到达消费事件的消息,并把消息发送到对应的topic ~~~ 同一消息重试次数过多,不再重试
### --- 创建springboot项目: ~~~ ——>New Module——>Spring Initializr:配置信息如下——>NExt——> ~~~ ——>Spring Boot:2.2.8——> ~~~ ——>添加依赖:web、kafka、redis——>Next——>END

### --- 更给springboot版本2.5.5为2.2.8 4.0.0 org.springframework.boot spring-boot-starter-parent 2.2.8.RELEASE
### --- 标准pom.xml依赖文件 4.0.0 org.springframework.boot spring-boot-starter-parent 2.2.8.RELEASE com.yanqi.kafka.demo demo-retryqueue 0.0.1-SNAPSHOT demo-retryqueue Demo project for Spring Boot 1.8 org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-web org.springframework.kafka spring-kafka org.springframework.boot spring-boot-starter-test test org.junit.vintage junit-vintage-engine
org.springframework.kafka
spring-kafka-test
test
org.springframework.boot
spring-boot-starter-data-redis
com.alibaba
fastjson
1.2.73
org.springframework.boot
spring-boot-maven-plugin
### --- 添加application.properties:src——>main——>resources——>application.properties
# bootstrap.servers
spring.kafka.bootstrap-servers=hadoop01:9092
# key序列化器
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
# value序列化器
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 消费组id:group.id
spring.kafka.consumer.group-id=retryGroup
# key反序列化器
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# value反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# redis数据库编号
spring.redis.database=0
# redis主机地址
spring.redis.host=192.168.1.111
# redis端口
spring.redis.port=6379
# Redis服务器连接密码(默认为空)
spring.redis.password=
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.jedis.pool.max-active=20
# 连接池最大阻塞等待时间(使用负值表示没有限制)
spring.redis.jedis.pool.max-wait=-1
# 连接池中的最大空闲连接
spring.redis.jedis.pool.max-idle=10
# 连接池中的最小空闲连接
spring.redis.jedis.pool.min-idle=0
# 连接超时时间(毫秒)
spring.redis.timeout=1000
# Kafka主题名称,业务主题
spring.kafka.topics.test=tp_demo_retry_01
# 重试队列,重试主题
spring.kafka.topics.retry=tp_demo_retry_02
### --- RetryqueueApplication.java
package com.yanqi.kafka.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RetryqueueApplication {
public static void main(String[] args) {
SpringApplication.run(RetryqueueApplication.class, args);
}
}