全栈开发

kafka topic元数据查询

1、指定单个topic

import bigdata.backend.monitor.utils.KafkaProgressUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

/**
 * Kafka 消费进度监控服务 (针对 binlog 项目 - Backend 模块)
 * 负责查询不同 Topic 对应的不同消费者组的进度
 */
@Service
public class KafkaMonitorService {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    // 从配置文件获取不同 Topic 对应的消费者组 ID
    @Value("${monitor.binlog.consumer-group-id:binlog-monitor-group}")
    private String binlogConsumerGroupId;

    @Value("${monitor.violation.consumer-group-id:monitor-violation-events-group}")
    private String violationConsumerGroupId;

    @Value("${monitor.initial-topics:binlog-monitor}")
    private String initialTopic;

    @Value("${monitor.violation-topic:monitor-violation-events}")
    private String violationTopic;

    /**
     * 查询 binlog 监控 Topic (initial-topics) 的消费进度
     * 该 Topic 由 binlog-monitor-group 消费
     * @return TopicProgressInfo 对象
     */
    public KafkaProgressUtils.TopicProgressInfo getBinlogTopicProgress() {
        System.out.println("Querying progress for initial topic: " + initialTopic + " with group: " + binlogConsumerGroupId);
        return KafkaProgressUtils.getTopicProgress(bootstrapServers, binlogConsumerGroupId, initialTopic);
    }

    /**
     * 查询违规事件 Topic (violation-topic) 的消费进度
     * 该 Topic 由 monitor-violation-events-group 消费
     * @return TopicProgressInfo 对象
     */
    public KafkaProgressUtils.TopicProgressInfo getViolationTopicProgress() {
        System.out.println("Querying progress for violation topic: " + violationTopic + " with group: " + violationConsumerGroupId);
        return KafkaProgressUtils.getTopicProgress(bootstrapServers, violationConsumerGroupId, violationTopic);
    }

    /**
     * 查询指定 Topic 的消费进度 (通用方法)
     * @param topicName 要查询的 Topic 名称
     * @param consumerGroupId 该 Topic 对应的消费者组 ID
     * @return TopicProgressInfo 对象
     */
    public KafkaProgressUtils.TopicProgressInfo getTopicProgress(String topicName, String consumerGroupId) {
        System.out.println("Querying progress for topic: " + topicName + " with group: " + consumerGroupId);
        return KafkaProgressUtils.getTopicProgress(bootstrapServers, consumerGroupId, topicName);
    }

    // Getters for configuration values (optional, if needed elsewhere)
    public String getBootstrapServers() {
        return bootstrapServers;
    }

    public String getBinlogConsumerGroupId() {
        return binlogConsumerGroupId;
    }

    public String getViolationConsumerGroupId() {
        return violationConsumerGroupId;
    }

    public String getInitialTopic() {
        return initialTopic;
    }

    public String getViolationTopic() {
        return violationTopic;
    }
}

整体流程:

  1. 初始化客户端配置
    • 准备 AdminClient 的配置(主要是 bootstrap.servers)。
    • 准备临时 KafkaConsumer 的配置,包括 bootstrap.serverskey.deserializervalue.deserializer关键:设置一个临时的、唯一的 group.id(如 temp-offset-query-group-UUID)以避免干扰实际消费者,并设置 enable.auto.commit = false
  2. 创建客户端实例
    • 根据步骤 1 的配置,创建 AdminClient 实例。这个客户端用于查询消费者组的消费进度(committed offset)。
    • 根据步骤 1 的配置,创建临时 KafkaConsumer 实例。这个客户端用于查询 Topic 的分区信息和最新的 offset(log end offset)。
  3. 获取 Topic 分区信息
    • 使用临时 KafkaConsumer 调用 listTopics() 方法,获取 Kafka 集群中所有 Topic 的信息。
    • 从返回的 Topic 列表中,找到指定 topicName 对应的分区列表 (PartitionInfo)。
  4. 验证 Topic 存在性
    • 检查步骤 3 中获取的分区列表是否为空或 null
    • 如果 Topic 不存在或没有分区,则直接返回一个空的进度信息对象,并结束流程。
    • 如果 Topic 存在,则继续下一步。
  5. 构建 TopicPartition 集合
    • 将步骤 3 获取到的分区列表转换为 Set<TopicPartition>,用于后续查询 endOffset
  6. 查询消费者组的 committed offset
    • 使用 AdminClient 调用 listConsumerGroupOffsets(consumerGroupId) 方法,查询指定消费者组 consumerGroupIdcommitted offset 信息。这会返回一个 Map<TopicPartition, OffsetAndMetadata>,包含了该组在所有已订阅分区上的提交进度。
  7. 查询 Topic 的 log end offset
    • 使用临时 KafkaConsumer 调用 endOffsets(topicPartitionsSet) 方法,查询指定 TopicPartition 集合的 log end offset(即每个分区最新的消息偏移量)。
  8. 计算并汇总进度
    • 遍历步骤 3 获取到的 Topic 分区列表。
    • 对于每个分区:
      • 从步骤 7 的 endOffsets 结果中获取该分区的 endOffset
      • 从步骤 6 的 committedOffsets 结果中获取该分区的 committedOffset
      • 如果 committedOffset 不存在(例如该分区从未被该组消费过),则将其视为 0
      • 计算该分区的延迟 lag = max(0, endOffset - committedOffset)
      • 将该分区的 committedOffset(已消费消息数)、endOffset(总消息数)、lag(延迟数)记录下来。
      • 将该分区的进度信息添加到结果对象中。
      • 将该分区的 committedOffset 累加到 totalConsumedendOffset 累加到 totalMessageslag 累加到 totalLag
    • 遍历完成后,将计算出的 totalConsumed, totalMessages, totalLag 设置到结果对象的 Topic 级别汇总信息中。
  9. 处理异常
    • 如果在步骤 2 到 8 的任何环节发生 InterruptedExceptionExecutionException,则捕获异常,记录错误日志,并将错误信息设置到结果对象中。
  10. 资源清理
    • 无论流程是否成功,都在 finally 块中确保 AdminClient 和临时 KafkaConsumer 实例被关闭,释放网络和内存资源。
  11. 返回结果
    • 返回封装了 Topic 进度信息(包括分区详情、汇总数据、可能的错误信息)的 TopicProgressInfo 对象。

2、指定多个kafka topic

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

/**
 * 通用 Kafka 消费进度查询工具类 (按需查询版 - 支持多 Topic)
 * 每次查询都创建和销毁客户端连接,适用于按需查询场景
 */
public class KafkaProgressUtils {

    /**
     * 查询指定消费者组对指定多个 Topic 的消费进度信息
     *
     * @param bootstrapServers Kafka 集群地址,例如 "localhost:9092,localhost:9093"
     * @param consumerGroupId  消费者组 ID
     * @param topicNames       要查询的 Topic 名称列表
     * @return Map,Key 为 Topic 名称,Value 为该 Topic 的进度信息对象 (TopicProgressInfo)
     */
    public static Map<String, TopicProgressInfo> getTopicProgress(String bootstrapServers, String consumerGroupId, List<String> topicNames) {
        // 1. 创建 AdminClient 用于查询消费者组的 offset
        Properties adminProps = new Properties();
        adminProps.put("bootstrap.servers", bootstrapServers);
        AdminClient adminClient = null;

        // 2. 创建临时 KafkaConsumer 用于查询 Topic 的最新 offset
        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers", bootstrapServers);
        consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 使用临时消费者组ID查询 offset,避免干扰实际消费者
        consumerProps.put("group.id", "temp-offset-query-group-" + UUID.randomUUID().toString());
        consumerProps.put("enable.auto.commit", "false");
        Consumer<String, String> kafkaConsumer = null;

        // 初始化结果 Map
        Map<String, TopicProgressInfo> progressMap = new HashMap<>();
        if (topicNames == null || topicNames.isEmpty()) {
            System.out.println("Topic list is empty, returning empty result map.");
            return progressMap;
        }

        try {
            // 初始化 AdminClient
            adminClient = AdminClient.create(adminProps);
            // 初始化 KafkaConsumer
            kafkaConsumer = new KafkaConsumer<>(consumerProps);

            // 3. 获取所有 Topic 的分区信息
            System.out.println("Fetching topic partitions for topics: " + topicNames);
            Map<String, List<PartitionInfo>> topicPartitions = kafkaConsumer.listTopics(Duration.ofSeconds(10));
            // 过滤出我们关心的 topics
            Map<String, List<PartitionInfo>> targetTopicPartitions = topicPartitions.entrySet().stream()
                    .filter(entry -> topicNames.contains(entry.getKey()))
                    .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

            if (targetTopicPartitions.isEmpty()) {
                System.out.println("None of the specified topics were found or have no partitions.");
                // 为每个请求的 topic 创建空的进度信息
                for (String topic : topicNames) {
                    progressMap.put(topic, new TopicProgressInfo(topic));
                }
                return progressMap;
            }

            // 构建所有需要查询 offset 的 TopicPartition 集合
            Set<TopicPartition> allTopicPartitions = new HashSet<>();
            for (Map.Entry<String, List<PartitionInfo>> entry : targetTopicPartitions.entrySet()) {
                String topic = entry.getKey();
                for (PartitionInfo partitionInfo : entry.getValue()) {
                    allTopicPartitions.add(new TopicPartition(topic, partitionInfo.partition()));
                }
            }

            // 4. 获取消费者组的当前消费 offset (committed offset) for ALL relevant partitions
            System.out.println("Fetching committed offsets for group: " + consumerGroupId);
            ListConsumerGroupOffsetsResult groupOffsetsResult = adminClient.listConsumerGroupOffsets(consumerGroupId);
            Map<TopicPartition, org.apache.kafka.clients.consumer.OffsetAndMetadata> committedOffsets = groupOffsetsResult.partitionsToOffsetAndMetadata().get();

            // 5. 获取所有相关分区的最新 offset (log end offset)
            System.out.println("Fetching log end offsets for all relevant partitions.");
            Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(allTopicPartitions, Duration.ofSeconds(10));

            // 6. 计算每个 Topic 的进度并填充结果对象
            for (String topicName : topicNames) { // 遍历请求的 topic 列表,确保即使 topic 不存在也能返回空信息
                TopicProgressInfo progressInfo = new TopicProgressInfo(topicName);
                List<PartitionInfo> partitions = targetTopicPartitions.get(topicName);

                if (partitions == null || partitions.isEmpty()) {
                    System.out.println("Topic '" + topicName + "' not found or has no partitions in the cluster.");
                    progressMap.put(topicName, progressInfo); // 添加空的进度信息
                    continue; // 跳过这个 topic 的计算
                }

                long totalConsumed = 0;
                long totalMessages = 0;
                long totalLag = 0;

                System.out.println("Calculating progress for topic: " + topicName);
                for (PartitionInfo partitionInfo : partitions) {
                    TopicPartition tp = new TopicPartition(topicName, partitionInfo.partition());
                    long endOffset = endOffsets.getOrDefault(tp, 0L);
                    // 从查询到的组 offsets 中查找当前分区的 committed offset
                    org.apache.kafka.clients.consumer.OffsetAndMetadata offsetAndMetadata = committedOffsets.get(tp);
                    long committedOffset = 0;
                    if (offsetAndMetadata != null) {
                        committedOffset = offsetAndMetadata.offset();
                    } else {
                        // 如果该分区在消费者组中没有提交过 offset,根据 auto.offset.reset 策略,可能是从头开始 (0) 或从最新 (endOffset)
                        // 这里我们假设没有提交过,则认为已消费为 0
                        System.out.println("No committed offset found for partition " + tp + " in group " + consumerGroupId + ". Assuming 0.");
                    }

                    long lag = Math.max(0, endOffset - committedOffset); // 计算 lag,确保不为负数
                    long consumedMessages = committedOffset;

                    totalConsumed += consumedMessages;
                    totalMessages += endOffset;
                    totalLag += lag;

                    // 添加分区级别的进度信息
                    progressInfo.addPartitionProgress(tp.partition(), consumedMessages, endOffset, lag);
                    System.out.println("  Partition " + tp.partition() + 
                                       ": consumed=" + consumedMessages + 
                                       ", total=" + endOffset + 
                                       ", lag=" + lag);
                }

                // 设置 Topic 级别的汇总信息
                progressInfo.setTotalConsumed(totalConsumed);
                progressInfo.setTotalMessages(totalMessages);
                progressInfo.setTotalLag(totalLag);
                System.out.println("Topic '" + topicName + "' - Total: consumed=" + totalConsumed + 
                                   ", total=" + totalMessages + 
                                   ", lag=" + totalLag);

                // 将单个 Topic 的进度信息放入 Map
                progressMap.put(topicName, progressInfo);
            }

        } catch (InterruptedException e) {
            System.err.println("Thread interrupted while getting Kafka consumer progress: " + e.getMessage());
            e.printStackTrace();
            Thread.currentThread().interrupt(); // 重要:恢复中断状态
            // 为每个请求的 topic 创建带有错误信息的进度信息
            for (String topic : topicNames) {
                TopicProgressInfo errorInfo = new TopicProgressInfo(topic);
                errorInfo.setError("Thread interrupted: " + e.getMessage());
                progressMap.put(topic, errorInfo);
            }
        } catch (ExecutionException e) {
            System.err.println("Error getting Kafka consumer progress: " + e.getMessage());
            e.printStackTrace();
            // 为每个请求的 topic 创建带有错误信息的进度信息
            for (String topic : topicNames) {
                TopicProgressInfo errorInfo = new TopicProgressInfo(topic);
                errorInfo.setError("Execution error: " + e.getMessage());
                progressMap.put(topic, errorInfo);
            }
        } finally {
            // 7. 确保资源被释放
            System.out.println("Closing AdminClient and KafkaConsumer...");
            if (adminClient != null) {
                try {
                    adminClient.close();
                    System.out.println("AdminClient closed.");
                } catch (Exception e) {
                    System.err.println("Error closing AdminClient: " + e.getMessage());
                }
            }
            if (kafkaConsumer != null) {
                try {
                    kafkaConsumer.close();
                    System.out.println("KafkaConsumer closed.");
                } catch (Exception e) {
                    System.err.println("Error closing KafkaConsumer: " + e.getMessage());
                }
            }
        }

        return progressMap;
    }

    /**
     * 查询单个 Topic 的便捷方法 (保持向后兼容)
     * @param bootstrapServers Kafka 集群地址
     * @param consumerGroupId  消费者组 ID
     * @param topicName        要查询的 Topic 名称
     * @return TopicProgressInfo 对象
     */
    public static TopicProgressInfo getTopicProgress(String bootstrapServers, String consumerGroupId, String topicName) {
        Map<String, TopicProgressInfo> map = getTopicProgress(bootstrapServers, consumerGroupId, Arrays.asList(topicName));
        return map.get(topicName); // 返回单个 Topic 的结果
    }

    /**
     * 用于封装单个 Topic 的进度信息
     */
    public static class TopicProgressInfo {
        private final String topicName;
        private long totalConsumed = 0;
        private long totalMessages = 0;
        private long totalLag = 0;
        private final List<PartitionProgress> partitionProgressList = new ArrayList<>();
        private String error = null; // 存储查询过程中可能发生的错误

        public TopicProgressInfo(String topicName) {
            this.topicName = topicName;
        }

        public void addPartitionProgress(int partition, long consumedMessages, long totalMessages, long lag) {
            partitionProgressList.add(new PartitionProgress(partition, consumedMessages, totalMessages, lag));
        }

        // Getters
        public String getTopicName() { return topicName; }
        public long getTotalConsumed() { return totalConsumed; }
        public void setTotalConsumed(long totalConsumed) { this.totalConsumed = totalConsumed; }
        public long getTotalMessages() { return totalMessages; }
        public void setTotalMessages(long totalMessages) { this.totalMessages = totalMessages; }
        public long getTotalLag() { return totalLag; }
        public void setTotalLag(long totalLag) { this.totalLag = totalLag; }
        public List<PartitionProgress> getPartitionProgressList() { return partitionProgressList; }
        public String getError() { return error; }
        public void setError(String error) { this.error = error; }

        @Override
        public String toString() {
            if (error != null) {
                return "TopicProgressInfo{topicName='" + topicName + "', error='" + error + "'}";
            }
            return "TopicProgressInfo{" +
                    "topicName='" + topicName + '\'' +
                    ", totalConsumed=" + totalConsumed +
                    ", totalMessages=" + totalMessages +
                    ", totalLag=" + totalLag +
                    ", partitionProgressList=" + partitionProgressList +
                    '}';
        }

        /**
         * 用于封装单个 Topic 分区的进度信息
         */
        public static class PartitionProgress {
            private final int partition;
            private final long consumedMessages; // 已消费的消息数
            private final long totalMessages;    // 总消息数
            private final long lag;              // 未消费的消息数 (lag)

            public PartitionProgress(int partition, long consumedMessages, long totalMessages, long lag) {
                this.partition = partition;
                this.consumedMessages = consumedMessages;
                this.totalMessages = totalMessages;
                this.lag = lag;
            }

            // Getters
            public int getPartition() { return partition; }
            public long getConsumedMessages() { return consumedMessages; }
            public long getTotalMessages() { return totalMessages; }
            public long getLag() { return lag; }

            @Override
            public String toString() {
                return "PartitionProgress{" +
                        "partition=" + partition +
                        ", consumedMessages=" + consumedMessages +
                        ", totalMessages=" + totalMessages +
                        ", lag=" + lag +
                        '}';
            }
        }
    }
}

3、service示例

import bigdata.backend.monitor.utils.KafkaProgressUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

/**
 * Kafka 消费进度监控服务 (针对 binlog 项目 - Backend 模块)
 * 负责查询不同 Topic 对应的不同消费者组的进度
 */
@Service
public class KafkaMonitorService {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    // 从配置文件获取不同 Topic 对应的消费者组 ID
    @Value("${monitor.binlog.consumer-group-id:binlog-monitor-group}")
    private String binlogConsumerGroupId;

    @Value("${monitor.violation.consumer-group-id:monitor-violation-events-group}")
    private String violationConsumerGroupId;

    @Value("${monitor.initial-topics:binlog-monitor}")
    private String initialTopic;

    @Value("${monitor.violation-topic:monitor-violation-events}")
    private String violationTopic;

    /**
     * 查询 binlog 监控 Topic (initial-topics) 的消费进度
     * 该 Topic 由 binlog-monitor-group 消费
     * @return TopicProgressInfo 对象
     */
    public KafkaProgressUtils.TopicProgressInfo getBinlogTopicProgress() {
        System.out.println("Querying progress for initial topic: " + initialTopic + " with group: " + binlogConsumerGroupId);
        return KafkaProgressUtils.getTopicProgress(bootstrapServers, binlogConsumerGroupId, initialTopic);
    }

    /**
     * 查询违规事件 Topic (violation-topic) 的消费进度
     * 该 Topic 由 monitor-violation-events-group 消费
     * @return TopicProgressInfo 对象
     */
    public KafkaProgressUtils.TopicProgressInfo getViolationTopicProgress() {
        System.out.println("Querying progress for violation topic: " + violationTopic + " with group: " + violationConsumerGroupId);
        return KafkaProgressUtils.getTopicProgress(bootstrapServers, violationConsumerGroupId, violationTopic);
    }

    /**
     * 查询指定 Topic 的消费进度 (通用方法)
     * @param topicName 要查询的 Topic 名称
     * @param consumerGroupId 该 Topic 对应的消费者组 ID
     * @return TopicProgressInfo 对象
     */
    public KafkaProgressUtils.TopicProgressInfo getTopicProgress(String topicName, String consumerGroupId) {
        System.out.println("Querying progress for topic: " + topicName + " with group: " + consumerGroupId);
        return KafkaProgressUtils.getTopicProgress(bootstrapServers, consumerGroupId, topicName);
    }

    // Getters for configuration values (optional, if needed elsewhere)
    public String getBootstrapServers() {
        return bootstrapServers;
    }

    public String getBinlogConsumerGroupId() {
        return binlogConsumerGroupId;
    }

    public String getViolationConsumerGroupId() {
        return violationConsumerGroupId;
    }

    public String getInitialTopic() {
        return initialTopic;
    }

    public String getViolationTopic() {
        return violationTopic;
    }
}