锐单电子商城 , 一站式电子元器件采购平台!
  • 电话:400-990-0325

基于Kafka的nginx日志收集分析平台

时间:2023-12-27 13:07:02 sc连接器挂掉的原因

文章目录

  • 项目描述
  • 项目目标
  • 项目架构
  • 项目涉及知识详解
    • Nginx
      • 反向代理
      • 负载均衡
      • 项目中nginx起的作用
    • Filebeat
      • filebeat是什么?
      • filebeat的工作方式
      • 项目中filebeat起的作用
      • Filebeat如何保证事件至少被输出一次
      • Filebeat如何记录文件状态?
    • Kafka
      • Kafka是什么
      • 新闻中间件的作用
      • 消息队列
      • Kafka的优点
      • Kafka如何保证高可用性?
      • Kafka为什么要做分区,做多个partition呢
      • 如果某个topic有多个partition,producer如何知道数据发送到哪里?partition呢?
      • Kafka一些基本概念
      • 为什么使用Kafka统一收集日志?
      • producer发送数据给Kafka集群的流程
      • 分成多个segment的好处
    • Zookeeper
      • Zookeeper在Kafka中的作用
  • 一键安装nginx、filebeat、Kafka、zookeeper的脚本
  • pykafka生成消费脚本

项目描述

这个项目是基于的Kafka日志收集分析平台使用的主要技术有nginx、filebeat、Kafka、zookeeper、python、mysql等。完成这个项目后,您将对技术的整合和使用有更进一步的了解。一般的过程就是使用nginx集群模拟用户访问网站的行为,使其产生nginx的access日志,然后由filebeat收集这个日志,然后吐出来Kafka中,然后使用python的pykafka对Kafka清理数据,分析日志IP省份和运营商以及带宽和时间,通过设置阈值,实现每分钟的监控和电子邮件报警,最后将分析保存到mysql数据库中。

项目目标

项目目标主要是防止某省某运营商流量突然增加,导致服务器异常,起到监控和预警的作用。

项目架构

该项目分为以下几个模块:用户—>代理集群—>web集群—>kafka和zookeeper集群—>消费者—>数据库
项目架构

  1. 用户可以通过域名或代理集群IP访问我们的地址web集群中的web服务。(域名也可用于测试。IP地址)
  2. 代理集群:用户通过域名访问我们的代理集群中的服务器,nginx的域名解析会随机解析成代理集群中的服务器。然后代理集群中的两个服务器nginx负载平衡将按照某种算法转发用户发送的请求web集群上。我们还使用了这个代理集群keepalived,使集群中的1.1和1.两台服务器互为主备,提高资源利用率(服务器1是1.1的master,是1.2的backup,服务器2是1.2的master,是1.1的backup,如果master挂掉了,那么这个IP的master会切换到backup服务器上)
  3. web集群:web三个集群服务器将产生nginx日志,我们需要使用这三个服务器nginx统一收集日志,放入filebeat所以,我们在web安装在集群的每个服务器上filebeat,指定了filebeat的输入为nginx日志的存储路径输出为Kafka的同一个topic中。
  4. Kafka集群:filebeat发送数据给kafka集群时,跟随任何一个broker互动可以,因为如果互动的话broker里面不是partition的leader,那么follower将返回当前请求副本leader信息,然后filebeat再跟leader交互。这里所有的partition都加了同一个名字nginxlog的topic中。filebeat随机发送数据给一个partition的leader之后(该partition的ISR列表中的follower会主动从leader中pull信息),leader会返回一个ack信息,然后filebeat然后发送下一个数据,以确保数据发送的一致性。那么如何保证数据消费的一致性呢?引入消费者消费数据时,High Water Mark机制,即木桶效应。Kafka数据存储在配置文件的制定目录中,该项目存储在/data/nginxlog-0/00000000.log下。
    zookeeper是用来管理Kafka的,Kafka许多元信息(例如partition、topic、replica)都是交给zookeeper统一管理,但数据统一,data是由Kafka自己保存。zookeeper还会选举Kafka集群的controller,这个controller用来协调partition的leader跟follower的选举。注意,zookeeper本身也有leader跟follower是的,它的选举方法是一致性算法(zab),少数服从多数,当选票数超过一半leader,所以在zk在集群中,机器的存活量必须超过一半,才能正常使用,所以我们也会通过zk为了方便选举,集群节点数设为奇数。跟filebeat给Kafka传递数据一样,Kafka连接任何一个zookeeper可以操作,但必须操作新的数据修改和其他事务leader如果客户端连接到上运行follower上进行事务操作,follower会返回给leader的IP,最终客户端还在leader上操作,但是可以直接连接follower查询操作。
  5. 我只在这里使用消费者组pykafka模拟消费者消费数据,所以没有消费群体,但如果有消费群体,需要注意的是,同一消费群体的消费者不能消费一个topic中一个partition数据,否则会导致数据混乱,降低消费者的处理效率。当消费者消费时,他们会记录自己的偏移,消费偏移可以保存在当地或提交Kafka的__consumer_offset保存主题。
    我使用pykafka模块创建消费者,然后清理数据,筛选出来IP地址、时间、带宽等字段,然后通过淘宝接口IP地址分析成省份和运营商,最后通过pymysql存储在数据库中的模块。
  6. 到目前为止,我的项目已经结束了,使用了很多知识点,整合知识点并不容易,这需要我们非常小心,非常熟练,否则很容易犯错误。

项目涉及知识详解

Nginx

nginx高性能web但由于服务器只能显示静态页面,我们通常不会部署一个完整的项目nginx上,但通常使用nginx做负载平衡和反向代理。

反向代理

反向代理是代理服务器,作为服务器的代理,站在服务器一侧,屏蔽服务器信息,常用场景是多个服务器分布式部署,如一些大网站,由于访问者多,需要多个服务器来解决问题,然后这些服务器由反向代理服务器代理,客户发送请求,首先由反向代理服务器,然后按照一定的规则分发给明确的服务器,客户不知道是哪个服务器。常常用nginx做反向代理。常常用nginx作为反向代理。(想象一下每个人都在访问。www.baidu.com,但百度绝对不可能只接受一个服务器的访问)

负载均衡

负载:就是Nginx接受请求
均衡:Nginx按照一定的规则,将收到的请求分发给不同的服务器进行处理
nginx支持轮询、ip_hash、url_hash等

关于nginx更多知识可以查看我的另一个博客:nginx相关内容及配置文件详细说明

项目中nginx起的作用

我们使用第一个集群nginx做负载平衡和反向代理。
负载平衡是指我们根据客户端的访问请求nginx指定算法分配到web在集群中的服务器上,可以减轻服务器的负担,避免所有要求都在服务器上分散。如果服务器挂断了,nginx负载平衡策略也将帮助我们向服务器转发。
反向代理是指我们使用它nginx转发功能能,防止客户端请求直接发送到web服务器上,这样可以提高我们服务器的安全性,不让客户端知道我们真实服务器的IP地址可以避免一些黑客攻击

之前也有使用DS做负载均衡的,就是将一个域名(类似于www.sc.com)解析成多个IP地址,一般来说会轮询的方式去解析成各个IP。但是如果其中一个服务器挂了,DNS不会立马将这个IP地址从解析列表中去掉,还是会解析成挂掉的IP,可能会造成访问失败。虽然客户端有重试,但是还是会影响用户体验。

在第二个集群中,我们使用nginx来展示静态页面,并且产生访问日志

Filebeat

filebeat是什么?

filebeat是用于转发和集中日志数据的轻量级传送工具。Filebeat监视您指定的日志文件或位置,收集日志事件,并将它们转发到Kafka中进行索引。

filebeat的工作方式

启动filebeat时,它将启动一个或多个输入,这些输入将在为日志数据指定的位置中查找。对于Filebeat所找到的每个日志,Filebeat都会启动收集器。每个收集器都读取单个日志以获取新内容,并将新日志数据发送到libbeat,libbeat将聚集事件,并将聚集的数据发送到Filebeat配置的输出。

项目中filebeat起的作用

filebeat在整个项目中扮演的是生产者的角色,它主要的作用是用来收集nginx产生的数据。你肯定会想,nginx产生的数据为什么不直接发送到kafka中,而是要经过filebeat这个步骤呢?这是因为,我们有多个web服务器,每个web服务器产生的日志都不相同,如果一个一个的发送给kafka,效率非常低下,而且不方便统一管理,这对于kafka取数据来说也是非常不方便的。

Filebeat如何保证事件至少被输出一次

Filebeat之所以能保证事件至少被传递到配置的输出一次,没有数据丢失,是因为filebeat将每个事件的传递状态保存在文件中。在未得到输出方确认时,filebeat会尝试一直发送,直到得到回应。若filebeat在传输过程中被关闭,则不会再关闭之前确认所有事件。任何在filebeat关闭之前未确认的事件,都会在filebeat重启之后重新发送。这可确保至少发送一次,但有可能会重复。可通过设置shutdown_timeout 参数来设置关闭之前的等待事件回应的时间(默认禁用)。

Filebeat如何记录文件状态

将文件状态记录在文件中(默认在/var/lib/filebeat/registry)。此状态可以记住Harvester收集文件的偏移量。若连接不上输出设备,如ES等,filebeat会记录发送前的最后一行,并再可以连接的时候继续发送。Filebeat在运行的时候,Prospector状态会被记录在内存中。Filebeat重启的时候,利用registry记录的状态来进行重建,用来还原到重启之前的状态。每个Prospector会为每个找到的文件记录一个状态,对于每个文件,Filebeat存储唯一标识符以检测文件是否先前被收集。

Kafka

Kafka是什么

kafka又称为消息中间件:消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。
Kafka是一个分布式的基于发布订阅的消息系统,它是一种消息队列,一般用来做日志的处理。

消息中间件的作用

  • 业务的解耦(将模块间的RPC(远程过程调用)改为通过消息队列中转,解除系统间的耦合)
  • 日志的收集(通过日志可以跟踪调试信息、定位问题,利用消息队列产品在接收和持久化消息方面的高性能,引入消息队列快速接收日志消息,避免因为写入日志时的某些故障导致业务系统访问阻塞、请求延迟等问题)
  • 流量削峰(系统的吞吐量往往取决于底层存储服务的处理能力,数据访问层可以调整消费速度缓解存储服务压力,避免短暂的高峰将系统压垮;使用消息队列先将短时间高并发的请求持久化,然后逐步处理,从而削平高峰期的并发流量,改善系统的性能)
  • 异步调用(对于无需关注调用结果的场景,可以通过消息队列异步处理)

消息队列

  • 一个典型意义上的消息队列至少需要包含消息的生产者(filebeat)、中间件(kafka)、消费者(python程序)
  • 消息队列的通信模式(也就是消息中间件的通信模式)
    • 点对点模式(也就是一对一模式,一一对应,不能够重复消费,并且消费完了之后,消息中间件中数据就被删了)
    • 发布订阅模式(kafka使用,多对多模式,通过设置偏移量可以重复消费,消费完了数据不会被删除)

Kafka的优点

  • 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,他的延迟最低 只有几毫秒
  • 可扩展性:kafka集群支持热扩展
  • 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份放至数据丢失
  • 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败
  • 高并发:支持数千个客户端同时读写

Kafka如何保证高可用呢?

多个broker,多个partition,多个replica

Kafka为什么要做分区,弄多个partition呢

  1. 方便扩展,因为一个topic可以有多个partition,所以我们可以通过扩展机器去轻松应对日益增长的数据量
  2. 提高并发,以partition为读写单位,可以多个消费者同时消费数据,提高了消息的处理效率

如果某个topic有多个partition,producer又怎么知道该将数据发往哪个partition呢?

  1. partition在写入的时候可以指定需要写入的partition,如果有指定,则写入对应的partition。
  2. 如果没有指定partition,但是设置了数据的key,则会根据key的值hash出一个partition。
  3. 如果既没指定partition,又没有设置key,则会轮询选出一个partition。

Kafka的一些基本概念

  • Producer:Producer即生产者,消息的产生者,是消息的入口,项目中的生产者是filebeat,收集nginx服务产生的数据
  • Broker:是kafka的实例,每个服务器上有一个或多个kafka的实例,我们姑且认为每个broker对应一台服务器。 每个kaka集群内的broker都有一 个不重复的编号,如broker-0,broker-1等…
  • Topic:消息的主题,也就是消息的类别,每个topic可以有多个分区,, kafka的数据就保存在topic. 在每个broker,上都可以创建多个topic。Topic进行消息的分类,比如nginx、mysql,日志给不同的主题,就是不同的类型
  • Partition:Topic的分区,每个Topic可以有多个分区。分区的作用是做负载,提高kafka的吞吐量,提高效率,提高并发。同一个topic在不同的分区的数据是不重复的, parttion的表现形式就是一个一个的文件夹!(一般来说,partition的数量跟broker的数量保持一致)。但是多个partition会造成消息顺序混乱,如果对消息顺序有要求就只设置一个partition就可以了,如果对并发有要求就设置多个partition(日志设置多个partition)
  • Replica:副本,每一 个分区都有多个副本,副本的作用是做备胎,是Kafka里的高可用。当主分区( Leader )故障的时候会选择一个备胎(Follower工位,成为Leader,在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量, follower和leader绝对是在不同的机器,并且一个partition中leader只能有一个,follower可以有多个,同一机器对同一个分区也只可能存放一个副本 (包括自己)。)如果broker数量和replication数量一致,那么可以坏掉n-1台机器
  • Leader:每个分区多个副本的主角色,生产者发送数据的对象,以及消费者消费数据的对象都是Leader
  • Follower:每个分区多个副本的从角色,实时的从Leader中同步数据,保持和leader数据的同步,Leader发生故障的时候,某个Follower会成为新的Leader(controller管理选举,一般从ISR中顺延产生)
  • Segment:partition物理上由多个segment组成,Partition的结构:Partition在服务器上的表现形式就是一个一个的文件夹,每个Partition的文件夹下面会有多组segment文件,每组segment文件又包含.index文件、.log文件、.timeindex文件这三个文件,log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息。
  • Message:每一条发送的消息主体,是通信的基本单位(存放在log中),每个producer可以向一个topic发布消息,partition中的每条message包含以下三个属性:offset、MessageSize、data
  • offset:偏移量,它可以唯一确定每条消息在partition内的位置。
    • 生产者Offset:消息写入的时候,每一个分区都有一个offset,这个offset就是生产者的offset,同时也是这个分区的最新最大的offset。有些时候没有指定某一个分区的offset,这个工作kafka帮我们完成。
    • 消费者Offset:记录消费者消费到哪里了,也可能由于Consumer在消费过程中可能会出现断电宕机等故障,Consumer恢复后,需要从故障前的位置继续消费,所以Consumer需要实时记录自己消费到了哪个offset,以便故障复后继续消费。
  • Consumer :消费者,即消息的消费方,是消息的出口。一般来说,消费者的数量跟broker的数量保持一致,consumer的数量要大于或者等于partition的数量最好
  • Consumer Group我们可以将多个消费者组成一个消费者组 ,在kafka的设计中同一个分区(partition)的数据只能被消费者组中的某一个消费者消费,如果同一个消费组内的两个消费者同时消费,会出问题;但是不同的消费组可以同时消费一个partition。 同一个消费者组的消费者可以消费同一个topic的不同分区(partition)的数据,这也是为了提高kafka的吞吐量!
  • Zookeeper : kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性。
    其中,partition和broker和replica和leader和follower的关系可以如图所示:

    replica的含义包括follower和leader,以上broker1并不是绝对包含leader,broker2和broker3中绝对包含的是follower

为什么使用Kafka做日志统一收集?

  1. 故障发生时方便定位问题(如果直接收集nginx上的日志,如果中途发生故障,那么还需要一台一台登录nginx机器才能排查错误,而kafka集群可以统一管理)
  2. 日志集中管理,后续需要日志的程序直接从Kafka获取日志即可,尽可能的减少日志处理对nginx的影响(如果将日志收集的程序跟产生日志的nginx绑定在一个服务器上,那么一旦程序躲起来,很可能对服务器上的nginx服务造成影响)

producer发送数据给Kafka集群的流程

  1. Producer先从kafka集群中获取分区的Leader
  2. Producer将消息发送给Leader
  3. Leader将消息写入本地文件
  4. Followers从Leader上Pull消息
  5. Followers将消息写入本地后,向Leader发送ACK来确认
  6. Leader收到所有副本的ACK后向Producer发送ACK

分成多个segment的好处

方便数据清理,Kafka可以按时间或者按数据大小两个维度清理数据(任意一个条件满足,都可以触发日志清理),不会永久保存,定期保存,默认是保存七天(可修改Kafka的配置文件server.properties 中的log.retention.hours这个参数)

[root@nginx-kafka01 data]# ls
nginxlog-0
__consumer_offsets-11     
[root@nginx-kafka01 data]# cd nginxlog-0/
[root@nginx-kafka01 nginxlog-0]# ls
00000000000000000000.index  00000000000000000000.timeindex
00000000000000000000.log    leader-epoch-checkpoint

*.log文件存放的真正的kafka的数据,*.index存放的是索引
log.segment.bytes=1073741824 (可以通过Kafka配置文件server.properties中的log.segment.bytes设置的每个段的大小)

Zookeeper

Zookeeper在Kafka中的作用

  1. zookeeper是用来管理Kafka集群的,它可以保存Kafka的元信息,topic、partition、replica信息都保存在zookeeper中,但是Kafka不会将topic本身的数据发送到zookeeper上的。
  2. 选举Kafka集群的controller,这个controller来协调副本的leader、follower选举(现在3.0Kafka自己选举controller,我们使用的2.12),通过抢占的方式来选出controller
    选举出的Kafka的controller管理Kafka副本的leader和follower,同步、选举(leader挂掉了顺延)
    Kafka可以挂掉n-1台(broker、partition、replica数量一致的情况下)
    zookeeper必须存活集群节点数的一半以上
  3. Broker端使用zookeeper来注册broker信息,以及监测partition分区leader的存活性。

Kafka3.0版本已经脱离zookeeper,Kafka自己实现zookeeper功能,通过KRaft进行了自己的集群管理
目前我使用的zookeeper版本是3.6.3版本,Kafka是2.12版本

一键安装nginx、filebeat、Kafka、zookeeper的脚本

一键安装脚本

pykafka生成消费脚本

使用pykafka模拟消费者,将数据过滤并入库

mysql数据库中的表需要自己先建好

锐单商城拥有海量元器件数据手册IC替代型号,打造电子元器件IC百科大全!

相关文章