功能:将多个数据源的数据汇集到一个处理单元进行集中分类处理;
入口类TestMain
| 1 | public class TestMain { | 
| 2 | |
| 3 | public static void main(String[] args) { | 
| 4 | TopologyBuilder builder = new TopologyBuilder(); | 
| 5 | builder.setSpout("random1", new RandomWordSpout1(), 1); | 
| 6 | builder.setSpout("random2", new RandomWordSpout2(), 1); | 
| 7 | builder.setSpout("random3", new RandomWordSpout3(), 1); | 
| 8 | builder.setBolt("", new TransferBolt(), 1) | 
| 9 | .localOrShuffleGrouping("random1", "stream1") | 
| 10 | .localOrShuffleGrouping("random2", "stream2") | 
| 11 | .localOrShuffleGrouping("random3", "stream3"); | 
| 12 | |
| 13 | Config conf = new Config(); | 
| 14 | conf.setDebug(false); | 
| 15 | conf.setNumWorkers(1); | 
| 16 | LocalCluster cluster = new LocalCluster(); | 
| 17 | cluster.submitTopology("test-1", conf, builder.createTopology()); | 
| 18 | } | 
| 19 | } | 
数据源类RandomWordSpout1 输出字段为name
| 1 | public class RandomWordSpout1 extends BaseRichSpout { | 
| 2 | |
| 3 | private static final long serialVersionUID = -4287209449750623371L; | 
| 4 | |
| 5 | private SpoutOutputCollector collector; | 
| 6 | |
| 7 | @Override | 
| 8 | public void open(@SuppressWarnings("rawtypes") Map conf, | 
| 9 | TopologyContext context, SpoutOutputCollector collector) { | 
| 10 | this.collector = collector; | 
| 11 | } | 
| 12 | |
| 13 | @Override | 
| 14 | public void declareOutputFields(OutputFieldsDeclarer declarer) { | 
| 15 | declarer.declareStream("stream1", new Fields("name")); | 
| 16 | } | 
| 17 | |
| 18 | @Override | 
| 19 | public void nextTuple() { | 
| 20 | collector.emit("stream1", new Values("RandomWordSpout1")); | 
| 21 | } | 
| 22 | |
| 23 | } | 
数据源类RandomWordSpout2 输出字段为content
| 1 | public class RandomWordSpout2 extends BaseRichSpout { | 
| 2 | |
| 3 | private SpoutOutputCollector collector; | 
| 4 | |
| 5 | @Override | 
| 6 | public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { | 
| 7 | this.collector = collector; | 
| 8 | } | 
| 9 | |
| 10 | @Override | 
| 11 | public void declareOutputFields(OutputFieldsDeclarer declarer) { | 
| 12 | declarer.declareStream("stream2", new Fields("content")); | 
| 13 | } | 
| 14 | |
| 15 | @Override | 
| 16 | public void nextTuple() { | 
| 17 | collector.emit("stream2",new Values("RandomWordSpout2")); | 
| 18 | } | 
| 19 | |
| 20 | } | 
数据源类RandomWordSpout3输出key、value两个字段
| 1 | public class RandomWordSpout3 extends BaseRichSpout { | 
| 2 | |
| 3 | private SpoutOutputCollector collector; | 
| 4 | |
| 5 | @Override | 
| 6 | public void open(@SuppressWarnings("rawtypes") Map conf, | 
| 7 | TopologyContext context, SpoutOutputCollector collector) { | 
| 8 | this.collector = collector; | 
| 9 | } | 
| 10 | |
| 11 | @Override | 
| 12 | public void declareOutputFields(OutputFieldsDeclarer declarer) { | 
| 13 | declarer.declareStream("stream3", new Fields("key", "value")); | 
| 14 | } | 
| 15 | |
| 16 | @Override | 
| 17 | public void nextTuple() { | 
| 18 | collector.emit("stream3", new Values("chenx","happyday")); | 
| 19 | |
| 20 | } | 
| 21 | |
| 22 | } | 
聚流处理类TransferBolt,输出从各流获取到的数据
| 1 | public class TransferBolt extends BaseBasicBolt { | 
| 2 | |
| 3 | private static final long serialVersionUID = 4223708336037089125L; | 
| 4 | private Map<String, Fields> _fieldMap = null; | 
| 5 | |
| 6 | @Override | 
| 7 | public void prepare(@SuppressWarnings("rawtypes") Map stormConf, | 
| 8 | TopologyContext context) { | 
| 9 | _fieldMap = new HashMap<String, Fields>(); | 
| 10 | Set<GlobalStreamId> sourceSet = context.getThisSources().keySet(); | 
| 11 | for (GlobalStreamId source : sourceSet) { | 
| 12 | Fields fields = context.getComponentOutputFields( | 
| 13 | source.get_componentId(), source.get_streamId()); | 
| 14 | _fieldMap.put(source.get_componentId() + source.get_streamId(), | 
| 15 | fields); | 
| 16 | } | 
| 17 | |
| 18 | } | 
| 19 | |
| 20 | @Override | 
| 21 | public void declareOutputFields(OutputFieldsDeclarer declarer) { | 
| 22 | } | 
| 23 | |
| 24 | @Override | 
| 25 | public void execute(Tuple input, BasicOutputCollector collector) { | 
| 26 | String key = input.getSourceComponent() + input.getSourceStreamId(); | 
| 27 | Fields fields = _fieldMap.get(key); | 
| 28 | int size = fields.size(); | 
| 29 | String content = ""; | 
| 30 | for (int i = 0; i < size; i++) { | 
| 31 | content += input.getStringByField(fields.get(i)); | 
| 32 | } | 
| 33 | System.out.println("SourceComponent:" + input.getSourceComponent() | 
| 34 | + ",SourceStreamId:" + input.getSourceStreamId() + ",content:" | 
| 35 | + content); | 
| 36 | } | 
| 37 | |
| 38 | } | 
原文:http://www.cnblogs.com/jianyuan/p/4830839.html