首页 > 其他 > 详细

【大数据Kafka之 high-level--Consumer 】

时间:2016-06-06 02:15:02      阅读:295      评论:0      收藏:0      [点我收藏+]

一、特点:

不用关心offset, 会自动的读zookeeper中该Consumer group的last offset

?

二、注意事项

1. 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,

? ?所以consumer数不要大于partition数?

2. 如果consumer比partition少,一个consumer会对应于多个partitions,

? ? 这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀?

? ? 最好partiton数目是consumer数目的整数倍,所以partition数目很重要,

? ? 比如取24,就很容易设定consumer数目?

3. 如果consumer从多个partition读到数据,不保证数据间的顺序性,

? ? kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同?

4. 增减consumer,broker,partition会导致rebalance,

? ? ?所以rebalance后consumer对应的partition会发生变化?

5. High-level接口中获取不到数据的时候是会block的?

?

三、代码如下:

package kafkatest.kakfademo;

?

import java.io.UnsupportedEncodingException;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

?

import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

?

public class ConsumerDemo1 {

public static void main(String[] args) {

ConsumerDemo1 demo = new ConsumerDemo1();

demo.test();

}

?

@SuppressWarnings("rawtypes")

public void test() {

String topicName = "test";

int numThreads = 1;

Properties properties = new Properties();

properties.put("zookeeper.connect", "hadoop0:2181");// 声明zk

properties.put("group.id", "group--demo");// 必须要使用别的组名称,

// 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据

?

ConsumerConnector consumer = Consumer

.createJavaConsumerConnector(new ConsumerConfig(properties));

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

topicCountMap.put(topicName, numThreads); // 一次从主题中获取一个数据

Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer

.createMessageStreams(topicCountMap);

// 获取每次接收到的这个数据

List<KafkaStream<byte[], byte[]>> streams = messageStreams

.get(topicName);

?

// now launch all the threads

ExecutorService executor = Executors.newFixedThreadPool(numThreads);

?

// now create an object to consume the messages

//

int threadNumber = 0;

for (final KafkaStream stream : streams) {

executor.execute(new ConsumerMsgTask(stream, threadNumber));

threadNumber++;

}

}

?

class ConsumerMsgTask implements Runnable {

?

private KafkaStream m_stream;

private int m_threadNumber;

?

public ConsumerMsgTask(KafkaStream stream, int threadNumber) {

m_threadNumber = threadNumber;

m_stream = stream;

}

?

public void run() {

ConsumerIterator<byte[], byte[]> it = m_stream.iterator();

long offset = 0;

try {

while (it.hasNext())

offset = it.next().offset();

byte[] bytes = it.next().message();

?

String msg = new String(bytes, "UTF-8");

?

System.out.print("offset: " + offset + ",msg:" + msg);

System.out.println("Shutting down Thread: " + m_threadNumber);

} catch (UnsupportedEncodingException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

?

}

?

?

四、实验验证


bubuko.com,布布扣
?

?


bubuko.com,布布扣
?

?


bubuko.com,布布扣
?

【大数据Kafka之 high-level--Consumer 】

原文:http://gaojingsong.iteye.com/blog/2302784

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!