跳到主要内容

第3章 Kafka集群架构

1.kafka集群架构

1728196253345-f53f75e2-d56f-4dc0-b219-c3b280698873.png

2.kafka集群部署(三台机器都操作)

软件包

安装jdk

tar zxf jdk-11.0.13+8.tar.gz -C /opt/
ln -s /opt/jdk-11.0.13+8 /opt/jdk
cp /etc/profile /etc/profile.bak
cat >> /etc/profile << 'EOF'
export JAVA_HOME=/opt/jdk
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=$PATH:${JAVA_HOME}/bin
EOF
source /etc/profile
java -version
echo $JAVA_HOME
echo $JRE_HOME

安装zookeeper集群 (注意!三台机器的配置一样,但是id不一样)

配置解释:


# tickTime: ZooKeeper中使用的基本时间单位,以毫秒为单位。这个时间用来控制心跳和超时。默认是2000毫秒(2秒)。

tickTime=2000


# initLimit: 这个配置项用来配置在启动过程中,follower可以与leader进行同步的初始化时间,以tickTime的倍数计算。这里设置为10,意味着允许10个tickTime的时间来完成初始化。

initLimit=10


# syncLimit: 这个配置项用来设置follower与leader同步时的最长等待时间,也是以tickTime的倍数计算。这里设置为5,意味着在5个tickTime时间内,follower需要与leader完成同步。

syncLimit=5


# dataDir: ZooKeeper保存数据的目录。这个配置项必须设置,因为所有的重要数据都存储在这里。

dataDir=/data/zookeeper


# clientPort: 客户端连接ZooKeeper服务器的端口。默认端口是2181。

clientPort=2181


# server.X=<IP地址>:<通讯端口>:<选举端口>


# 这些是集群中各个服务器的配置。每个server.X的X代表一个服务器的唯一标识符。


# 通讯端口用于服务器间的通讯(默认为2888),选举端口用于选举leader(默认为3888)。

server.1=10.0.0.51:2888:3888
server.2=10.0.0.52:2888:3888
server.3=10.0.0.53:2888:3888

db-51:

tar zxf apache-zookeeper-3.7.0-bin.tar.gz -C /opt/
ln -s /opt/apache-zookeeper-3.7.0-bin/ /opt/zookeeper
mkdir -p /data/zookeeper
cp /opt/zookeeper/conf/zoo_sample.cfg /opt/zookeeper/conf/zoo.cfg
cat >/opt/zookeeper/conf/zoo.cfg<<EOF
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper
clientPort=2181
server.1=10.0.0.51:2888:3888
server.2=10.0.0.52:2888:3888
server.3=10.0.0.53:2888:3888
EOF
echo "1" > /data/zookeeper/myid
cat /data/zookeeper/myid

db-52:

tar zxf apache-zookeeper-3.7.0-bin.tar.gz -C /opt/
ln -s /opt/apache-zookeeper-3.7.0-bin/ /opt/zookeeper
mkdir -p /data/zookeeper
cp /opt/zookeeper/conf/zoo_sample.cfg /opt/zookeeper/conf/zoo.cfg
cat >/opt/zookeeper/conf/zoo.cfg<<EOF
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper
clientPort=2181
server.1=10.0.0.51:2888:3888
server.2=10.0.0.52:2888:3888
server.3=10.0.0.53:2888:3888
EOF
echo "2" > /data/zookeeper/myid
cat /data/zookeeper/myid

db-53:

tar zxf apache-zookeeper-3.7.0-bin.tar.gz -C /opt/
ln -s /opt/apache-zookeeper-3.7.0-bin/ /opt/zookeeper
mkdir -p /data/zookeeper
cp /opt/zookeeper/conf/zoo_sample.cfg /opt/zookeeper/conf/zoo.cfg
cat >/opt/zookeeper/conf/zoo.cfg<<EOF
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper
clientPort=2181
server.1=10.0.0.51:2888:3888
server.2=10.0.0.52:2888:3888
server.3=10.0.0.53:2888:3888
EOF
echo "3" > /data/zookeeper/myid
cat /data/zookeeper/myid

安装kafka集群(三台机器不一样)

db-51:

tar zxf kafka_2.13-3.2.0.tgz -C /opt/
ln -s /opt/kafka_2.13-3.2.0/ /opt/kafka
mkdir -p /data/kafka/logs

cat > /opt/kafka/config/server.properties << EOF

# 设置每个broker的唯一ID,必须在集群中是唯一的

broker.id=1


# 配置监听地址,确保与服务器IP一致

listeners=PLAINTEXT://10.0.0.51:9092


# 配置网络线程数,根据服务器核心数调整

num.network.threads=3


# 配置I/O线程数,通常设置为处理器的核心数

num.io.threads=8


# Socket发送缓冲区的大小

socket.send.buffer.bytes=102400


# Socket接收缓冲区的大小

socket.receive.buffer.bytes=102400


# 请求的最大字节数,用于防止内存溢出

socket.request.max.bytes=104857600


# 日志文件存储的目录

log.dirs=/data/kafka/logs


# 分区数,每个主题的默认分区数

num.partitions=3


# 每个数据目录的数据恢复线程数

num.recovery.threads.per.data.dir=1


# 偏移量主题的副本因子,应至少等于集群中的节点数减一,以确保高可用

offsets.topic.replication.factor=3


# 事务状态日志的副本因子,应设置为集群节点数以确保事务的持久性

transaction.state.log.replication.factor=3


# 最小同步副本数,设置为2以确保至少有一个副本同步

transaction.state.log.min.isr=2


# 日志保留时间,24小时

log.retention.hours=24


# 单个日志文件的最大大小

log.segment.bytes=1073741824


# 日志保留检查间隔,单位毫秒

log.retention.check.interval.ms=300000


# Zookeeper集群的连接字符串

zookeeper.connect=10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181


# Zookeeper连接超时时间,单位毫秒

zookeeper.connection.timeout.ms=6000


# 初始重平衡延迟,设置为0可以减少重平衡的延迟

group.initial.rebalance.delay.ms=0


# 请求确认设置,确保数据完整性

request.required.acks=-1


# 默认副本因子,确保高可用性

replication.factor=3


# 最小同步副本数,确保数据不丢失

min.insync.replicas=2


# 禁止不干净的领导者选举

unclean.leader.election.enable=false
EOF

db-52:

tar zxf kafka_2.13-3.2.0.tgz -C /opt/
ln -s /opt/kafka_2.13-3.2.0/ /opt/kafka
mkdir -p /data/kafka/logs

cat > /opt/kafka/config/server.properties << EOF

# 设置每个broker的唯一ID,必须在集群中是唯一的

broker.id=2


# 配置监听地址,确保与服务器IP一致

listeners=PLAINTEXT://10.0.0.52:9092


# 配置网络线程数,根据服务器核心数调整

num.network.threads=3


# 配置I/O线程数,通常设置为处理器的核心数

num.io.threads=8


# Socket发送缓冲区的大小

socket.send.buffer.bytes=102400


# Socket接收缓冲区的大小

socket.receive.buffer.bytes=102400


# 请求的最大字节数,用于防止内存溢出

socket.request.max.bytes=104857600


# 日志文件存储的目录

log.dirs=/data/kafka/logs


# 分区数,每个主题的默认分区数

num.partitions=3


# 每个数据目录的数据恢复线程数

num.recovery.threads.per.data.dir=1


# 偏移量主题的副本因子,应至少等于集群中的节点数减一,以确保高可用

offsets.topic.replication.factor=3


# 事务状态日志的副本因子,应设置为集群节点数以确保事务的持久性

transaction.state.log.replication.factor=3


# 最小同步副本数,设置为2以确保至少有一个副本同步

transaction.state.log.min.isr=2


# 日志保留时间,24小时

log.retention.hours=24


# 单个日志文件的最大大小

log.segment.bytes=1073741824


# 日志保留检查间隔,单位毫秒

log.retention.check.interval.ms=300000


# Zookeeper集群的连接字符串

zookeeper.connect=10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181


# Zookeeper连接超时时间,单位毫秒

zookeeper.connection.timeout.ms=6000


# 初始重平衡延迟,设置为0可以减少重平衡的延迟

group.initial.rebalance.delay.ms=0


# 请求确认设置,确保数据完整性

request.required.acks=-1


# 默认副本因子,确保高可用性

replication.factor=3


# 最小同步副本数,确保数据不丢失

min.insync.replicas=2


# 禁止不干净的领导者选举

unclean.leader.election.enable=false
EOF

db-53:

tar zxf kafka_2.13-3.2.0.tgz -C /opt/
ln -s /opt/kafka_2.13-3.2.0/ /opt/kafka
mkdir -p /data/kafka/logs

cat >/opt/kafka/config/server.properties<<EOF
tar zxf kafka_2.13-3.2.0.tgz -C /opt/
ln -s /opt/kafka_2.13-3.2.0/ /opt/kafka
mkdir -p /data/kafka/logs

cat > /opt/kafka/config/server.properties << EOF

# 设置每个broker的唯一ID,必须在集群中是唯一的

broker.id=3


# 配置监听地址,确保与服务器IP一致

listeners=PLAINTEXT://10.0.0.53:9092


# 配置网络线程数,根据服务器核心数调整

num.network.threads=3


# 配置I/O线程数,通常设置为处理器的核心数

num.io.threads=8


# Socket发送缓冲区的大小

socket.send.buffer.bytes=102400


# Socket接收缓冲区的大小

socket.receive.buffer.bytes=102400


# 请求的最大字节数,用于防止内存溢出

socket.request.max.bytes=104857600


# 日志文件存储的目录

log.dirs=/data/kafka/logs


# 分区数,每个主题的默认分区数

num.partitions=3


# 每个数据目录的数据恢复线程数

num.recovery.threads.per.data.dir=1


# 偏移量主题的副本因子,应至少等于集群中的节点数减一,以确保高可用

offsets.topic.replication.factor=3


# 事务状态日志的副本因子,应设置为集群节点数以确保事务的持久性

transaction.state.log.replication.factor=3


# 最小同步副本数,设置为2以确保至少有一个副本同步

transaction.state.log.min.isr=2


# 日志保留时间,24小时

log.retention.hours=24


# 单个日志文件的最大大小

log.segment.bytes=1073741824


# 日志保留检查间隔,单位毫秒

log.retention.check.interval.ms=300000


# Zookeeper集群的连接字符串

zookeeper.connect=10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181


# Zookeeper连接超时时间,单位毫秒

zookeeper.connection.timeout.ms=6000


# 初始重平衡延迟,设置为0可以减少重平衡的延迟

group.initial.rebalance.delay.ms=0


# 请求确认设置,确保数据完整性

request.required.acks=-1


# 默认副本因子,确保高可用性

replication.factor=3


# 最小同步副本数,确保数据不丢失

min.insync.replicas=2


# 禁止不干净的领导者选举

unclean.leader.election.enable=false
EOF

启动测试

启动zookeeper

/opt/zookeeper/bin/zkServer.sh start
/opt/zookeeper/bin/zkServer.sh status

前台启动kafka

/opt/kafka/bin/kafka-server-start.sh  /opt/kafka/config/server.properties

后台启动kafka

/opt/kafka/bin/kafka-server-start.sh  -daemon /opt/kafka/config/server.properties

检查

jps

systemd配置

zookeeper

cat > /usr/lib/systemd/system/zookeeper.service << EOF
[Unit]
Description=zookeeper service
After=network.target syslog.target remote-fs.target nss-lookup.target

[Service]
Type=forking
Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/opt/jdk/bin"
ExecStart=/opt/zookeeper/bin/zkServer.sh start
ExecReload=/opt/zookeeper/bin/zkServer.sh restart
ExecStop=/opt/zookeeper/bin/zkServer.sh stop

[Install]
WantedBy=multi-user.target
EOF

systemctl daemon-reload
systemctl start zookeeper.service

kafka

cat > /usr/lib/systemd/system/kafka.service << EOF
[Unit]
Description=kafka service
After=network.target syslog.target remote-fs.target nss-lookup.target

[Service]
Type=simple
Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/opt/jdk/bin"
LimitNOFILE=102400
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-failure

[Install]
WantedBy=multi-user.target
EOF

systemctl daemon-reload
systemctl start kafka.service

更新: 2024-10-07 14:35:54