首页 > 其他 > 详细

Storm学习笔记

时间:2014-03-18 04:58:12      阅读:622      评论:0      收藏:0      [点我收藏+]
1.如何让一个spout并行读取多个流?
方法:任何spout、bolts组件都可以访问TopologyContext。利用这个特性可以让Spouts的实例之间划分流。
示例:获取到storm集群spouts并行度的大小,和这个spout实例的Task Index,让输入的tracks[]的多个流合理地分到某一个spout实例中。这样就可以实现一个spout并行读取多个流。
bubuko.com,布布扣
//open()方法中
int
spoutsSize = context.getComponentTasks(context.getThisComponentId()).size(); int myIdx = context.getThisTaskIndex(); String[] tracks = ((String) conf.get("track")).split(",");
bubuko.com,布布扣

一个完整的Spout code:输入参数track代表了多个流,在open()方法中用取模%初始化track,在execute()方法读取track的数据,发送。由于spout的多个实例的myIdx不同,它们可以获得各自的一个track,可以实现一个spout读取多个流。

bubuko.com,布布扣
  1 //ApiStreamingSpout.java
  2 package twitter.streaming;
  3 
  4 import java.io.BufferedReader;
  5 import java.io.IOException;
  6 import java.io.InputStream;
  7 import java.io.InputStreamReader;
  8 import java.util.Map;
  9 import java.util.concurrent.LinkedBlockingQueue;
 10 
 11 import org.apache.http.HttpResponse;
 12 import org.apache.http.StatusLine;
 13 import org.apache.http.auth.AuthScope;
 14 import org.apache.http.auth.UsernamePasswordCredentials;
 15 import org.apache.http.client.methods.HttpGet;
 16 import org.apache.http.impl.client.BasicCredentialsProvider;
 17 import org.apache.http.impl.client.DefaultHttpClient;
 18 import org.apache.log4j.Logger;
 19 import org.json.simple.parser.JSONParser;
 20 import org.json.simple.parser.ParseException;
 21 
 22 import backtype.storm.spout.SpoutOutputCollector;
 23 import backtype.storm.task.TopologyContext;
 24 import backtype.storm.topology.OutputFieldsDeclarer;
 25 import backtype.storm.topology.base.BaseRichSpout;
 26 import backtype.storm.tuple.Fields;
 27 import backtype.storm.tuple.Values;
 28 
 29 public class ApiStreamingSpout extends BaseRichSpout {
 30 
 31     static String STREAMING_API_URL = "https://stream.twitter.com/1/statuses/filter.json?track=";
 32     private String track;
 33     private String user;
 34     private String password;
 35     private DefaultHttpClient client;
 36     private SpoutOutputCollector collector;
 37     private UsernamePasswordCredentials credentials;
 38     private BasicCredentialsProvider credentialProvider;
 39 
 40     LinkedBlockingQueue<String> tweets = new LinkedBlockingQueue<String>();
 41 
 42     static Logger LOG = Logger.getLogger(ApiStreamingSpout.class);
 43     static JSONParser jsonParser = new JSONParser();
 44 
 45     @Override
 46     public void nextTuple() {
 47         /*
 48          * Create the client call
 49          */
 50         client = new DefaultHttpClient();
 51         client.setCredentialsProvider(credentialProvider);
 52         HttpGet get = new HttpGet(STREAMING_API_URL + track); // 每个spout实例track是唯一的。
 53         HttpResponse response;
 54         try {
 55             // Execute
 56             response = client.execute(get);
 57             StatusLine status = response.getStatusLine();
 58             if (status.getStatusCode() == 200) {
 59                 InputStream inputStream = response.getEntity().getContent();
 60                 BufferedReader reader = new BufferedReader(
 61                         new InputStreamReader(inputStream));
 62                 String in;
 63                 // Read line by line
 64                 while ((in = reader.readLine()) != null) {
 65                     try {
 66                         // Parse and emit
 67                         Object json = jsonParser.parse(in);
 68                         collector.emit(new Values(track, json));
 69                     } catch (ParseException e) {
 70                         LOG.error("Error parsing message from twitter", e);
 71                     }
 72                 }
 73             }
 74         } catch (IOException e) {
 75             LOG.error("Error in communication with twitter api ["
 76                     + get.getURI().toString() + "]");
 77             try {
 78                 Thread.sleep(10000);
 79             } catch (InterruptedException e1) {
 80             }
 81         }
 82     }
 83 
 84     /**
 85      * spoutsSize、myIdx实现了一个spout读取多个流tracks。
 86      */
 87     @Override
 88     public void open(Map conf, TopologyContext context,
 89             SpoutOutputCollector collector) {
 90         int spoutsSize = context
 91                 .getComponentTasks(context.getThisComponentId()).size();
 92         int myIdx = context.getThisTaskIndex();
 93         String[] tracks = ((String) conf.get("track")).split(",");
 94         StringBuffer tracksBuffer = new StringBuffer();
 95         for (int i = 0; i < tracks.length; i++) {
 96             if (i % spoutsSize == myIdx) {
 97                 tracksBuffer.append(",");
 98                 tracksBuffer.append(tracks[i]);
 99             }
100         }
101 
102         if (tracksBuffer.length() == 0)
103             throw new RuntimeException("No track found for spout"
104                     + " [spoutsSize:" + spoutsSize + ", tracks:"
105                     + tracks.length + "] the amount"
106                     + " of tracks must be more then the spout paralellism");
107 
108         this.track = tracksBuffer.substring(1).toString();
109 
110         user = (String) conf.get("user");
111         password = (String) conf.get("password");
112 
113         credentials = new UsernamePasswordCredentials(user, password);
114         credentialProvider = new BasicCredentialsProvider();
115         credentialProvider.setCredentials(AuthScope.ANY, credentials);
116         this.collector = collector;
117     }
118 
119     @Override
120     public void declareOutputFields(OutputFieldsDeclarer declarer) {
121         declarer.declare(new Fields("criteria", "tweet"));
122     }
123 }
View Code

 通过这种技术,可以在数据源间分布收集器。相同的技术可以被应用在其他的场景-例如,从web服务器收集日志文件。PS:没有试过。

 

2.Bolt可以使用emit(streamId, tuple)发射元组到多条流,每条流由字符串streamId来识别。然后,在TopologyBuilder 中,你可以决定订阅哪条流。

  没有试过。2个疑问:如何declare呢?spout有这个功能么?

解答:1.declareOutputFields()方法中声明多条流,不就可以了。

bubuko.com,布布扣
1 public void declareOutputFields(OutputFieldsDeclarer declarer) {
2         declarer.declare(new Fields("line"));
3         declarer.declareStream("second", new Fields("line2"));
4     }
bubuko.com,布布扣

        2.Bolt和spout的实现来看,应该都是可以的。

 3.BaseRichSpout是否是自动调用ack方法的?

    BaseBasicBolt,  is used to do the acking automatically.意思就是说,这个是自动调用ack的。有空试一试。PS:目前的项目编程是如下方法。

 

1
2
collector.emit();
input.ack();

 

  

 

Storm学习笔记,布布扣,bubuko.com

Storm学习笔记

原文:http://www.cnblogs.com/byrhuangqiang/p/3605091.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!