Storm的所有的状态信息都保存在Zookeeper里面,nimbus通过在zookeeper上面写状态信息来分配任务:
使得nimbus可以监控整个storm集群的状态,从而可以重启一些挂掉的task。 ZooKeeper使得整个storm集群十分的健壮-—任何一台工作机器挂掉都没有关系,只要重启然后从zookeeper上面重新获取状态信息就可以了。那Storm在zookeeper里面存储了哪些状态呢?在James Xu的文章中有所涉及,但是该文章讲述的已经过时了。本文主要介绍Storm在ZooKeeper中保存的数据目录结构,源代码主要是:backtype.storm.cluster。
关于storm操作zookeeper的详细分析请参见:源码阅读之storm操作zookeeper-cluster.clj
Zookeeper的操作
1
2
3
4
5
6
7
8
9
10
11
12
|
(defprotocol ClusterState (set-ephemeral-node [ this path data]) (delete-node [ this path]) (create-sequential [ this path data]) (set-data [ this path data]) ;; if node does not exist, create persistent with this data (get-data [ this path watch?]) (get-children [ this path watch?]) (mkdirs [ this path]) (close [ this ]) (register [ this callback]) (unregister [ this id]) ) |
Storm使用Zookeeper的操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
(defprotocol StormClusterState (assignments [ this callback]) (assignment-info [ this storm-id callback]) (active-storms [ this ]) (storm-base [ this storm-id callback]) (get-worker-heartbeat [ this storm-id node port]) (executor-beats [ this storm-id executor->node+port]) (supervisors [ this callback]) (supervisor-info [ this supervisor-id]) ;; returns nil if doesn't exist (setup-heartbeats! [ this storm-id]) (teardown-heartbeats! [ this storm-id]) (teardown-topology-errors! [ this storm-id]) (heartbeat-storms [ this ]) (error-topologies [ this ]) (worker-heartbeat! [ this storm-id node port info]) (remove-worker-heartbeat! [ this storm-id node port]) (supervisor-heartbeat! [ this supervisor-id info]) (activate-storm! [ this storm-id storm-base]) (update-storm! [ this storm-id new -elems]) (remove-storm-base! [ this storm-id]) (set-assignment! [ this storm-id info]) (remove-storm! [ this storm-id]) (report-error [ this storm-id task-id error]) (errors [ this storm-id task-id]) (disconnect [ this ]) ) |
Storm中在Zookeeper中存储的目录
1
2
3
4
5
6
7
8
9
10
11
12
|
(def ASSIGNMENTS-ROOT "assignments" ) (def CODE-ROOT "code" ) (def STORMS-ROOT "storms" ) (def SUPERVISORS-ROOT "supervisors" ) (def WORKERBEATS-ROOT "workerbeats" ) (def ERRORS-ROOT "errors" ) (def ASSIGNMENTS-SUBTREE (str "/" ASSIGNMENTS-ROOT)) (def STORMS-SUBTREE (str "/" STORMS-ROOT)) (def SUPERVISORS-SUBTREE (str "/" SUPERVISORS-ROOT)) (def WORKERBEATS-SUBTREE (str "/" WORKERBEATS-ROOT)) (def ERRORS-SUBTREE (str "/" ERRORS-ROOT)) |
1./assignments -> 任务分配信息
2./storms -> 正在运行的topology的ID
3./supervisors -> 所有的Supervisors的心跳信息
4./workerbeats -> 所有的Worker的心跳
5./errors -> 产生的出错信息
结构图
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
/-{storm-zk-root} -- storm在zookeeper上的根目录(默认为/storm) | |-/assignments -- topology的任务分配信息 | | | |-/{topology-id} -- 这个目录保存的是每个topology的assignments信息包括:对应的nimbus上 | -- 的代码目录,所有task的启动时间,每个task与机器、端口的映射。操作为 | -- (assignments)来获取所有assignments的值;以及(assignment-info storm-id) | -- 来得到给定的storm-id对应的AssignmentInfo信息 | -- 在AssignmentInfo中存储的内容有: | -- :executor->node+port :executor->start-time-secs :node->host | -- 具体定义在common.clj中的 | -- (defrecord Assignment[master-code-dir node->host executor->node+port executor->start-time-secs]) | |-/storms -- 这个目录保存所有正在运行的topology的id | | | | | |-/{topology-id} -- 这个文件保存这个topology的一些信息,包括topology的名字,topology开始运行 | -- 的时间以及这个topology的状态。操作(active-storms),获得当前路径活跃的下 | -- topology数据。保存的内容参考类StormBase;(storm-base storm-id)得到给定的 | -- storm-id下的StormBase数据,具体定义在common.clj中的 | -- (defrecord StormBase [storm-name launch-time-secs status num-workers component->executors]) | |-/supervisors -- 这个目录保存所有的supervisor的心跳信息 | | | | | |-/{supervisor-id} -- 这个文件保存supervisor的心跳信息包括:心跳时间,主机名,这个supervisor上 | -- worker的端口号,运行时间(具体看SupervisorInfo类)。操作(supervisors)得到 | -- 所有的supervisors节点;(supervisor-info supervisor-id)得到给定的 | -- supervisor-id对应的SupervisorInfo信息;具体定义在common.clj中的 | | -- (defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs]) | |-/workerbeats -- 所有worker的心跳 | | | |-/{topology-id} -- 这个目录保存这个topology的所有的worker的心跳信息 | | | |-/{supervisorId-port} -- worker的心跳信息,包括心跳的时间,worker运行时间以及一些统计信息 | | -- 操作(heartbeat-storms)得到所有有心跳数据的topology, | -- (get-worker-heartbeat storm-id node port)得到具体一个topology下 | -- 的某个worker(node:port)的心跳状况, | -- (executor-beats storm-id executor->node+port)得到一个executor的心跳状况 | |-/errors -- 所有产生的error信息 | |-/{topology-id} -- 这个目录保存这个topology下面的错误信息。操作(error-topologies)得到出错 | -- 的topology;(errors storm-id component-id)得到 | -- 给定的storm-id component-id下的出错信息 |-/{component-id} |
总结
以上就是本文关于浅谈Storm在zookeeper上的目录结构的全部内容,如有不足之处,欢迎留言指出,希望对大家有所帮助。感谢朋友们对本站的支持!
原文链接:https://segmentfault.com/a/1190000000653595