上一节我们在CentOS中安装了kafka并对生产者消费者进行了测试。在实际生产中,flume经常配合kafka同时使用,本文将阐述如何使用flume收集数据到kafka。
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type=exec
a1.sources.s1.command=tail -F /root/kafka.log
a1.sources.s1.channels=c1
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100
#设置Kafka接收器
a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
#设置Kafka的broker地址和端口号
a1.sinks.k1.brokerList=192.168.100.200:9092
#设置Kafka的Topic
a1.sinks.k1.topic=test
#设置序列化方式
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
a1.sinks.k1.channel=c1
cd
touch kafka.log
ping wh1993.net >> kafka.log
cd /root/apache-flume-1.8.0-bin/bin
./flume-ng agent -c ../conf -f ../conf/flume-conf.properties -n a1 -Dflume.root.logger=INFO,console
cd /root/kafka_2.12-0.11.0.2/bin
# 启动kafka的消费者【注意,topic的名称为test】
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
观察消费者的启动窗口的打印内容,就是kafka.log中不断增加的内容。
以上,就是整合flume与kafka的过程。
原文:https://www.cnblogs.com/alichengxuyuan/p/12576869.html