# 导入Kafka相关的库
from kafka import KafkaProducer, KafkaConsumer
import json
# 创建Kafka生产者
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'], # Kafka服务器地址
value_serializer=lambda x: json.dumps(x).encode('utf-8') # 序列化消息内容为JSON格式
)
# 发送消息到指定主题
producer.send('my-topic', value={'key': 'value'})
# 确保所有消息都已发送
producer.flush()
# 关闭生产者连接
producer.close()
# 创建Kafka消费者
consumer = KafkaConsumer(
'my-topic', # 订阅的主题
bootstrap_servers=['localhost:9092'], # Kafka服务器地址
auto_offset_reset='earliest', # 从最早的消息开始消费
enable_auto_commit=True, # 自动提交偏移量
group_id='my-group', # 消费者组ID
value_deserializer=lambda x: json.loads(x.decode('utf-8')) # 反序列化消息内容为JSON格式
)
# 消费消息
for message in consumer:
print(f"Received message: {message.value}")
# 关闭消费者连接
consumer.close()
json.dumps
对消息进行序列化。如果你需要更详细的配置或功能,请参考Kafka官方文档。
上一篇:python中or和and
下一篇:pythonlist
Laravel PHP 深圳智简公司。版权所有©2023-2043 LaravelPHP 粤ICP备2021048745号-3
Laravel 中文站