
当然。Producers如今有各种语言的实现比方Java、C、Python等。
.png)
package bonree.producer;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/*******************************************************************************
 * BidPlanStructForm.java Created on 2014-7-8
 * Author: <a href=mailto:wanghouda@126.com>houda</a>
 * @Title: SimpleProducer.java
 * @Package bonree.producer
 * Description:
 * Version: 1.0
 ******************************************************************************/
public class SimpleProducer {
	private static Producer<Integer,String> producer;
	private final Properties props=new Properties();
	public SimpleProducer(){
		//定义连接的broker list
		props.put("metadata.broker.list", "192.168.4.31:9092");
		//定义序列化类(Java对象传输前要序列化)
		props.put("serializer.class", "kafka.serializer.StringEncoder");
		producer = new Producer<Integer, String>(new ProducerConfig(props));
	}
	public static void main(String[] args) {
		SimpleProducer sp=new SimpleProducer();
		//定义topic
		String topic="mytopic";
		//定义要发送给topic的消息
		String messageStr = "send a message to broker ";
		//构建消息对象
		KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, messageStr);
		//推送消息到broker
		producer.send(data);
		producer.close();
	}
}package bonree.consumer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
/*******************************************************************************
 * Created on 2014-7-8 Author: <a
 * href=mailto:wanghouda@126.com>houda</a>
 * @Title: SimpleHLConsumer.java
 * @Package bonree.consumer Description: Version: 1.0
 ******************************************************************************/
public class SimpleHLConsumer {
	private final ConsumerConnector consumer;
	private final String topic;
	public SimpleHLConsumer(String zookeeper, String groupId, String topic) {
		Properties props = new Properties();
		//定义连接zookeeper信息
		props.put("zookeeper.connect", zookeeper);
		//定义Consumer全部的groupID,关于groupID,后面会继续介绍
		props.put("group.id", groupId);
		props.put("zookeeper.session.timeout.ms", "500");
		props.put("zookeeper.sync.time.ms", "250");
		props.put("auto.commit.interval.ms", "1000");
		consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
		this.topic = topic;
	}
	public void testConsumer() {
		Map<String, Integer> topicCount = new HashMap<String, Integer>();
		//定义订阅topic数量
		topicCount.put(topic, new Integer(1));
		//返回的是全部topic的Map
		Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
		//取出我们要须要的topic中的消息流
		List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
		for (final KafkaStream stream : streams) {
			ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
			while (consumerIte.hasNext())
				System.out.println("Message from Single Topic :: " + new String(consumerIte.next().message()));
		}
		if (consumer != null)
			consumer.shutdown();
	}
	public static void main(String[] args) {
		String topic = "mytopic";
		SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("192.168.4.32:2181", "testgroup", topic);
		simpleHLConsumer.testConsumer();
	}
}
版权声明:本文博客原创文章。博客,未经同意,不得转载。
原文:http://www.cnblogs.com/hrhguanli/p/4621663.html