因为项目需要,需要使用zookeeper。
zookeeper就不用介绍了,具体见此文。 http://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper/#icomments
zookeeper有点类似epoll的ET模式,事件只会通知一次,当通知完只会,需要再次注册。不然不会通知了。
注意:
1)哪些函数监听哪些事件类型。比如:getChild函数如果节点不存在,不会记录这个watcher,而且不能监听set事件。当父节点被删除时,也会触发
2)每个功能都有多个函数原型,为的就是能够在多个地方设置watcher回调。默认的回调函数都是在初始化连接时候设置的
当然,函数又分同步和非异步的。异步的多了一个回调来通知结果。
疑问:
1)如果通知之后,又通知了一次,这个事件会丢么?
2)zookeeper是如何保证watcher事件不丢失的
3)这些疑问的解答等项目完毕之后,在写博文予以解答
下用例子用java测试zookeeper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169 |
import java.io.IOException; import java.util.List; import org.apache.zookeeper.AsyncCallback.DataCallback; import org.apache.zookeeper.AsyncCallback.StatCallback; import org.apache.zookeeper.AsyncCallback.StringCallback; import org.apache.zookeeper.Watcher.Event; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import
org.apache.zookeeper.WatchedEvent; import
org.apache.zookeeper.Watcher; import
org.apache.zookeeper.ZooDefs.Ids; import
org.apache.zookeeper.ZooKeeper; import
org.apache.zookeeper.data.Stat; import
org.slf4j.Logger; import
org.slf4j.LoggerFactory; public
class TestZk implements
StringCallback, DataCallback{ public
static final Logger LOGGER = LoggerFactory.getLogger(TestZk. class ); private
static final int SESSION_TIMEOUT = 30000 ; private
ZooKeeper zooKeeper; public
boolean isConnected = false ; private
Watcher watcher = new
Watcher(){ @Override public
void process(WatchedEvent event) { // TODO Auto-generated method stub System.out.printf( "Zookeeper connection status:%s,type:%s,path:%s\n" , event.getState(),event.getType(),event.getPath()); if
(event.getType() == Watcher.Event.EventType.None) { switch (event.getState()){ case
SyncConnected: isConnected = true ; break ; case
Expired: isConnected = false ; default : break ; } } } }; public
void open() throws
IOException{ zooKeeper = new
ZooKeeper( "10.210.215.64:2181" ,SESSION_TIMEOUT, watcher); } public
void close(){ if ( null
!= zooKeeper){ try
{ zooKeeper.close(); } catch
(InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public
void testCreate() throws
KeeperException, InterruptedException{ String result = null ; result = zooKeeper.create( "/testZk" , "valueZk" .getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.printf( "create /testZk result:%s\n" ,result); } public
void testACreate(){ zooKeeper.create( "/testZk" , "valueAZk" .getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, this , null ); } public
void testDeleteNode() throws
InterruptedException, KeeperException{ zooKeeper.delete( "/testZk" ,- 1 ); } public
void testGetDataWatch() throws
KeeperException, InterruptedException{ byte [] value = zooKeeper.getData( "/testZk" , new
Watcher(){ @Override public
void process(WatchedEvent event) { // TODO Auto-generated method stub System.out.printf( "getdata path:%s,state:%s,path:%s\n" ,event.getPath(), event.getState(),event.getPath()); } }, null ); String strValue = new
String(value); System.out.printf( "testGetDataWatch value:%s\n" ,strValue); } public
void testAGetDataWatch(){ zooKeeper.getData( "/testZk" , true , this , null ); } public
void testAExists() throws
KeeperException, InterruptedException{ Stat stat = zooKeeper.exists( "/testZk" , true ); if (stat != null ){ System.out.printf( "node /testZk exists,stat:%s" ,stat.toString()); } //会触发create和delete和set } public
void testGetChild() throws
KeeperException, InterruptedException{ Stat stat = null ; List<String> list = zooKeeper.getChildren( "/testZk" , new
Watcher(){ @Override public
void process(WatchedEvent event) { // TODO Auto-generated method stub System.out.printf( "getchild path:%s,state:%s,path:%s\n" ,event.getPath(), event.getState(),event.getPath()); } },stat); if (list != null ){ for
(String node : list) { System.out.printf( "node changed,node:%s\n" ,node); } } else { System.out.printf( "node /testZk is empty\n" ); } //getchildren只会触发create和delete,不会触发set zooKeeper.delete( "/testZk/liu1" ,- 1 ); //zooKeeper.setData("/testZk/liu1","tttt".getBytes(), -1); } public
static void main(String[] args) { TestZk zkHelper = new
TestZk(); try
{ zkHelper.open(); //zkHelper.testDeleteNode(); zkHelper.testAExists(); zkHelper.testACreate(); zkHelper.testGetDataWatch(); zkHelper.testAGetDataWatch(); zkHelper.testGetChild(); } catch
(IOException e) { // TODO Auto-generated catch block e.printStackTrace(); return ; } catch
(KeeperException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch
(InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { try
{ System.in.read(); } catch
(IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println( "zookeeper closed" ); zkHelper.close(); } } @Override public
void processResult( int
rc, String path, Object ctx, String name) { // TODO Auto-generated method stub System.out.printf( "process1 result rc:%d,path:%s,name:%s\n" ,rc,path,name); } @Override public
void processResult( int
rc, String path, Object ctx, byte [] data, Stat stat) { // TODO Auto-generated method stub System.out.printf( "process2 result rc:%d,path:%s,date:%s\n" ,rc,path, new
String(data)); } } |
原文:http://www.cnblogs.com/xloogson/p/3542968.html