是个实时的,分布式以及具备高容错的计算框架
storm进程常驻内存
storm数据不经过磁盘,在内存中处理
是Twitter开源的实时的大数据处理框架,最早开源与GitHub
架构
nimbus
supervisor
worker
编程模型
DAG(topology)
spout
bolt
数据传输
ZMQ
zeroMQ开源的消息传递框架,并不是一个massagequeue
Netty
Netty是基于NIO的网络框架,更加高效。(之所以Storm 0.9版本之后使用Netty,是因为ZMQ的license和Storm的license不兼容。
高可靠性
异常处理
消息的可靠性保障机制
一个storm拓扑有一系列特殊的“acker”任务用来跟踪 每一个spout发送的所有tuple的dag(directed acyclic graph有向无环图)。当一个acker看到一个dag完成以后,它会给创造spout touple的spout task发送一个应答ack消息。你可以设置一个拓扑的acker task的个数在Config.TOPOLOGY_ACKERS,默认情况下每个任务的每个worker有一个acker。
理解storm可靠性的最好方式是看tuple的生命周期和tuple dags。当一个拓扑中的一个tuple产生时,无论他是一个spout或者一个bolt,都会随机给定一个64位的id,这些id用来让ackers跟踪dag中的每一个spout tuple。
每个tuple知道存在于他们的tuple 树的spout tuple的ids(也就是这个tuple的源tuple的id(祖宗id)) ,问你在一个bolt中发送一个新的tuple时,他的源spout tuple id会被复制给这个新的tuple。当一个tuple被ack,他发送一个消息到正确的acker task,信息内容包括这个tuple tree是怎么改变的。特别的,他会告诉acker“我是这个源spout tuple的tuple树内部完成的,并且这些是从我产生的新的tuples”。
可维护性
StormUI 图形化监控接口
storm的流式处理
流式处理
客户端提交数据进行结算,并不会等待数据计算结果
逐条处理
ETL
统计分析
实时请求
实时请求应答服务(同步)
客户端提交数据请求之后,立刻取得计算结果并返回给客户端
实时请求处理
storm与MapReduce对比
Storm:进程、线程常驻内存运行,数据不进入磁盘,数据通过网络传递。
MapReduce:为TB、PB级别数据设计的批处理离线计算框架。
storm的计算模型
spout----数据源
拓扑中数据流的来源,一般会从指定外部的数据源读取元组(tuple)发送到拓扑(topology)中
一个spout可以发送多个数据流
可先通过OutputFieldsDeclarer中的declare方法声明定义的不同数据流,发送数据时通过SpoutOutputCollector中的emit方法指定数据流Id(streamId)参数将数据发送出去
Spout中最核心的方法是nextTuple,该方法会被Storm线程不断调用、主动从数据源拉取数据,再通过emit方法将数据生成元组(Tuple)发送给之后的Bolt计算
blot--数据流处理组件
拓扑中数据处理均由bolt完成对于简单的任务或者数据流转换,单个Bolt可以简单实现;更加复杂场景往往需要多个Bolt分多个步骤完成
一个blot可以发送多个数据流
可先通过OutputFieldsDeclarer中的declare方法声明定义的不同数据流,发送数据时通过SpoutOutputCollector中的emit方法指定数据流Id(streamId)参数将数据发送出去
Bolt中最核心的方法是execute方法,该方法负责接收到一个元组(Tuple)数据、真正实现核心的业务逻辑
stream grouping--数据流分组(数据分发策略)
简单实例
数据累加
sumtopology类
package com.shsxt.strom.sum;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
import java.util.HashMap;
/**
* @author: Savage
* @data: 2019/10/17 11:29
*/
public abstract class SumTopology {
public static void main(String[] args) {
//创建一个topologybuilder对象
TopologyBuilder topologyBuilder = new TopologyBuilder();
//设置spout对象
topologyBuilder.setSpout("myspout",new SumSpout());
//设置bolt对象
topologyBuilder.setBolt("mybolt",new SumBolt()).shuffleGrouping("myspout");
//根据topologybuilder对象创建一个topology对象
StormTopology topology = topologyBuilder.createTopology();
//设置本地运行模式
LocalCluster localCluster = new LocalCluster();
//添加本地运行的topology对象
localCluster.submitTopology("job-sum",new HashMap(),topology);
}
}
sumbolt类
package com.shsxt.strom.sum;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import java.util.Map;
/**
* @author: Savage
* @data: 2019/10/17 11:57
*/
public class SumBolt extends BaseRichBolt {
long sum=0;
sumspout类
package com.shsxt.strom.sum;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.Map;
/**
* @author: Savage
* @data: 2019/10/17 11:42
*/
public class SumSpout extends BaseRichSpout {
long number=0;
SpoutOutputCollector collector;
/**
* 初始化调用此方法
* @param conf
* @param context
* @param collector
*/
Wordcount
设置WordCountTP类
package com.shsxt.strom.wordcount;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.AuthorizationException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
/**
* @author: Savage
* @data: 2019/10/17 14:58
*/
public class WordCountTP {
public static void main(String[] args) {
//创建topologybuilder对象
TopologyBuilder topologyBuilder = new TopologyBuilder();
//创建spout类
topologyBuilder.setSpout("wcSpout",new WordCountSpout());
//创建split类
topologyBuilder.setBolt("wcSplit",new WordCountSplit(),2).shuffleGrouping("wcSpout");
//创建count类
topologyBuilder.setBolt("wcCount",new WordCount(),5).fieldsGrouping("wcSplit",new Fields("word"));
//创建任务
StormTopology topology = topologyBuilder.createTopology();
Config config = new Config();
config.setNumWorkers(3);
if (args.length > 0) {
try {
StormSubmitter.submitTopology(args[0],config,topology);
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
} catch (AuthorizationException e) {
e.printStackTrace();
}
}else {
//创建本地运行模式
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("wordcount",config,topology);
}
}
}
设置WordCountSpout类
package com.shsxt.strom.wordcount;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.Map;
import java.util.Random;
/**
* @author: Savage
* @data: 2019/10/17 15:01
*/
public class WordCountSpout extends BaseRichSpout {
private Random random=new Random();
//定义数据
String[] lines={
"好 好 学 习",
"天 天 向 上",
"我 爱 北 京 天 安 门"
};
SpoutOutputCollector collector;
设置WordCountSplit类
package com.shsxt.strom.wordcount;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
/**
* @author: Savage
* @data: 2019/10/17 15:11
*/
public class WordCountSplit extends BaseRichBolt {
OutputCollector collector;
设置WordCount类
package com.shsxt.strom.wordcount;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import java.util.HashMap;
import java.util.Map;
/**
* @author: Savage
* @data: 2019/10/17 15:19
*/
public class WordCount extends BaseRichBolt {
Map<String,Integer> result = new HashMap<>();
架构模型
nimbus
资源调度
任务分配
接收jar包
supervier
接收nimbus分配的任务
启动,停止自己管理的worker进程(当前supervisor上worker数量由配置文件设定)
worker
运行具体处理运算组件的进程(每个Worker对应执行一个Topology的子集)
worker任务类型,即spout任务、bolt任务两种
启动executor(executor即worker JVM进程中的一个java线程,一般默认每个executor负责执行一个task任务)
zookeeper,监听,管理
storm架构设计与Hadoop架构对比
Hadoop | Storm | |
---|---|---|
主节点 | ResourceManager | Nimbus |
从节点 | NodeManager | Supervisor |
应用程序 | Job | Topology |
工作进程 | Child | Worker |
计算模型 | Map/Reduce | Spout/Bolt |
storm任务提交流程
storm. 本地树
zookeeper目录树
上传jar包
解压
创建logs文件
mkdir logs
启动
进入 storm目录下
启动zookeeper
bin/storm dev-zookeeper >> logs/zk.out 2>&1 &
启动nimbus
bin/storm nimbus >> logs/nimbus.out 2>&1 &
启动 stormUI
bin/storm ui >> logs/ui.out 2>&1 &
启动supervisor
bin/storm supervisor >> logs/supervisor.out 2>&1 &
启动logviewer
bin/storm logviewer &
查看日志
http://192.168.61.220:8000/log?file= 要查看的文件名
上传jar包
解压
创建logs文件
mkdir logs
修改配置文件
vim storm.yaml
storm.zookeeper.servers:
"node01"
"node02"
"node03"
nimbus.host: “node01"
拷贝
scp storm-0.10.0 node02:‘pwd‘
scp storm-0.10.0 node03:‘pwd‘
启动
启动zookeeper集群
启动nimbus
bin/storm nimbus >> logs/nimbus.out 2>&1 &
nimbus.host配置的是哪个节点就在该节点上启动nimbus
启动supervisor(剩余两个节点上启动)
bin/storm supervisor >> logs/supervisor.out 2>&1 &
启动storm UI
只需要在一台机器上启动即可
storm ui >> logs/ui.out 2>&1 &
访问
查看日志
bin/storm logviewer &
需要在哪个节点上查看日志就在该节点上上启动
worker进程
一个Topology拓扑会包含一个或多个Worker(每个Worker进程只能从属于一个特定的Topology)这些Worker进程会并行跑在集群中不同的服务器上,即一个Topology拓扑其实是由并行运行在Storm集群中多台服务器上的进程所组成
Executor--线程
Executor是由Worker进程中生成的一个线程;每个Worker进程中会运行拓扑当中的一个或多个Executor线程;一个Executor线程中可以执行一个或多个Task任务(默认每个Executor只执行一个Task任务),但是这些Task任务都是对应着同一个组件(Spout、Bolt)。
task
实际执行数据处理的最小单元
每个task即为一个Spout或者一个Bolt
Task数量在整个Topology生命周期中保持不变,Executor数量可以变化或手动调整(默认情况下,Task数量和Executor是相同的,即每个Executor线程中默认运行一个Task任务)
设置Worker进程数
Config.setNumWorkers(int workers)
设置executor线程数
TopologyBuilder.setSpout(String id, IRichSpout spout, Number parallelism_hint) TopologyBuilder.setBolt(String id, IRichBolt bolt, Number parallelism_hint) :其中, parallelism_hint即为executor线程数(如果不设置,则为一个线程)
设置Task数量
ComponentConfigurationDeclarer.setNumTasks(Number val)
Rebalance – 再平衡 即,动态调整Topology拓扑的Worker进程数量、以及Executor线程数量
支持两种调整方式:
通过Storm UI
通过Storm CLI
通过Storm CLI动态调整
Worker进程间的数据通信
ZMQ
ZeroMQ 开源的消息传递框架,并不是一个MessageQueue
Netty
Netty是基于NIO的网络框架,更加高效。(之所以Storm 0.9版本之后使用Netty,是因为ZMQ的license和Storm的license不兼容。)
Worker内部的数据通信
Disruptor
实现了“队列”的功能。 可以理解为一种事件监听或者消息处理机制,即在队列当中一边由生产者放入消息数据,另一边消费者并行取出消息数据处理
Storm 通信机制 -- Worker内部的消息传递机制
集群节点宕机
nimbus服务器
单点故障
非nimbus服务器
发生故障时,该节点上所有的task任务都会超时,nimbus会将这些task任务重新分配到其他服务器上运行
进程挂掉
worker
挂掉时,Supervisor会重新启动这个进程。如果启动过程中仍然一直失败,并且无法向Nimbus发送心跳,Nimbus会将该Worker重新分配到其他服务器上
Supervisor
无状态(所有的状态信息都存放在Zookeeper中来管理) 快速失败(每当遇到任何异常情况,都会自动毁灭)
Nimbus
`无状态(所有的状态信息都存放在Zookeeper中来管理) 快速失败(每当遇到任何异常情况,都会自动毁灭)
消息的完整性
从Spout中发出的Tuple,以及基于他所产生Tuple(例如上个例子当中Spout发出的句子,以及句子当中单词的tuple等)
由这些消息就构成了一棵tuple树
当这棵tuple树发送完成,并且树当中每一条消息都被正确处理,就表明spout发送消息被“完整处理”,即消息的完整性
DRPC (Distributed RPC)分布式远程过程调用 DRPC 是通过一个 DRPC 服务端(DRPC server)来实现分布式 RPC 功能的。 DRPC Server 负责接收 RPC 请求,并将该请求发送到 Storm中运行的 Topology,等待接收 Topology 发送的处理结果,并将该结果返回给发送请求的客户端。 (其实,从客户端的角度来说,DPRC 与普通的 RPC 调用并没有什么区别。) DRPC设计目的: 为了充分利用Storm的计算能力实现高密度的并行实时计算。 (Storm接收若干个数据流输入,数据在Topology当中运行完成,然后通过DRPC将结果进行输出。)
客户端通过向 DRPC 服务器发送待执行函数的名称以及该函数的参数来获取处理结果。实现该函数的拓扑使用一个DRPCSpout 从 DRPC 服务器中接收一个函数调用流。DRPC 服务器会为每个函数调用都标记了一个唯一的 id。随后拓扑会执行函数来计算结果,并在拓扑的最后使用一个名为 ReturnResults 的 bolt 连接到 DRPC 服务器,根据函数调用的 id 来将函数调用的结果返回。
定义DRPC拓扑:
方法1: 通过LinearDRPCTopologyBuilder (该方法也过期,不建议使用) 该方法会自动为我们设定Spout、将结果返回给DRPC Server等,我们只需要将Topology实现
方法2: 直接通过普通的拓扑构造方法TopologyBuilder来创建DRPC拓扑 需要手动设定好开始的DRPCSpout以及结束的ReturnResults
运行模式:
本地模式
远程模式
修改配置文件conf/storm.yaml drpc.servers:
"node01“
启动DRPC Server
bin/storm drpc &
通过StormSubmitter.submitTopology提交拓扑
- 
DRPC
原文:https://www.cnblogs.com/ruanjianwei/p/12132499.html