注:本文所有资料均可从https://github.com/leichenjie/kafka下载
Kafka是一个分布式的消息队列,也是一个分布式的发布-订阅系统,还是一个分布式存储系统,是一个分布式、支持分区的(partition)、多副本(replica),基于zookeeper协调的分布式消息系统,它最大的特性就是可以实时的处理大量数据以满足各种需求场景。
主要应用场景是:日志收集系统和消息系统
如下图:






Kafka目前主要作为一个分布式的发布订阅的消息系统使用,下面简单介绍一下kafka的基本机制





你可以在kafka官网 http://kafka.apache.org/downloads下载到最新的kafka安装包,选择下载二进制版本的tgz文件。
Kafka是使用scala编写的运行与jvm虚拟机上的程序,虽然也可以在windows上使用,但是kafka基本上是运行在linux服务器上,因此我们这里也使用linux来开始今天的实战。
首先确保你的机器上安装了jdk,kafka需要java运行环境,以前的kafka还需要zookeeper,新版的kafka已经内置了一个zookeeper环境,所以我们可以直接使用,说是安装,如果只需要进行最简单的尝试的话我们只需要解压到任意目录即可,这里我们将kafka压缩包解压到/usr/local目录。
在kafka解压目录下有一个config的文件夹,里面放置的是我们的配置文件
consumer.properites 消费者配置
producer.properties 生产者配置
server.properties kafka服务器的配置,此配置文件用来配置kafka服务器,目前仅介绍最基础的配置
./zkServer.sh start
启动zookeeper成功后,会看到如下的输出,输入如下命令验证启动是否成功
echo ruok|nc localhost 2181
./bin/kafka-server-start.sh config/server.properties
启动成功后,会看到如下输出
./bin/kafka-topics.sh -create --zookeeper localhost:2181 -replication-factor 1 --partitions 1 -topic test
创建成功
./bin/kafka-topics.sh --list --zookeeper localhost:2181
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 -topic test --from-beginning
3. 创建一个生产者 不要关闭消费者终端,打开一个新的终端,输入
./bin/kafka-console-producer.sh -broker-list localhost:9092 --topic test
执行完毕后会进入编辑器页面
发送完消息后,可以回到我们的消费者终端,可以看到,终端已经打印了我们刚才发送的消息
生产者示例代码
package com.leicj.demo1;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Date;
import java.util.Properties;
public class ProducerDemo {
    public static void main(String[] args) {
        int events = 100;
        //设置配置属性
        Properties props = new Properties();
        props.put("metadata.broker.list","192.168.133.129:9092");
        props.put("serializer.class","kafka.serializer.StringEncoder");
        props.put("key.serializer.class","kafka.serializer.StringEncoder");
        //可选配置,如果不配置,则使用默认的partitioner
        props.put("partitioner.class","com.leicj.demo1.PartitionerDemo");
        //触发acknowledgement机制,否则是fire and forget,可能会引起数据丢失
        props.put("request.required.acks","1");
        ProducerConfig config = new ProducerConfig(props);
        //创建producer
        Producer<String,String> producer = new Producer<String, String>(config);
        long start = System.currentTimeMillis();
        for (long i = 0; i < events; i++) {
            long runtime = new Date().getTime();
            String ip = "192.168.133.129";
            String msg = runtime + ",www.test.com," + ip;
            //如果topic不存在,则会自动创建,默认replication-factor为1,partitions为0
            KeyedMessage<String,String> data = new KeyedMessage<String, String>("test",ip,msg);
            producer.send(data);
        }
        System.out.println("耗时: " + (System.currentTimeMillis() - start));
        producer.close();
    }
}
Partitioning Code
package com.leicj.demo1;
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class PartitionerDemo implements Partitioner {
    public PartitionerDemo(VerifiableProperties props) {}
    @Override
    public int partition(Object obj, int numPartitions) {
        int partition = 0;
        if (obj instanceof String) {
            String key = (String)obj;
            int offset = key.lastIndexOf(‘.‘);
            if (offset > 0) {
                partition = Integer.parseInt(key.substring(offset + 1)) % numPartitions;
            }
        }else {
            partition = obj.toString().length() % numPartitions;
        }
        return partition;
    }
}
消费者代码示例
package com.leicj.demo1;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ConsumerDemo {
    private final ConsumerConnector consumer;
    private final String topic;
    private ExecutorService executor;
    public ConsumerDemo(String a_zookeeper, String a_groupId, String a_topic) {
        consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));
        this.topic = a_topic;
    }
    public void shutdown() {
        if (consumer != null) {
            consumer.shutdown();
        }
        if (executor != null) {
            executor.shutdown();
        }
    }
    public void run(int numThreads) {
        Map<String,Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(numThreads));
        Map<String, List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[],byte[]>> streams = consumerMap.get(topic);
        //now launch all the threads
        executor = Executors.newFixedThreadPool(numThreads);
        //now create an object to consume the messages
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submit(new ConsumerMsgTask(stream,threadNumber));
            threadNumber++;
        }
    }
    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect",a_zookeeper);
        props.put("group.id",a_groupId);
        props.put("zookeeper.session.timeout.ms","400");
        props.put("zookeeper.sync.time.ms","200");
        props.put("auto.commit.interval.ms","1000");
        return new ConsumerConfig(props);
    }
    public static void main(String[] arg) {
        String[] args = { "192.168.133.129:2181", "0", "test", "10" };
        String zooKeeper = args[0];
        String groupId = args[1];
        String topic = args[2];
        int threads = Integer.parseInt(args[3]);
        ConsumerDemo demo = new ConsumerDemo(zooKeeper, groupId, topic);
        demo.run(threads);
        try {
            Thread.sleep(10000);
        } catch (InterruptedException ie) {
        }
        demo.shutdown();
    }
}
package com.leicj.demo1;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
public class ConsumerMsgTask implements Runnable {
    private KafkaStream m_stream;
    private int m_threadNumber;
    public ConsumerMsgTask(KafkaStream m_stream, int m_threadNumber) {
        this.m_stream = m_stream;
        this.m_threadNumber = m_threadNumber;
    }
    @Override
    public void run() {
        ConsumerIterator<byte[],byte[]> it = m_stream.iterator();
        while (it.hasNext()) {
            System.out.println("Thread " + m_threadNumber + ":" + new String(it.next().message()));
        }
        System.out.println("Shutting down Thread: " + m_threadNumber);
    }
}
在执行ProducerDemo.Main()方法时,出现“kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.”错误 
排查步骤:
原文:https://www.cnblogs.com/leichenjie/p/9183254.html