1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package org.apache.hadoop.hbase.zookeeper;
20
21
import java.io.Closeable;
22
import java.io.IOException;
23
import java.util.ArrayList;
24
import java.util.List;
25
import java.util.concurrent.CopyOnWriteArrayList;
26
import java.util.concurrent.CountDownLatch;
27
28
import org.apache.commons.logging.Log;
29
import org.apache.commons.logging.LogFactory;
30
import org.apache.hadoop.classification.InterfaceAudience;
31
import org.apache.hadoop.conf.Configuration;
32
import org.apache.hadoop.hbase.Abortable;
33
import org.apache.hadoop.hbase.HConstants;
34
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
35
import org.apache.hadoop.hbase.util.Threads;
36
import org.apache.zookeeper.KeeperException;
37
import org.apache.zookeeper.WatchedEvent;
38
import org.apache.zookeeper.Watcher;
39
import org.apache.zookeeper.ZooDefs;
40
import org.apache.zookeeper.data.ACL;
41
42
43
44
45
46
47
48
49
50
51
52
53
@InterfaceAudience.Private
54
public class
ZooKeeperWatcher implements Watcher, Abortable, Closeable {
55
private static final Log LOG = LogFactory.getLog(ZooKeeperWatcher.
class);
56
57
58
59
private String identifier;
60
61
62
private String quorum;
63
64
65
private
RecoverableZooKeeper recoverableZooKeeper;
66
67
68
protected
Abortable abortable;
69
70
private boolean aborted = false;
71
72
73
private final List<ZooKeeperListener> listeners =
74
new CopyOnWriteArrayList<ZooKeeperListener>();
75
76
77
78
public CountDownLatch saslLatch =
new CountDownLatch(1);
79
80
81
82
83
public String baseZNode;
84
85
public String metaServerZNode;
86
87
public String rsZNode;
88
89
public String drainingZNode;
90
91
private String masterAddressZNode;
92
93
public String backupMasterAddressesZNode;
94
95
public String clusterStateZNode;
96
97
public String assignmentZNode;
98
99
public String tableZNode;
100
101
public String clusterIdZNode;
102
103
public String splitLogZNode;
104
105
public String balancerZNode;
106
107
public String tableLockZNode;
108
109
public String recoveringRegionsZNode;
110
111
public static String namespaceZNode =
"namespace";
112
113
114
115
public static final ArrayList<ACL> CREATOR_ALL_AND_WORLD_READABLE =
116
new ArrayList<ACL>() { {
117
add(
new ACL(ZooDefs.Perms.READ,ZooDefs.Ids.ANYONE_ID_UNSAFE));
118
add(
new ACL(ZooDefs.Perms.ALL,ZooDefs.Ids.AUTH_IDS));
119
}};
120
121
private final Configuration conf;
122
123
private final Exception constructorCaller;
124
125
126
127
128
129
130
131
132
public
ZooKeeperWatcher(Configuration conf, String identifier,
133
Abortable abortable)
throws ZooKeeperConnectionException, IOException {
134
this(conf, identifier, abortable, false);
135
}
136
137
138
139
140
141
142
143
144
145
146
147
148
public
ZooKeeperWatcher(Configuration conf, String identifier,
149
Abortable abortable,
boolean canCreateBaseZNode)
150
throws IOException,
ZooKeeperConnectionException {
151
this.conf = conf;
152
153
154
try {
155
throw new Exception(
"ZKW CONSTRUCTOR STACK TRACE FOR DEBUGGING");
156
}
catch (Exception e) {
157
this.constructorCaller = e;
158
}
159
this.quorum = ZKConfig.getZKQuorumServersString(conf);
160
161
162
this.identifier = identifier;
163
this.abortable = abortable;
164
setNodeNames(conf);
165
this.recoverableZooKeeper = ZKUtil.connect(conf, quorum,
this, identifier);
166
if (canCreateBaseZNode) {
167
createBaseZNodes();
168
}
169
}
170
171
private void createBaseZNodes()
throws ZooKeeperConnectionException {
172
try {
173
174
ZKUtil.createWithParents(
this, baseZNode);
175
ZKUtil.createAndFailSilent(
this, assignmentZNode);
176
ZKUtil.createAndFailSilent(
this, rsZNode);
177
ZKUtil.createAndFailSilent(
this, drainingZNode);
178
ZKUtil.createAndFailSilent(
this, tableZNode);
179
ZKUtil.createAndFailSilent(
this, splitLogZNode);
180
ZKUtil.createAndFailSilent(
this, backupMasterAddressesZNode);
181
ZKUtil.createAndFailSilent(
this, tableLockZNode);
182
ZKUtil.createAndFailSilent(
this, recoveringRegionsZNode);
183
}
catch (KeeperException e) {
184
throw new
ZooKeeperConnectionException(
185
prefix(
"Unexpected KeeperException creating base node"), e);
186
}
187
}
188
189
@Override
190
public String toString() {
191
return this.identifier +
", quorum=" + quorum +
", baseZNode=" + baseZNode;
192
}
193
194
195
196
197
198
199
200
public String prefix(
final String str) {
201
return this.toString() +
" " + str;
202
}
203
204
205
206
207
private void setNodeNames(Configuration conf) {
208
baseZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
209
HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
210
metaServerZNode = ZKUtil.joinZNode(baseZNode,
211
conf.get(
"zookeeper.znode.metaserver",
"meta-region-server"));
212
rsZNode = ZKUtil.joinZNode(baseZNode,
213
conf.get(
"zookeeper.znode.rs",
"rs"));
214
drainingZNode = ZKUtil.joinZNode(baseZNode,
215
conf.get(
"zookeeper.znode.draining.rs",
"draining"));
216
masterAddressZNode = ZKUtil.joinZNode(baseZNode,
217
conf.get(
"zookeeper.znode.master",
"master"));
218
backupMasterAddressesZNode = ZKUtil.joinZNode(baseZNode,
219
conf.get(
"zookeeper.znode.backup.masters",
"backup-masters"));
220
clusterStateZNode = ZKUtil.joinZNode(baseZNode,
221
conf.get(
"zookeeper.znode.state",
"running"));
222
assignmentZNode = ZKUtil.joinZNode(baseZNode,
223
conf.get(
"zookeeper.znode.unassigned",
"region-in-transition"));
224
tableZNode = ZKUtil.joinZNode(baseZNode,
225
conf.get(
"zookeeper.znode.tableEnableDisable",
"table"));
226
clusterIdZNode = ZKUtil.joinZNode(baseZNode,
227
conf.get(
"zookeeper.znode.clusterId",
"hbaseid"));
228
splitLogZNode = ZKUtil.joinZNode(baseZNode,
229
conf.get(
"zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME));
230
balancerZNode = ZKUtil.joinZNode(baseZNode,
231
conf.get(
"zookeeper.znode.balancer",
"balancer"));
232
tableLockZNode = ZKUtil.joinZNode(baseZNode,
233
conf.get(
"zookeeper.znode.tableLock",
"table-lock"));
234
recoveringRegionsZNode = ZKUtil.joinZNode(baseZNode,
235
conf.get(
"zookeeper.znode.recovering.regions",
"recovering-regions"));
236
namespaceZNode = ZKUtil.joinZNode(baseZNode,
237
conf.get(
"zookeeper.znode.namespace",
"namespace"));
238
}
239
240
241
242
243
244
public void registerListener(
ZooKeeperListener listener) {
245
listeners.add(listener);
246
}
247
248
249
250
251
252
253
public void registerListenerFirst(
ZooKeeperListener listener) {
254
listeners.add(0, listener);
255
}
256
257
public void unregisterListener(
ZooKeeperListener listener) {
258
listeners.remove(listener);
259
}
260
261
262
263
264
public void unregisterAllListeners() {
265
listeners.clear();
266
}
267
268
269
270
271
public List<ZooKeeperListener> getListeners() {
272
return new ArrayList<ZooKeeperListener>(listeners);
273
}
274
275
276
277
278
public int getNumberOfListeners() {
279
return listeners.size();
280
}
281
282
283
284
285
286
public
RecoverableZooKeeper getRecoverableZooKeeper() {
287
return recoverableZooKeeper;
288
}
289
290
public void reconnectAfterExpiration()
throws IOException, InterruptedException {
291
recoverableZooKeeper.reconnectAfterExpiration();
292
}
293
294
295
296
297
298
public String getQuorum() {
299
return quorum;
300
}
301
302
303
304
305
306
307
308
@Override
309
public void process(WatchedEvent event) {
310
LOG.debug(prefix(
"Received ZooKeeper Event, " +
311
"type=" + event.getType() +
", " +
312
"state=" + event.getState() +
", " +
313
"path=" + event.getPath()));
314
315
switch(event.getType()) {
316
317
318
case None: {
319
connectionEvent(event);
320
break;
321
}
322
323
324
325
case NodeCreated: {
326
for(
ZooKeeperListener listener : listeners) {
327
listener.nodeCreated(event.getPath());
328
}
329
break;
330
}
331
332
case NodeDeleted: {
333
for(
ZooKeeperListener listener : listeners) {
334
listener.nodeDeleted(event.getPath());
335
}
336
break;
337
}
338
339
case NodeDataChanged: {
340
for(
ZooKeeperListener listener : listeners) {
341
listener.nodeDataChanged(event.getPath());
342
}
343
break;
344
}
345
346
case NodeChildrenChanged: {
347
for(
ZooKeeperListener listener : listeners) {
348
listener.nodeChildrenChanged(event.getPath());
349
}
350
break;
351
}
352
}
353
}
354
355
356
357
358
359
360
361
362
363
364
365
366
367
private void connectionEvent(WatchedEvent event) {
368
switch(event.getState()) {
369
case SyncConnected:
370
371
372
long finished = System.currentTimeMillis() +
373
this.conf.getLong(
"hbase.zookeeper.watcher.sync.connected.wait", 2000);
374
while (System.currentTimeMillis() < finished) {
375
Threads.sleep(1);
376
if (
this.recoverableZooKeeper !=
null)
break;
377
}
378
if (
this.recoverableZooKeeper ==
null) {
379
LOG.error(
"ZK is null on connection event -- see stack trace " +
380
"for the stack trace when constructor was called on this zkw",
381
this.constructorCaller);
382
throw new NullPointerException(
"ZK is null");
383
}
384
this.identifier =
this.identifier +
"-0x" +
385
Long.toHexString(
this.recoverableZooKeeper.getSessionId());
386
387
LOG.debug(
this.identifier +
" connected");
388
break;
389
390
391
case Disconnected:
392
LOG.debug(prefix(
"Received Disconnected from ZooKeeper, ignoring"));
393
break;
394
395
case Expired:
396
String msg = prefix(
this.identifier +
" received expired from " +
397
"ZooKeeper, aborting");
398
399
400
if (
this.abortable !=
null) {
401
this.abortable.abort(msg,
new KeeperException.SessionExpiredException());
402
}
403
break;
404
405
case ConnectedReadOnly:
406
case SaslAuthenticated:
407
break;
408
409
default:
410
throw new IllegalStateException(
"Received event is not valid: " + event.getState());
411
}
412
}
413
414
415
416
417
418
419
420
421
422
423
424
425
426
public void sync(String path) {
427
this.recoverableZooKeeper.sync(path,
null,
null);
428
}
429
430
431
432
433
434
435
436
437
438
439
440
public void keeperException(KeeperException ke)
441
throws KeeperException {
442
LOG.error(prefix(
"Received unexpected KeeperException, re-throwing exception"), ke);
443
throw ke;
444
}
445
446
447
448
449
450
451
452
453
454
455
456
457
public void interruptedException(InterruptedException ie) {
458
LOG.debug(prefix(
"Received InterruptedException, doing nothing here"), ie);
459
460
Thread.currentThread().interrupt();
461
462
}
463
464
465
466
467
468
469
public void close() {
470
try {
471
if (recoverableZooKeeper !=
null) {
472
recoverableZooKeeper.close();
473
}
474
}
catch (InterruptedException e) {
475
Thread.currentThread().interrupt();
476
}
477
}
478
479
public Configuration getConfiguration() {
480
return conf;
481
}
482
483
@Override
484
public void abort(String why, Throwable e) {
485
if (
this.abortable !=
null)
this.abortable.abort(why, e);
486
else this.aborted =
true;
487
}
488
489
@Override
490
public boolean isAborted() {
491
return this.abortable ==
null?
this.aborted:
this.abortable.isAborted();
492
}
493
494
495
496
497
public String getMasterAddressZNode() {
498
return this.masterAddressZNode;
499
}
500
501
}