锐单电子商城 , 一站式电子元器件采购平台!
  • 电话:400-990-0325

CC00084.kafka——|Hadoop&kafka.V69|——|kafka.v69|稳定性|重试队列.v01|

时间:2023-02-20 08:00:00 1v69连接器

一、重试队列
### --- 重试队列  ~~~     kafka没有重试机制不支持新闻重试,也没有死信队列,所以使用kafka做消息队列时, ~~~     需要实现新闻重试的功能。
### --- 实现:创新kafka主题作为重试队列:  ~~~     创建一个topic作为重试topic,接收等待重试的消息。 ~~~     普通topic消费者设置下一次重试新闻topic。 ~~~     从重试topic获取待重试消息存储redis的zset以下消费时间排序 ~~~     定时任务从redis获取到达消费事件的消息,并把消息发送到对应的topic ~~~     同一消息重试次数过多,不再重试
二、创造一个springboot项目
### --- 创建springboot项目:  ~~~     ——>New Module——>Spring Initializr:配置信息如下——>NExt——> ~~~     ——>Spring Boot:2.2.8——> ~~~     ——>添加依赖:web、kafka、redis——>Next——>END
### --- 更给springboot版本2.5.5为2.2.8      4.0.0              org.springframework.boot         spring-boot-starter-parent         2.2.8.RELEASE               
### --- 标准pom.xml依赖文件        4.0.0              org.springframework.boot         spring-boot-starter-parent         2.2.8.RELEASE                    com.yanqi.kafka.demo     demo-retryqueue     0.0.1-SNAPSHOT     demo-retryqueue     Demo project for Spring Boot              1.8                                org.springframework.boot             spring-boot-starter                               org.springframework.boot             spring-boot-starter-web                               org.springframework.kafka             spring-kafka                               org.springframework.boot             spring-boot-starter-test             test                                                   org.junit.vintage                     junit-vintage-engine                              
        
        
            org.springframework.kafka
            spring-kafka-test
            test
        
        
            org.springframework.boot
            spring-boot-starter-data-redis
        
        
            com.alibaba
            fastjson
            1.2.73
        
    
    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    
### --- 添加application.properties:src——>main——>resources——>application.properties

# bootstrap.servers
spring.kafka.bootstrap-servers=hadoop01:9092
# key序列化器
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
# value序列化器
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

# 消费组id:group.id
spring.kafka.consumer.group-id=retryGroup
# key反序列化器
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# value反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

# redis数据库编号
spring.redis.database=0
# redis主机地址
spring.redis.host=192.168.1.111
# redis端口
spring.redis.port=6379
# Redis服务器连接密码(默认为空)
spring.redis.password=
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.jedis.pool.max-active=20
# 连接池最大阻塞等待时间(使用负值表示没有限制)
spring.redis.jedis.pool.max-wait=-1
# 连接池中的最大空闲连接
spring.redis.jedis.pool.max-idle=10
# 连接池中的最小空闲连接
spring.redis.jedis.pool.min-idle=0
# 连接超时时间(毫秒)
spring.redis.timeout=1000

# Kafka主题名称,业务主题
spring.kafka.topics.test=tp_demo_retry_01
# 重试队列,重试主题
spring.kafka.topics.retry=tp_demo_retry_02
### --- RetryqueueApplication.java

package com.yanqi.kafka.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RetryqueueApplication {

    public static void main(String[] args) {
        SpringApplication.run(RetryqueueApplication.class, args);
    }

}
锐单商城拥有海量元器件数据手册IC替代型号,打造电子元器件IC百科大全!

相关文章