
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO">
<Appenders>
<Console name="myConsole" target="SYSTEM_OUT">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
<RollingFile name="myFile" fileName="/Users/guludada/Desktop/logs/app.log"
filePattern="/Users/guludada/Desktop/logs/app-%d{yyyy-MM-dd-HH}.log">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} [%t] %-5level %logger{36} - %msg%n"/>
<Policies>
<TimeBasedTriggeringPolicy />
</Policies>
</RollingFile>
</Appenders>
<Loggers>
<Root level="Info">
<AppenderRef ref="myConsole"/>
<AppenderRef ref="myFile"/>
</Root>
</Loggers>
</Configuration>生成器代码:package com.guludada.ordersInfo;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
// Import log4j classes.
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class ordersInfoGenerator {
public enum paymentWays {
Wechat,Alipay,Paypal
}
public enum merchantNames {
优衣库,天猫,淘宝,咕噜大大,快乐宝贝,守望先峰,哈毒妇,Storm,Oracle,Java,CSDN,跑男,路易斯威登,
暴雪公司,Apple,Sumsam,Nissan,Benz,BMW,Maserati
}
public enum productNames {
黑色连衣裙, 灰色连衣裙, 棕色衬衫, 性感牛仔裤, 圆脚牛仔裤,塑身牛仔裤, 朋克卫衣,高腰阔腿休闲裤,人字拖鞋,
沙滩拖鞋
}
float[] skuPriceGroup = {299,399,699,899,1000,2000};
float[] discountGroup = {10,20,50,100};
float totalPrice = 0;
float discount = 0;
float paymentPrice = 0;
private static final Logger logger = LogManager.getLogger(ordersInfoGenerator.class);
private int logsNumber = 1000;
public void generate() {
for(int i = 0; i <= logsNumber; i++) {
logger.info(randomOrderInfo());
}
}
public String randomOrderInfo() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = new Date();
String orderNumber = randomNumbers(5) + date.getTime();
String orderDate = sdf.format(date);
String paymentNumber = randomPaymentWays() + "-" + randomNumbers(8);
String paymentDate = sdf.format(date);
String merchantName = randomMerchantNames();
String skuInfo = randomSkus();
String priceInfo = calculateOrderPrice();
return "orderNumber: " + orderNumber + " | orderDate: " + orderDate + " | paymentNumber: " +
paymentNumber + " | paymentDate: " + paymentDate + " | merchantName: " + merchantName +
" | sku: " + skuInfo + " | price: " + priceInfo;
}
private String randomPaymentWays() {
paymentWays[] paymentWayGroup = paymentWays.values();
Random random = new Random();
return paymentWayGroup[random.nextInt(paymentWayGroup.length)].name();
}
private String randomMerchantNames() {
merchantNames[] merchantNameGroup = merchantNames.values();
Random random = new Random();
return merchantNameGroup[random.nextInt(merchantNameGroup.length)].name();
}
private String randomProductNames() {
productNames[] productNameGroup = productNames.values();
Random random = new Random();
return productNameGroup[random.nextInt(productNameGroup.length)].name();
}
private String randomSkus() {
Random random = new Random();
int skuCategoryNum = random.nextInt(3);
String skuInfo ="[";
totalPrice = 0;
for(int i = 1; i <= 3; i++) {
int skuNum = random.nextInt(3)+1;
float skuPrice = skuPriceGroup[random.nextInt(skuPriceGroup.length)];
float totalSkuPrice = skuPrice * skuNum;
String skuName = randomProductNames();
String skuCode = randomCharactersAndNumbers(10);
skuInfo += " skuName: " + skuName + " skuNum: " + skuNum + " skuCode: " + skuCode
+ " skuPrice: " + skuPrice + " totalSkuPrice: " + totalSkuPrice + ";";
totalPrice += totalSkuPrice;
}
skuInfo += " ]";
return skuInfo;
}
private String calculateOrderPrice() {
Random random = new Random();
discount = discountGroup[random.nextInt(discountGroup.length)];
paymentPrice = totalPrice - discount;
String priceInfo = "[ totalPrice: " + totalPrice + " discount: " + discount + " paymentPrice: " + paymentPrice +" ]";
return priceInfo;
}
private String randomCharactersAndNumbers(int length) {
String characters = "abcdefghijklmnopqrstuvwxyz0123456789";
String randomCharacters = "";
Random random = new Random();
for (int i = 0; i < length; i++) {
randomCharacters += characters.charAt(random.nextInt(characters.length()));
}
return randomCharacters;
}
private String randomNumbers(int length) {
String characters = "0123456789";
String randomNumbers = "";
Random random = new Random();
for (int i = 0; i < length; i++) {
randomNumbers += characters.charAt(random.nextInt(characters.length()));
}
return randomNumbers;
}
public static void main(String[] args) {
ordersInfoGenerator generator = new ordersInfoGenerator();
generator.generate();
}
}
agent.sources = origin agent.channels = memorychannel agent.sinks = target agent.sources.origin.type = exec agent.sources.origin.command = tail -F /export/data/trivial/app.log agent.sources.origin.channels = memorychannel agent.sources.origin.interceptors = i1 agent.sources.origin.interceptors.i1.type = static agent.sources.origin.interceptors.i1.key = topic agent.sources.origin.interceptors.i1.value = ordersInfo agent.sinks.loggerSink.type = logger agent.sinks.loggerSink.channel = memorychannel agent.channels.memorychannel.type = memory agent.channels.memorychannel.capacity = 10000 agent.sinks.target.type = avro agent.sinks.target.channel = memorychannel agent.sinks.target.hostname = 172.16.124.130 agent.sinks.target.port = 4545
agent.sources = origin agent.channels = memorychannel agent.sinks = target agent.sources.origin.type = avro agent.sources.origin.channels = memorychannel agent.sources.origin.bind = 0.0.0.0 agent.sources.origin.port = 4545 agent.sinks.loggerSink.type = logger agent.sinks.loggerSink.channel = memorychannel agent.channels.memorychannel.type = memory agent.channels.memorychannel.capacity = 5000000 agent.channels.memorychannel.transactionCapacity = 1000000 agent.sinks.target.type = org.apache.flume.sink.kafka.KafkaSink #agent.sinks.target.topic = bigdata agent.sinks.target.brokerList=localhost:9092 agent.sinks.target.requiredAcks=1 agent.sinks.target.batchSize=100 agent.sinks.target.channel = memorychannel
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=export/data/kafka
zookeeper.connect=localhost:2181
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9093
log.dir=/export/data/kafka
zookeeper.connect=localhost:2181broker.id是kafka集群上每一个节点的单独标识,不能重复;listeners可以理解为每一个节点上Kafka进程要监听的端口,使用默认的就行;
log.dir是Kafka的log文件(记录消息的log file)存放目录;
zookeeper.connect就是Zookeeper的URI地址和端口。> bin/kafka-server-start.sh config/server-1.properties & ... > bin/kafka-server-start.sh config/server-2.properties & ...
conf/storm.yaml
# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ########### These MUST be filled in for a storm configuration storm.zookeeper.servers: - "ymhHadoop" - "ymhHadoop2" - "ymhHadoop3" storm.local.dir: "/export/data/storm/workdir" nimbus.host: "ymhHadoop" supervisor.slots.ports: -6700 -6701 -6702 -6703
package com.guludada.ordersanalysis;
import java.util.UUID;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import storm.kafka.Broker;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StaticHosts;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import storm.kafka.trident.GlobalPartitionInformation;
public class ordersAnalysisTopology {
private static String topicName = "ordersInfo";
private static String zkRoot = "/stormKafka/"+topicName;
public static void main(String[] args) {
BrokerHosts hosts = new ZkHosts("ymhHadoop:2181,ymhHadoop2:2181,ymhHadoop3:2181");
SpoutConfig spoutConfig = new SpoutConfig(hosts,topicName,zkRoot,UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafkaSpout",kafkaSpout);
builder.setBolt("merchantsSalesBolt", new merchantsSalesAnalysisBolt(), 2).shuffleGrouping("kafkaSpout");
Config conf = new Config();
conf.setDebug(true);
if(args != null && args.length > 0) {
conf.setNumWorkers(1);
try {
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
} catch (AlreadyAliveException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InvalidTopologyException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} else {
conf.setMaxSpoutPending(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("ordersAnalysis", conf, builder.createTopology());
}
}
}
package com.guludada.domain;
import java.util.ArrayList;
import java.util.Date;
public class ordersBean {
Date createTime = null;
String number = "";
String paymentNumber = "";
Date paymentDate = null;
String merchantName = "";
ArrayList<skusBean> skuGroup = null;
float totalPrice = 0;
float discount = 0;
float paymentPrice = 0;
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public String getNumber() {
return number;
}
public void setNumber(String number) {
this.number = number;
}
public String getPaymentNumber() {
return paymentNumber;
}
public void setPaymentNumber(String paymentNumber) {
this.paymentNumber = paymentNumber;
}
public Date getPaymentDate() {
return paymentDate;
}
public void setPaymentDate(Date paymentDate) {
this.paymentDate = paymentDate;
}
public String getMerchantName() {
return merchantName;
}
public void setMerchantName(String merchantName) {
this.merchantName = merchantName;
}
public ArrayList<skusBean> getSkuGroup() {
return skuGroup;
}
public void setSkuGroup(ArrayList<skusBean> skuGroup) {
this.skuGroup = skuGroup;
}
public float getTotalPrice() {
return totalPrice;
}
public void setTotalPrice(float totalPrice) {
this.totalPrice = totalPrice;
}
public float getDiscount() {
return discount;
}
public void setDiscount(float discount) {
this.discount = discount;
}
public float getPaymentPrice() {
return paymentPrice;
}
public void setPaymentPrice(float paymentPrice) {
this.paymentPrice = paymentPrice;
}
}
本文例子中用不到skusbean,所以这里作者就没有写
偷懒一下下package com.guludada.domain;
public class skusBean {
………………
}package com.guludada.common;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.guludada.domain.ordersBean;
public class logInfoHandler {
SimpleDateFormat sdf_final = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public ordersBean getOrdersBean(String orderInfo) {
ordersBean order = new ordersBean();
//从日志信息中过滤出订单信息
Pattern orderPattern = Pattern.compile("orderNumber:.+");
Matcher orderMatcher = orderPattern.matcher(orderInfo);
if(orderMatcher.find()) {
String orderInfoStr = orderMatcher.group(0);
String[] orderInfoGroup = orderInfoStr.trim().split("\\|");
//获取订单号
String orderNum = (orderInfoGroup[0].split(":"))[1].trim();
order.setNumber(orderNum);
//获取创建时间
String orderCreateTime = orderInfoGroup[1].trim().split(" ")[1] + " " + orderInfoGroup[1].trim().split(" ")[2];
try {
order.setCreateTime(sdf_final.parse(orderCreateTime));
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//获取商家名称
String merchantName = (orderInfoGroup[4].split(":"))[1].trim();
order.setMerchantName(merchantName);
//获取订单总额
String orderPriceInfo = (orderInfoGroup[6].split("price:"))[1].trim();
String totalPrice = (orderPriceInfo.substring(2, orderPriceInfo.length()-3).trim().split(" "))[1];
order.setTotalPrice(Float.parseFloat(totalPrice));
return order;
} else {
return order;
}
}
}
package com.guludada.ordersanalysis;
import java.util.Map;
import com.guludada.common.logInfoHandler;
import com.guludada.domain.ordersBean;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class merchantsSalesAnalysisBolt extends BaseRichBolt {
private OutputCollector _collector;
logInfoHandler loginfohandler;
JedisPool pool;
public void execute(Tuple tuple) {
String orderInfo = tuple.getString(0);
ordersBean order = loginfohandler.getOrdersBean(orderInfo);
//store the salesByMerchant infomation into Redis
Jedis jedis = pool.getResource();
jedis.zincrby("orderAna:topSalesByMerchant", order.getTotalPrice(), order.getMerchantName());
}
public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
this._collector = collector;
this.loginfohandler = new logInfoHandler();
this.pool = new JedisPool(new JedisPoolConfig(), "ymhHadoop",6379,2 * 60000,"12345");
}
public void declareOutputFields(OutputFieldsDeclarer arg0) {
// TODO Auto-generated method stub
}
}
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.guludada</groupId>
<artifactId>Storm_OrdersAnalysis</artifactId>
<packaging>war</packaging>
<version>0.0.1-SNAPSHOT</version>
<name>Storm_OrdersAnalysis Maven Webapp</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.6</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.6</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.9.0.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.1</version>
</dependency>
</dependencies>
<build>
<finalName>Storm_OrdersAnalysis</finalName>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.guludada.ordersanalysis.ordersAnalysisTopology</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>Flume+Kafka+Storm+Redis实时分析系统基本架构
原文:http://blog.csdn.net/ymh198816/article/details/51998085