* 每个应用实例部署一个日志agent 
* agent实时将日志发送到kafka 
* storm实时计算日志 
* storm计算结果保存到hbase
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.0.2</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-kafka</artifactId>
    <version>1.0.2</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.2.0</version>
</dependency>
public class JsonBolt extends BaseRichBolt {
    private static final Logger LOG = LoggerFactory
            .getLogger(JsonBolt.class);
    private Fields fields;
    private OutputCollector collector;
    public JsonBolt() {
        this.fields = new Fields("hostIp", "instanceName", "className",
                "methodName", "createTime", "callTime", "errorCode");
    }
    @Override
    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        this.collector = collector;
    }
    @Override
    public void execute(Tuple tuple) {
        String spanDataJson = tuple.getString(0);
        LOG.info("source data:{}", spanDataJson);
        Map<String, Object> map = (Map<String, Object>) JSONValue
                .parse(spanDataJson);
        Values values = new Values();
        for (int i = 0, size = this.fields.size(); i < size; i++) {
            values.add(map.get(this.fields.get(i)));
        }
        this.collector.emit(tuple, values);
        this.collector.ack(tuple);
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(this.fields);
    }
}
public class MyTopology {
    private static final String TOPOLOGY_NAME = "SPAN-DATA-TOPOLOGY";
    private static final String KAFKA_SPOUT_ID = "kafka-stream";
    private static final String JsonProject_BOLT_ID = "jsonProject-bolt";
    public static void main(String[] args) throws Exception {
        String zks = "132.122.252.51:2181";
        String topic = "span-data-topic";
        String zkRoot = "/kafka-storm"; 
        BrokerHosts brokerHosts = new ZkHosts(zks);
        SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot,
                KAFKA_SPOUT_ID);
        spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
        spoutConf.zkServers = Arrays.asList(new String[] { "132.122.252.51" });
        spoutConf.zkPort = 2181;
        JsonBolt jsonBolt = new JsonBolt();
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(KAFKA_SPOUT_ID, new KafkaSpout(spoutConf));
        builder.setBolt(JsonProject_BOLT_ID, jsonBolt).shuffleGrouping(
                KAFKA_SPOUT_ID);
        Config config = new Config();
        config.setNumWorkers(1);
        if (args.length == 0) {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(TOPOLOGY_NAME, config,
                    builder.createTopology());
            Utils.waitForSeconds(100);
            cluster.killTopology(TOPOLOGY_NAME);
            cluster.shutdown();
        } else {
            StormSubmitter.submitTopology(args[0], config,
                    builder.createTopology());
        }
    }
}
原文:http://blog.csdn.net/wangyangzhizhou/article/details/52440862