前言
项目最近 kafka 又崩了…
心态炸了…
总是 nobroker,自己也总是没有彻底解决这个问题,所以,受不了了,我要开始深入理解一下 kafka 了…
由于之前一篇文章都是几万字,导致可读性比较拉胯,所以从现在开始,我每一章出一篇文章…这样可读性强一些…
第一章 初识 kafka
Kafka起初是由LinkedIn公司采用Scala语言开发的一个多分区、多副本且基于ZooKeeper协调的分布式消息系统,现已被捐献给Apache基金会。目前Kafka已经定位为一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。
Kafka主要扮演三大角色:
- 消息系统。提供传统的消息中间件的功能:系统解耦、冗余存储、流量削峰、异步通信等,同时还提供了消息的顺序性保障以及回溯消费的功能。
- 存储系统。Kafka有多副本机制,所以可以用来当数据存储系统使用。
- 流式处理平台。为每个流行的流式处理框架提供了可靠的数据来源,还提供了一个完整的流式处理类库。
基本概念
组成
- Producer。生产者,负责创建消息发送到 broker 中。
- Consumer。从 broker 中接收消息。
- Zookeeper。负责集群元数据的管理、控制器的选举等等。
- Broker。负责将收到的消息存储到磁盘中,可以是一个集群。
重要的概念
Topic 和 Partition
Kafka中的消息以 topic 为单位进行归类,生产者负责将消息发送到特定的主题,而消费者负责订阅主题并进行消费。
topic 仅仅是一个逻辑上的概念,实际的实现还是分区,一个主题可以有多个分区,每个分区都可以分散到不同 broker 中,分区在存储层面可以看作是一个可追加的 log 文件,而 offset 是消息在分区中的唯一标识,Kafka 通过它来保证分区内的消息的顺序性,所以 Kafka 仅保证分区有序,不保证主题有序。
多副本机制
Kafka 为分区引入了多副本机制,保证高可用。在这里和 Mysql 一样,同样是一主多从,所以关键就是主从同步的问题,同时还会涉及到主备切换、主从延迟等通用问题。
分区中所有副本统称为 AR(Assigned Replicas),AR = ISR(In-Sync Replicas) + OSR(Out-of-Sync Replicas),当 leader副本发生故障时,只有在 ISR 中的 副本才有资格被选取为新的 leader。
在读取 offset 的过程中,也引入了 HW(高水位) 和 LEO(Log End Offset)的概念,在 Mysql 中实现 mvcc 中,也同样采用了 高水位的概念,不过在那里是在事务的视图中出现的。这里的 HW 和 LEO 同样让我想到了 NIO 中的 buffer 也是拥有同样的概念的。
Kafka 的复制机制既不是完全的同步复制,也不是单纯的异步复制。这里我们可以类比事务处理。同步意味着所有都完成才提交事务「即要求所有 follower 副本全部复制完才可以进行读取消息」,而异步则意味着只要leader读入成功则直接提交事务「Mysql 是异步复制」。前者影响效率,后者则存在不可靠性,因为一旦leader宕机而follower没有同步完成,数据就会出现问题。所以这里采用折中的方案,只要同步完 ISR 集合中的副本就提交事务。安装和配置
博主使用的 macOS 系统,所以接下来对在 macOS 下如何安装运行 kafka 做一个详细的叙述。
JDK 的安装和配置
Kafka是建立在 jvm 之上的,所以需要先安装 jvm,这个很简单啦…
- 下载
- 配环境
- 查看配置是否成功
这里我不详细叙述,直接看该网址就行,下面的我会详细叙述。
Zookeeper 的安装和配置
首先再简单介绍一下 Zookeeper,其实 kafka 现在已经内置了 Zookeeper,这里我们就暂时不使用内置的 zk 了,因为内置的只能是单机版,但是在生产环境中 zk 一般都是集群,所以这里我们再单独进行 zk 的安装和配置。
ZooKeeper是一个开源的分布式协调服务,是Google Chubby的一个开源实现。分布式应用程序可以基于ZooKeeper实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master选举、配置维护等功能。在ZooKeeper中共有3个角色:leader、follower和observer,同一时刻 ZooKeeper集群中只会有一个leader,其他的都是follower和observer。observer不参与投票,默认情况下 ZooKeeper 中只有 leader 和 follower 两个角色。
下载 Zookeeper
brew install zookeeper
这里需要注意的是,下载之后,Zookeeper 中的 bin 目录和配置文件不在一块。
bin 目录的位置:”/usr/local/Cellar/zookeeper/3.4.14”
配置文件的位置:”/usr/local/etc/zookeeper”
Zookeeper 的 启动停止日志:/usr/local/var/log/zookeeper
配环境
命令:open -e .bash_profile
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15MONGODB_HOME=/usr/local/mongodb
KAFKA_HOME=/usr/local/Cellar/kafka/2.4.0
ZOOKEEPER_HOME=/usr/local/Cellar/zookeeper/3.4.14
PATH="/Library/Frameworks/Python.framework/Versions/3.7/bin:$ZOOKEEPER_HOME/bin:$KAFKA_HOME/bin:$MONGODB_HOME/bin:${PATH}:"
export PATH
export PATH="/Users/yangweijie/anaconda3/bin:$PATH"
export JAVA_HOME=/Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home
export CLASSPAHT=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$PATH:命令:source .bash_profile
让配置文件生效
修改 Zookeeper 配置文件
进入到 /usr/local/etc/zookeeper,然后将 zoo_sample.cfg 文件修改为 zoo.cfg
cd /usr/local/etc/zookeeper
cp zoo_sample.cfg zoo.cfg修改 zoo.cfg 配置文件,主要是创建数据目录和日志目录,因为默认是没有的,我们需要自己创建
open -e zoo.cfg
mkdir -p tmp/data
mkdir -p tmp/log
1 | # zoo.cfg 文件内容 |
dataDir:”/usr/local/etc/zookeeper/tmp/data”
Log: “/usr/local/etc/zookeeper/tmp/log”
启动 Zookeeper
启动 zookeeper:
zkServer start
查看 zookeeper 状态:
zkServer status
这个其实看不出来 Zookeeper 是否真的运行起来了,我在下面的问题模块也有讲到,如果想看 Zookeeper 是否真的运行起来了,可以查看 “/usr/local/etc/zookeeper/tmp/data/zookeeper_server.pid” 是否存在,当 zk stop时该文件就会消失。
Kafka 的安装和配置
下载 kafka
brew install kafka
同样的,bin 和 配置文件位置不在一起。
bin 目录的位置:”/usr/local/Cellar/kafka/2.4.0”
配置文件的位置:”/usr/local/etc/kafka”
Kafka 的启动停止日志:/usr/local/var/log/kafka
配环境
见上面的 bash_profile
修改 Kafka 配置文件
将 /usr/local/etc/kafka/server.properties 进行相应的修改,主要是对日志文件的地址进行修改,然后修改完之后自己创建好相应的目录。
这里我的 kafka 的消息 log 是 /usr/local/etc/kafka/tmp/kafka-logs
。
启动 Kafka
启动 kafka
1
kafka-server-start /usr/local/etc/kafka/server.properties
常用指令:
创建 topic:
1 | kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test |
查看 topic :
1 | kafka-topics --list --zookeeper localhost:2181 |
发送消息:
1 | kafka-console-producer --broker-list localhost:9092 --topic test |
消费消息:
1 | kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning |
查看 java 有关的进程:
1 | jps -l |
遇到的问题
Q1
问题
系统是 macOS,kafka 版本为 2.4.0,zookeeper 版本为3.4.14,在安装启动遇到这样一个问题:
用 “kafka-server-stop /usr/local/etc/kafka/server.properties” 无法杀死 kafka 进程…
使用完这个命令之后 用命令 “jps -l” 可以看到,原先的Kafka.kafka 对应的进程的确没了,但是莫名其妙又起了一个进程来运行 kafka,这是啥情况… 为啥会杀不死。
「从后文可以知道,jps 实际上无法准备的确认 kafka服务 是否正常启动。」
尝试解决方案
- 重写kafka-server-stop.sh 脚本
1 | PIDS=$(ps -ef |grep java|grep kafka | grep -v grep | awk '{print $2}') |
原 kafka-server-stop.sh 内容:
1 | SIGNAL=${SIGNAL:-TERM} |
参考: https://blog.csdn.net/dengjili/article/details/95041267
依旧是修改 kafka-server-stop.sh 的内容
1 | PIDS=$(ps ax | grep -i ‘kafka.Kafka’ | grep java | grep -v grep | awk ‘{print $1}’) |
修改为:
1 | PIDS=$(jps -lm | grep -i 'kafka.Kafka' | awk '{print $1}') |
在解决第三个问题的时候发现,发现并不能仅仅通过 jps 去看 kafka 是否停止运行,因为其非常的不准确,其实 kafka 可能已经停止了,也就是说,”kafka-server-stop /usr/local/etc/kafka/server.properties” 其实已经杀死 kafka 服务进程,只是我们不知道而已。。。。
我们可以通过生产消息:
kafka-console-producer --broker-list localhost:9092 --topic test
来判断 kafka 是否是正常启动的。
Q2
问题
使用命令 “zkServer stop” 无法停止 zookeeper,报错:
1 | ZooKeeper JMX enabled by default |
尝试解决方案
原来是因为我根本没有把 Zookeeper 起来,使用 zkServer status并不能显示 zk 是否真的起来了…
在我再次启动 zk 时,的确在 “/usr/local/etc/zookeeper/tmp/data 下 出现了 zookeeper_server.pid” 这个文件。
所以说,想看 zk 到底是否真的起来了,可以查看 “/usr/local/etc/zookeeper/tmp/data/zookeeper_server.pid” 是否存在,当 zk stop时该文件就会消失。
结果:该问题解决!!!Q3
问题
在我一直无法关闭 kafka 之后,我进行了电脑重启,重启之后,使用 “jps -l” 的确没有再出现 “kafka.Kafka”,说明这时 kafka 的确停止了,但是新的问题又出现了…
在使用 kafka-server-start /usr/local/etc/kafka/server.properties
命令之后,再也无法启动 kafka 了… 我太难了…
1 | [2020-04-24 23:16:58,980] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) |
通过 “jps -l” 的结果看,的确是没有启动起来。
1 | yangweijieMacBook-Pro:tmp yangweijie$ jps -l |
然后我就去洗澡了.. 回来 “jps -l”,我擦,kafka 起来了,自己起来了???我的妈,受不了了…
1 | yangweijieMacBook-Pro:~ yangweijie$ jps -l |
然后我不信,我就试着去生产一个消息
1 | kafka-console-producer --broker-list localhost:9092 --topic test |
的确,报错了
1 | WARN [Producer clientId=console-producer] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) |
这我就想不通了… 这个 kafka.Kafka 难道不能代表 kafka 服务端活着?
然后我又去尝试的 “jps -l” ,妈的,那个进程又死了…
1 | yangweijieMacBook-Pro:~ yangweijie$ jps -l |
为了证明我眼睛没花,我又尝试的一遍,妈的,又出现了一个新的 kafka 进程。。。
1 | yangweijieMacBook-Pro:~ yangweijie$ jps -l |
然后我又试了一遍,kafka 进程又没了…
然后我接着试了很多遍,kafka进程一直没有… 这次,我决定再次启动一下 kafka 试试… 还是启动不了…
然后我无聊的测试下 “jsp -l” 这个命令,发现其执行 5-6 次就会有一次 “kafka.Kafka” 的进程产生,然后又会自己消失… 玄学??????崩了…
尝试解决方案
我们回到第三个最初的问题,也就是一直启动不了 kafka,也就是上面的报错:
1 | kafka.common.InconsistentClusterIdException: The Cluster ID I5sBHS6MSMG4MmUddyviFQ doesn't match stored clusterId Some(Lz_cYIXrTryPr_06DLt6hQ) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong. |
于是,我就尝试去找 meta.properties,原来这个 meta.properties 在/usr/local/etc/kafka/tmp/kafka-logs/
,然后将其里面的 meta.properties 中的 Cluster ID 改为报错中需要的。这应该就是 2.4.0 的bug了,在我上次暴力关闭 kafka 时,不会主动的与新的zk实例建立连接。
Q4
问题
最近想查一下 kafka 的启动日志,结果不查不知道,一查吓一跳,一个启动日志占了4个多g的存储…
看了下,全是启动日志,10分钟就更新一次…
吓得我赶紧删了,得研究一下如何不让他更新的这么频繁,我的256g的电脑瑟瑟发抖…
尝试解决方案
看了下,应该是以前把 Zookeeper 关闭了,但是 kafka 一直没关闭,然后 kafka 一直失败重连,日志疯狂增加…
再次尝试的使用命令 “kafka-server-stop /usr/local/etc/kafka/server.properties”,然后看 “/usr/local/var/log/kafka” 下的日志。
首先,的确有 shutdown,这是我搜索 shutdown 后过滤得到的日志,如下:
1 | controlled.shutdown.enable = true |
但是,使用 jps -l 看的确 kafka 进程还没死,收发消息也正常,所以我们再看一下完整的日志,如下:
1 | [2020-04-29 18:21:17,883] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) |
最终,我在 https://www.orchome.com/32 找到一个看上去很不错的答案,但是很遗憾,依旧失败了,我强制 kill 也没用,依然会自动重新创建一个 kafka 服务…
至于文章中提到的 controlled.shutdown.enable
,开启(true)后,执行shutdown时,broker主动将自己有leader身份的partition转移给ISR里的其他broker,但是如果是默认值false,对关闭依然不会有影响,按照默认的规则切换只是会慢一点点而已…
Kafka 在 java 中使用
实现
maven 依赖
1 | <dependency> |
生产者客户端
1 | package Kafka.Demo1; |
消费者客户端
1 | package Kafka.Demo1; |
遇到的问题
在启动生产者和消费者后,报错如下:「但是程序可以正常运行…」
1 | SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". |
解决方案:
- 采用 其提示的 url 中的解决方案进行解决,失败。。。还是会有上面的错误…
1 | <dependencies> |
参考:https://www.cnblogs.com/felixzh/p/12487644.html
直接上slf4j官网找到相应的maven包添加到pom.xml
网址:https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12/1.8.0-alpha2
好吧… 依旧不好使
- 最终得以解决,以下是最终解决方案:
参考:https://www.shuzhiduo.com/A/pRdBO486zn/
在添加了以下依赖,还是会报错…「这里的版本我是通过查询 cd /usr/local/Cellar/kafka/2.4.0/libexec/lib/ 得到的 slf4j 和 log4j 的版本」
1
2
3
4
5
6
7
8
9
10
11
12 > <dependency>
> <groupId>org.slf4j</groupId>
> <artifactId>slf4j-log4j12</artifactId>
> <version>1.7.28</version>
> <scope>test</scope>
> </dependency>
> <dependency>
> <groupId>log4j</groupId>
> <artifactId>log4j</artifactId>
> <version>1.2.17</version>
> </dependency>
>
之后,看到了上面的参考文章,再次添加
1
2
3
4
5
6 > <dependency>
> <groupId>org.slf4j</groupId>
> <artifactId>slf4j-nop</artifactId>
> <version>1.7.2</version>
> </dependency>
>
得以解决该问题…