本机IP是10.30.6.24,后面配置过程当中需要依据自己IP信息配置修改
kafka默认使用127.0.0.1访问
配置compose.yaml文件如下
yaml
services:
zookeeper:
image: zookeeper
container_name: demo-zookeeper
ports:
- "2181:2181"
restart: always
kafka:
image: wurstmeister/kafka
container_name: demo-kafka
ports:
- "9092:9092"
ulimits:
nofile:
soft: 262144
hard: 262144
environment:
DOCKER_API_VERSION: 1.41
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://10.30.6.24:9092"
KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9092"
KAFKA_BROKER_ID: 1
KAFKA_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LOG_DIRS: /kafka/kafka-logs-backend
depends_on:
- zookeeper
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- kafka-data:/kafka
command: /bin/sh -c "rm -f /kafka/kafka-logs-backend/meta.properties && start-kafka.sh"
volumes:
kafka-data: {}services:
zookeeper:
image: zookeeper
container_name: demo-zookeeper
ports:
- "2181:2181"
restart: always
kafka:
image: wurstmeister/kafka
container_name: demo-kafka
ports:
- "9092:9092"
ulimits:
nofile:
soft: 262144
hard: 262144
environment:
DOCKER_API_VERSION: 1.41
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://10.30.6.24:9092"
KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9092"
KAFKA_BROKER_ID: 1
KAFKA_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LOG_DIRS: /kafka/kafka-logs-backend
depends_on:
- zookeeper
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- kafka-data:/kafka
command: /bin/sh -c "rm -f /kafka/kafka-logs-backend/meta.properties && start-kafka.sh"
volumes:
kafka-data: {}启动命令
shell
$ docker compose up -d$ docker compose up -d配置参数解释
ulimits
- 操作系统提供限制可使用资源量的方式
linux系统默认是1024个,具体执行命令ulimit -a查看- 由于消息队列文件读写频繁,需要调大该配置,修改
kafka的默认最大打开文件数量 - 限制可以是硬限制或软限制,但软限制不能超过硬限制
环境变量解释
DOCKER_API_VERSION:docker version命令的API Version输出信息KAFKA_ADVERTISED_LISTENERS: 把kafka的地址端口注册给zookeeper,这个地方的数据目前是PLAINTEXT://10.30.6.24:9092,这个IP地址需要依据具体机器IP进行修改,指明客户端通过哪个IP可以访问到当前节点,如果网卡IP有修改的话也需要修改这个地方的配置KAFKA_LISTENERS: 配置kafka的监听端口,指明kafka当前节点监听本机的哪个网卡,这个地方的IP地址可以填写为0.0.0.0表示监听所有网卡的信息KAFKA_BROKER_ID: 一个kafka节点 就是一个broker,一个集群里面的broker id唯一KAFKA_PORT: 配置kafka开放端口KAFKA_ZOOKEEPER_CONNECT: 配置对应的zookeeper连接信息,因为是在同一个docker compose当中,所以可以使用服务名称作为host连接信息KAFKA_LOG_DIRS: 保存日志数据的目录,默认是/tmp/kafka-logs
挂载卷解释
- /var/run/docker.sock:/var/run/docker.sock: 把docker的sock挂在进去
- kafka-data:/kafka: 把kafka日志信息挂载出来进行持久化,如果不需要进行数据持久化,可以去掉这一步挂载
启动命令
/bin/sh -c "rm -f /kafka/kafka-logs-backend/meta.properties && start-kafka.sh"
由于挂载数据的时候会把kafka的配置信息也挂载出来,并且保存在meta.properties文件当中
文件内容如下,会保存一个cluster.id,当容器销毁重建时候,kafka会重新创建一个cluster.id,同时也会去检查meta.properties的信息
#
#Mon Jun 27 06:38:03 GMT 2022
cluster.id=XMHTDGRvQ5yJnEfXKhuabg
version=0
broker.id=1#
#Mon Jun 27 06:38:03 GMT 2022
cluster.id=XMHTDGRvQ5yJnEfXKhuabg
version=0
broker.id=1当容器启动中会产生报错如下,主要是kafka检查cluster.id不一致导致
kafka.common.InconsistentClusterIdException: The Cluster ID 2Z7pfznDRmWeLJNp3nZm8A doesn't match stored clusterId Some(XMHTDGRvQ5yJnEfXKhuabg) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.kafka.common.InconsistentClusterIdException: The Cluster ID 2Z7pfznDRmWeLJNp3nZm8A doesn't match stored clusterId Some(XMHTDGRvQ5yJnEfXKhuabg) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.所以需要配置在kafka启动之前删除持久化保存的meta.properties配置信息,这一步不影响持久化数据,主要是避免冲突报错
python客户端操作
安装依赖库
$ pip install kafka-python$ pip install kafka-python生产者
producer.py
python
import json
from kafka import KafkaProducer
# 配置value序列化方法,选择kafka节点信息
# 如果是远程broker需要把127.0.0.1修改为对应IP地址
producer = KafkaProducer(
value_serializer=lambda m: json.dumps(m).encode('ascii'),
bootstrap_servers=['10.30.6.24:9092'])
# 发送操作默认是异步的
for _ in range(100):
producer.send('my-topic', {'key': 'value'})
# 阻塞直到操作实际send
producer.flush()import json
from kafka import KafkaProducer
# 配置value序列化方法,选择kafka节点信息
# 如果是远程broker需要把127.0.0.1修改为对应IP地址
producer = KafkaProducer(
value_serializer=lambda m: json.dumps(m).encode('ascii'),
bootstrap_servers=['10.30.6.24:9092'])
# 发送操作默认是异步的
for _ in range(100):
producer.send('my-topic', {'key': 'value'})
# 阻塞直到操作实际send
producer.flush()消费者
consumer.py
python
import json
from kafka import KafkaConsumer
# consumer配置,topic信息和生产者相同
consumer = KafkaConsumer('my-topic',
group_id='my-group',
auto_offset_reset='earliest',
bootstrap_servers=['10.30.6.24:9092'],
value_deserializer=lambda m: json.loads(m.decode('ascii')))
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))import json
from kafka import KafkaConsumer
# consumer配置,topic信息和生产者相同
consumer = KafkaConsumer('my-topic',
group_id='my-group',
auto_offset_reset='earliest',
bootstrap_servers=['10.30.6.24:9092'],
value_deserializer=lambda m: json.loads(m.decode('ascii')))
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))auto_offset_reset可选参数如下
earliest: 当各分区下有已提交的offset时,从提交的offset开始消费,无提交的offset时,从头开始消费latest: 当各分区下有已提交的offset时,从提交的offset开始消费,无提交的offset时,消费新产生的该分区下的数据(默认选项是这个)none:topic各分区都存在已提交的offset时,从offset后开始消费,只要有一个分区不存在已提交的offset,则抛出异常(不要使用这个)
开启两个终端分别执行
$ python producer.py$ python producer.py$ python consumer.py$ python consumer.py拓展
如果python客户端也是在容器里面,可以修改compose.yaml的kafka容器的环境变量配置
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092"KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092"python消费者和生产者可以使用kafka:9092访问broker
如果程序是在容器外面,也可以配置修改/etc/hosts新增一行数据
127.0.0.1 kafka127.0.0.1 kafka最终实现依据kafka这个host参数进行访问