首页 > 其他 > 详细

KAFKA的安装使用

时间:2017-01-30 12:27:55      阅读:346      评论:0      收藏:0      [点我收藏+]

1.官网

  kafka.apache.org

 

2.上传

  技术分享

 

3.解压到modules

 

4.修改配置文件server.properties

  )修改kafa收集到的日志数据存储文件夹

  技术分享

  )修改zookeeper

  技术分享

 

二:启动

1.启动zookeeper

  技术分享

 

2.启动broker

  技术分享

  如果没有出现日志错误,就使用下面的命令:

  nohup bin/kafka-server-start.sh config/server.properties > logs/server-start.log 2>&1 &

 

3.检验

  技术分享

 

4.创建topic(使用help帮助)

  技术分享

 

 5.创建一个nginxlog的topic

  bin/kafka-topics.sh --create --topic nginxlog --partitions 1 --replication-factor 1 --zookeeper linux-hadoop3.ibeifeng.com:2181

  技术分享

6.查看详情

   bin/kafka-topics.sh --describe --topic nginxlog --zookeeper linux-hadoop3.ibeifeng.com:2181 

  技术分享

 

三:检测topic

1.启动消息生产者,将消息发送到kafka的topic上

  bin/kafka-console-producer.sh --broker-list linux-hadoop3.ibeifeng.com:9092 --topic nginxlog

  技术分享

 

2.启动消息消费者

  bin/kafka-console-consumer.sh --zookeeper linux-hadoop3.ibeifeng.com:2181 --topic nginxlog --from-beginning 

  技术分享

 

四:模拟产生nginx日志

1.在服务器上创建一个根目录

  技术分享

 

2.上传jar包

  技术分享

 

3.执行命令

  java -jar data-generate-1.0-SNAPSHOT-jar-with-dependencies.jar 1000 >>nginx.log

   技术分享

 

4.查看

  tail -f nginx.log

  技术分享

 

 五:使用flume将模拟产生的nginx日志上传到hdfs和kafka上

1.当前状态

  技术分享

2.project_agent.conf

  技术分享

 

3.具体的代码 

  技术分享
 1 #exec source - memory channel - kafka sink/hdfs sink
 2 a1.sources = r1
 3 #a1.sinks = kafka_sink hdfs_sink
 4 a1.sinks = kafka_sink
 5 #a1.channels = c1 c2
 6 a1.channels = c1
 7 
 8 a1.sources.r1.type = exec
 9 a1.sources.r1.command = tail -F  /home/beifeng/workspace/nginx.log
10 
11 
12 # kafka_sink
13 a1.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
14 a1.sinks.kafka_sink.topic = nginxlog
15 a1.sinks.kafka_sink.brokerList =linux-hadoop3.ibeifeng.com:9092
16 # 确认级别: 0  表示不确认
17 #           1 topic有个多个备份 replication  主 leader备份
18 #              只要求leader备份写入成功就任务消息发送到Topic成功
19 #           -1  topic有个多个备份  要求所有备份写入成功,才算消息生产成功
20 a1.sinks.kafka_sink.requiredAcks = 1
21 a1.sinks.kafka_sink.batchSize = 20
22 a1.sinks.kafka_sink.channel = c1
23 
24 # hdfs_sink
25 #a1.sinks.hdfs_sink.type = hdfs
26 #a1.sinks.hdfs_sink.hdfs.path = /flume/events/%Y%m%d
27 #a1.sinks.hdfs_sink.hdfs.filePrefix = nginx_log-
28 #a1.sinks.hdfs_sink.hdfs.fileType = DataStream
29 #a1.sinks.hdfs_sink.hdfs.useLocalTimeStamp = true
30 #a1.sinks.hdfs_sink.hdfs.rollInterval = 0
31 #rollSize值 比hdfs block大小 小一点 (10M)
32 #a1.sinks.hdfs_sink.hdfs.rollSize = 102400
33 #a1.sinks.hdfs_sink.hdfs.rollCount = 0
34 
35 
36 # Use a channel which buffers events in memory
37 a1.channels.c1.type = memory
38 a1.channels.c1.capacity = 1000
39 a1.channels.c1.transactionCapacity = 100
40 
41 #a1.channels.c2.type = memory
42 #a1.channels.c2.capacity = 1000
43 #a1.channels.c2.transactionCapacity = 100
44 
45 # Bind the source and sink to the channel
46 #a1.sources.r1.channels = c1 c2
47 #a1.sinks.kafka_sink.channel = c1
48 #a1.sinks.hdfs_sink.channel = c2
49 #指定source 与 channel之间的关系 复制(默认)  --- 多路复用模式 event header body
50 #a1.sources.r1.selector.type = replicating
51 
52 #########################################################
53 # Bind the source and sink to the channel
54 a1.sources.r1.channels = c1
55 a1.sinks.kafka_sink.channel = c1
View Code

 

 

六:具体情况

1.执行flume

  bin/flume-ng agent -n a1 -c conf/ --conf-file conf/project_agent.conf -Dflume-root-logger=INFO,console

  技术分享

 

2.启动生产者,就是模拟问价jar

  让其不断的产生日志到nginx.log中

  技术分享

3.观看启动了消费者的窗口

  就会发现不断的产生日志被消费

  技术分享

 

 

 七:注意

1.关闭kafka

  技术分享

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

  

  

KAFKA的安装使用

原文:http://www.cnblogs.com/juncaoit/p/6358007.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!