Kafka Connect是Kafka0.9新增的模块。可以从名称看出,它可以和外部系统、数据集建立一个数据流的连接,实现数据的输入、输出。有以下特性:
?
?
在大部分Kafka应用场景中,我们常常需要从某一数据源导入数据到Kafka中,或者将Kafka中的数据导出(准确说是被取出)到其他系统中。为了实现这一功能,我们一般需要在上游系统中创建一个Kafka Producer,或在下游系统中创建Kafka Consumer。
?
在Kafka0.9中新增的Kafka Connect模块实现了以上功能,这样我们就可以省掉了一些通用交互代码,而只需要做简单的配置就行了。这和Logstash的思路很相似,Kafka提供了connector的接口,通过实现该接口来,我们可以创建各种各样的Input和Output插件。当然Kafka Connect才刚刚推出,插件远没有Logstash丰富,不过相信随着Kafka0.9的普及,这一功能将变得更加实用。
?
以下介绍Kafka Connect的一个简单的实现,FileInput和FileOutput,启动方式如下:
?
执行connect-standalone.sh会在本地单机启动connector(s)。我们必须声明至少两个参数,第一个是Kafka Connect的配置文件,包含一些基本配置例如kafka server地址,序列化格式等等;其他的配置文件用于配置connector(s),每个配置文件创建一个connector。我们来看一下这两个配置文件:
?
?
该配置会启动一个名叫local-file-source的connector,这个connector会从/tmp/test-src.txt中读取数据,然后写入名叫test的topic中。
?
?
该配置会启动一个名叫local-file-sink的connector,这个connector会从Kafka的test topic中读取数据,然后写入/tmp/test-dest.tx文件中。
?
所以我们看到,运行以上命令实际上是启动了一个程序不断地读取/tmp/test-src.txt,并通过Kafka Producer发送到Kafka 的test这个topic中,同时启动一个Kafka Consumer从这个topic中读取数据并写入/tmp/test-dest.txt,启动之后我们可以测试一下。
?
我们往/tmp/test-src.txt中写入几个字符,然后我们查看/tmp/test-dest.txt,发现“hello world”已经被写入/tmp/test-dest.txt中。
?
多说一句,数据并不是以文本的形式存入Kafka,而是Json。它的格式类似于:
?
所以本文中两个connector实际上还做了数据的封装和Json解析的工作。
?
?
?
原文:http://kane-xie.iteye.com/blog/2269822