Laravel  
laravel
文档
数据库
架构
入门
php技术
    
Laravelphp
laravel / php / java / vue / mysql / linux / python / javascript / html / css / c++ / c#

java kafka消费者代码

作者:屰兲洏垳&   发布日期:2025-12-29   浏览:103

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {

    public static void main(String[] args) {
        // 配置Kafka消费者属性
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        // 创建Kafka消费者实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 订阅主题
        consumer.subscribe(Collections.singletonList("test-topic"));

        // 持续消费消息
        try {
            while (true) {
                // 拉取消息,超时时间为100毫秒
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

                // 处理拉取到的消息
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
            }
        } finally {
            // 关闭消费者
            consumer.close();
        }
    }
}

解释说明:

  1. 配置Kafka消费者属性:通过Properties对象设置Kafka消费者的配置参数,如bootstrap.servers(Kafka集群地址)、group.id(消费者组ID)、key.deserializervalue.deserializer(键和值的反序列化器)。
  2. 创建Kafka消费者实例:使用配置好的属性创建一个KafkaConsumer实例。
  3. 订阅主题:通过subscribe方法指定要订阅的主题列表。
  4. 持续消费消息:在一个无限循环中调用poll方法拉取消息,poll方法会阻塞直到有数据可读或超时。每次拉取到的消息会被遍历并处理。
  5. 关闭消费者:在程序结束时确保调用close方法来释放资源。

上一篇:java 取交集

下一篇:java匿名类

大家都在看

java url decode

java判断是windows还是linux

java连接数据库的代码

java date类型比较大小

java djl

ubuntu 卸载java

es java api

java list 查找

java 解压rar

java读取excel中的图片

Laravel PHP 深圳智简公司。版权所有©2023-2043 LaravelPHP 粤ICP备2021048745号-3

Laravel 中文站