一. 从 Demo 解析源码
package com.wenniuwuren.concurrent;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
* Created by zhuyb on 16/5/1.
*/
public class SemaphoreTest {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
Semaphore semaphore = new Semaphore(5);
for (int i = 0; i < 10; i++) {
int count = i;
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
semaphore.acquire(); // 获取许可
System.out.println("当前循环:" + count);
System.out.println("当前还剩多少许可数量:" + semaphore.availablePermits());
Thread.sleep(10000);
semaphore.release(); // 释放占用的许可
} catch (Exception e) {
e.printStackTrace();
}
}
};
executorService.execute(runnable);
}
executorService.shutdown();
}
}public Semaphore(int permits) {
sync = new NonfairSync(permits);
}static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
AQS 的 Sync 构造函数:设置 AQS 的同步状态 stat
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
// 省略无用代码
}
protected final void setState(int newState) {
state = newState;
}(2)semaphore.acquire(); // 获取许可
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
public void release() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}接下来看 tryReleaseShared 返回 true 后进入的 doReleaseShared():与互斥锁不同的是,共享锁在释放锁的时候会将共享锁释放信息向 AQS 的队列中传播这个共享锁已经释放的 SIGNAL,这样等待这个共享锁的线程就能较快地脱离 AQS 的等待队列。从 ws == Node.SIGNAL 分支执行的就是共享锁的向后传递 SIGNAL 方法 unparkSuccessor(h)。最外面的 for(;;)就是为了保证释放锁的正常进行,异常就是循环重试
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}原文:http://blog.csdn.net/wenniuwuren/article/details/51302745