//open()方法中
int spoutsSize = context.getComponentTasks(context.getThisComponentId()).size(); int myIdx = context.getThisTaskIndex(); String[] tracks = ((String) conf.get("track")).split(",");
一个完整的Spout code:输入参数track代表了多个流,在open()方法中用取模%初始化track,在execute()方法读取track的数据,发送。由于spout的多个实例的myIdx不同,它们可以获得各自的一个track,可以实现一个spout读取多个流。
1 //ApiStreamingSpout.java 2 package twitter.streaming; 3 4 import java.io.BufferedReader; 5 import java.io.IOException; 6 import java.io.InputStream; 7 import java.io.InputStreamReader; 8 import java.util.Map; 9 import java.util.concurrent.LinkedBlockingQueue; 10 11 import org.apache.http.HttpResponse; 12 import org.apache.http.StatusLine; 13 import org.apache.http.auth.AuthScope; 14 import org.apache.http.auth.UsernamePasswordCredentials; 15 import org.apache.http.client.methods.HttpGet; 16 import org.apache.http.impl.client.BasicCredentialsProvider; 17 import org.apache.http.impl.client.DefaultHttpClient; 18 import org.apache.log4j.Logger; 19 import org.json.simple.parser.JSONParser; 20 import org.json.simple.parser.ParseException; 21 22 import backtype.storm.spout.SpoutOutputCollector; 23 import backtype.storm.task.TopologyContext; 24 import backtype.storm.topology.OutputFieldsDeclarer; 25 import backtype.storm.topology.base.BaseRichSpout; 26 import backtype.storm.tuple.Fields; 27 import backtype.storm.tuple.Values; 28 29 public class ApiStreamingSpout extends BaseRichSpout { 30 31 static String STREAMING_API_URL = "https://stream.twitter.com/1/statuses/filter.json?track="; 32 private String track; 33 private String user; 34 private String password; 35 private DefaultHttpClient client; 36 private SpoutOutputCollector collector; 37 private UsernamePasswordCredentials credentials; 38 private BasicCredentialsProvider credentialProvider; 39 40 LinkedBlockingQueue<String> tweets = new LinkedBlockingQueue<String>(); 41 42 static Logger LOG = Logger.getLogger(ApiStreamingSpout.class); 43 static JSONParser jsonParser = new JSONParser(); 44 45 @Override 46 public void nextTuple() { 47 /* 48 * Create the client call 49 */ 50 client = new DefaultHttpClient(); 51 client.setCredentialsProvider(credentialProvider); 52 HttpGet get = new HttpGet(STREAMING_API_URL + track); // 每个spout实例track是唯一的。 53 HttpResponse response; 54 try { 55 // Execute 56 response = client.execute(get); 57 StatusLine status = response.getStatusLine(); 58 if (status.getStatusCode() == 200) { 59 InputStream inputStream = response.getEntity().getContent(); 60 BufferedReader reader = new BufferedReader( 61 new InputStreamReader(inputStream)); 62 String in; 63 // Read line by line 64 while ((in = reader.readLine()) != null) { 65 try { 66 // Parse and emit 67 Object json = jsonParser.parse(in); 68 collector.emit(new Values(track, json)); 69 } catch (ParseException e) { 70 LOG.error("Error parsing message from twitter", e); 71 } 72 } 73 } 74 } catch (IOException e) { 75 LOG.error("Error in communication with twitter api [" 76 + get.getURI().toString() + "]"); 77 try { 78 Thread.sleep(10000); 79 } catch (InterruptedException e1) { 80 } 81 } 82 } 83 84 /** 85 * spoutsSize、myIdx实现了一个spout读取多个流tracks。 86 */ 87 @Override 88 public void open(Map conf, TopologyContext context, 89 SpoutOutputCollector collector) { 90 int spoutsSize = context 91 .getComponentTasks(context.getThisComponentId()).size(); 92 int myIdx = context.getThisTaskIndex(); 93 String[] tracks = ((String) conf.get("track")).split(","); 94 StringBuffer tracksBuffer = new StringBuffer(); 95 for (int i = 0; i < tracks.length; i++) { 96 if (i % spoutsSize == myIdx) { 97 tracksBuffer.append(","); 98 tracksBuffer.append(tracks[i]); 99 } 100 } 101 102 if (tracksBuffer.length() == 0) 103 throw new RuntimeException("No track found for spout" 104 + " [spoutsSize:" + spoutsSize + ", tracks:" 105 + tracks.length + "] the amount" 106 + " of tracks must be more then the spout paralellism"); 107 108 this.track = tracksBuffer.substring(1).toString(); 109 110 user = (String) conf.get("user"); 111 password = (String) conf.get("password"); 112 113 credentials = new UsernamePasswordCredentials(user, password); 114 credentialProvider = new BasicCredentialsProvider(); 115 credentialProvider.setCredentials(AuthScope.ANY, credentials); 116 this.collector = collector; 117 } 118 119 @Override 120 public void declareOutputFields(OutputFieldsDeclarer declarer) { 121 declarer.declare(new Fields("criteria", "tweet")); 122 } 123 }
通过这种技术,可以在数据源间分布收集器。相同的技术可以被应用在其他的场景-例如,从web服务器收集日志文件。PS:没有试过。
2.Bolt可以使用emit(streamId, tuple)发射元组到多条流,每条流由字符串streamId来识别。然后,在TopologyBuilder 中,你可以决定订阅哪条流。
没有试过。2个疑问:如何declare呢?spout有这个功能么?
解答:1.declareOutputFields()方法中声明多条流,不就可以了。
1 public void declareOutputFields(OutputFieldsDeclarer declarer) { 2 declarer.declare(new Fields("line")); 3 declarer.declareStream("second", new Fields("line2")); 4 }
2.Bolt和spout的实现来看,应该都是可以的。
3.BaseRichSpout是否是自动调用ack方法的?
BaseBasicBolt, is used to do the acking automatically.意思就是说,这个是自动调用ack的。有空试一试。PS:目前的项目编程是如下方法。
1
2 |
collector.emit(); input.ack(); |
原文:http://www.cnblogs.com/byrhuangqiang/p/3605091.html