import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
// 配置Kafka消费者属性
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 创建Kafka消费者实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Collections.singletonList("test-topic"));
// 持续消费消息
try {
while (true) {
// 拉取消息,超时时间为100毫秒
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 处理拉取到的消息
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
} finally {
// 关闭消费者
consumer.close();
}
}
}
Properties对象设置Kafka消费者的配置参数,如bootstrap.servers(Kafka集群地址)、group.id(消费者组ID)、key.deserializer和value.deserializer(键和值的反序列化器)。KafkaConsumer实例。subscribe方法指定要订阅的主题列表。poll方法拉取消息,poll方法会阻塞直到有数据可读或超时。每次拉取到的消息会被遍历并处理。close方法来释放资源。上一篇:java 取交集
下一篇:java匿名类
Laravel PHP 深圳智简公司。版权所有©2023-2043 LaravelPHP 粤ICP备2021048745号-3
Laravel 中文站