本机IP
是10.30.6.24
,后面配置过程当中需要依据自己IP
信息配置修改
kafka
默认使用127.0.0.1
访问
配置compose.yaml
文件如下
yaml
services:
zookeeper:
image: zookeeper
container_name: demo-zookeeper
ports:
- "2181:2181"
restart: always
kafka:
image: wurstmeister/kafka
container_name: demo-kafka
ports:
- "9092:9092"
ulimits:
nofile:
soft: 262144
hard: 262144
environment:
DOCKER_API_VERSION: 1.41
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://10.30.6.24:9092"
KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9092"
KAFKA_BROKER_ID: 1
KAFKA_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LOG_DIRS: /kafka/kafka-logs-backend
depends_on:
- zookeeper
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- kafka-data:/kafka
command: /bin/sh -c "rm -f /kafka/kafka-logs-backend/meta.properties && start-kafka.sh"
volumes:
kafka-data: {}
services:
zookeeper:
image: zookeeper
container_name: demo-zookeeper
ports:
- "2181:2181"
restart: always
kafka:
image: wurstmeister/kafka
container_name: demo-kafka
ports:
- "9092:9092"
ulimits:
nofile:
soft: 262144
hard: 262144
environment:
DOCKER_API_VERSION: 1.41
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://10.30.6.24:9092"
KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9092"
KAFKA_BROKER_ID: 1
KAFKA_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LOG_DIRS: /kafka/kafka-logs-backend
depends_on:
- zookeeper
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- kafka-data:/kafka
command: /bin/sh -c "rm -f /kafka/kafka-logs-backend/meta.properties && start-kafka.sh"
volumes:
kafka-data: {}
启动命令
shell
$ docker compose up -d
$ docker compose up -d
配置参数解释
ulimits
- 操作系统提供限制可使用资源量的方式
linux
系统默认是1024个,具体执行命令ulimit -a
查看- 由于消息队列文件读写频繁,需要调大该配置,修改
kafka
的默认最大打开文件数量 - 限制可以是硬限制或软限制,但软限制不能超过硬限制
环境变量解释
DOCKER_API_VERSION
:docker version
命令的API Version
输出信息KAFKA_ADVERTISED_LISTENERS
: 把kafka
的地址端口注册给zookeeper
,这个地方的数据目前是PLAINTEXT://10.30.6.24:9092
,这个IP
地址需要依据具体机器IP
进行修改,指明客户端通过哪个IP
可以访问到当前节点,如果网卡IP
有修改的话也需要修改这个地方的配置KAFKA_LISTENERS
: 配置kafka
的监听端口,指明kafka
当前节点监听本机的哪个网卡,这个地方的IP
地址可以填写为0.0.0.0
表示监听所有网卡的信息KAFKA_BROKER_ID
: 一个kafka
节点 就是一个broker
,一个集群里面的broker id
唯一KAFKA_PORT
: 配置kafka
开放端口KAFKA_ZOOKEEPER_CONNECT
: 配置对应的zookeeper
连接信息,因为是在同一个docker compose
当中,所以可以使用服务名称作为host
连接信息KAFKA_LOG_DIRS
: 保存日志数据的目录,默认是/tmp/kafka-logs
挂载卷解释
- /var/run/docker.sock:/var/run/docker.sock
: 把docker
的sock
挂在进去
- kafka-data:/kafka
: 把kafka
日志信息挂载出来进行持久化,如果不需要进行数据持久化,可以去掉这一步挂载
启动命令
/bin/sh -c "rm -f /kafka/kafka-logs-backend/meta.properties && start-kafka.sh"
由于挂载数据的时候会把kafka
的配置信息也挂载出来,并且保存在meta.properties
文件当中
文件内容如下,会保存一个cluster.id
,当容器销毁重建时候,kafka
会重新创建一个cluster.id
,同时也会去检查meta.properties
的信息
#
#Mon Jun 27 06:38:03 GMT 2022
cluster.id=XMHTDGRvQ5yJnEfXKhuabg
version=0
broker.id=1
#
#Mon Jun 27 06:38:03 GMT 2022
cluster.id=XMHTDGRvQ5yJnEfXKhuabg
version=0
broker.id=1
当容器启动中会产生报错如下,主要是kafka
检查cluster.id
不一致导致
kafka.common.InconsistentClusterIdException: The Cluster ID 2Z7pfznDRmWeLJNp3nZm8A doesn't match stored clusterId Some(XMHTDGRvQ5yJnEfXKhuabg) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
kafka.common.InconsistentClusterIdException: The Cluster ID 2Z7pfznDRmWeLJNp3nZm8A doesn't match stored clusterId Some(XMHTDGRvQ5yJnEfXKhuabg) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
所以需要配置在kafka
启动之前删除持久化保存的meta.properties
配置信息,这一步不影响持久化数据,主要是避免冲突报错
python
客户端操作
安装依赖库
$ pip install kafka-python
$ pip install kafka-python
生产者
producer.py
python
import json
from kafka import KafkaProducer
# 配置value序列化方法,选择kafka节点信息
# 如果是远程broker需要把127.0.0.1修改为对应IP地址
producer = KafkaProducer(
value_serializer=lambda m: json.dumps(m).encode('ascii'),
bootstrap_servers=['10.30.6.24:9092'])
# 发送操作默认是异步的
for _ in range(100):
producer.send('my-topic', {'key': 'value'})
# 阻塞直到操作实际send
producer.flush()
import json
from kafka import KafkaProducer
# 配置value序列化方法,选择kafka节点信息
# 如果是远程broker需要把127.0.0.1修改为对应IP地址
producer = KafkaProducer(
value_serializer=lambda m: json.dumps(m).encode('ascii'),
bootstrap_servers=['10.30.6.24:9092'])
# 发送操作默认是异步的
for _ in range(100):
producer.send('my-topic', {'key': 'value'})
# 阻塞直到操作实际send
producer.flush()
消费者
consumer.py
python
import json
from kafka import KafkaConsumer
# consumer配置,topic信息和生产者相同
consumer = KafkaConsumer('my-topic',
group_id='my-group',
auto_offset_reset='earliest',
bootstrap_servers=['10.30.6.24:9092'],
value_deserializer=lambda m: json.loads(m.decode('ascii')))
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
import json
from kafka import KafkaConsumer
# consumer配置,topic信息和生产者相同
consumer = KafkaConsumer('my-topic',
group_id='my-group',
auto_offset_reset='earliest',
bootstrap_servers=['10.30.6.24:9092'],
value_deserializer=lambda m: json.loads(m.decode('ascii')))
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
auto_offset_reset
可选参数如下
earliest
: 当各分区下有已提交的offset
时,从提交的offset
开始消费,无提交的offset
时,从头开始消费latest
: 当各分区下有已提交的offset
时,从提交的offset
开始消费,无提交的offset
时,消费新产生的该分区下的数据(默认选项是这个)none
:topic
各分区都存在已提交的offset
时,从offset
后开始消费,只要有一个分区不存在已提交的offset
,则抛出异常(不要使用这个)
开启两个终端分别执行
$ python producer.py
$ python producer.py
$ python consumer.py
$ python consumer.py
拓展
如果python
客户端也是在容器里面,可以修改compose.yaml
的kafka
容器的环境变量配置
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092"
python
消费者和生产者可以使用kafka:9092
访问broker
如果程序是在容器外面,也可以配置修改/etc/hosts
新增一行数据
127.0.0.1 kafka
127.0.0.1 kafka
最终实现依据kafka
这个host
参数进行访问