Laravel  
laravel
文档
数据库
架构
入门
php技术
    
Laravelphp
laravel / php / java / vue / mysql / linux / python / javascript / html / css / c++ / c#

kafka python

作者:冫LOVE灬丶棒棒   发布日期:2025-02-17   浏览:100

# 导入Kafka相关的库
from kafka import KafkaProducer, KafkaConsumer
import json

# 创建Kafka生产者
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],  # Kafka服务器地址
    value_serializer=lambda x: json.dumps(x).encode('utf-8')  # 序列化消息内容为JSON格式
)

# 发送消息到指定主题
producer.send('my-topic', value={'key': 'value'})

# 确保所有消息都已发送
producer.flush()

# 关闭生产者连接
producer.close()

# 创建Kafka消费者
consumer = KafkaConsumer(
    'my-topic',  # 订阅的主题
    bootstrap_servers=['localhost:9092'],  # Kafka服务器地址
    auto_offset_reset='earliest',  # 从最早的消息开始消费
    enable_auto_commit=True,  # 自动提交偏移量
    group_id='my-group',  # 消费者组ID
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))  # 反序列化消息内容为JSON格式
)

# 消费消息
for message in consumer:
    print(f"Received message: {message.value}")

# 关闭消费者连接
consumer.close()

解释说明:

  1. KafkaProducer:用于向Kafka集群发送消息。我们指定了Kafka服务器的地址,并使用json.dumps对消息进行序列化。
  2. send():将消息发送到指定的主题(topic)。这里发送的是一个字典,键值对形式的消息。
  3. flush():确保所有消息都已发送完毕。
  4. close():关闭生产者的连接,释放资源。
  5. KafkaConsumer:用于从Kafka集群中消费消息。我们订阅了特定的主题,并指定了如何处理偏移量和反序列化消息。
  6. for message in consumer:循环读取消息并打印出来。
  7. close():关闭消费者的连接,释放资源。

如果你需要更详细的配置或功能,请参考Kafka官方文档

上一篇:python中or和and

下一篇:pythonlist

大家都在看

python时间格式

python ord和chr

python中的yield

python list.pop

python的for i in range

npm config set python

python代码简单

python读取文件夹

python中turtle

python 输出时间

Laravel PHP 深圳智简公司。版权所有©2023-2043 LaravelPHP 粤ICP备2021048745号-3

Laravel 中文站