目录

来玩一玩消息队列的王者——kafka

Kafka

概念

基于发布、订阅模式的消息中间件

没有消息队列的话,两个进程通信是直接连接

https://cdn.cjpa.top/cdnimages/image-20210306213555530.png

https://cdn.cjpa.top/cdnimages/image-20210306213655510.png

同步的效率会比较低,异步处理在执行之后还可以执行其他的。

缓冲也叫削峰。

使用消息队列的好处

  • 解耦

    允许独立的拓展或修改两边的处理过程,只要确保他们遵守同样的接口约束

  • 可恢复性

    系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程之间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理

  • 缓冲

    有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况

  • 灵活性&峰值处理能力

    在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命,无疑是一个巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃

消息队列的两种模式

  • 点对点模式

    一对一,消费者主动拉取数据,消息收到后消息清除,消息只能给一个人。

    消息生产者生产消息发送到Queue中,然后消息消费者从Queue中取出并且消费消息。消息被消费之后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

    https://cdn.cjpa.top/cdnimages/image-20210306235109052.png

  • 发布/订阅模式

    一对多,消费者消费数据之后不会清除消息

    消息生产者(发布)将消息发布到topic中,同时有豆哥消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有的订阅者消费

https://cdn.cjpa.top/cdnimages/image-20210306235431183.png

​ 这个保留也是有一定的期限的,因为它只是一个消息队列,不是一个数据库系统。kafka是一个发布/订阅的模式。

​ 可能会有这么一种情况,不想要的数据也会收到。

https://cdn.cjpa.top/cdnimages/image-20210306235807017.png

如果消息队列的速度和消费者的速度不太一样,可能会造成资源浪费。所以订阅/发布的这种模式,一般有两种实现方式

  • 消费者主动去拉消息
  • 消息队列主动推

kafka是消费者主动拉消息的模式。

kafka的缺点:

消费者自己需要轮询来查看消息队列中有没有数据。有时会比较浪费consumer的资源。

基础架构

https://cdn.cjpa.top/image-20210323205911929.png

Producer

消费生产者,想kafka broker发消息的客户端

Consumer

消息消费者,向kafka broker取消息的客户端

Consumer Group(CG)

消费者组,由一个或者多个consumer组成。消费者组内每个消费者负责消费不同分取的数据,一个分区只能由一个组内消费者消费,消费者组之间互不影响所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者

Broker

一台kafka服务器就是一个broker,一个集群由多个broker组成。一个broker可以容纳多个topic

Topic

可以理解为一个队列,生产者和消费者面相的都是一个topic

Partion

为了实现拓展性,一个非常大的topic可以分布到多个broker(也就是服务器)上,一个topic可以分为多个partion,每个partion是一个有序的队列

Replica

副本,为了保证集群中某个节点发生故障时,该节点上的partion数据不丢失,且kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower

Leader

每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是leader

follower

每个分区多个副本的“从”,实时从leader中同步数据,保持和leader数据的同步,leader发生故障时,某个follower会成为新的leader

生产者生产消息

kafka集群管理消息(数据)

消费者消费消息

zookeeper注册消息

0.9之前消费者记录数据的位置是靠存放到zk里面的偏移量,0.9之后是存到了kafka本地的系统的topic中(kafka消息是存到磁盘中)

为什么这样做?

原来的方式给zk的要有点太大了,在高并发情况下边取数据边记录位置,会很麻烦

安装

docker 安装

下载镜像

sudo docker pull wurstmeister/zookeeper 
sudo docker pull wurstmeister/kafka

启动zookeeper

sudo docker run -d --name zookeeper-container -p 2181 -t wurstmeister/zookeeper

启动kafka

sudo docker run -d --name kafka --publish 9092:9092 --link zookeeper-container --env KAFKA_ZOOKEEPER_CONNECT=zookeeper-container:2181 --env KAFKA_ADVERTISED_HOST_NAME=127.0.0.1 --env KAFKA_ADVERTISED_PORT=9092 --volume /etc/localtime:/etc/localtime wurstmeister/kafka 

进入到kafka

sudo docker exec -it kafka /bin/bash 

进入到kafka默认目录

cd /opt/kafka

下载

https://cdn.cjpa.top/image-20210323211846617.png

➜  apps ls
kafka_2.11-0.11.0.0.tgz
➜  apps tar -zxvf kafka_2.11-0.11.0.0.tgz	# 解压
➜  apps ls
kafka_2.11-0.11.0.0     kafka_2.11-0.11.0.0.tgz
➜  mv kafka_2.11-0.11.0.0 kafka # 改名
cd kafka/config
➜  config ll
total 128
-rw-r--r--@ 1 cjp  staff   906B  6 23  2017 connect-console-sink.properties
-rw-r--r--@ 1 cjp  staff   909B  6 23  2017 connect-console-source.properties
-rw-r--r--@ 1 cjp  staff   5.7K  6 23  2017 connect-distributed.properties
-rw-r--r--@ 1 cjp  staff   883B  6 23  2017 connect-file-sink.properties
-rw-r--r--@ 1 cjp  staff   881B  6 23  2017 connect-file-source.properties
-rw-r--r--@ 1 cjp  staff   1.1K  6 23  2017 connect-log4j.properties
-rw-r--r--@ 1 cjp  staff   2.7K  6 23  2017 connect-standalone.properties
-rw-r--r--@ 1 cjp  staff   1.2K  6 23  2017 consumer.properties
-rw-r--r--@ 1 cjp  staff   4.6K  6 23  2017 log4j.properties
-rw-r--r--@ 1 cjp  staff   1.9K  6 23  2017 producer.properties
-rw-r--r--@ 1 cjp  staff   6.8K  6 23  2017 server.properties# 主要改这个
-rw-r--r--@ 1 cjp  staff   1.0K  6 23  2017 tools-log4j.properties
-rw-r--r--@ 1 cjp  staff   1.0K  6 23  2017 zookeeper.properties

配置文件

server.properties

# broker的全局唯一编号,不能重复
broker.id=0
# 删除topic功能使能
delete.topic.enable=true
# 处理网络请求的线程数量
num.network.threads=3
# 用来处理磁盘io的线程数量
num.io.threads=8
# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
# 接收套接字的缓冲区大小
socket.receive.bffer.bytes=102400
# 请求套接字的缓冲区大小
socket.request.max.bytes=104857600
# kafka里面暂存数据的目录,和日志的名字很像,但是不是日志
log.dirs=/tmp/kafka-logs
# topic在定钱broker上的分区个数
num.partitions=1
# 用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
# 数据保存的时间,过期将被清除
log.retention.hours=168
# log文件的大小
log.segment.bytes=1073741824
# 配置连接zookeeper集群地址
zookeeper.connect=localhost:2181

配置环境变量

$ vim ./etc/profile
# KAFKA_HOME
export KAFKA_HOME=/Users/cjp/apps/kafka
export PATH=$PATH:$KAFKA_HOME/bin
$ source /etc/profile

启动kafka

查看bin目录

➜  bin ll
# 这两个主要用在测试环境中,一般不怎么用
-rwxr-xr-x@  1 cjp  staff   945B  6 23  2017 kafka-console-consumer.sh	# 控制台的消费者
-rwxr-xr-x@  1 cjp  staff   944B  6 23  2017 kafka-console-producer.sh	# 控制台的生产者
-rwxr-xr-x@  1 cjp  staff   1.3K  6 23  2017 kafka-server-start.sh	# 开始
-rwxr-xr-x@  1 cjp  staff   975B  6 23  2017 kafka-server-stop.sh·	# 停止
-rwxr-xr-x@  1 cjp  staff   863B  6 23  2017 kafka-topics.sh	# topic的增删改查 

执行命令

bin/kafka-server-start.sh config/server.properties

但是这样直接启动会有一个问题就是kafka会一直占用终端,还有一种方法就是把kafka设置为守护进程(仅在hadoop环境中有效)

bin/kafka-server-start.sh -daemon config/server.properties

hadoop启动脚本

https://cdn.cjpa.top/image-20210324102848336.png

kafka命令操作

1)查看当前服务器中所有的topic

bin/kafka-topics.sh --zookeeper localhost:2181 --list

2)创建topic

bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic first

选项说明

–topic 定义topic名

–replication-factor 定义副本数(副本数不能超过集群的数量)

–partitions 定义分区数

3)删除topic

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic first

需要server.properties中方设置delete.topic.enable=true否则只是标记删除

4)发送消息

➜  kafka bin/kafka-console-producer.sh --broker-list localhost:9092 --topic first
>hello kafka, i am cjp, your master.

5)消费消息

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic first

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic first
  • 建议用–bootstrap-server代替–zookeeper

  • 控制台这边比较特殊,要用–from-beginning

6)查看分区详情

bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic first
  1. 修改分区数
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic first --partitions 6