锁是用来做并发最简单的方式,当然其代价也是最高的。内核态的锁的时候需要操作系统进行一次上下文切换,加锁、释放锁会导致比较多的上下文切换和调度延时,等待锁的线程会被挂起直至锁释放。在上下文切换的时候,cpu之前缓存的指令和数据都将失效,对性能有很大的损失。用户态的锁虽然避免了这些问题,但是其实它们只是在没有真实的竞争时才有效。
Java在JDK1.5之前都是靠synchronized关键字保证同步的,这种通过使用一致的锁定协议来协调对共享状态的访问,可以确保无论哪个线程持有守护变量的锁,都采用独占的方式来访问这些变量,如果出现多个线程同时访问锁,那第一些线线程将被挂起,当线程恢复执行时,必须等待其它线程执行完他们的时间片以后才能被调度执行,在挂起和恢复执行过程中存在着很大的开销。锁还存在着其它一些缺点,当一个线程正在等待锁时,它不能做任何事。如果一个线程在持有锁的情况下被延迟执行,那么所有需要这个锁的线程都无法执行下去。如果被阻塞的线程优先级高,而持有锁的线程优先级低,将会导致优先级反转(Priority Inversion)。
独占锁是一种悲观锁,synchronized就是一种独占锁,它假设最坏的情况,并且只有在确保其它线程不会造成干扰的情况下执行,会导致其它所有需要锁的线程挂起,等待持有锁的线程释放锁。而另一个更加有效的锁就是乐观锁。所谓乐观锁就是,每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止。
与锁相比,volatile变量是一和更轻量级的同步机制,因为在使用这些变量时不会发生上下文切换和线程调度等操作,但是volatile变量也存在一些局限:不能用于构建原子的复合操作,因此当一个变量依赖旧值时就不能使用volatile变量。(参考:谈谈volatiile)
原子操作指的是在一步之内就完成而且不能被中断。原子操作在多线程环境中是线程安全的,无需考虑同步的问题。在java中,下列操作是原子操作:
问题来了,为什么long型赋值不是原子操作呢?例如:
1 |
long foo = 65465498L; |
实时上java会分两步写入这个long变量,先写32位,再写后32位。这样就线程不安全了。如果改成下面的就线程安全了:
1 |
private
volatile long foo; |
因为volatile内部已经做了synchronized.
要实现无锁(lock-free)的非阻塞算法有多种实现方法,其中CAS(比较与交换,Compare and swap)是一种有名的无锁算法。CAS, CPU指令,在大多数处理器架构,包括IA32、Space中采用的都是CAS指令,CAS的语义是“我认为V的值应该为A,如果是,那么将V的值更新为B,否则不修改并告诉V的值实际为多少”,CAS是项乐观锁技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知这次竞争中失败,并可以再次尝试。CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。CAS无锁算法的C实现如下:
1
2
3
4
5
6
7
8
9 |
int compare_and_swap ( int * reg, int
oldval, int
newval) { ATOMIC(); int
old_reg_val = *reg; if
(old_reg_val == oldval) *reg = newval; END_ATOMIC(); return
old_reg_val; } |
CAS比较与交换的伪代码可以表示为:
do{
备份旧数据;
基于旧数据构造新数据;
}while(!CAS(
内存地址,备份的旧数据,新数据 ))
就是指当两者进行比较时,如果相等,则证明共享数据没有被修改,替换成新值,然后继续往下运行;如果不相等,说明共享数据已经被修改,放弃已经所做的操作,然后重新执行刚才的操作。容易看出 CAS 操作是基于共享数据不会被修改的假设,采用了类似于数据库的 commit-retry 的模式。当同步冲突出现的机会很少时,这种假设能带来较大的性能提升。
在JDK1.5之前,如果不编写明确的代码就无法执行CAS操作,在JDK1.5中引入了底层的支持,在int、long和对象的引用等类型上都公开了CAS的操作,并且JVM把它们编译为底层硬件提供的最有效的方法,在运行CAS的平台上,运行时把它们编译为相应的机器指令,如果处理器/CPU不支持CAS指令,那么JVM将使用自旋锁。因此,值得注意的是,CAS解决方案与平台/编译器紧密相关(比如x86架构下其对应的汇编指令是lock cmpxchg,如果想要64Bit的交换,则应使用lock cmpxchg8b。在.NET中我们可以使用Interlocked.CompareExchange函数)。
在原子类变量中,如java.util.concurrent.atomic中的AtomicXXX,都使用了这些底层的JVM支持为数字类型的引用类型提供一种高效的CAS操作,而在java.util.concurrent中的大多数类在实现时都直接或间接的使用了这些原子变量类。
Java 1.6中AtomicLong.incrementAndGet()的实现源码为:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248 |
1 : /* 2: * Written by Doug Lea with assistance from members of JCP JSR-166 3: * Expert Group and released to the public domain, as explained at 5: */ 6 : 7 : package
java.util.concurrent.atomic; 8 : import
sun.misc.Unsafe; 9 : 10 : /** 11: * A <tt>long</tt> value that may be updated atomically. See the 12: * {@link java.util.concurrent.atomic} package specification for 13: * description of the properties of atomic variables. An 14: * <tt>AtomicLong</tt> is used in applications such as atomically 15: * incremented sequence numbers, and cannot be used as a replacement 16: * for a {@link java.lang.Long}. However, this class does extend 17: * <tt>Number</tt> to allow uniform access by tools and utilities that 18: * deal with numerically-based classes. 19: * 20: * @since 1.5 21: * @author Doug Lea 22: */ 23 : public
class AtomicLong extends
Number implements
java.io.Serializable { 24 : private
static final long serialVersionUID = 1927816293512124184L; 25 : 26 : // setup to use Unsafe.compareAndSwapLong for updates 27 : private
static final Unsafe unsafe = Unsafe.getUnsafe(); 28 : private
static final long valueOffset; 29 : 30 : /** 31: * Records whether the underlying JVM supports lockless 32: * CompareAndSet for longs. While the unsafe.CompareAndSetLong 33: * method works in either case, some constructions should be 34: * handled at Java level to avoid locking user-visible locks. 35: */ 36 : static
final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8(); 37 : 38 : /** 39: * Returns whether underlying JVM supports lockless CompareAndSet 40: * for longs. Called only once and cached in VM_SUPPORTS_LONG_CAS. 41: */ 42 : private
static native boolean VMSupportsCS8(); 43 : 44 : static
{ 45 : try
{ 46 : valueOffset = unsafe.objectFieldOffset 47 : (AtomicLong. class .getDeclaredField( "value" )); 48 : } catch
(Exception ex) { throw
new Error(ex); } 49 : } 50 : 51 : private
volatile long value; 52 : 53 : /** 54: * Creates a new AtomicLong with the given initial value. 55: * 56: * @param initialValue the initial value 57: */ 58 : public
AtomicLong( long
initialValue) { 59 : value = initialValue; 60 : } 61 : 62 : /** 63: * Creates a new AtomicLong with initial value <tt>0</tt>. 64: */ 65 : public
AtomicLong() { 66 : } 67 : 68 : /** 69: * Gets the current value. 70: * 71: * @return the current value 72: */ 73 : public
final long get() { 74 : return
value; 75 : } 76 : 77 : /** 78: * Sets to the given value. 79: * 80: * @param newValue the new value 81: */ 82 : public
final void set( long
newValue) { 83 : value = newValue; 84 : } 85 : 86 : /** 87: * Eventually sets to the given value. 88: * 89: * @param newValue the new value 90: * @since 1.6 91: */ 92 : public
final void lazySet( long
newValue) { 93 : unsafe.putOrderedLong( this , valueOffset, newValue); 94 : } 95 : 96 : /** 97: * Atomically sets to the given value and returns the old value. 98: * 99: * @param newValue the new value 100: * @return the previous value 101: */ 102 : public
final long getAndSet( long
newValue) { 103 : while
( true ) { 104 : long
current = get(); 105 : if
(compareAndSet(current, newValue)) 106 : return
current; 107 : } 108 : } 109 : 110 : /** 111: * Atomically sets the value to the given updated value 112: * if the current value <tt>==</tt> the expected value. 113: * 114: * @param expect the expected value 115: * @param update the new value 116: * @return true if successful. False return indicates that 117: * the actual value was not equal to the expected value. 118: */ 119 : public
final boolean compareAndSet( long
expect, long
update) { 120 : return
unsafe.compareAndSwapLong( this , valueOffset, expect, update); 121 : } 122 : 123 : /** 124: * Atomically sets the value to the given updated value 125: * if the current value <tt>==</tt> the expected value. 126: * May fail spuriously and does not provide ordering guarantees, 127: * so is only rarely an appropriate alternative to <tt>compareAndSet</tt>. 128: * 129: * @param expect the expected value 130: * @param update the new value 131: * @return true if successful. 132: */ 133 : public
final boolean weakCompareAndSet( long
expect, long
update) { 134 : return
unsafe.compareAndSwapLong( this , valueOffset, expect, update); 135 : } 136 : 137 : /** 138: * Atomically increments by one the current value. 139: * 140: * @return the previous value 141: */ 142 : public
final long getAndIncrement() { 143 : while
( true ) { 144 : long
current = get(); 145 : long
next = current + 1 ; 146 : if
(compareAndSet(current, next)) 147 : return
current; 148 : } 149 : } 150 : 151 : /** 152: * Atomically decrements by one the current value. 153: * 154: * @return the previous value 155: */ 156 : public
final long getAndDecrement() { 157 : while
( true ) { 158 : long
current = get(); 159 : long
next = current - 1 ; 160 : if
(compareAndSet(current, next)) 161 : return
current; 162 : } 163 : } 164 : 165 : /** 166: * Atomically adds the given value to the current value. 167: * 168: * @param delta the value to add 169: * @return the previous value 170: */ 171 : public
final long getAndAdd( long
delta) { 172 : while
( true ) { 173 : long
current = get(); 174 : long
next = current + delta; 175 : if
(compareAndSet(current, next)) 176 : return
current; 177 : } 178 : } 179 : 180 : /** 181: * Atomically increments by one the current value. 182: * 183: * @return the updated value 184: */ 185 : public
final long incrementAndGet() { 186 : for
(;;) { 187 : long
current = get(); 188 : long
next = current + 1 ; 189 : if
(compareAndSet(current, next)) 190 : return
next; 191 : } 192 : } 193 : 194 : /** 195: * Atomically decrements by one the current value. 196: * 197: * @return the updated value 198: */ 199 : public
final long decrementAndGet() { 200 : for
(;;) { 201 : long
current = get(); 202 : long
next = current - 1 ; 203 : if
(compareAndSet(current, next)) 204 : return
next; 205 : } 206 : } 207 : 208 : /** 209: * Atomically adds the given value to the current value. 210: * 211: * @param delta the value to add 212: * @return the updated value 213: */ 214 : public
final long addAndGet( long
delta) { 215 : for
(;;) { 216 : long
current = get(); 217 : long
next = current + delta; 218 : if
(compareAndSet(current, next)) 219 : return
next; 220 : } 221 : } 222 : 223 : /** 224: * Returns the String representation of the current value. 225: * @return the String representation of the current value. 226: */ 227 : public
String toString() { 228 : return
Long.toString(get()); 229 : } 230 : 231 : 232 : public
int intValue() { 233 : return
( int )get(); 234 : } 235 : 236 : public
long longValue() { 237 : return
( long )get(); 238 : } 239 : 240 : public
float floatValue() { 241 : return
( float )get(); 242 : } 243 : 244 : public
double doubleValue() { 245 : return
( double )get(); 246 : } 247 : 248 : } |
由此可见,AtomicLong.incrementAndGet的实现用了乐观锁技术,调用了sun.misc.Unsafe类库里面的 CAS算法,用CPU指令来实现无锁自增。所以,AtomicLong.incrementAndGet的自增比用synchronized的锁效率倍增。
1
2
3
4
5
6
7
8
9
10
11
12 |
public final int getAndIncrement() { for
(;;) { int
current = get(); int
next = current + 1 ; if
(compareAndSet(current, next)) return
current; } } public
final boolean compareAndSet( int
expect, int
update) { return
unsafe.compareAndSwapInt( this , valueOffset, expect, update); } |
下面是测试代码:可以看到用AtomicLong.incrementAndGet的性能比用synchronized高出几倍。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229 |
package
console; import java.util.concurrent.atomic.AtomicLong; public class main { /** * @param args */ public
static void main(String[] args) { System.out.println( "START -- " ); calc(); calcSynchro(); calcAtomic(); testThreadsSync(); testThreadsAtomic(); testThreadsSync2(); testThreadsAtomic2(); System.out.println( "-- FINISHED " ); } private
static void calc() { stopwatch sw = new
stopwatch(); sw.start(); long
val = 0 ; while
(val < 10000000L) { val++; } sw.stop(); long
milSecds = sw.getElapsedTime(); System.out.println( " calc() elapsed (ms): "
+ milSecds); } private
static void calcSynchro() { stopwatch sw = new
stopwatch(); sw.start(); long
val = 0 ; while
(val < 10000000L) { synchronized
(main. class ) { val++; } } sw.stop(); long
milSecds = sw.getElapsedTime(); System.out.println( " calcSynchro() elapsed (ms): "
+ milSecds); } private
static void calcAtomic() { stopwatch sw = new
stopwatch(); sw.start(); AtomicLong val = new
AtomicLong( 0 ); while
(val.incrementAndGet() < 10000000L) { } sw.stop(); long
milSecds = sw.getElapsedTime(); System.out.println( " calcAtomic() elapsed (ms): "
+ milSecds); } private
static void testThreadsSync(){ stopwatch sw = new
stopwatch(); sw.start(); Thread t1 = new
Thread( new
LoopSync()); t1.start(); Thread t2 = new
Thread( new
LoopSync()); t2.start(); while
(t1.isAlive() || t2.isAlive()) { } sw.stop(); long
milSecds = sw.getElapsedTime(); System.out.println( " testThreadsSync() 1 thread elapsed (ms): "
+ milSecds); } private
static void testThreadsAtomic(){ stopwatch sw = new
stopwatch(); sw.start(); Thread t1 = new
Thread( new
LoopAtomic()); t1.start(); Thread t2 = new
Thread( new
LoopAtomic()); t2.start(); while
(t1.isAlive() || t2.isAlive()) { } sw.stop(); long
milSecds = sw.getElapsedTime(); System.out.println( " testThreadsAtomic() 1 thread elapsed (ms): "
+ milSecds); } private
static void testThreadsSync2(){ stopwatch sw = new
stopwatch(); sw.start(); Thread t1 = new
Thread( new
LoopSync()); t1.start(); Thread t2 = new
Thread( new
LoopSync()); t2.start(); while
(t1.isAlive() || t2.isAlive()) { } sw.stop(); long
milSecds = sw.getElapsedTime(); System.out.println( " testThreadsSync() 2 threads elapsed (ms): "
+ milSecds); } private
static void testThreadsAtomic2(){ stopwatch sw = new
stopwatch(); sw.start(); Thread t1 = new
Thread( new
LoopAtomic()); t1.start(); Thread t2 = new
Thread( new
LoopAtomic()); t2.start(); while
(t1.isAlive() || t2.isAlive()) { } sw.stop(); long
milSecds = sw.getElapsedTime(); System.out.println( " testThreadsAtomic() 2 threads elapsed (ms): "
+ milSecds); } private
static class LoopAtomic implements
Runnable { public
void run() { AtomicLong val = new
AtomicLong( 0 ); while
(val.incrementAndGet() < 10000000L) { } } } private
static class LoopSync implements
Runnable { public
void run() { long
val = 0 ; while
(val < 10000000L) { synchronized
(main. class ) { val++; } } } } } public
class stopwatch { private
long startTime = 0 ; private
long stopTime = 0 ; private
boolean running = false ; public
void start() { this .startTime = System.currentTimeMillis(); this .running = true ; } public
void stop() { this .stopTime = System.currentTimeMillis(); this .running = false ; } public
long getElapsedTime() { long
elapsed; if
(running) { elapsed = (System.currentTimeMillis() - startTime); } else
{ elapsed = (stopTime - startTime); } return
elapsed; } public
long getElapsedTimeSecs() { long
elapsed; if
(running) { elapsed = ((System.currentTimeMillis() - startTime) / 1000 ); } else
{ elapsed = ((stopTime - startTime) / 1000 ); } return
elapsed; } // sample usage // public static void main(String[] args) { // StopWatch s = new StopWatch(); // s.start(); // //code you want to time goes here // s.stop(); // System.out.println("elapsed time in milliseconds: " + // s.getElapsedTime()); // } } |
下面是比非阻塞自增稍微复杂一点的CAS的例子:非阻塞堆栈/ConcurrentStack
。ConcurrentStack
中的 push()
和 pop()
操作在结构上与NonblockingCounter
上相似,只是做的工作有些冒险,希望在 “提交”
工作的时候,底层假设没有失效。push()
方法观察当前最顶的节点,构建一个新节点放在堆栈上,然后,如果最顶端的节点在初始观察之后没有变化,那么就安装新节点。如果 CAS
失败,意味着另一个线程已经修改了堆栈,那么过程就会重新开始。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 |
public class ConcurrentStack<E> { AtomicReference<Node<E>> head = new
AtomicReference<Node<E>>(); public
void push(E item) { Node<E> newHead = new
Node<E>(item); Node<E> oldHead; do
{ oldHead = head.get(); newHead.next = oldHead; } while
(!head.compareAndSet(oldHead, newHead)); } public
E pop() { Node<E> oldHead; Node<E> newHead; do
{ oldHead = head.get(); if
(oldHead == null ) return
null ; newHead = oldHead.next; } while
(!head.compareAndSet(oldHead,newHead)); return
oldHead.item; } static
class Node<E> { final
E item; Node<E> next; public
Node(E item) { this .item = item; } } } |
在轻度到中度的争用情况下,非阻塞算法的性能会超越阻塞算法,因为 CAS 的多数时间都在第一次尝试时就成功,而发生争用时的开销也不涉及线程挂起和上下文切换,只多了几个循环迭代。没有争用的 CAS 要比没有争用的锁便宜得多(这句话肯定是真的,因为没有争用的锁涉及 CAS 加上额外的处理),而争用的 CAS 比争用的锁获取涉及更短的延迟。
在高度争用的情况下(即有多个线程不断争用一个内存位置的时候),基于锁的算法开始提供比非阻塞算法更好的吞吐率,因为当线程阻塞时,它就会停止争用,耐心地等候轮到自己,从而避免了进一步争用。但是,这么高的争用程度并不常见,因为多数时候,线程会把线程本地的计算与争用共享数据的操作分开,从而给其他线程使用共享数据的机会。
以上的示例(自增计数器和堆栈)都是非常简单的非阻塞算法,一旦掌握了在循环中使用 CAS,就可以容易地模仿它们。对于更复杂的数据结构,非阻塞算法要比这些简单示例复杂得多,因为修改链表、树或哈希表可能涉及对多个指针的更新。CAS 支持对单一指针的原子性条件更新,但是不支持两个以上的指针。所以,要构建一个非阻塞的链表、树或哈希表,需要找到一种方式,可以用 CAS 更新多个指针,同时不会让数据结构处于不一致的状态。
在链表的尾部插入元素,通常涉及对两个指针的更新:“尾” 指针总是指向列表中的最后一个元素,“下一个” 指针从过去的最后一个元素指向新插入的元素。因为需要更新两个指针,所以需要两个 CAS。在独立的 CAS 中更新两个指针带来了两个需要考虑的潜在问题:如果第一个 CAS 成功,而第二个 CAS 失败,会发生什么?如果其他线程在第一个和第二个 CAS 之间企图访问链表,会发生什么?
对于非复杂数据结构,构建非阻塞算法的 “技巧” 是确保数据结构总处于一致的状态(甚至包括在线程开始修改数据结构和它完成修改之间),还要确保其他线程不仅能够判断出第一个线程已经完成了更新还是处在更新的中途,还能够判断出如果第一个线程走向 AWOL,完成更新还需要什么操作。如果线程发现了处在更新中途的数据结构,它就可以 “帮助” 正在执行更新的线程完成更新,然后再进行自己的操作。当第一个线程回来试图完成自己的更新时,会发现不再需要了,返回即可,因为 CAS 会检测到帮助线程的干预(在这种情况下,是建设性的干预)。
这种 “帮助邻居” 的要求,对于让数据结构免受单个线程失败的影响,是必需的。如果线程发现数据结构正处在被其他线程更新的中途,然后就等候其他线程完成更新,那么如果其他线程在操作中途失败,这个线程就可能永远等候下去。即使不出现故障,这种方式也会提供糟糕的性能,因为新到达的线程必须放弃处理器,导致上下文切换,或者等到自己的时间片过期(而这更糟)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 |
public class LinkedQueue <E> { private
static class Node <E> { final
E item; final
AtomicReference<Node<E>> next; Node(E item, Node<E> next) { this .item = item; this .next = new
AtomicReference<Node<E>>(next); } } private
AtomicReference<Node<E>> head = new
AtomicReference<Node<E>>( new
Node<E>( null , null )); private
AtomicReference<Node<E>> tail = head; public
boolean put(E item) { Node<E> newNode = new
Node<E>(item, null ); while
( true ) { Node<E> curTail = tail.get(); Node<E> residue = curTail.next.get(); if
(curTail == tail.get()) { if
(residue == null ) /* A */
{ if
(curTail.next.compareAndSet( null , newNode)) /* C */
{ tail.compareAndSet(curTail, newNode) /* D */
; return
true ; } } else
{ tail.compareAndSet(curTail, residue) /* B */ ; } } } } } |
具体算法相见IBM Developerworks
Java5中的ConcurrentHashMap,线程安全,设计巧妙,用桶粒度的锁,避免了put和get中对整个map的锁定,尤其在get中,只对一个HashEntry做锁定操作,性能提升是显而易见的。
具体实现中使用了锁分离机制,在这个帖子中有非常详细的讨论。这里有关于Java内存模型结合ConcurrentHashMap的分析。以下是JDK6的ConcurrentHashMap的源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526 |
/* * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ /* * This file is available under and governed by the GNU General Public * License version 2 only, as published by the Free Software Foundation. * However, the following notice accompanied the original version of this * file: * * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at */ package
java.util.concurrent; import
java.util.concurrent.locks.*; import
java.util.*; import
java.io.Serializable; import
java.io.IOException; import
java.io.ObjectInputStream; import
java.io.ObjectOutputStream; import
java.io.ObjectStreamField; /** * A hash table supporting full concurrency of retrievals and * adjustable expected concurrency for updates. This class obeys the * same functional specification as {@link java.util.Hashtable}, and * includes versions of methods corresponding to each method of * <tt>Hashtable</tt>. However, even though all operations are * thread-safe, retrieval operations do <em>not</em> entail locking, * and there is <em>not</em> any support for locking the entire table * in a way that prevents all access. This class is fully * interoperable with <tt>Hashtable</tt> in programs that rely on its * thread safety but not on its synchronization details. * * <p> Retrieval operations (including <tt>get</tt>) generally do not * block, so may overlap with update operations (including * <tt>put</tt> and <tt>remove</tt>). Retrievals reflect the results * of the most recently <em>completed</em> update operations holding * upon their onset. For aggregate operations such as <tt>putAll</tt> * and <tt>clear</tt>, concurrent retrievals may reflect insertion or * removal of only some entries. Similarly, Iterators and * Enumerations return elements reflecting the state of the hash table * at some point at or since the creation of the iterator/enumeration. * They do <em>not</em> throw {@link ConcurrentModificationException}. * However, iterators are designed to be used by only one thread at a time. * * <p> The allowed concurrency among update operations is guided by * the optional <tt>concurrencyLevel</tt> constructor argument * (default <tt>16</tt>), which is used as a hint for internal sizing. The * table is internally partitioned to try to permit the indicated * number of concurrent updates without contention. Because placement * in hash tables is essentially random, the actual concurrency will * vary. Ideally, you should choose a value to accommodate as many * threads as will ever concurrently modify the table. Using a * significantly higher value than you need can waste space and time, * and a significantly lower value can lead to thread contention. But * overestimates and underestimates within an order of magnitude do * not usually have much noticeable impact. A value of one is * appropriate when it is known that only one thread will modify and * all others will only read. Also, resizing this or any other kind of * hash table is a relatively slow operation, so, when possible, it is * a good idea to provide estimates of expected table sizes in * constructors. * * <p>This class and its views and iterators implement all of the * <em>optional</em> methods of the {@link Map} and {@link Iterator} * interfaces. * * <p> Like {@link Hashtable} but unlike {@link HashMap}, this class * does <em>not</em> allow <tt>null</tt> to be used as a key or value. * * <p>This class is a member of the * <a href="{@docRoot}/../technotes/guides/collections/index.html"> * Java Collections Framework</a>. * * @since 1.5 * @author Doug Lea * @param <K> the type of keys maintained by this map * @param <V> the type of mapped values */ public
class ConcurrentHashMap<K, V> extends
AbstractMap<K, V> implements
ConcurrentMap<K, V>, Serializable { private
static final long serialVersionUID = 7249069246763182397L; /* * The basic strategy is to subdivide the table among Segments, * each of which itself is a concurrently readable hash table. To * reduce footprint, all but one segments are constructed only * when first needed (see ensureSegment). To maintain visibility * in the presence of lazy construction, accesses to segments as * well as elements of segment‘s table must use volatile access, * which is done via Unsafe within methods segmentAt etc * below. These provide the functionality of AtomicReferenceArrays * but reduce the levels of indirection. Additionally, * volatile-writes of table elements and entry "next" fields * within locked operations use the cheaper "lazySet" forms of * writes (via putOrderedObject) because these writes are always * followed by lock releases that maintain sequential consistency * of table updates. * * Historical note: The previous version of this class relied * heavily on "final" fields, which avoided some volatile reads at * the expense of a large initial footprint. Some remnants of * that design (including forced construction of segment 0) exist * to ensure serialization compatibility. */ /* ---------------- Constants -------------- */ /** * The default initial capacity for this table, * used when not otherwise specified in a constructor. */ static
final int DEFAULT_INITIAL_CAPACITY = 16 ; /** * The default load factor for this table, used when not * otherwise specified in a constructor. */ static
final float DEFAULT_LOAD_FACTOR = 0 .75f; /** * The default concurrency level for this table, used when not * otherwise specified in a constructor. */ static
final int DEFAULT_CONCURRENCY_LEVEL = 16 ; /** * The maximum capacity, used if a higher value is implicitly * specified by either of the constructors with arguments. MUST * be a power of two <= 1<<30 to ensure that entries are indexable * using ints. */ static
final int MAXIMUM_CAPACITY = 1
<< 30 ; /** * The minimum capacity for per-segment tables. Must be a power * of two, at least two to avoid immediate resizing on next use * after lazy construction. */ static
final int MIN_SEGMENT_TABLE_CAPACITY = 2 ; /** * The maximum number of segments to allow; used to bound * constructor arguments. Must be power of two less than 1 << 24. */ static
final int MAX_SEGMENTS = 1
<< 16 ; // slightly conservative /** * Number of unsynchronized retries in size and containsValue * methods before resorting to locking. This is used to avoid * unbounded retries if tables undergo continuous modification * which would make it impossible to obtain an accurate result. */ static
final int RETRIES_BEFORE_LOCK = 2 ; /* ---------------- Fields -------------- */ /** * Mask value for indexing into segments. The upper bits of a * key‘s hash code are used to choose the segment. */ final
int segmentMask; /** * Shift value for indexing within segments. */ final
int segmentShift; /** * The segments, each of which is a specialized hash table. */ final
Segment<K,V>[] segments; transient
Set<K> keySet; transient
Set<Map.Entry<K,V>> entrySet; transient
Collection<V> values; /** * ConcurrentHashMap list entry. Note that this is never exported * out as a user-visible Map.Entry. */ static
final class HashEntry<K,V> { final
int hash; final
K key; volatile
V value; volatile
HashEntry<K,V> next; HashEntry( int
hash, K key, V value, HashEntry<K,V> next) { this .hash = hash; this .key = key; this .value = value; this .next = next; } /** * Sets next field with volatile write semantics. (See above * about use of putOrderedObject.) */ final
void setNext(HashEntry<K,V> n) { UNSAFE.putOrderedObject( this , nextOffset, n); } // Unsafe mechanics static
final sun.misc.Unsafe UNSAFE; static
final long nextOffset; static
{ try
{ UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = HashEntry. class ; nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField( "next" )); } catch
(Exception e) { throw
new Error(e); } } } /** * Gets the ith element of given table (if nonnull) with volatile * read semantics. */ @SuppressWarnings ( "unchecked" ) static
final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int
i) { return
(tab == null ) ? null
: (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, (( long )i << TSHIFT) + TBASE); } /** * Sets the ith element of given table, with volatile write * semantics. (See above about use of putOrderedObject.) */ static
final <K,V> void
setEntryAt(HashEntry<K,V>[] tab, int
i, HashEntry<K,V> e) { UNSAFE.putOrderedObject(tab, (( long )i << TSHIFT) + TBASE, e); } /** * Applies a supplemental hash function to a given hashCode, which * defends against poor quality hash functions. This is critical * because ConcurrentHashMap uses power-of-two length hash tables, * that otherwise encounter collisions for hashCodes that do not * differ in lower or upper bits. */ private
static int hash( int
h) { // Spread bits to regularize both segment and index locations, // using variant of single-word Wang/Jenkins hash. h += (h << 15 ) ^ 0xffffcd7d ; h ^= (h >>> 10 ); h += (h << 3 ); h ^= (h >>> 6 ); h += (h << 2 ) + (h << 14 ); return
h ^ (h >>> 16 ); } /** * Segments are specialized versions of hash tables. This * subclasses from ReentrantLock opportunistically, just to * simplify some locking and avoid separate construction. */ static
final class Segment<K,V> extends
ReentrantLock implements
Serializable { /* * Segments maintain a table of entry lists that are always * kept in a consistent state, so can be read (via volatile * reads of segments and tables) without locking. This * requires replicating nodes when necessary during table * resizing, so the old lists can be traversed by readers * still using old version of table. * * This class defines only mutative methods requiring locking. * Except as noted, the methods of this class perform the * per-segment versions of ConcurrentHashMap methods. (Other * methods are integrated directly into ConcurrentHashMap * methods.) These mutative methods use a form of controlled * spinning on contention via methods scanAndLock and * scanAndLockForPut. These intersperse tryLocks with * traversals to locate nodes. The main benefit is to absorb * cache misses (which are very common for hash tables) while * obtaining locks so that traversal is faster once * acquired. We do not actually use the found nodes since they * must be re-acquired under lock anyway to ensure sequential * consistency of updates (and in any case may be undetectably * stale), but they will normally be much faster to re-locate. * Also, scanAndLockForPut speculatively creates a fresh node * to use in put if no node is found. */ private
static final long serialVersionUID = 2249069246763182397L; /** * The maximum number of times to tryLock in a prescan before * possibly blocking on acquire in preparation for a locked * segment operation. On multiprocessors, using a bounded * number of retries maintains cache acquired while locating * nodes. */ static
final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1
? 64 : 1 ; /** * The per-segment table. Elements are accessed via * entryAt/setEntryAt providing volatile semantics. */ transient
volatile HashEntry<K,V>[] table; /** * The number of elements. Accessed only either within locks * or among other volatile reads that maintain visibility. */ transient
int count; /** * The total number of mutative operations in this segment. * Even though this may overflows 32 bits, it provides * sufficient accuracy for stability checks in CHM isEmpty() * and size() methods. Accessed only either within locks or * among other volatile reads that maintain visibility. */ transient
int modCount; /** * The table is rehashed when its size exceeds this threshold. * (The value of this field is always <tt>(int)(capacity * * loadFactor)</tt>.) */ transient
int threshold; /** * The load factor for the hash table. Even though this value * is same for all segments, it is replicated to avoid needing * links to outer object. * @serial */ final
float loadFactor; Segment( float
lf, int threshold, HashEntry<K,V>[] tab) { this .loadFactor = lf; this .threshold = threshold; this .table = tab; } final
V put(K key, int
hash, V value, boolean
onlyIfAbsent) { HashEntry<K,V> node = tryLock() ? null
: scanAndLockForPut(key, hash, value); V oldValue; try
{ HashEntry<K,V>[] tab = table; int
index = (tab.length - 1 ) & hash; HashEntry<K,V> first = entryAt(tab, index); for
(HashEntry<K,V> e = first;;) { if
(e != null ) { K k; if
((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if
(!onlyIfAbsent) { e.value = value; ++modCount; } break ; } e = e.next; } else
{ if
(node != null ) node.setNext(first); else node = new
HashEntry<K,V>(hash, key, value, first); int
c = count + 1 ; if
(c > threshold && first != null
&& tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null ; break ; } } } finally
{ unlock(); } return
oldValue; } /** * Doubles size of table and repacks entries, also adding the * given node to new table */ @SuppressWarnings ( "unchecked" ) private
void rehash(HashEntry<K,V> node) { /* * Reclassify nodes in each list to new table. Because we * are using power-of-two expansion, the elements from * each bin must either stay at same index, or move with a * power of two offset. We eliminate unnecessary node * creation by catching cases where old nodes can be * reused because their next fields won‘t change. * Statistically, at the default threshold, only about * one-sixth of them need cloning when a table * doubles. The nodes they replace will be garbage * collectable as soon as they are no longer referenced by * any reader thread that may be in the midst of * concurrently traversing table. Entry accesses use plain * array indexing because they are followed by volatile * table write. */ HashEntry<K,V>[] oldTable = table; int
oldCapacity = oldTable.length; int
newCapacity = oldCapacity << 1 ; threshold = ( int )(newCapacity * loadFactor); HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new
HashEntry[newCapacity]; int
sizeMask = newCapacity - 1 ; for
( int i = 0 ; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; if
(e != null ) { HashEntry<K,V> next = e.next; int
idx = e.hash & sizeMask; if
(next == null ) // Single node on list newTable[idx] = e; else
{ // Reuse consecutive sequence at same slot HashEntry<K,V> lastRun = e; int
lastIdx = idx; for
(HashEntry<K,V> last = next; last != null ; last = last.next) { int
k = last.hash & sizeMask; if
(k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; // Clone remaining nodes for
(HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int
h = p.hash; int
k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new
HashEntry<K,V>(h, p.key, v, n); } } } } int
nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; } /** * Scans for a node containing given key while trying to * acquire lock, creating and returning one if not found. Upon * return, guarantees that lock is held. UNlike in most * methods, calls to method equals are not screened: Since * traversal speed doesn‘t matter, we might as well help warm * up the associated code and accesses as well. * * @return a new node if key not found, else null */ private
HashEntry<K,V> scanAndLockForPut(K key, int
hash, V value) { HashEntry<K,V> first = entryForHash( this , hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null ; int
retries = - 1 ; // negative while locating node while
(!tryLock()) { HashEntry<K,V> f; // to recheck first below if
(retries < 0 ) { if
(e == null ) { if
(node == null ) // speculatively create node node = new
HashEntry<K,V>(hash, key, value, null ); retries = 0 ; } else
if (key.equals(e.key)) retries = 0 ; else e = e.next; } else
if (++retries > MAX_SCAN_RETRIES) { lock(); break ; } else
if ((retries & 1 ) == 0
&& (f = entryForHash( this , hash)) != first) { e = first = f; // re-traverse if entry changed retries = - 1 ; } } return
node; } /** * Scans for a node containing the given key while trying to * acquire lock for a remove or replace operation. Upon * return, guarantees that lock is held. Note that we must * lock even if the key is not found, to ensure sequential * consistency of updates. */ private
void scanAndLock(Object key, int
hash) { // similar to but simpler than scanAndLockForPut HashEntry<K,V> first = entryForHash( this , hash); HashEntry<K,V> e = first; int
retries = - 1 ; while
(!tryLock()) { HashEntry<K,V> f; if
(retries < 0 ) { if
(e == null
|| key.equals(e.key)) retries = 0 ; else e = e.next; } else
if (++retries > MAX_SCAN_RETRIES) { lock(); break ; } else
if ((retries & 1 ) == 0
&& (f = entryForHash( this , hash)) != first) { e = first = f; retries = - 1 ; } } } /** * Remove; match on key only if value null, else match both. */ final
V remove(Object key, int
hash, Object value) { if
(!tryLock()) scanAndLock(key, hash); V oldValue = null ; try
{ HashEntry<K,V>[] tab = table; int
index = (tab.length - 1 ) & hash; HashEntry<K,V> e = entryAt(tab, index); HashEntry<K,V> pred = null ; while
(e != null ) { K k; HashEntry<K,V> next = e.next; if
((k = e.key) == key || (e.hash == hash && key.equals(k))) { V v = e.value; if
(value == null
|| value == v || value.equals(v)) { if
(pred == null ) setEntryAt(tab, index, next); else pred.setNext(next); ++modCount; --count; oldValue = v; } break ; } pred = e; e = next; } } finally
{ unlock(); } return
oldValue; } final
boolean replace(K key, int
hash, V oldValue, V newValue) { if
(!tryLock()) scanAndLock(key, hash); boolean
replaced = false ; try
{ HashEntry<K,V> e; for
(e = entryForHash( this , hash); e != null ; e = e.next) { K k; if
((k = e.key) == key || (e.hash == hash && key.equals(k))) { if
(oldValue.equals(e.value)) { e.value = newValue; ++modCount; replaced = true ; } break ; } } } finally
{ unlock(); } return
replaced; } final
V replace(K key, int
hash, V value) { if
(!tryLock()) scanAndLock(key, hash); V oldValue = null ; try
{ HashEntry<K,V> e; for
(e = entryForHash( this , hash); e != null ; e = e.next) { K k; if
((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; e.value = value; ++modCount; break ; } } } finally
{ unlock(); } return
oldValue; } final
void clear() { lock(); try
{ HashEntry<K,V>[] tab = table; for
( int i = 0 ; i < tab.length ; i++) setEntryAt(tab, i, null ); ++modCount; count = 0 ; } finally
{ unlock(); } } } // Accessing segments /** * Gets the jth element of given segment array (if nonnull) with * volatile element access semantics via Unsafe. */ @SuppressWarnings ( "unchecked" ) static
final <K,V> Segment<K,V> segmentAt(Segment<K,V>[] ss, int
j) { long
u = (j << SSHIFT) + SBASE; return
ss == null
? null : (Segment<K,V>) UNSAFE.getObjectVolatile(ss, u); } /** * Returns the segment for the given index, creating it and * recording in segment table (via CAS) if not already present. * * @param k the index * @return the segment */ @SuppressWarnings ( "unchecked" ) private
Segment<K,V> ensureSegment( int
k) { final
Segment<K,V>[] ss = this .segments; long
u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; if
((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null ) { Segment<K,V> proto = ss[ 0 ]; // use segment 0 as prototype int
cap = proto.table.length; float
lf = proto.loadFactor; int
threshold = ( int )(cap * lf); HashEntry<K,V>[] tab = (HashEntry<K,V>[]) new
HashEntry[cap]; if
((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null ) { // recheck Segment<K,V> s = new
Segment<K,V>(lf, threshold, tab); while
((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null ) { if
(UNSAFE.compareAndSwapObject(ss, u, null , seg = s)) break ; } } } return
seg; } // Hash-based segment and entry accesses /** * Get the segment for the given hash */ @SuppressWarnings ( "unchecked" ) private
Segment<K,V> segmentForHash( int
h) { long
u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; return
(Segment<K,V>) UNSAFE.getObjectVolatile(segments, u); } /** * Gets the table entry for the given segment and hash */ @SuppressWarnings ( "unchecked" ) static
final <K,V> HashEntry<K,V> entryForHash(Segment<K,V> seg, int
h) { HashEntry<K,V>[] tab; return
(seg == null
|| (tab = seg.table) == null ) ? null
: (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, (( long )(((tab.length - 1 ) & h)) << TSHIFT) + TBASE); } /* ---------------- Public operations -------------- */ /** * Creates a new, empty map with the specified initial * capacity, load factor and concurrency level. * * @param initialCapacity the initial capacity. The implementation * performs internal sizing to accommodate this many elements. * @param loadFactor the load factor threshold, used to control resizing. * Resizing may be performed when the average number of elements per * bin exceeds this threshold. * @param concurrencyLevel the estimated number of concurrently * updating threads. The implementation performs internal sizing * to try to accommodate this many threads. * @throws IllegalArgumentException if the initial capacity is * negative or the load factor or concurrencyLevel are * nonpositive. */ @SuppressWarnings ( "unchecked" ) public
ConcurrentHashMap( int
initialCapacity, float
loadFactor, int
concurrencyLevel) { if
(!(loadFactor > 0 ) || initialCapacity < 0
|| concurrencyLevel <= 0 ) throw
new IllegalArgumentException(); if
(concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int
sshift = 0 ; int
ssize = 1 ; while
(ssize < concurrencyLevel) { ++sshift; ssize <<= 1 ; } this .segmentShift = 32
- sshift; this .segmentMask = ssize - 1 ; if
(initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int
c = initialCapacity / ssize; if
(c * ssize < initialCapacity) ++c; int
cap = MIN_SEGMENT_TABLE_CAPACITY; while
(cap < c) cap <<= 1 ; // create segments and segments[0] Segment<K,V> s0 = new
Segment<K,V>(loadFactor, ( int )(cap * loadFactor), (HashEntry<K,V>[]) new
HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[]) new
Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this .segments = ss; } /** * Creates a new, empty map with the specified initial capacity * and load factor and with the default concurrencyLevel (16). * * @param initialCapacity The implementation performs internal * sizing to accommodate this many elements. * @param loadFactor the load factor threshold, used to control resizing. * Resizing may be performed when the average number of elements per * bin exceeds this threshold. * @throws IllegalArgumentException if the initial capacity of * elements is negative or the load factor is nonpositive * * @since 1.6 */ public
ConcurrentHashMap( int
initialCapacity, float
loadFactor) { this (initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL); } /** * Creates a new, empty map with the specified initial capacity, * and with default load factor (0.75) and concurrencyLevel (16). * * @param initialCapacity the initial capacity. The implementation * performs internal sizing to accommodate this many elements. * @throws IllegalArgumentException if the initial capacity of * elements is negative. */ public
ConcurrentHashMap( int
initialCapacity) { this (initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); } /** * Creates a new, empty map with a default initial capacity (16), * load factor (0.75) and concurrencyLevel (16). */ public
ConcurrentHashMap() { this (DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); } /** * Creates a new map with the same mappings as the given map. * The map is created with a capacity of 1.5 times the number * of mappings in the given map or 16 (whichever is greater), * and a default load factor (0.75) and concurrencyLevel (16). * * @param m the map */ public
ConcurrentHashMap(Map<? extends
K, ? extends
V> m) { this (Math.max(( int ) (m.size() / DEFAULT_LOAD_FACTOR) + 1 , DEFAULT_INITIAL_CAPACITY), DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); putAll(m); } /** * Returns <tt>true</tt> if this map contains no key-value mappings. * * @return <tt>true</tt> if this map contains no key-value mappings */ public
boolean isEmpty() { /* * Sum per-segment modCounts to avoid mis-reporting when * elements are concurrently added and removed in one segment * while checking another, in which case the table was never * actually empty at any point. (The sum ensures accuracy up * through at least 1<<31 per-segment modifications before * recheck.) Methods size() and containsValue() use similar * constructions for stability checks. */ long
sum = 0L; final
Segment<K,V>[] segments = this .segments; for
( int j = 0 ; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if
(seg != null ) { if
(seg.count != 0 ) return
false ; sum += seg.modCount; } } if
(sum != 0L) { // recheck unless no modifications for
( int j = 0 ; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if
(seg != null ) { if
(seg.count != 0 ) return
false ; sum -= seg.modCount; } } if
(sum != 0L) return
false ; } return
true ; } /** * Returns the number of key-value mappings in this map. If the * map contains more than <tt>Integer.MAX_VALUE</tt> elements, returns * <tt>Integer.MAX_VALUE</tt>. * * @return the number of key-value mappings in this map */ public
int size() { // Try a few times to get accurate count. On failure due to // continuous async changes in table, resort to locking. final
Segment<K,V>[] segments = this .segments; int
size; boolean
overflow; // true if size overflows 32 bits long
sum; // sum of modCounts long
last = 0L; // previous sum int
retries = - 1 ; // first iteration isn‘t retry try
{ for
(;;) { if
(retries++ == RETRIES_BEFORE_LOCK) { for
( int j = 0 ; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } sum = 0L; size = 0 ; overflow = false ; for
( int j = 0 ; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if
(seg != null ) { sum += seg.modCount; int
c = seg.count; if
(c < 0 || (size += c) < 0 ) overflow = true ; } } if
(sum == last) break ; last = sum; } } finally
{ if
(retries > RETRIES_BEFORE_LOCK) { for
( int j = 0 ; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return
overflow ? Integer.MAX_VALUE : size; } /** * Returns the value to which the specified key is mapped, * or {@code null} if this map contains no mapping for the key. * * <p>More formally, if this map contains a mapping from a key * {@code k} to a value {@code v} such that {@code key.equals(k)}, * then this method returns {@code v}; otherwise it returns * {@code null}. (There can be at most one such mapping.) * * @throws NullPointerException if the specified key is null */ public
V get(Object key) { int
hash = hash(key.hashCode()); for
(HashEntry<K,V> e = entryForHash(segmentForHash(hash), hash); e != null ; e = e.next) { K k; if
((k = e.key) == key || (e.hash == hash && key.equals(k))) return
e.value; } return
null ; } /** * Tests if the specified object is a key in this table. * * @param key possible key * @return <tt>true</tt> if and only if the specified object * is a key in this table, as determined by the * <tt>equals</tt> method; <tt>false</tt> otherwise. * @throws NullPointerException if the specified key is null */ public
boolean containsKey(Object key) { int
hash = hash(key.hashCode()); for
(HashEntry<K,V> e = entryForHash(segmentForHash(hash), hash); e != null ; e = e.next) { K k; if
((k = e.key) == key || (e.hash == hash && key.equals(k))) return
true ; } return
false ; } /** * Returns <tt>true</tt> if this map maps one or more keys to the * specified value. Note: This method requires a full internal * traversal of the hash table, and so is much slower than * method <tt>containsKey</tt>. * * @param value value whose presence in this map is to be tested * @return <tt>true</tt> if this map maps one or more keys to the * specified value * @throws NullPointerException if the specified value is null */ public
boolean containsValue(Object value) { // Same idea as size() if
(value == null ) throw
new NullPointerException(); final
Segment<K,V>[] segments = this .segments; boolean
found = false ; long
last = 0 ; int
retries = - 1 ; try
{ outer: for
(;;) { if
(retries++ == RETRIES_BEFORE_LOCK) { for
( int j = 0 ; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } long
hashSum = 0L; int
sum = 0 ; for
( int j = 0 ; j < segments.length; ++j) { HashEntry<K,V>[] tab; Segment<K,V> seg = segmentAt(segments, j); if
(seg != null
&& (tab = seg.table) != null ) { for
( int i = 0 ; i < tab.length; i++) { HashEntry<K,V> e; for
(e = entryAt(tab, i); e != null ; e = e.next) { V v = e.value; if
(v != null
&& value.equals(v)) { found = true ; break
outer; } } } sum += seg.modCount; } } if
(retries > 0
&& sum == last) break ; last = sum; } } finally
{ if
(retries > RETRIES_BEFORE_LOCK) { for
( int j = 0 ; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return
found; } /** * Legacy method testing if some key maps into the specified value * in this table. This method is identical in functionality to * {@link #containsValue}, and exists solely to ensure * full compatibility with class {@link java.util.Hashtable}, * which supported this method prior to introduction of the * Java Collections framework. * @param value a value to search for * @return <tt>true</tt> if and only if some key maps to the * <tt>value</tt> argument in this table as * determined by the <tt>equals</tt> method; * <tt>false</tt> otherwise * @throws NullPointerException if the specified value is null */ public
boolean contains(Object value) { return
containsValue(value); } /** * Maps the specified key to the specified value in this table. * Neither the key nor the value can be null. * * <p> The value can be retrieved by calling the <tt>get</tt> method * with a key that is equal to the original key. * * @param key key with which the specified value is to be associated * @param value value to be associated with the specified key * @return the previous value associated with <tt>key</tt>, or * <tt>null</tt> if there was no mapping for <tt>key</tt> * @throws NullPointerException if the specified key or value is null */ public
V put(K key, V value) { if
(value == null ) throw
new NullPointerException(); int
hash = hash(key.hashCode()); int
j = (hash >>> segmentShift) & segmentMask; Segment<K,V> s = segmentAt(segments, j); if
(s == null ) s = ensureSegment(j); return
s.put(key, hash, value, false ); } /** * {@inheritDoc} * * @return the previous value associated with the specified key, * or <tt>null</tt> if there was no mapping for the key * @throws NullPointerException if the specified key or value is null */ public
V putIfAbsent(K key, V value) { if
(value == null ) throw
new NullPointerException(); int
hash = hash(key.hashCode()); int
j = (hash >>> segmentShift) & segmentMask; Segment<K,V> s = segmentAt(segments, j); if
(s == null ) s = ensureSegment(j); return
s.put(key, hash, value, true ); } /** * Copies all of the mappings from the specified map to this one. * These mappings replace any mappings that this map had for any of the * keys currently in the specified map. * * @param m mappings to be stored in this map */ public
void putAll(Map<? extends
K, ? extends
V> m) { for
(Map.Entry<? extends
K, ? extends
V> e : m.entrySet()) put(e.getKey(), e.getValue()); } /** * Removes the key (and its corresponding value) from this map. * This method does nothing if the key is not in the map. * * @param key the key that needs to be removed * @return the previous value associated with <tt>key</tt>, or * <tt>null</tt> if there was no mapping for <tt>key</tt> * @throws NullPointerException if the specified key is null */ public
V remove(Object key) { int
hash = hash(key.hashCode()); Segment<K,V> s = segmentForHash(hash); return
s == null
? null : s.remove(key, hash, null ); } /** * {@inheritDoc} * * @throws NullPointerException if the specified key is null */ public
boolean remove(Object key, Object value) { int
hash = hash(key.hashCode()); Segment<K,V> s; return
value != null
&& (s = segmentForHash(hash)) != null
&& s.remove(key, hash, value) != null ; } /** * {@inheritDoc} * * @throws NullPointerException if any of the arguments are null */ public
boolean replace(K key, V oldValue, V newValue) { int
hash = hash(key.hashCode()); if
(oldValue == null
|| newValue == null ) throw
new NullPointerException(); Segment<K,V> s = segmentForHash(hash); return
s != null
&& s.replace(key, hash, oldValue, newValue); } /** * {@inheritDoc} * * @return the previous value associated with the specified key, * or <tt>null</tt> if there was no mapping for the key * @throws NullPointerException if the specified key or value is null */ public
V replace(K key, V value) { int
hash = hash(key.hashCode()); if
(value == null ) throw
new NullPointerException(); Segment<K,V> s = segmentForHash(hash); return
s == null
? null : s.replace(key, hash, value); } /** * Removes all of the mappings from this map. */ public
void clear() { final
Segment<K,V>[] segments = this .segments; for
( int j = 0 ; j < segments.length; ++j) { Segment<K,V> s = segmentAt(segments, j); if
(s != null ) s.clear(); } } /** * Returns a {@link Set} view of the keys contained in this map. * The set is backed by the map, so changes to the map are * reflected in the set, and vice-versa. The set supports element * removal, which removes the corresponding mapping from this map, * via the <tt>Iterator.remove</tt>, <tt>Set.remove</tt>, * <tt>removeAll</tt>, <tt>retainAll</tt>, and <tt>clear</tt> * operations. It does not support the <tt>add</tt> or * <tt>addAll</tt> operations. * * <p>The view‘s <tt>iterator</tt> is a "weakly consistent" iterator * that will never throw {@link ConcurrentModificationException}, * and guarantees to traverse elements as they existed upon * construction of the iterator, and may (but is not guaranteed to) * reflect any modifications subsequent to construction. */ public
Set<K> keySet() { Set<K> ks = keySet; return
(ks != null ) ? ks : (keySet = new
KeySet()); } /** * Returns a {@link Collection} view of the values contained in this map. * The collection is backed by the map, so changes to the map are * reflected in the collection, and vice-versa. The collection * supports element removal, which removes the corresponding * mapping from this map, via the <tt>Iterator.remove</tt>, * <tt>Collection.remove</tt>, <tt>removeAll</tt>, * <tt>retainAll</tt>, and <tt>clear</tt> operations. It does not * support the <tt>add</tt> or <tt>addAll</tt> operations. * * <p>The view‘s <tt>iterator</tt> is a "weakly consistent" iterator * that will never throw {@link ConcurrentModificationException}, * and guarantees to traverse elements as they existed upon * construction of the iterator, and may (but is not guaranteed to) * reflect any modifications subsequent to construction. */ public
Collection<V> values() { Collection<V> vs = values; return
(vs != null ) ? vs : (values = new
Values()); } /** * Returns a {@link Set} view of the mappings contained in this map. * The set is backed by the map, so changes to the map are * reflected in the set, and vice-versa. The set supports element * removal, which removes the corresponding mapping from the map, * via the <tt>Iterator.remove</tt>, <tt>Set.remove</tt>, * <tt>removeAll</tt>, <tt>retainAll</tt>, and <tt>clear</tt> * operations. It does not support the <tt>add</tt> or * <tt>addAll</tt> operations. * * <p>The view‘s <tt>iterator</tt> is a "weakly consistent" iterator * that will never throw {@link ConcurrentModificationException}, * and guarantees to traverse elements as they existed upon * construction of the iterator, and may (but is not guaranteed to) * reflect any modifications subsequent to construction. */ public
Set<Map.Entry<K,V>> entrySet() { Set<Map.Entry<K,V>> es = entrySet; return
(es != null ) ? es : (entrySet = new
EntrySet()); } /** * Returns an enumeration of the keys in this table. * * @return an enumeration of the keys in this table * @see #keySet() */ public
Enumeration<K> keys() { return
new KeyIterator(); } /** * Returns an enumeration of the values in this table. * * @return an enumeration of the values in this table * @see #values() */ public
Enumeration<V> elements() { return
new ValueIterator(); } /* ---------------- Iterator Support -------------- */ abstract
class HashIterator { int
nextSegmentIndex; int
nextTableIndex; HashEntry<K,V>[] currentTable; HashEntry<K, V> nextEntry; HashEntry<K, V> lastReturned; HashIterator() { nextSegmentIndex = segments.length - 1 ; nextTableIndex = - 1 ; advance(); } /** * Set nextEntry to first node of next non-empty table * (in backwards order, to simplify checks). */ final
void advance() { for
(;;) { if
(nextTableIndex >= 0 ) { if
((nextEntry = entryAt(currentTable, nextTableIndex--)) != null ) break ; } else
if (nextSegmentIndex >= 0 ) { Segment<K,V> seg = segmentAt(segments, nextSegmentIndex--); if
(seg != null
&& (currentTable = seg.table) != null ) nextTableIndex = currentTable.length - 1 ; } else break ; } } final
HashEntry<K,V> nextEntry() { HashEntry<K,V> e = nextEntry; if
(e == null ) throw
new NoSuchElementException(); lastReturned = e; // cannot assign until after null check if
((nextEntry = e.next) == null ) advance(); return
e; } public
final boolean hasNext() { return
nextEntry != null ; } public
final boolean hasMoreElements() { return
nextEntry != null ; } public
final void remove() { if
(lastReturned == null ) throw
new IllegalStateException(); ConcurrentHashMap. this .remove(lastReturned.key); lastReturned = null ; } } final
class KeyIterator extends
HashIterator implements
Iterator<K>, Enumeration<K> { public
final K next() { return
super .nextEntry().key; } public
final K nextElement() { return
super .nextEntry().key; } } final
class ValueIterator extends
HashIterator implements
Iterator<V>, Enumeration<V> { public
final V next() { return
super .nextEntry().value; } public
final V nextElement() { return
super .nextEntry().value; } } /** * Custom Entry class used by EntryIterator.next(), that relays * setValue changes to the underlying map. */ final
class WriteThroughEntry extends
AbstractMap.SimpleEntry<K,V> { WriteThroughEntry(K k, V v) { super (k,v); } /** * Set our entry‘s value and write through to the map. The * value to return is somewhat arbitrary here. Since a * WriteThroughEntry does not necessarily track asynchronous * changes, the most recent "previous" value could be * different from what we return (or could even have been * removed in which case the put will re-establish). We do not * and cannot guarantee more. */ public
V setValue(V value) { if
(value == null ) throw
new NullPointerException(); V v = super .setValue(value); ConcurrentHashMap. this .put(getKey(), value); return
v; } } final
class EntryIterator extends
HashIterator implements
Iterator<Entry<K,V>> { public
Map.Entry<K,V> next() { HashEntry<K,V> e = super .nextEntry(); return
new WriteThroughEntry(e.key, e.value); } } final
class KeySet extends
AbstractSet<K> { public
Iterator<K> iterator() { return
new KeyIterator(); } public
int size() { return
ConcurrentHashMap. this .size(); } public
boolean isEmpty() { return
ConcurrentHashMap. this .isEmpty(); } public
boolean contains(Object o) { return
ConcurrentHashMap. this .containsKey(o); } public
boolean remove(Object o) { return
ConcurrentHashMap. this .remove(o) != null ; } public
void clear() { ConcurrentHashMap. this .clear(); } } final
class Values extends
AbstractCollection<V> { public
Iterator<V> iterator() { return
new ValueIterator(); } public
int size() { return
ConcurrentHashMap. this .size(); } public
boolean isEmpty() { return
ConcurrentHashMap. this .isEmpty(); } public
boolean contains(Object o) { return
ConcurrentHashMap. this .containsValue(o); } public
void clear() { ConcurrentHashMap. this .clear(); } } final
class EntrySet extends
AbstractSet<Map.Entry<K,V>> { public
Iterator<Map.Entry<K,V>> iterator() { return
new EntryIterator(); } public
boolean contains(Object o) { if
(!(o instanceof
Map.Entry)) return
false ; Map.Entry<?,?> e = (Map.Entry<?,?>)o; V v = ConcurrentHashMap. this .get(e.getKey()); return
v != null
&& v.equals(e.getValue()); } public
boolean remove(Object o) { if
(!(o instanceof
Map.Entry)) return
false ; Map.Entry<?,?> e = (Map.Entry<?,?>)o; return
ConcurrentHashMap. this .remove(e.getKey(), e.getValue()); } public
int size() { return
ConcurrentHashMap. this .size(); } public
boolean isEmpty() { return
ConcurrentHashMap. this .isEmpty(); } public
void clear() { ConcurrentHashMap. this .clear(); } } /* ---------------- Serialization Support -------------- */ /** * Save the state of the <tt>ConcurrentHashMap</tt> instance to a * stream (i.e., serialize it). * @param s the stream * @serialData * the key (Object) and value (Object) * for each key-value mapping, followed by a null pair. * The key-value mappings are emitted in no particular order. */ private
void writeObject(java.io.ObjectOutputStream s) throws
IOException { // force all segments for serialization compatibility for
( int k = 0 ; k < segments.length; ++k) ensureSegment(k); s.defaultWriteObject(); final
Segment<K,V>[] segments = this .segments; for
( int k = 0 ; k < segments.length; ++k) { Segment<K,V> seg = segmentAt(segments, k); seg.lock(); try
{ HashEntry<K,V>[] tab = seg.table; for
( int i = 0 ; i < tab.length; ++i) { HashEntry<K,V> e; for
(e = entryAt(tab, i); e != null ; e = e.next) { s.writeObject(e.key); s.writeObject(e.value); } } } finally
{ seg.unlock(); } } s.writeObject( null ); s.writeObject( null ); } /** * Reconstitute the <tt>ConcurrentHashMap</tt> instance from a * stream (i.e., deserialize it). * @param s the stream */ @SuppressWarnings ( "unchecked" ) private
void readObject(java.io.ObjectInputStream s) throws
IOException, ClassNotFoundException { // Don‘t call defaultReadObject() ObjectInputStream.GetField oisFields = s.readFields(); final
Segment<K,V>[] oisSegments = (Segment<K,V>[])oisFields.get( "segments" , null ); final
int ssize = oisSegments.length; if
(ssize < 1
|| ssize > MAX_SEGMENTS || (ssize & (ssize- 1 )) != 0
) // ssize not power of two throw
new java.io.InvalidObjectException( "Bad number of segments:" + ssize); int
sshift = 0 , ssizeTmp = ssize; while
(ssizeTmp > 1 ) { ++sshift; ssizeTmp >>>= 1 ; } UNSAFE.putIntVolatile( this , SEGSHIFT_OFFSET, 32
- sshift); UNSAFE.putIntVolatile( this , SEGMASK_OFFSET, ssize - 1 ); UNSAFE.putObjectVolatile( this , SEGMENTS_OFFSET, oisSegments); // Re-initialize segments to be minimally sized, and let grow. int
cap = MIN_SEGMENT_TABLE_CAPACITY; final
Segment<K,V>[] segments = this .segments; for
( int k = 0 ; k < segments.length; ++k) { Segment<K,V> seg = segments[k]; if
(seg != null ) { seg.threshold = ( int )(cap * seg.loadFactor); seg.table = (HashEntry<K,V>[]) new
HashEntry[cap]; } } // Read the keys and values, and put the mappings in the table for
(;;) { K key = (K) s.readObject(); V value = (V) s.readObject(); if
(key == null ) break ; put(key, value); } } // Unsafe mechanics private
static final sun.misc.Unsafe UNSAFE; private
static final long SBASE; private
static final int SSHIFT; private
static final long TBASE; private
static final int TSHIFT; private
static final long SEGSHIFT_OFFSET; private
static final long SEGMASK_OFFSET; private
static final long SEGMENTS_OFFSET; static
{ int
ss, ts; try
{ UNSAFE = sun.misc.Unsafe.getUnsafe(); Class tc = HashEntry[]. class ; Class sc = Segment[]. class ; TBASE = UNSAFE.arrayBaseOffset(tc); SBASE = UNSAFE.arrayBaseOffset(sc); ts = UNSAFE.arrayIndexScale(tc); ss = UNSAFE.arrayIndexScale(sc); SEGSHIFT_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap. class .getDeclaredField( "segmentShift" )); SEGMASK_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap. class .getDeclaredField( "segmentMask" )); SEGMENTS_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap. class .getDeclaredField( "segments" )); } catch
(Exception e) { throw
new Error(e); } if
((ss & (ss- 1 )) != 0
|| (ts & (ts- 1 )) != 0 ) throw
new Error( "data type scale not a power of two" ); SSHIFT = 31
- Integer.numberOfLeadingZeros(ss); TSHIFT = 31
- Integer.numberOfLeadingZeros(ts); } } |
高并发环境下要实现高吞吐量和线程安全,两个思路:一个是用优化的锁实现,一个是lock-free的无锁结构。但非阻塞算法要比基于锁的算法复杂得多。开发非阻塞算法是相当专业的训练,而且要证明算法的正确也极为困难,不仅和具体的目标机器平台和编译器相关,而且需要复杂的技巧和严格的测试。虽然Lock-Free编程非常困难,但是它通常可以带来比基于锁编程更高的吞吐量。所以Lock-Free编程是大有前途的技术。它在线程中止、优先级倒置以及信号安全等方面都有着良好的表现。
另外,在设计思路上除了尽量减少资源争用以外,还可以借鉴nginx/node.js等单线程大循环的机制,用单线程或CPU数相同的线程开辟大的队列,并发的时候任务压入队列,线程轮询然后一个个顺序执行。由于每个都采用异步I/O,没有阻塞线程。这个大队列可以使用RabbitMQueue,或是JDK的同步队列(性能稍差),或是使用Disruptor无锁队列(Java)。任务处理可以全部放在内存(多级缓存、读写分离、ConcurrentHashMap、甚至分布式缓存Redis)中进行增删改查。最后用Quarz维护定时把缓存数据同步到DB中。当然,这只是中小型系统的思路,如果是大型分布式系统会非常复杂,需要分而治理,用SOA的思路,参考这篇文章的图。
如果深入 JVM
和操作系统,会发现非阻塞算法无处不在。垃圾收集器使用非阻塞算法加快并发和平行的垃圾搜集;调度器使用非阻塞算法有效地调度线程和进程,实现内在锁。在
Mustang(Java 6.0)中,基于锁的 SynchronousQueue
算法被新的非阻塞版本代替。很少有开发人员会直接使用
SynchronousQueue
,但是通过 Executors.newCachedThreadPool()
工厂构建的线程池用它作为工作队列。比较缓存线程池性能的对比测试显示,新的非阻塞同步队列实现提供了几乎是当前实现 3
倍的速度。在 Mustang 的后续版本(代码名称为 Dolphin)中,已经规划了进一步的改进。
非阻塞同步算法与CAS(Compare and Swap)无锁算法
原文:http://www.cnblogs.com/Mainz/p/3546347.html