storm由Java实现,但通过multilang protocl(多语言协议),能够使用php,python,ruby或者javascript来写spout和bolt。 多语言协议是storm中实现的一种特殊协议,它使用标准输入和标准输出作为与执行spout和bolt任务的进程之间通信的信道。消息以json格式或者普通的文本行通过信道传输。
该协议依赖于作为进程之间通信信道的标准输入和标准输出。一个脚本想要生效需要采取下列步骤
要控制进程(启动或者停止),storm需要知道的正在执行脚本的进程PID。根据多语言协议,当处理过程开始的第一件事情就是storm会发射一个带有配置,拓扑上下文和PID目录的Json对象到标准输入。它看上去跟下面的代码块差不多:
{ "conf": { "topology.message.timeout.secs": 3, }, "context": { "task->component": { "1": "example-spout", "2": "__acker", "3": "example-bolt" }, "taskid": 3 }, "pidDir": "..." }
进程必须在pidDir指示的目录创建一个空文件,文件名为进程ID,然后将PID作为JSON对象写到标准输出。
{"pid" : 1234}
例如,如果接收到/tmp/example\n,脚本的执行PID为123,那么创建空文件/tmp/example/123,并打印行{"pid":123}n和end\n到标准输出。storm采用这种方式来跟踪PID,以及在关闭的时候杀死进程。
$config = json_decode(read_msg(), true); $heartbeatdir = $config[‘pidDir‘]; $pid = getmypid(); fclose(fopen("$heartbeatdir/$pid", "w")); storm_send(["pid"=>$pid]); flush();
这里已经实现了函数read_msg,用于从标准输入读取消息。多语言协议中消息是json格式的单行或多行文本。当storm发送单行内容为end\n消息时说明消息结束。
function read_msg() { $msg = ""; while(true) { $l = fgets(STDIN); $line = substr($l,0,-1); if($line=="end") { break; } $msg = "$msg$line\n"; } return substr($msg, 0, -1); } function storm_send($json) { write_line(json_encode($json)); write_line("end"); } function write_line($line) { echo("$line\n"); }
使用flush()是非常关键的,有可能由于指定字符数量未满足导致缓冲区的内容不会flush。这意味脚本会挂起等待storm输入,由于storm同样在等待脚本输出,所以脚本是不会接收到输入的。所以一定要保证当脚本输出内容后立即flush。 |
这是最关键的一步,因为所有的工作都在这里完成。这一步的实现取决于要实现spout还是bolt。在spout的情形中,应该发送消息。在bolt的情形中,循环,读取消息,处理它们,然后发射,确认或者失败。
发送数字的spout的实现如下。
$from = intval($argv[1]); $to = intval($argv[2]); while(true) { $msg = read_msg(); $cmd = json_decode($msg, true); if ($cmd[‘command‘]==‘next‘) { if ($from<$to) { storm_emit(array("$from")); $task_ids = read_msg(); $from++; } else { sleep(1); } } storm_sync(); }
从命令行参数中获得from和to,开始循环。每当从storm取得消息next,就说明可以发射新tuple。一旦所有的tuple发送完成,没有更多的tuple发送,挂起。
要保证脚本准备好下一个tuple,storm在发射下一个之前会等待文本行sync\n。要读取命令,直接调用read_msg()来解码。在bolts中有些小区别。
while(true) { $msg = read_msg(); $tuple = json_decode($msg, true, 512, JSON_BIGINT_AS_STRING); if (!empty($tuple["id"])) { if (isPrime($tuple["tuple"][0])) { storm_emit(array($tuple["tuple"][0])); } storm_ack($tuple["id"]); } }
循环,从标准输入读取消息。一旦接收到消息,json解析。如果是一个tuple,对它做处理,即判断是否是素数。在任何情形中,都要消息确认。
在函数json_decode中使用JSON_BIGINT_AS_STRING来规避Java和PHP之间的格式问题。Java中发送的大数,在PHP中获取时会丢失精度,而这可能会导致问题。要解决这个问题,告诉PHP将大数当作字符串处理,在json消息中打印时不使用双引号。PHP 5.4.0 或更高版本必须使用这个参数。
https://github.com/nathanmarz/storm/wiki/Using-non-JVM-languages-with-Storm
https://github.com/lazyshot/storm-php
https://github.com/nathanmarz/storm-starter
《Getting Started with Storm》
在storm中使用非Java语言,布布扣,bubuko.com
原文:http://blog.csdn.net/loloxiaoz3/article/details/20299661