在kafka的开发和维护中,我们经常需要了解kafka topic以及连接在其上的consumer的实时信息,比如logsize,offset,owner等。为此kafka提供了ConsumerOffsetChecker,它的用法很简单
?
?
输出结果类似于
?
?
我们也可以通过kafka web console一类的工具直观地获取kafka信息,但如果我们要构建自己的监控系统,需要抓取这些信息的话,有两种办法:一种是运行ConsumerOffsetChecker然后解析输出的字符串;另一种就是通过SimpleConsumer和Zookeeper实时抓取信息(换句话说就是把ConsumerOffsetChecker翻译一下:)),以下介绍第二种方法的思路。
?
首先我们看kafka信息在zookeeper的存储结构
?
?
对于指定的topic和groupid,通过(1)可以拿到所有的partition信息(Pid),然后通过(5)可以拿到offset,通过(4)可以拿到owner。就差logsize还没法拿到,事实上logsize在zookeeper中并没有记录,它必须通过kafka consumer的low level的api取得。
?
?
?
private Long getLogSize(String topicName, String partitionId, long offsetRequestTime) { SimpleConsumer consumer = new SimpleConsumer(server, port, 10000, 1024000, "ConsumerOffsetChecker"); Long logSize = null; TopicAndPartition topicAndPartition = new TopicAndPartition(topicName, Integer.parseInt(partitionId)); Map<TopicAndPartition, PartitionOffsetRequestInfo> map = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); map.put(topicAndPartition, new PartitionOffsetRequestInfo(offsetRequestTime, 1)); OffsetRequest request = new OffsetRequest(map, kafka.api.OffsetRequest.CurrentVersion(), kafka.api.OffsetRequest.DefaultClientId()); OffsetResponse response = consumer.getOffsetsBefore(request); long[] aa = response.offsets(topicName, Integer.parseInt(partitionId)); if (aa.length != 0) { logSize = aa[0]; } return logSize; }
?
第二行,创建simple consumer
第四行,创建topic和partition对象
第六行,创建offset request,第一个参数是记录写入的时间,如果是kafka.api.OffsetRequest.EarliestTime(),则代表当前最早的一条记录,也就是当前最小offset;如果是kafka.api.OffsetRequest.LatestTime(),则代表最新的一条记录,也就是当前最大offset。第二个参数是获取offset的个数。
?
由max offset和current offset,我们可以获得当前还有多少消息没有被消费(lag),由(lag/(maxoffset-minoffset)),我们可以算出当前还没有被消费的消息占的百分比,如果这个百分比接近100%,那么接下来很可能会导致offset out of range exception而丢失数据。
原文:http://kane-xie.iteye.com/blog/2231004