# RocketMQ Python 示例代码
from rocketmq.client import PushConsumer, ConsumeStatus
# 初始化消费者
consumer = PushConsumer('consumer_group_name')
consumer.set_namesrv_addr('localhost:9876') # 设置 NameServer 地址
# 订阅主题
consumer.subscribe('TopicTest', '*')
# 定义消息处理函数
def callback(msg):
print(f"Receive message: {msg.body.decode('utf-8')}")
return ConsumeStatus.CONSUME_SUCCESS
# 注册消息处理函数
consumer.register_message_callback(callback)
# 启动消费者
consumer.start()
try:
while True:
pass
except KeyboardInterrupt:
consumer.shutdown()
PushConsumer 实例,并设置其所属的消费者组名称。set_namesrv_addr 方法指定 RocketMQ 的 NameServer 地址。subscribe 方法订阅一个或多个主题,这里订阅了名为 TopicTest 的主题。callback,该函数会在接收到消息时被调用。在这个例子中,它会打印出消息内容并返回消费成功状态。register_message_callback 方法将回调函数注册到消费者。start 方法启动消费者,开始监听消息。try-except 块捕获 KeyboardInterrupt 异常,以便在程序退出时调用 shutdown 方法优雅地关闭消费者。这段代码展示了如何使用 Python 客户端库与 RocketMQ 进行交互,实现消息的接收和处理。
上一篇:python分词
下一篇:python文件运行
Laravel PHP 深圳智简公司。版权所有©2023-2043 LaravelPHP 粤ICP备2021048745号-3
Laravel 中文站