0安装JDK
wget --no-check-certificate --no-cookie --header "Cookie: oraclelicense=accept-securebackup-cookie;" http://download.oracle.com/otn-pub/java/jdk/8u45-b14/jdk-8u45-linux-x64.rpm
使用rpm -ivh jdk-8u45-linux-x64.rpm进行安装
检查安装Javac
1:centOS安装ZeroMQ所需组件及工具:
yum install gcc
yum install gcc-c++
yum install make
yum install uuid-devel
yum install libuuid-devel
yum install libtool
wget http://mirror.bjtu.edu.cn/apache/zookeeper/stable/zookeeper-3.4.6.tar.gz
tar -zxvf zookeeper-3.4.6.tar.gz
cp -R zookeeper-3.4.6 /usr/local/
ln -s /usr/local/zookeeper-3.4.6/ /usr/local/zookeeper
vim /etc/profile
export ZOOKEEPER_HOME="/path/to/zookeeper" #路径指定,存放日志等文件
cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg
mkdir /tmp/zookeepermkdir /var/log/zookeeper
安装zeromq以及jzmq:
wget http://download.zeromq.org/zeromq-2.2.0.tar.gz
tar zxf zeromq-2.2.0.tar.gz
cd zeromq-2.2.0
./configure
make
make install
sudo ldconfig (更新LD_LIBRARY_PATH)zeromq安装完成。
安装jzmq: (提前安装好java)
yum install git
git clone git://github.com/nathanmarz/jzmq.git
cd jzmq
./autogen.sh
./configure
make
make install然后,jzmq就装好了.注意:在./autogen.sh这步如果报错:autogen.sh:error:could not find libtool is required to run autogen.sh,这是因为缺少了libtool,可以用#yum install libtool*来解决。
wget http://cloud.github.com/downloads/nathanmarz/storm/storm-0.8.1.zip
unzip storm-0.8.1.zip
mv storm-0.8.1 /usr/local/
ln -s /usr/local/storm-0.8.1/ /usr/local/storm
vim /etc/profile
export STORM_HOME=/usr/local/storm-0.8.1
export PATH=$PATH:$STORM_HOME/bin
到此为止单机版的Storm就安装完毕了。
zkServer.sh start
storm nimbus &
storm supervisor &
storm ui &
storm jar /opt/hadoop/loganalyst/storm-dependend/data/teststorm-1.0.jar teststorm.TopologyMain /opt/hadoop/loganalyst/storm-dependend/data/words.txt
storm kill {toponame}
storm active {toponame}
storm deactive {toponame}
storm list再查看进程jps查看UI:在浏览器中输入http://localhost:8080
6:storm进程远程kill
如要监控Storm集群和运行在其上的Topology,该如何做呢?
Storm已经为你考虑到了,Storm支持Thrift的C/S架构,在部署Nimbus组件的机器上启动一个Thrift Server进程来提供服务,我们可以通过编写一个Thrift Client来请求Thrift Server,来获取你想得到的集群和Topology的相关数据,来接入监控平台,如Zabbix等,我目前使用的就是Zabbix。
整体的流程已经清楚了,下面就来实践吧。
1 安装Thrift
由于我们要使用Thrift来编译Storm的源代码来获得Thrift Client相关的Java源代码,所以需要先安装Thrift,这里选取的版本为0.9.2。
到官网下载好安装包:http://thrift.apache.org/
编译安装:configure && make && make install
验证:thrift --version
如果打印出Thrift version 0.9.2,代表安装成功。
2 编译Thrift Client代码
首先下载Storm源代码,这里使用最新的0.9.3版本:http://mirrors.hust.edu.cn/apache/storm/apache-storm-0.9.3/apache-storm-0.9.3-src.tar.gz
解压后进行编译:thrift -gen java apache-storm-0.9.3/storm-core/src/storm.thrift
在当前目录下出现gen-java文件夹,此文件夹下就是Thrift Client的Java源代码了。
3 使用Thrift Client API
然后创建一个Maven项目来进行执行监控数据的获取。
项目生成一个Jar文件,输入一些命令和自定义参数,然后输出结果。
以命令行的形式进行调用,这样可以方便的接入监控系统,当然使用形式可以根据自身情况施行。
创建好后,把gen-java生成的代码拷贝进来。
在pom.xml里引入Thrift对应版本的库:
|
1
2
3
4
5
|
<dependency> <groupId>org.apache.thrift</groupId> <artifactId>libthrift</artifactId> <version>0.9.2</version></dependency> |
首先写一些Thrift相关的辅助类。
ClientInfo.java
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
package com.damacheng009.storm.monitor.thrift;import org.apache.thrift.protocol.TBinaryProtocol;import org.apache.thrift.transport.TFramedTransport;import org.apache.thrift.transport.TSocket;import backtype.storm.generated.Nimbus;/** * 代表一个Thrift Client的信息 * @author jb-xingchencheng * */public class ClientInfo { private TSocket tsocket; private TFramedTransport tTransport; private TBinaryProtocol tBinaryProtocol; private Nimbus.Client client; public TSocket getTsocket() { return tsocket; } public void setTsocket(TSocket tsocket) { this.tsocket = tsocket; } public TFramedTransport gettTransport() { return tTransport; } public void settTransport(TFramedTransport tTransport) { this.tTransport = tTransport; } public TBinaryProtocol gettBinaryProtocol() { return tBinaryProtocol; } public void settBinaryProtocol(TBinaryProtocol tBinaryProtocol) { this.tBinaryProtocol = tBinaryProtocol; } public Nimbus.Client getClient() { return client; } public void setClient(Nimbus.Client client) { this.client = client; }} |
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
package com.damacheng009.storm.monitor.thrift;import org.apache.thrift.protocol.TBinaryProtocol;import org.apache.thrift.transport.TFramedTransport;import org.apache.thrift.transport.TSocket;import org.apache.thrift.transport.TTransportException;import backtype.storm.generated.Nimbus;/** * Thrift Client管理类 * @author jb-xingchencheng * */public class ClientManager { public static ClientInfo getClient(String nimbusHost, int nimbusPort) throws TTransportException { ClientInfo client = new ClientInfo(); TSocket tsocket = new TSocket(nimbusHost, nimbusPort); TFramedTransport tTransport = new TFramedTransport(tsocket); TBinaryProtocol tBinaryProtocol = new TBinaryProtocol(tTransport); Nimbus.Client c = new Nimbus.Client(tBinaryProtocol); tTransport.open(); client.setTsocket(tsocket); client.settTransport(tTransport); client.settBinaryProtocol(tBinaryProtocol); client.setClient(c); return client; } public static void closeClient(ClientInfo client) { if (null == client) { return; } if (null != client.gettTransport()) { client.gettTransport().close(); } if (null != client.getTsocket()) { client.getTsocket().close(); } }} |
下面是入口类:
Main.java
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
package com.damacheng009.storm.monitor;import com.damacheng009.storm.monitor.logic.Logic;/** * 入口类 * @author jb-xingchencheng * */public class Main { // NIMBUS的信息 public static String NIMBUS_HOST = "192.168.180.36"; public static int NIMBUS_PORT = 6627; /** * 命令格式 CMD(命令) [ARG0] [ARG1] ...(更多参数) * @param args */ public static void main(String[] args) { if (args.length < 3) { return; } NIMBUS_HOST = args[0]; NIMBUS_PORT = Integer.parseInt(args[1]); String cmd = args[2]; String result = "-1"; if (cmd.equals("get_topo_exp_size")) { String topoName = args[3]; result = Logic.getTopoExpSize(topoName); } else if (cmd.equals("get_topo_exp_stack_trace")) { String topoName = args[3]; result = Logic.getTopoExpStackTrace(topoName); } System.out.println(result); }} |
测试的时候把具体的HOST和PORT改一下即可。
然后是具体的逻辑类。Logic.java
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
package com.damacheng009.storm.monitor.logic;import java.util.Date;import java.util.List;import java.util.Set;import com.damacheng009.storm.monitor.Main;import com.damacheng009.storm.monitor.thrift.ClientInfo;import com.damacheng009.storm.monitor.thrift.ClientManager;import backtype.storm.generated.ClusterSummary;import backtype.storm.generated.ErrorInfo;import backtype.storm.generated.TopologyInfo;import backtype.storm.generated.TopologySummary;public class Logic { /** * 取得某个拓扑的异常个数 * @param topoName * @return */ public static String getTopoExpSize(String topoName) { ClientInfo client = null; int errorTotal = 0; try { client = ClientManager.getClient(Main.NIMBUS_HOST, Main.NIMBUS_PORT); ClusterSummary clusterSummary = client.getClient().getClusterInfo(); List<TopologySummary> topoSummaryList = clusterSummary.getTopologies(); for (TopologySummary ts : topoSummaryList) { if (ts.getName().equals(topoName)) { TopologyInfo topologyInfo = client.getClient().getTopologyInfo(ts.getId()); Set<String> errorKeySet = topologyInfo.getErrors().keySet(); for (String errorKey : errorKeySet) { List<ErrorInfo> listErrorInfo = topologyInfo.getErrors().get(errorKey); errorTotal += listErrorInfo.size(); } break; } } return String.valueOf(errorTotal); } catch (Exception e) { return "-1"; } finally { ClientManager.closeClient(client); } } /** * 返回某个拓扑的异常堆栈 * @param topoName * @return */ public static String getTopoExpStackTrace(String topoName) { ClientInfo client = null; StringBuilder error = new StringBuilder(); try { client = ClientManager.getClient(Main.NIMBUS_HOST, Main.NIMBUS_PORT); ClusterSummary clusterSummary = client.getClient().getClusterInfo(); List<TopologySummary> topoSummaryList = clusterSummary.getTopologies(); for (TopologySummary ts : topoSummaryList) { if (ts.getName().equals(topoName)) { TopologyInfo topologyInfo = client.getClient().getTopologyInfo(ts.getId()); // 得到错误信息 Set<String> errorKeySet = topologyInfo.getErrors().keySet(); for (String errorKey : errorKeySet) { List<ErrorInfo> listErrorInfo = topologyInfo.getErrors().get(errorKey); for (ErrorInfo ei : listErrorInfo) { // 发生异常的时间 long expTime = (long) ei.getError_time_secs() * 1000; // 现在的时间 long now = System.currentTimeMillis(); // 由于获取的是全量的错误堆栈,我们可以设置一个范围来获取指定范围的错误,看情况而定 // 如果超过5min,那么就不用记录了,因为5min检查一次 if (now - expTime > 1000 * 60 * 5) { continue; } error.append(new Date(expTime) + "\n"); error.append(ei.getError() + "\n"); } } break; } } return error.toString().isEmpty() ? "none" : error.toString(); } catch (Exception e) { return "-1"; } finally { ClientManager.closeClient(client); } }} |
最后打成一个Jar包,就可以跑起来接入监控系统了,如在Zabbix中,可以把各个监控项设置为自定义的item,在Zabbix Client中配置命令行来运行Jar取得数据。
接下来的测试过程先略过。
对于Storm监控的实践,目前就是这样了。
原文:http://www.cnblogs.com/leo3689/p/5158138.html