文章
kafka topic元数据查询
1、指定单个topic
import bigdata.backend.monitor.utils.KafkaProgressUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
/**
* Kafka 消费进度监控服务 (针对 binlog 项目 - Backend 模块)
* 负责查询不同 Topic 对应的不同消费者组的进度
*/
@Service
public class KafkaMonitorService {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
// 从配置文件获取不同 Topic 对应的消费者组 ID
@Value("${monitor.binlog.consumer-group-id:binlog-monitor-group}")
private String binlogConsumerGroupId;
@Value("${monitor.violation.consumer-group-id:monitor-violation-events-group}")
private String violationConsumerGroupId;
@Value("${monitor.initial-topics:binlog-monitor}")
private String initialTopic;
@Value("${monitor.violation-topic:monitor-violation-events}")
private String violationTopic;
/**
* 查询 binlog 监控 Topic (initial-topics) 的消费进度
* 该 Topic 由 binlog-monitor-group 消费
* @return TopicProgressInfo 对象
*/
public KafkaProgressUtils.TopicProgressInfo getBinlogTopicProgress() {
System.out.println("Querying progress for initial topic: " + initialTopic + " with group: " + binlogConsumerGroupId);
return KafkaProgressUtils.getTopicProgress(bootstrapServers, binlogConsumerGroupId, initialTopic);
}
/**
* 查询违规事件 Topic (violation-topic) 的消费进度
* 该 Topic 由 monitor-violation-events-group 消费
* @return TopicProgressInfo 对象
*/
public KafkaProgressUtils.TopicProgressInfo getViolationTopicProgress() {
System.out.println("Querying progress for violation topic: " + violationTopic + " with group: " + violationConsumerGroupId);
return KafkaProgressUtils.getTopicProgress(bootstrapServers, violationConsumerGroupId, violationTopic);
}
/**
* 查询指定 Topic 的消费进度 (通用方法)
* @param topicName 要查询的 Topic 名称
* @param consumerGroupId 该 Topic 对应的消费者组 ID
* @return TopicProgressInfo 对象
*/
public KafkaProgressUtils.TopicProgressInfo getTopicProgress(String topicName, String consumerGroupId) {
System.out.println("Querying progress for topic: " + topicName + " with group: " + consumerGroupId);
return KafkaProgressUtils.getTopicProgress(bootstrapServers, consumerGroupId, topicName);
}
// Getters for configuration values (optional, if needed elsewhere)
public String getBootstrapServers() {
return bootstrapServers;
}
public String getBinlogConsumerGroupId() {
return binlogConsumerGroupId;
}
public String getViolationConsumerGroupId() {
return violationConsumerGroupId;
}
public String getInitialTopic() {
return initialTopic;
}
public String getViolationTopic() {
return violationTopic;
}
}整体流程:
- 初始化客户端配置:
- 准备
AdminClient的配置(主要是bootstrap.servers)。 - 准备临时
KafkaConsumer的配置,包括bootstrap.servers、key.deserializer、value.deserializer。关键:设置一个临时的、唯一的group.id(如temp-offset-query-group-UUID)以避免干扰实际消费者,并设置enable.auto.commit = false。
- 准备
- 创建客户端实例:
- 根据步骤 1 的配置,创建
AdminClient实例。这个客户端用于查询消费者组的消费进度(committed offset)。 - 根据步骤 1 的配置,创建临时
KafkaConsumer实例。这个客户端用于查询 Topic 的分区信息和最新的 offset(log end offset)。
- 根据步骤 1 的配置,创建
- 获取 Topic 分区信息:
- 使用临时
KafkaConsumer调用listTopics()方法,获取 Kafka 集群中所有 Topic 的信息。 - 从返回的 Topic 列表中,找到指定
topicName对应的分区列表 (PartitionInfo)。
- 使用临时
- 验证 Topic 存在性:
- 检查步骤 3 中获取的分区列表是否为空或
null。 - 如果 Topic 不存在或没有分区,则直接返回一个空的进度信息对象,并结束流程。
- 如果 Topic 存在,则继续下一步。
- 检查步骤 3 中获取的分区列表是否为空或
- 构建 TopicPartition 集合:
- 将步骤 3 获取到的分区列表转换为
Set<TopicPartition>,用于后续查询endOffset。
- 将步骤 3 获取到的分区列表转换为
- 查询消费者组的
committed offset:- 使用
AdminClient调用listConsumerGroupOffsets(consumerGroupId)方法,查询指定消费者组consumerGroupId的committed offset信息。这会返回一个Map<TopicPartition, OffsetAndMetadata>,包含了该组在所有已订阅分区上的提交进度。
- 使用
- 查询 Topic 的
log end offset:- 使用临时
KafkaConsumer调用endOffsets(topicPartitionsSet)方法,查询指定TopicPartition集合的log end offset(即每个分区最新的消息偏移量)。
- 使用临时
- 计算并汇总进度:
- 遍历步骤 3 获取到的 Topic 分区列表。
- 对于每个分区:
- 从步骤 7 的
endOffsets结果中获取该分区的endOffset。 - 从步骤 6 的
committedOffsets结果中获取该分区的committedOffset。 - 如果
committedOffset不存在(例如该分区从未被该组消费过),则将其视为0。 - 计算该分区的延迟
lag = max(0, endOffset - committedOffset)。 - 将该分区的
committedOffset(已消费消息数)、endOffset(总消息数)、lag(延迟数)记录下来。 - 将该分区的进度信息添加到结果对象中。
- 将该分区的
committedOffset累加到totalConsumed,endOffset累加到totalMessages,lag累加到totalLag。
- 从步骤 7 的
- 遍历完成后,将计算出的
totalConsumed,totalMessages,totalLag设置到结果对象的 Topic 级别汇总信息中。
- 处理异常:
- 如果在步骤 2 到 8 的任何环节发生
InterruptedException或ExecutionException,则捕获异常,记录错误日志,并将错误信息设置到结果对象中。
- 如果在步骤 2 到 8 的任何环节发生
- 资源清理:
- 无论流程是否成功,都在
finally块中确保AdminClient和临时KafkaConsumer实例被关闭,释放网络和内存资源。
- 无论流程是否成功,都在
- 返回结果:
- 返回封装了 Topic 进度信息(包括分区详情、汇总数据、可能的错误信息)的
TopicProgressInfo对象。
- 返回封装了 Topic 进度信息(包括分区详情、汇总数据、可能的错误信息)的
2、指定多个kafka topic
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
/**
* 通用 Kafka 消费进度查询工具类 (按需查询版 - 支持多 Topic)
* 每次查询都创建和销毁客户端连接,适用于按需查询场景
*/
public class KafkaProgressUtils {
/**
* 查询指定消费者组对指定多个 Topic 的消费进度信息
*
* @param bootstrapServers Kafka 集群地址,例如 "localhost:9092,localhost:9093"
* @param consumerGroupId 消费者组 ID
* @param topicNames 要查询的 Topic 名称列表
* @return Map,Key 为 Topic 名称,Value 为该 Topic 的进度信息对象 (TopicProgressInfo)
*/
public static Map<String, TopicProgressInfo> getTopicProgress(String bootstrapServers, String consumerGroupId, List<String> topicNames) {
// 1. 创建 AdminClient 用于查询消费者组的 offset
Properties adminProps = new Properties();
adminProps.put("bootstrap.servers", bootstrapServers);
AdminClient adminClient = null;
// 2. 创建临时 KafkaConsumer 用于查询 Topic 的最新 offset
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", bootstrapServers);
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 使用临时消费者组ID查询 offset,避免干扰实际消费者
consumerProps.put("group.id", "temp-offset-query-group-" + UUID.randomUUID().toString());
consumerProps.put("enable.auto.commit", "false");
Consumer<String, String> kafkaConsumer = null;
// 初始化结果 Map
Map<String, TopicProgressInfo> progressMap = new HashMap<>();
if (topicNames == null || topicNames.isEmpty()) {
System.out.println("Topic list is empty, returning empty result map.");
return progressMap;
}
try {
// 初始化 AdminClient
adminClient = AdminClient.create(adminProps);
// 初始化 KafkaConsumer
kafkaConsumer = new KafkaConsumer<>(consumerProps);
// 3. 获取所有 Topic 的分区信息
System.out.println("Fetching topic partitions for topics: " + topicNames);
Map<String, List<PartitionInfo>> topicPartitions = kafkaConsumer.listTopics(Duration.ofSeconds(10));
// 过滤出我们关心的 topics
Map<String, List<PartitionInfo>> targetTopicPartitions = topicPartitions.entrySet().stream()
.filter(entry -> topicNames.contains(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
if (targetTopicPartitions.isEmpty()) {
System.out.println("None of the specified topics were found or have no partitions.");
// 为每个请求的 topic 创建空的进度信息
for (String topic : topicNames) {
progressMap.put(topic, new TopicProgressInfo(topic));
}
return progressMap;
}
// 构建所有需要查询 offset 的 TopicPartition 集合
Set<TopicPartition> allTopicPartitions = new HashSet<>();
for (Map.Entry<String, List<PartitionInfo>> entry : targetTopicPartitions.entrySet()) {
String topic = entry.getKey();
for (PartitionInfo partitionInfo : entry.getValue()) {
allTopicPartitions.add(new TopicPartition(topic, partitionInfo.partition()));
}
}
// 4. 获取消费者组的当前消费 offset (committed offset) for ALL relevant partitions
System.out.println("Fetching committed offsets for group: " + consumerGroupId);
ListConsumerGroupOffsetsResult groupOffsetsResult = adminClient.listConsumerGroupOffsets(consumerGroupId);
Map<TopicPartition, org.apache.kafka.clients.consumer.OffsetAndMetadata> committedOffsets = groupOffsetsResult.partitionsToOffsetAndMetadata().get();
// 5. 获取所有相关分区的最新 offset (log end offset)
System.out.println("Fetching log end offsets for all relevant partitions.");
Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(allTopicPartitions, Duration.ofSeconds(10));
// 6. 计算每个 Topic 的进度并填充结果对象
for (String topicName : topicNames) { // 遍历请求的 topic 列表,确保即使 topic 不存在也能返回空信息
TopicProgressInfo progressInfo = new TopicProgressInfo(topicName);
List<PartitionInfo> partitions = targetTopicPartitions.get(topicName);
if (partitions == null || partitions.isEmpty()) {
System.out.println("Topic '" + topicName + "' not found or has no partitions in the cluster.");
progressMap.put(topicName, progressInfo); // 添加空的进度信息
continue; // 跳过这个 topic 的计算
}
long totalConsumed = 0;
long totalMessages = 0;
long totalLag = 0;
System.out.println("Calculating progress for topic: " + topicName);
for (PartitionInfo partitionInfo : partitions) {
TopicPartition tp = new TopicPartition(topicName, partitionInfo.partition());
long endOffset = endOffsets.getOrDefault(tp, 0L);
// 从查询到的组 offsets 中查找当前分区的 committed offset
org.apache.kafka.clients.consumer.OffsetAndMetadata offsetAndMetadata = committedOffsets.get(tp);
long committedOffset = 0;
if (offsetAndMetadata != null) {
committedOffset = offsetAndMetadata.offset();
} else {
// 如果该分区在消费者组中没有提交过 offset,根据 auto.offset.reset 策略,可能是从头开始 (0) 或从最新 (endOffset)
// 这里我们假设没有提交过,则认为已消费为 0
System.out.println("No committed offset found for partition " + tp + " in group " + consumerGroupId + ". Assuming 0.");
}
long lag = Math.max(0, endOffset - committedOffset); // 计算 lag,确保不为负数
long consumedMessages = committedOffset;
totalConsumed += consumedMessages;
totalMessages += endOffset;
totalLag += lag;
// 添加分区级别的进度信息
progressInfo.addPartitionProgress(tp.partition(), consumedMessages, endOffset, lag);
System.out.println(" Partition " + tp.partition() +
": consumed=" + consumedMessages +
", total=" + endOffset +
", lag=" + lag);
}
// 设置 Topic 级别的汇总信息
progressInfo.setTotalConsumed(totalConsumed);
progressInfo.setTotalMessages(totalMessages);
progressInfo.setTotalLag(totalLag);
System.out.println("Topic '" + topicName + "' - Total: consumed=" + totalConsumed +
", total=" + totalMessages +
", lag=" + totalLag);
// 将单个 Topic 的进度信息放入 Map
progressMap.put(topicName, progressInfo);
}
} catch (InterruptedException e) {
System.err.println("Thread interrupted while getting Kafka consumer progress: " + e.getMessage());
e.printStackTrace();
Thread.currentThread().interrupt(); // 重要:恢复中断状态
// 为每个请求的 topic 创建带有错误信息的进度信息
for (String topic : topicNames) {
TopicProgressInfo errorInfo = new TopicProgressInfo(topic);
errorInfo.setError("Thread interrupted: " + e.getMessage());
progressMap.put(topic, errorInfo);
}
} catch (ExecutionException e) {
System.err.println("Error getting Kafka consumer progress: " + e.getMessage());
e.printStackTrace();
// 为每个请求的 topic 创建带有错误信息的进度信息
for (String topic : topicNames) {
TopicProgressInfo errorInfo = new TopicProgressInfo(topic);
errorInfo.setError("Execution error: " + e.getMessage());
progressMap.put(topic, errorInfo);
}
} finally {
// 7. 确保资源被释放
System.out.println("Closing AdminClient and KafkaConsumer...");
if (adminClient != null) {
try {
adminClient.close();
System.out.println("AdminClient closed.");
} catch (Exception e) {
System.err.println("Error closing AdminClient: " + e.getMessage());
}
}
if (kafkaConsumer != null) {
try {
kafkaConsumer.close();
System.out.println("KafkaConsumer closed.");
} catch (Exception e) {
System.err.println("Error closing KafkaConsumer: " + e.getMessage());
}
}
}
return progressMap;
}
/**
* 查询单个 Topic 的便捷方法 (保持向后兼容)
* @param bootstrapServers Kafka 集群地址
* @param consumerGroupId 消费者组 ID
* @param topicName 要查询的 Topic 名称
* @return TopicProgressInfo 对象
*/
public static TopicProgressInfo getTopicProgress(String bootstrapServers, String consumerGroupId, String topicName) {
Map<String, TopicProgressInfo> map = getTopicProgress(bootstrapServers, consumerGroupId, Arrays.asList(topicName));
return map.get(topicName); // 返回单个 Topic 的结果
}
/**
* 用于封装单个 Topic 的进度信息
*/
public static class TopicProgressInfo {
private final String topicName;
private long totalConsumed = 0;
private long totalMessages = 0;
private long totalLag = 0;
private final List<PartitionProgress> partitionProgressList = new ArrayList<>();
private String error = null; // 存储查询过程中可能发生的错误
public TopicProgressInfo(String topicName) {
this.topicName = topicName;
}
public void addPartitionProgress(int partition, long consumedMessages, long totalMessages, long lag) {
partitionProgressList.add(new PartitionProgress(partition, consumedMessages, totalMessages, lag));
}
// Getters
public String getTopicName() { return topicName; }
public long getTotalConsumed() { return totalConsumed; }
public void setTotalConsumed(long totalConsumed) { this.totalConsumed = totalConsumed; }
public long getTotalMessages() { return totalMessages; }
public void setTotalMessages(long totalMessages) { this.totalMessages = totalMessages; }
public long getTotalLag() { return totalLag; }
public void setTotalLag(long totalLag) { this.totalLag = totalLag; }
public List<PartitionProgress> getPartitionProgressList() { return partitionProgressList; }
public String getError() { return error; }
public void setError(String error) { this.error = error; }
@Override
public String toString() {
if (error != null) {
return "TopicProgressInfo{topicName='" + topicName + "', error='" + error + "'}";
}
return "TopicProgressInfo{" +
"topicName='" + topicName + '\'' +
", totalConsumed=" + totalConsumed +
", totalMessages=" + totalMessages +
", totalLag=" + totalLag +
", partitionProgressList=" + partitionProgressList +
'}';
}
/**
* 用于封装单个 Topic 分区的进度信息
*/
public static class PartitionProgress {
private final int partition;
private final long consumedMessages; // 已消费的消息数
private final long totalMessages; // 总消息数
private final long lag; // 未消费的消息数 (lag)
public PartitionProgress(int partition, long consumedMessages, long totalMessages, long lag) {
this.partition = partition;
this.consumedMessages = consumedMessages;
this.totalMessages = totalMessages;
this.lag = lag;
}
// Getters
public int getPartition() { return partition; }
public long getConsumedMessages() { return consumedMessages; }
public long getTotalMessages() { return totalMessages; }
public long getLag() { return lag; }
@Override
public String toString() {
return "PartitionProgress{" +
"partition=" + partition +
", consumedMessages=" + consumedMessages +
", totalMessages=" + totalMessages +
", lag=" + lag +
'}';
}
}
}
}3、service示例
import bigdata.backend.monitor.utils.KafkaProgressUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
/**
* Kafka 消费进度监控服务 (针对 binlog 项目 - Backend 模块)
* 负责查询不同 Topic 对应的不同消费者组的进度
*/
@Service
public class KafkaMonitorService {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
// 从配置文件获取不同 Topic 对应的消费者组 ID
@Value("${monitor.binlog.consumer-group-id:binlog-monitor-group}")
private String binlogConsumerGroupId;
@Value("${monitor.violation.consumer-group-id:monitor-violation-events-group}")
private String violationConsumerGroupId;
@Value("${monitor.initial-topics:binlog-monitor}")
private String initialTopic;
@Value("${monitor.violation-topic:monitor-violation-events}")
private String violationTopic;
/**
* 查询 binlog 监控 Topic (initial-topics) 的消费进度
* 该 Topic 由 binlog-monitor-group 消费
* @return TopicProgressInfo 对象
*/
public KafkaProgressUtils.TopicProgressInfo getBinlogTopicProgress() {
System.out.println("Querying progress for initial topic: " + initialTopic + " with group: " + binlogConsumerGroupId);
return KafkaProgressUtils.getTopicProgress(bootstrapServers, binlogConsumerGroupId, initialTopic);
}
/**
* 查询违规事件 Topic (violation-topic) 的消费进度
* 该 Topic 由 monitor-violation-events-group 消费
* @return TopicProgressInfo 对象
*/
public KafkaProgressUtils.TopicProgressInfo getViolationTopicProgress() {
System.out.println("Querying progress for violation topic: " + violationTopic + " with group: " + violationConsumerGroupId);
return KafkaProgressUtils.getTopicProgress(bootstrapServers, violationConsumerGroupId, violationTopic);
}
/**
* 查询指定 Topic 的消费进度 (通用方法)
* @param topicName 要查询的 Topic 名称
* @param consumerGroupId 该 Topic 对应的消费者组 ID
* @return TopicProgressInfo 对象
*/
public KafkaProgressUtils.TopicProgressInfo getTopicProgress(String topicName, String consumerGroupId) {
System.out.println("Querying progress for topic: " + topicName + " with group: " + consumerGroupId);
return KafkaProgressUtils.getTopicProgress(bootstrapServers, consumerGroupId, topicName);
}
// Getters for configuration values (optional, if needed elsewhere)
public String getBootstrapServers() {
return bootstrapServers;
}
public String getBinlogConsumerGroupId() {
return binlogConsumerGroupId;
}
public String getViolationConsumerGroupId() {
return violationConsumerGroupId;
}
public String getInitialTopic() {
return initialTopic;
}
public String getViolationTopic() {
return violationTopic;
}
}