|
1
2
3
|
root@m1:/home/hadoop# cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/kafka_2.9.2-0.8.1.1.jar /home/hadoop/flume-1.5.0-bin/libroot@m1:/home/hadoop# cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/scala-library-2.9.2.jar /home/hadoop/flume-1.5.0-bin/libroot@m1:/home/hadoop# cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/metrics-core-2.2.0.jar /home/hadoop/flume-1.5.0-bin/lib |
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
package idoall.cloud.flume.sink;import java.util.Properties;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.flume.Channel;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.EventDeliveryException;import org.apache.flume.Transaction;import org.apache.flume.conf.Configurable;import org.apache.flume.sink.AbstractSink;public class KafkaSink extends AbstractSink implements Configurable { private static final Log logger = LogFactory.getLog(KafkaSink.class); private String topic; private Producer<String, String> producer; public void configure(Context context) { topic = "idoall_testTopic"; Properties props = new Properties(); props.setProperty("metadata.broker.list", "m1:9092,m2:9092,s1:9092,s2:9092"); props.setProperty("serializer.class", "kafka.serializer.StringEncoder"); props.put("partitioner.class", "idoall.cloud.kafka.Partitionertest"); props.put("zookeeper.connect", "m1:2181,m2:2181,s1:2181,s2:2181/kafka"); props.setProperty("num.partitions", "4"); // props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); producer = new Producer<String, String>(config); logger.info("KafkaSink初始化完成."); } public Status process() throws EventDeliveryException { Channel channel = getChannel(); Transaction tx = channel.getTransaction(); try { tx.begin(); Event e = channel.take(); if (e == null) { tx.rollback(); return Status.BACKOFF; } KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, new String(e.getBody())); producer.send(data); logger.info("flume向kafka发送消息:" + new String(e.getBody())); tx.commit(); return Status.READY; } catch (Exception e) { logger.error("Flume KafkaSinkException:", e); tx.rollback(); return Status.BACKOFF; } finally { tx.close(); } }} |
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
root@m1:/home/hadoop/flume-1.5.0-bin# vi /home/hadoop/flume-1.5.0-bin/conf/kafka.confa1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = syslogtcpa1.sources.r1.port = 5140a1.sources.r1.host = localhosta1.sources.r1.channels = c1# Describe the sinka1.sinks.k1.type = idoall.cloud.flume.sink.KafkaSink# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1 |
|
1
|
root@m1:/home/hadoop# /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-server-start.sh /home/hadoop/kafka_2.9.2-0.8.1.1/config/server.properties & |
|
1
2
3
4
5
6
7
8
9
10
11
|
root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/kafka.conf -n a1 -Dflume.root.logger=INFO,console#下面只截取部分日志信息14/08/19 11:36:34 INFO sink.KafkaSink: KafkaSink初始化完成.14/08/19 11:36:34 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]14/08/19 11:36:34 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.SyslogTcpSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2a9e3ba7 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }14/08/19 11:36:34 INFO node.Application: Starting Channel c114/08/19 11:36:34 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.14/08/19 11:36:34 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started14/08/19 11:36:34 INFO node.Application: Starting Sink k114/08/19 11:36:34 INFO node.Application: Starting Source r114/08/19 11:36:34 INFO source.SyslogTcpSource: Syslog TCP Source starting... |
|
1
|
root@m1:/home/hadoop# echo "hello idoall.org syslog" | nc localhost 5140 |
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
14/08/19 11:36:34 INFO sink.KafkaSink: KafkaSink初始化完成.14/08/19 11:36:34 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]14/08/19 11:36:34 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.SyslogTcpSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2a9e3ba7 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }14/08/19 11:36:34 INFO node.Application: Starting Channel c114/08/19 11:36:34 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.14/08/19 11:36:34 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started14/08/19 11:36:34 INFO node.Application: Starting Sink k114/08/19 11:36:34 INFO node.Application: Starting Source r114/08/19 11:36:34 INFO source.SyslogTcpSource: Syslog TCP Source starting...14/08/19 11:38:05 WARN source.SyslogUtils: Event created from Invalid Syslog data.14/08/19 11:38:05 INFO client.ClientUtils$: Fetching metadata from broker id:3,host:s2,port:9092 with correlation id 0 for 1 topic(s) Set(idoall_testTopic)14/08/19 11:38:05 INFO producer.SyncProducer: Connected to s2:9092 for producing14/08/19 11:38:05 INFO producer.SyncProducer: Disconnecting from s2:909214/08/19 11:38:05 INFO producer.SyncProducer: Connected to m1:9092 for producing14/08/19 11:38:05 INFO sink.KafkaSink: flume向kafka发送消息:hello idoall.org syslog |
|
1
2
3
4
5
6
7
8
9
10
11
|
root@s1:/home/hadoop# /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-console-consumer.sh --zookeeper m1:2181 --topic flume-kafka-storm-001 --from-beginningSLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.[2014-08-11 14:22:12,165] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [flume-kafka-storm-001,1] (kafka.server.ReplicaFetcherManager)[2014-08-11 14:22:12,218] WARN [KafkaApi-3] Produce request with correlation id 2 from client on partition [flume-kafka-storm-001,1] failed due to Topic flume-kafka-storm-001 either doesn‘t exist or is in the process of being deleted (kafka.server.KafkaApis)[2014-08-11 14:22:12,223] INFO Completed load of log flume-kafka-storm-001-1 with log end offset 0 (kafka.log.Log)[2014-08-11 14:22:12,250] INFO Created log for partition [flume-kafka-storm-001,1] in /home/hadoop/kafka_2.9.2-0.8.1.1/kafka-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager)[2014-08-11 14:22:12,267] WARN Partition [flume-kafka-storm-001,1] on broker 3: No checkpointed highwatermark is found for partition [flume-kafka-storm-001,1] (kafka.cluster.Partition)[2014-08-11 14:22:12,375] INFO Closing socket connection to /192.168.1.50. (kafka.network.Processor)hello idoall.org syslog |
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
<?xml version="1.0" encoding="utf-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>idoall.cloud</groupId> <artifactId>idoall.cloud</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>idoall.cloud</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <repositories> <repository> <id>github-releases</id> <url>http://oss.sonatype.org/content/repositories/github-releases/</url> </repository> <repository> <id>clojars.org</id> <url>http://clojars.org/repo</url> </repository> </repositories> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>com.sksamuel.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.0-beta1</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.14</version> </dependency> <dependency> <groupId>storm</groupId> <artifactId>storm</artifactId> <version>0.9.0.1</version> <!-- keep storm out of the jar-with-dependencies --> <scope>provided</scope> </dependency> <dependency> <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> <version>3.2.1</version> </dependency> </dependencies> </project> |
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
package idoall.cloud.storm;import java.text.SimpleDateFormat;import java.util.Date;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import kafka.consumer.ConsumerConfig;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichSpout;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;public class KafkaSpouttest implements IRichSpout { private SpoutOutputCollector collector; private ConsumerConnector consumer; private String topic; public KafkaSpouttest() { } public KafkaSpouttest(String topic) { this.topic = topic; } public void nextTuple() { } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } public void ack(Object msgId) { } public void activate() { <span style="font-size: 9pt; line-height: 25.2000007629395px;"> </span>consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); <span style="font-size: 9pt; line-height: 25.2000007629395px;"> </span>Map<String,Integer> topickMap = new HashMap<String, Integer>(); topickMap.put(topic, 1); System.out.println("*********Results********topic:"+topic); Map<String, List<KafkaStream<byte[],byte[]>>> streamMap=consumer.createMessageStreams(topickMap); KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0); ConsumerIterator<byte[],byte[]> it =stream.iterator(); while(it.hasNext()){ String value =new String(it.next().message()); SimpleDateFormat formatter = new SimpleDateFormat ("yyyy年MM月dd日 HH:mm:ss SSS"); Date curDate = new Date(System.currentTimeMillis());//获取当前时间 String str = formatter.format(curDate); System.out.println("storm接收到来自kafka的消息------->" + value); collector.emit(new Values(value,1,str), value); } } private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); // 设置zookeeper的链接地址 props.put("zookeeper.connect","m1:2181,m2:2181,s1:2181,s2:2181"); // 设置group id props.put("group.id", "1"); // kafka的group 消费记录是保存在zookeeper上的, 但这个信息在zookeeper上不是实时更新的, 需要有个间隔时间更新 props.put("auto.commit.interval.ms", "1000"); props.put("zookeeper.session.timeout.ms","10000"); return new ConsumerConfig(props); } public void close() { } public void deactivate() { } public void fail(Object msgId) { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word","id","time")); } public Map<String, Object> getComponentConfiguration() { System.out.println("getComponentConfiguration被调用"); topic="idoall_testTopic"; return null; }} |
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
package idoall.cloud.storm;import java.util.HashMap;import java.util.Map;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.TopologyBuilder;import backtype.storm.topology.base.BaseBasicBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;public class KafkaTopologytest { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new KafkaSpouttest(""), 1); builder.setBolt("bolt1", new Bolt1(), 2).shuffleGrouping("spout"); builder.setBolt("bolt2", new Bolt2(), 2).fieldsGrouping("bolt1",new Fields("word")); Map conf = new HashMap(); conf.put(Config.TOPOLOGY_WORKERS, 1); conf.put(Config.TOPOLOGY_DEBUG, true); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("my-flume-kafka-storm-topology-integration", conf, builder.createTopology()); Utils.sleep(1000*60*5); // local cluster test ... cluster.shutdown(); } public static class Bolt1 extends BaseBasicBolt { public void execute(Tuple input, BasicOutputCollector collector) { try { String msg = input.getString(0); int id = input.getInteger(1); String time = input.getString(2); msg = msg+"bolt1"; System.out.println("对消息加工第1次-------[arg0]:"+ msg +"---[arg1]:"+id+"---[arg2]:"+time+"------->"+msg); if (msg != null) { collector.emit(new Values(msg)); } } catch (Exception e) { e.printStackTrace(); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } public static class Bolt2 extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); public void execute(Tuple tuple, BasicOutputCollector collector) { String msg = tuple.getString(0); msg = msg + "bolt2"; System.out.println("对消息加工第2次---------->"+msg); collector.emit(new Values(msg,1)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } }} |
|
1
2
3
4
5
|
root@m2:/home/hadoop# /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-console-producer.sh --broker-st m1:9092 --sync --topic idoall_testTopicSLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.hello welcome idoall.org |
|
1
2
3
4
5
|
root@s1:/home/hadoop# /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-console-consumer.sh --zookeeper m1:2181 --topic idoall_testTopic --from-beginningSLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.hello welcome idoall.org |
|
1
2
3
4
5
6
7
8
9
10
11
|
#信息太多,我只截取重要部分:*********Results********topic:idoall_testTopicstorm接收到来自kafka的消息------->hello welcome idoall.org5268 [Thread-24-spout] INFO backtype.storm.daemon.task - Emitting: spout default [hello welcome idoall.org, 1, 2014年08月19日 11:21:15 051]对消息加工第1次-------[arg0]:hello welcome idoall.orgbolt1---[arg1]:1---[arg2]:2014年08月19日 11:21:15 051------->hello welcome idoall.orgbolt15269 [Thread-18-bolt1] INFO backtype.storm.daemon.executor - Processing received message source: spout:6, stream: default, id: {-2000523200413433507=6673316475127546409}, [hello welcome idoall.org, 1, 2014年08月19日 11:21:15 051]5269 [Thread-18-bolt1] INFO backtype.storm.daemon.task - Emitting: bolt1 default [hello welcome idoall.orgbolt1]5269 [Thread-18-bolt1] INFO backtype.storm.daemon.task - Emitting: bolt1 __ack_ack [-2000523200413433507 4983764025617316501]5269 [Thread-20-bolt2] INFO backtype.storm.daemon.executor - Processing received message source: bolt1:3, stream: default, id: {-2000523200413433507=1852530874180384956}, [hello welcome idoall.orgbolt1]对消息加工第2次---------->hello welcome idoall.orgbolt1bolt25270 [Thread-20-bolt2] INFO backtype.storm.daemon.task - Emitting: bolt2 default [hello welcome idoall.orgbolt1bolt2, 1] |
|
1
2
3
4
5
|
root@m1:/home/hadoop# cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/kafka_2.9.2-0.8.1.1.jar /home/hadoop/storm-0.9.2-incubating/libroot@m1:/home/hadoop# cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/scala-library-2.9.2.jar /home/hadoop/storm-0.9.2-incubating/libroot@m1:/home/hadoop# cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/metrics-core-2.2.0.jar /home/hadoop/storm-0.9.2-incubating/libroot@m1:/home/hadoop# cp /home/hadoop/zookeeper-3.4.5/dist-maven/zookeeper-3.4.5.jar /home/hadoop/storm-0.9.2-incubating/libroot@m1:/home/hadoop# cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/zkclient-0.3.jar /home/hadoop/storm-0.9.2-incubating/lib |
|
1
|
root@m1:/home/hadoop# /home/hadoop/storm-0.9.2-incubating/bin/storm nimbus & |
|
1
|
root@s1:/home/hadoop# /home/hadoop/storm-0.9.2-incubating/bin/storm supervisor & |
|
1
|
root@m1:/home/hadoop# /home/hadoop/storm-0.9.2-incubating/bin/storm ui & |
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
root@m1:/home/hadoop/storm-0.9.2-incubating# ll总用量 25768drwxr-xr-x 11 root root 4096 Aug 19 11:53 ./drwxr-xr-x 46 hadoop hadoop 4096 Aug 17 15:06 ../drwxr-xr-x 2 root root 4096 Aug 1 14:38 bin/-rw-r--r-- 1 502 staff 34239 Jun 13 08:46 CHANGELOG.mddrwxr-xr-x 2 root root 4096 Aug 2 12:31 conf/-rw-r--r-- 1 502 staff 538 Mar 13 11:17 DISCLAIMERdrwxr-xr-x 3 502 staff 4096 May 6 03:13 examples/drwxr-xr-x 3 root root 4096 Aug 1 14:38 external/-rw-r--r-- 1 root root 26252342 Aug 19 11:36 idoall.cloud.jardrwxr-xr-x 3 root root 4096 Aug 2 12:51 ldir/drwxr-xr-x 2 root root 4096 Aug 19 11:53 lib/-rw-r--r-- 1 502 staff 22822 Jun 12 04:07 LICENSEdrwxr-xr-x 2 root root 4096 Aug 1 14:38 logback/drwxr-xr-x 2 root root 4096 Aug 1 15:07 logs/-rw-r--r-- 1 502 staff 981 Jun 11 01:10 NOTICEdrwxr-xr-x 5 root root 4096 Aug 1 14:38 public/-rw-r--r-- 1 502 staff 7445 Jun 10 02:24 README.markdown-rw-r--r-- 1 502 staff 17 Jun 17 00:22 RELEASE-rw-r--r-- 1 502 staff 3581 May 30 00:20 SECURITY.mdroot@m1:/home/hadoop/storm-0.9.2-incubating# /home/hadoop/storm-0.9.2-incubating/bin/storm jar idoall.cloud.jar idoall.cloud.storm.KafkaTopologytest |
|
1
2
|
root@m1:/home/hadoop# echo "flume->kafka->storm message" | nc localhost 5140 root@m1:/home/hadoop# |
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
#内容太多,只截取重要部分storm接收到来自kafka的消息------->flume->kafka->storm message174218 [Thread-16-spout] INFO backtype.storm.daemon.task - Emitting: spout default [flume->kafka->storm message, 1, 2014年08月19日 12:06:39 360]174220 [Thread-10-bolt1] INFO backtype.storm.daemon.executor - Processing received message source: spout:6, stream: default, id: {-2345821945306343027=-7738131487327750388}, [flume->kafka->storm message, 1, 2014年08月19日 12:06:39 360]对消息加工第1次-------[arg0]:flume->kafka->storm messagebolt1---[arg1]:1---[arg2]:2014年08月19日 12:06:39 360------->flume->kafka->storm messagebolt1174221 [Thread-10-bolt1] INFO backtype.storm.daemon.task - Emitting: bolt1 default [flume->kafka->storm messagebolt1]174221 [Thread-10-bolt1] INFO backtype.storm.daemon.task - Emitting: bolt1 __ack_ack [-2345821945306343027 -2191137958679040397]174222 [Thread-20-__acker] INFO backtype.storm.daemon.executor - Processing received message source: bolt1:3, stream: __ack_ack, id: {}, [-2345821945306343027 -2191137958679040397]174222 [Thread-12-bolt2] INFO backtype.storm.daemon.executor - Processing received message source: bolt1:3, stream: default, id: {-2345821945306343027=8433871885621516671}, [flume->kafka->storm messagebolt1]对消息加工第2次---------->flume->kafka->storm messagebolt1bolt2174223 [Thread-12-bolt2] INFO backtype.storm.daemon.task - Emitting: bolt2 default [flume->kafka->storm messagebolt1bolt2, 1]174223 [Thread-12-bolt2] INFO backtype.storm.daemon.task - Emitting: bolt2 __ack_ack [-2345821945306343027 8433871885621516671]174224 [Thread-20-__acker] INFO backtype.storm.daemon.executor - Processing received message source: bolt2:4, stream: __ack_ack, id: {}, [-2345821945306343027 8433871885621516671]174228 [Thread-16-spout] INFO backtype.storm.daemon.task - Emitting: spout __ack_init [-2345821945306343027 -7738131487327750388 6]174228 [Thread-20-__acker] INFO backtype.storm.daemon.executor - Processing received message source: spout:6, stream: __ack_init, id: {}, [-2345821945306343027 -7738131487327750388 6]174228 [Thread-20-__acker] INFO backtype.storm.daemon.task - Emitting direct: 6; __acker __ack_ack [-2345821945306343027] |
Flume+Kafka+Strom基于分布式环境的结合使用,布布扣,bubuko.com
原文:http://www.cnblogs.com/lion.net/p/3922960.html