JDK 并发包小结

一、同步控制

1. 重入锁

重入锁可以替代 synchronized 关键字,在 jdk5.0 早期,重入锁的性能好于 synchronized,从 jdk6.0 开始,synchronized 优化,两者性能差不多了。

重入锁例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ReenterLock implements Runnable {
// 重入锁
public static ReentrantLock lock = new ReentrantLock();
public static int i = 0;

@Override
public void run() {
for (int j = 0; j < 10000000; ++j) {
lock.lock();
try {
i++;
} finally {
lock.unlock();
}
}
}

public static void main(String[] args) throws InterruptedException {
ReenterLock tl = new ReenterLock();
Thread t1 = new Thread(tl);
Thread t2 = new Thread(tl);
t1.start(); t2.start();
t1.join(); t2.join();
System.out.println(i);
}
}

非常直观地,我们发现和 synchronized 相比,这里的重入锁更加注重锁的细节,包括了上锁和解锁的过程。更加灵活。

值得注意的是,重入锁可以重入

1
2
3
4
5
6
7
8
lock.lock();
lock.lock();
try {
i++;
} finally {
lock.unlock();
lock.unlock();
}

在这种情况下,一个线程连续两次获得同一把锁。这是允许的!如果同一个线程多次获得锁,那么在释放锁的时候,也必须释放相同次数!如果释放锁的次数多,那么会得到一个 java. lang. IllegalMonitorState Exception 异常,反之,如果释放锁的次数少了,那么相当于线程还持有这个锁,因此,其他线程也无法进入临界区。

1.1 中断响应

对于 synchronized 来说,如果一个线程在等待锁,那么结局是获得锁或者是保持等待;对于重入锁而言,还有可能被中断

如果一个线程正在等待锁,那么它收到一个通知,被告知无须再等待,可以停止了。

下面的代码就是一个死锁,得益于锁中断,我已解决这个死锁!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class IntLock implements Runnable {
public static ReentrantLock lock1 = new ReentrantLock();
public static ReentrantLock lock2 = new ReentrantLock();

int lock;

public IntLock(int lock) {
this.lock = lock;
}

@Override
public void run() {
try {
if (lock == 1) {
lock1.lockInterruptibly();
try {
Thread.sleep(500);
} catch (InterruptedException e) {

}
lock2.lockInterruptibly();
} else {
lock2.lockInterruptibly();
try {
Thread.sleep(500);
} catch (InterruptedException e) {

}
lock1.lockInterruptibly();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (lock1.isHeldByCurrentThread()) lock1.unlock();
if (lock2.isHeldByCurrentThread()) lock2.unlock();
System.out.println(Thread.currentThread().getId() + ":线程退出");
}
}

public static void main(String[] args) throws InterruptedException {
IntLock r1 = new IntLock(1);
IntLock r2 = new IntLock(2);
Thread t1 = new Thread(r1);
Thread t2 = new Thread(r2);
t1.start();
t2.start();
Thread.sleep(1000);
// 中断其中一个线程
t2.interrupt();
}
}

看到一个方法,叫做 lockInterruptiby() ,这个方法可以对中断进行响应,在等待锁的过程中,可以响应中断

线程 t1 获取了 lock1 ,然后去获取 lock2 ;线程 t2 获取了 lock2 ,然后去获取 lock1 。现在他们相互等待了,现在,我们中断一个线程,t2 会放弃对于 lock1 的申请,同时释放 lock2

所以,最后完成任务的是 t1 ,而 t2 线程放弃任务直接退出,释放资源。

1.2 锁申请等待限时

避免死锁的另外一种方法是限时等待,给定一个时间,让线程自动放弃。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class TimeLock implements Runnable {

public static ReentrantLock lock = new ReentrantLock();

@Override
public void run() {
try {
if (lock.tryLock(5, TimeUnit.SECONDS)) {
Thread.sleep(6000);
} else {
System.out.println("get lock failed");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (lock.isHeldByCurrentThread()) lock.unlock();
}
}

public static void main(String[] args) {
TimeLock tl = new TimeLock();
Thread t1 = new Thread(tl);
Thread t2 = new Thread(tl);
t1.start();
t2.start();
}
}

这里的方法,是使用了 tryLock() ,第一个参数是等待时间,第二个参数是计时单位。这个例子中,用于持有锁长达 6s ,另外一个线程会申请失败。当然,这个不会导致死锁了。

tryLock() 方法也可以不需要参数。立即获取或者立即失败。同样不会死锁。

现在我们看一个非常绕的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
if (lock1.tryLock()) {
try {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (lock2.tryLock()) {
try {
System.out.println(Thread.currentThread().getId() + ":My Job done");
return;
} finally {
lock2.unlock();
}
}
} finally {
lock1.unlock();
}
}

外层是一个 while 循环,线程1执行这里,然后线程2和它相反。如果不是 tryLock() 就非常容易死锁了,但是这里不会,因为是 tryLock() 嘛,立即返回结果,有意思的是这里的代码不能这样更改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
try {
if (lock1.tryLock()) {
Thread.sleep(500);
}
try {
if (lock2.tryLock()) {
System.out.println(Thread.currentThread().getId() + ":My Job done");
return;
}
} finally {
lock2.unlock();
}
} catch (InterruptedException e) {
} finally {
lock1.unlock();
}

这样改将报错!这是因为,我们假设有两个线程,线程1现在有 lock1 ,线程2现在有 lock2 ,此刻,线程1尝试获取 lock2 ,就是 try{} 代码里再用 try{} ,去获取 lock2 ,使用的是 tryLock() ,那么,现在我们失败了,于是执行到 finally{}问题在这里,我们没有 lock2 啊!所以这样改是错的。

1.3 公平锁

在大多情况下,锁都是非公平的,系统在一个锁可用的时候,从等待列表中随机挑选一个,因此不能保证其公平性。公平锁的特点是:不会产生解,只要排队,总会得到资源。如果使用 synchronized 关键字,那么锁就是非公平的。重入锁可以设置公平性,当 fair 为 true 时,锁公平。

1
public ReentrantLock(boolean fair)

实现公平锁要维护一个有序序列,因此公平锁的实现成本较高,性能相对较低,默认情况下,锁是非公平的。下面的代码突出了公平锁的特点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class FairLock implements Runnable {
// 公平锁
public static ReentrantLock fairLock = new ReentrantLock(true);

@Override
public void run() {
for(;;) {
try {
fairLock.lock();
System.out.println(Thread.currentThread().getName() + " 获得锁");
} finally {
fairLock.unlock();
}
}
}

public static void main(String[] args) {
FairLock r1 = new FairLock();
Thread t1 = new Thread(r1, "Thread_t1");
Thread t2 = new Thread(r1, "Thread_t2");
t1.start(); t2.start();
}
}

公平锁的情况下,t1 和 t2 将轮流获得锁,而在非公平锁的情况下,一个线程会倾向于再次获取已经持有的锁

重入锁的实现,主要集中在 Java 层面,包含三个要素:

  1. 原子状态。原子状态使用 CAS 操作来存储当前锁的状态,判断锁是否已经被别的线程持有。
  2. 等待队列。所有没有请求到锁的线程,会进入等待队列进行等待,待有线程释放锁后,系统就能从等待队列中唤醒一个线程,继续工作。
  3. 阻塞原语 park() 和 unpark() ,用来挂起和恢复线程。没有得到锁的线程将会被挂起。

2. Condition 条件

我们知道 wait()notify() 是配合 synchronized 关键字使用的,而 Condition 是与重入锁相关联的。

主要有下面几个方法:

1
2
3
4
5
6
void await() throws InterruptedException; // 类似于 Object.wait(); 可以响应中断
void awaitUninterruptibly(); // 与 await(); 的区别在于不响应中断
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal(); // 类似于 Object.notify(); 方法

下面是一个小例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ReenterLockCondition implements Runnable {
// Condition

public static ReentrantLock lock = new ReentrantLock();
public static Condition conditon = lock.newCondition();

@Override
public void run() {
try {
lock.lock();
conditon.await(); // 释放了这把锁,进行等待
System.out.println("Thread is going on"); // 又获得了锁
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public static void main(String[] args) throws InterruptedException {
ReenterLockCondition tl = new ReenterLockCondition();
Thread t1 = new Thread(tl);
t1.start();
Thread.sleep(2000);
lock.lock();
conditon.signal(); // 从队列中唤醒一个线程
lock.unlock(); // 把锁谦让给唤醒的线程
}
}

这里的第27行,如果我们仅仅只是唤醒了一个线程,而不放弃这个锁,那么那个线程也是没有办法继续执行的

在 JDK 内部,也使用了重入锁和 Condition ,比如 ArrayBlockingQueue 。

3. 信号量

信号量是锁的扩展,无论是 synchronized 和 ReentrantLock ,一次只允许一个线程访问一个资源,而信号量可以指定多个线程,同时访问一个资源。

1
2
public Semaphore(int permits)
public Semaphore(int permits, boolean fair) //第二个参数可以指定是否公平

信号量的主要方法:

1
2
3
4
5
public void acquire() // 尝试获取一个准入许可,若无法获得,将等待,直到获取或中断
public void acquireUninterruptibly () // 不响应中断
ublic boolean tryAcquire () // 立即返回,不等待
public boolean tryAcquire(long timeout, TimeUnit unit)
public void release () // 释放一个许可,其他线程可以进行资源访问

小例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class SemapDemo implements Runnable {

final Semaphore semp = new Semaphore(5);

@Override
public void run() {
try {
semp.acquire(); // 获取一个锁
// 模拟耗时操作
Thread.sleep(2000);
System.out.println(Thread.currentThread().getId() + ": done!");
semp.release(); // 释放一个锁
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
ExecutorService exec = Executors.newFixedThreadPool(20); // 开启20个线程
final SemapDemo demo = new SemapDemo();
for (int i = 0; i < 20; ++i) {
exec.submit(demo);
}
}
}

上面的例子中,因为我们设置信号量为5,所以会分组完成。

4. 读写锁

ReadWriteLock 是 Jdk5 中提供的读写分离锁。假设有线程 A1、A2、A3 用来写,线程 B1、B2、B3 用来读,如果是重入锁或者内部锁那么理论上来讲:读之间、读写之间、写之间都是串行操作。这显然是不合理的。

读写锁允许多线程同时读,使得 B1、B2、B3 之间真正并行。

下面有个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class ReadWriteLockDemo {
// 读写锁
private static Lock lock = new ReentrantLock();
private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private static Lock readLock = readWriteLock.readLock();
private static Lock writeLock = readWriteLock.writeLock();
private int value;

public Object handleRead(Lock lock) throws InterruptedException {
try {
lock.lock(); // 模拟读操作
Thread.sleep(1000); // 读操作的耗时越多,读写锁的优势更加明显
return value;
} finally {
lock.unlock();
}
}

public void handleWrite(Lock lock, int index) throws InterruptedException {
try {
lock.lock(); // 模拟写操作
Thread.sleep(1000);
value = index;
} finally {
lock.unlock();
}
}

public static void main(String[] args) {
final ReadWriteLockDemo demo = new ReadWriteLockDemo();
Runnable readRunnable = new Runnable() {
@Override
public void run() {
try {
demo.handleRead(readLock); // 并行
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};

Runnable writeRunnable = new Runnable() {
@Override
public void run() {
try {
demo.handleWrite(writeLock, new Random().nextInt()); // 串行
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};

for (int i = 0; i < 18; ++i) {
new Thread(readRunnable).start();
}

for (int i = 18; i < 20; ++i) {
new Thread(writeRunnable).start();
}
}
}

如果使用读写锁,上面的代码只要2秒多,而重入锁需要进行20多秒。

5. 倒计时器

CountDownLatch 中,CountDown 在英文中为倒计时,Latch 为门闩。

把门锁起来,不让里面的线程跑出来,因此,这个工具通常用来控制线程等待。它让一个线程等待直到倒计时结束,再开始执行

比如火箭发射,火箭发射前,要等待所有的检查完毕,才能点火,执行。

下面就是一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class CountDownLoatchDemo implements Runnable {
// 倒计时器
static final CountDownLatch end = new CountDownLatch(10); // 倒计时器

static final CountDownLoatchDemo demo = new CountDownLoatchDemo();

@Override
public void run() {
try {
// 模拟检查任务
Thread.sleep(new Random().nextInt(10) * 1000);
System.out.println("check complete");
end.countDown(); // 一个线程完成任务,倒计时器减一
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newFixedThreadPool(10); // 线程池
for (int i = 0; i < 10; ++i) {
exec.submit(demo);
}
// 等待检查
end.await(); // 等到10个线程完成方可继续
// 发射火箭
System.out.println("File!");
exec.shutdown();
}
}

上述逻辑如下:

6. 循环栅栏

CyclicBarrier 是另外一个多线程并发控制工具,和 CountDownLatch 类似,它实现线程间的计数等待,但是它的功能更加复杂且强大。

前面的 Cyclic 意为循环,也就是说这个计数器可以反复使用,假如我们将计数器设置为10,那么凑齐第一批10个线程后,计数器就会归零,然后接着凑齐下一批10个线程,这就是循环栅栏内在的含义

比 CountDownLatch 略微强大的地方,CyclicBarrier 可以接收一个参数作为 barrierAction 。所谓的 barrierAction 就是当计数器完成一次计数后,系统会执行的动作。

下面第一个参数是等待数,第二个参数是执行的任务。

1
public CyclicBarrier(int parties, Runnable barrierAction);

比如下面的司令命令士兵的场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class CyclicBarrierDemo {
// 循环栅栏
public static class Soldier implements Runnable {

private String soldier;
private final CyclicBarrier cyclic;

public Soldier(CyclicBarrier cyclic, String soldier) {
this.soldier = soldier;
this.cyclic = cyclic;
}

@Override
public void run() {
try {
// 等待所有士兵到齐
cyclic.await();
doWork(); // 执行任务
// 等待所有士兵任务完成
cyclic.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}

void doWork() {
try {
Thread.sleep(Math.abs(new Random().nextInt() % 10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(soldier + ":任务完成");
}

public static class BarrierRun implements Runnable {

boolean flag;
int N;

public BarrierRun(boolean flag, int n) {
this.flag = flag;
N = n;
}

@Override
public void run() {
if (flag) {
System.out.println("司令:[士兵" + N + "个,任务完成!");
} else {
System.out.println("司令:[士兵" + N + "个,集合完成!");
flag = true;
}
}
}

public static void main(String[] args) {
final int N = 10;
Thread[] allSoldier = new Thread[N];
boolean flag = false;
CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N));
// 设置屏障点,主要是为了执行这个方法
System.out.println("集合队伍!");
for (int i = 0; i < N; ++i) {
System.out.println("士兵" + i + "报道!");
allSoldier[i] = new Thread(new Soldier(cyclic, "士兵" + i));
allSoldier[i].start();
}
}
}
}

使用 CyclicBarrier.await() 以后每一个士兵线程集合完毕,之后再次使用该方法,将进行下一次计数,这次是监控士兵是否已经完成了任务。

这里可能有一个异常是 BrokenBarrierException ,一旦遇到这个异常,则表示当前的 CyclicBarrier 已经破损了,可能系统已经出现了问题没有办法达到线程到齐。

如果我们插入:

1
2
3
if (i == 5) {
allSoldier[0].interrupt();
}

将产生1个 InterruptedException 和9个 BrokenBarrierException 。

7. LockSupport

LockSupport 是一个非常方便实用的线程阻塞工具,它可以在线程内任意位置让线程阻塞。Thread.suspend() 相比,它弥补了由于 resume() 在前发生,导致线程无法继续执行。它不会像 Object.wait() 相比,它不需要先获取对象的锁,也不会抛出 InterruptedException 异常。

LockSupport 的静态方法 park() 可以阻塞当前线程,类似的还有 parkNanos()parkUntil()
方法。它们实现了一个限时的等待。

我们重写关于 suspend() 永久挂起的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class LockSupportDemo {
// LockSupport
public static Object u = new Object();
static ChangeObjectThread t1 = new ChangeObjectThread("t1");
static ChangeObjectThread t2 = new ChangeObjectThread("t2");

public static class ChangeObjectThread extends Thread {
public ChangeObjectThread(String name) {
super.setName(name);
}

@Override
public void run() {
synchronized (u) {
System.out.println("in " + getName());
LockSupport.park();
}
}
}

public static void main(String[] args) throws InterruptedException {
t1.start();
Thread.sleep(100);
t2.start();
LockSupport.unpark(t1); LockSupport.unpark(t2);
t1.join(); t2.join();
}
}

我们依然不能保证 unpark()park() 后执行,但是可以正常结束。只有合适因为 LockSupport 内部使用了类似信号量的机制,每为一个线程准备一个许可,如果许可可用,那么 park() 会立即返回,并消费这个许可。不可用时,会阻塞。而 unpark() 会让一个许可可用,与信号量不同的是,许可不可能超过一个,永远只有一个!

即使 unpark()park() 前执行,它也可以在下一次 park() 的时候返回。

虽然 LockSupport 不会抛出 InterruptedException 异常,但是我们可以通过 Thread.interrupted() 等方法获取中断标志。

二、ThreadPool

最简单的线程创建和回收:

1
2
3
4
5
6
new Thread(new Runnable() {
@Override
public void run() {
// do sth
}
}).start();

上面的代码在 run() 之后自动回收该线程。对线程的使用必须掌握一个度,在有限的范围内,增加线程的数量可以明显提高系统的吞吐量

1. 什么是线程池

避免系统频繁创建和销毁线程,让创建的线程进行复用。好比数据库连接池。创建线程变成了从线程池获得空闲线程,关闭线程变成了向池子归还线程

2. JDK 对线程池的支持

为了更好的控制多线程,JDK 有一套 Executor 框架,帮助开发人员有效进行线程控制,其本质就是一个线程池。

这些成员都在 java.util.concurrent 包中,是并发包的核心类。其中 ThreadPoolExecutor 表示一个线程池,Executors 类则扮演线程池工厂的角色,通过它获得一个拥有特定功能的线程池,ThreadPoolExecutor 类实现了 Executor 接口,因此通过它,任何 Runnable 对象都可以被 ThreadPoolExecutor 线程池调度。

Executor 框架提供了各种类型的线程池:

1
2
3
4
5
public static Executor Service newFixedThreadPool(int nThreads) // 返回固定线程数量的线程池
public static ExecutorService newSingleThreadExecutor() // 返回只有一个线程的线程池
public static ExecutorService newCachedThreadpool() // 返回一个根据实际情况调整线程数量的线程池
public static ScheduledExecutorService newSingleThreadscheduledExecutor() // ScheduledExecutorService 接口在 ExecutorService 接口上扩展了给定时间执行某任务
public static ScheduledExecutorService newScheduledThreadPool(int corePoolsize) // 和上面的区别在于线程池数量可以指定

下面是一个简单的例子,创建一个线程池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ThreadPoolDemo {
// 固定大小的线程池
public static class MyTask implements Runnable {

@Override
public void run() {
System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
MyTask task = new MyTask();
ExecutorService es = Executors.newCachedThreadPool();
for (int i = 0; i < 10; ++i) {
es.submit(task); // 提交任务
}
}
}

之前我们一直有提到计划任务,它返回一个 ScheduledExecutorService 对象,它可以根据时间对线程进行调度,具体方法如下:

1
2
3
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); // 在给定的时间,对任务进行一次调度
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay (Runnable command, long initialDelay, long delay, TimeUnit unit);

要知道一点,ScheduledExecutorService 并不一定会立即安排执行任务,它其实是一个计划任务的作用,会在指定时间,进行调度

关于 scheduleAtFixedTime()scheduleWithFixedDelay() 的区别如下:

对于 rate 而言,任务的频率是一定的,上一次任务开始到下一次任务开始,经过了一个周期时间;对于 delay 而言,上一次任务结束,经过 delay 时间后,再开始下一次的任务

下面是一个关于 scheduleAtFixedRate() 方法的调用例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class ScheduledExecutorServiceDemo {
// scheduleAtFixedRate 方法的例子
public static void main(String[] args) {
ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
// 如果前面的任务没有完成,则调度不会启动
ses.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(System.currentTimeMillis() / 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 0, 2, TimeUnit.SECONDS);
}
}

任务周期是 2S ,任务的实际执行是 1S ,两秒调用一次。

如果任务执行的时间超过周期,那么周期被拉长到任务执行的长度

任务如果中断,那么不会永久执行!要好好做异常处理。

3. 核心线程池的实现

那么对于核心的几个线程池,使用了 newFixedThreadPool() 方法,newSingleThreadExecutor() 方法还有 newCachedThreadPool() 方法,虽然看起来创建的线程有着完全不同的特点,但是内部均使用 ThreadPoolExecutor 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads
OL, TimeUnit.MILLISECONDS
new LinkedBlockingQueue <Runnable>());
}

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutor Service
(new ThreadPoolExecutor(1, 1,
OL, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAXVALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

我们来看看 ThreadPoolExecutor 类的封装吧:

1
2
3
4
5
6
7
public ThreadPoolExecutor (int corePoolsize, // 线程池中线程数量
int maximumPoolSize, // 线程池中最大的线程数量
long keepAliveTime, // 当超过 corePoolSize 时,多余线程的存活时间
TimeUnit unit, // keepAliveTime 的单位
BlockingQueue<Runnable> workQueue, // 被提交但仍未执行的任务
ThreadFactory threadFactory, // 线程工厂,用于创建线程
RejectedExecutionHandler handler // 拒绝策略)

其中值得注意的 workQueue ,是一个 BlockingQueue 接口的对象,仅仅用于存放 Runnable 对象,在 ThreadPoolExecutor 的构造函数中有以下几种 BlockingQueue :

  • 直接提交队列:该功能由 SynchronousQueue 对象提供,这是一个特殊的 BlockingQueue 。它没有容量,每一个插入操作都要等待一个相应的删除操作,反之,每一个删除都要等待一个插入操作。

任务提交到 SynchronousQueue 不会被真实保存,而是将新任务提交给线程执行,如果没有空闲进程,则尝试创建新的进程,如果进行数量已经最大,则执行拒绝策略。因此使用 SynchronousQueue 队列,通常要设置 maximumPoolSize 值。

  • 有界任务队列:通过 ArrayBlockingQueue 实现,有新任务时,如果任务数量小于 corePoolSize ,则优先创建新的线程,若大于 corePoolSize ,则将任务加入等待队列,若队列已满,则在总数不大于 maximumPoolSize 的情况下,创建新的线程执行任务,若大于 maximumPoolSize ,则执行拒绝策略。

有界队列在队列装满时,才可能将线程数提升到 corePoolSize 以上,除非系统繁忙,否则将确保线程数维持在 corePoolSize 。

  • 无界任务队列:无界任务队列通过 LinkedBlockingQueue 类实现,当有新的任务到来,系统线程数小于 corePoolSize 时,会生成新的线程执行任务,但达到 corePoolSize 后,不会继续增加了。若还有任务,而没有空闲线程,则进入队列等待,直到耗尽系统内存。
  • 优先任务队列:通过 PriorityBlockingQueue 实现,它是特殊的无界队列,之前的有界和无界队列都是按照先进先出算法处理任务的,而 PriorityBlockingQueue 则可以通过任务自身的优先级顺序先后执行(确保高优先级任务先执行)。

使用 newFixedThreadPool() 方法将返回一个 corePoolSize 和 maximumPoolSize 大小一样的使用无界任务队列的线程池。使用的是 LinkedBlockingQueue 任务队列。使用这个任务队列存放无法立即执行的任务。

使用 newCachedThreadPool() 方法将返回 corePoolSize 为 0 ,maximumPoolSize 无穷大的线程池,没有任务时,线程池中没有现成,当任务被提交,线程池将使用空闲的线程执行任务,若无空闲线程,则将任务加入 SynchronounsQueue 队列,直接提交队列,它迫使线程池增加新的线程执行任务,当任务完毕,由于 corePoolSize 为 0 ,因此空闲线程会在指定时间内被回收。

下面是 ThreadPoolExecutor 线程池的核心调度逻辑:

4. 拒绝策略

JDK 内置如下拒绝策略:

  • AbortPolicy 策略:该策略会直接抛出异常,阻止系统正常工作。
  • CallerRunsPolicy 策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。
  • DiscardOldestPolicy 策略:该策略将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。
  • DiscardPolicy 策略:该策略默默地丢弃无法处理的任务,不予仼何处理。如果允许任务丢失,我觉得这可能是最好的一种方案了吧!

这些策略都实现了 RejectedExecutionHandler 接口,我们可以自己扩展这个接口,该接口定义如下:

1
2
3
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor); // r 是请求执行的任务,executor 是当前的线程池
}

下面的代码执行需要 100 毫秒,因此有大量任务被直接丢弃:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class RejectThreadPoolDemo {
public static class MyTask implements Runnable {

@Override
public void run() {
System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) throws InterruptedException {
MyTask task = new MyTask();
ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(10), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString() + " is discard");
}
});
for (int i = 0; i < Integer.MAX_VALUE; ++i) {
es.submit(task);
Thread.sleep(10);
}
}
}

5. 自定义线程创建

我们开始思考一个问题:线程池里的线程是哪里来的?最开始的时候,线程通过 ThreadFactory 创建。ThreadFactory 是一个接口,它只有一个方法,用来创建线程:

1
Thread newThread(Runnable r);

下面的案例使用自定义的 ThreadFactory ,一方面记录了线程的创建,另一方面将所有线程设置为守护线程,这样主线程退出的时候,会强制销毁线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class FactoryDemo {
// 自定义线程创建

public static class MyTask implements Runnable {

@Override
public void run() {
System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) throws InterruptedException {
MyTask task = new MyTask();
ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
System.out.println("create " + t);
return t;
}
});
for (int i = 0; i < 5; ++i) {
es.submit(task);
}
Thread.sleep(2000);
}
}

6. 扩展线程池

虽然线程池已经有了,但是我们需要对它扩展,比如任务开始、结束的时间,或者其他一些功能。ThreadPoolExecutor 是一个可以扩展的线程池,有 beforeExecute()afterExecute()terminated() 三个接口对线程池进行控制。

ThreadPoolExecutor.Worker.runTask() 方法内部有这样的实现:

1
2
3
4
5
6
7
8
9
10
11
boolean ran = false;
beforeExecute(thread, task); // 运行前
try {
task.run(); // 运行任务
ran = true;
afterExecute(task, null); // 任务执行后
++completedTasks;
} catch (RuntimeException ex) {
if (!ran) afterExecute(task, ex); // 运行结束
throw ex;
}

这里的 Worker 是 ThreadPoolExecutor 的内部类,实现了 Runnable 接口。ThreadPoolExecutor 线程池中的线程正是 Worker 实例。Worker.runTask() 方法会被线程池以多线程模式异步调用,就是说这个方法会同时被多个线程访问,因此这里的 beforeExecute()afterExecute() 接口也将同时被多线程访问。

默认下,这两个方法是空的,我们可以对其扩展,对线程池运行状态跟踪,输出有用的调试信息,下面是一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class ExtThreadPool {
// 扩展线程池
public static class MyTask implements Runnable {

public String name;

public MyTask(String name) {
this.name = name;
}

@Override
public void run() {
System.out.println("正在执行:Thread ID:" + Thread.currentThread().getId() + ", Task name = " + name);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) throws InterruptedException {
ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>()) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("准备执行:" + ((MyTask) r).name);
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("执行完成:" + ((MyTask) r).name);
}

@Override
protected void terminated() {
System.out.println("线程池退出");
}
};
for (int i = 0; i < 5; ++i) {
MyTask task = new MyTask("Task-" + i);
es.execute(task);
Thread.sleep(10);
}
es.shutdown();
}
}

有意思的是这里使用的是 execute() 而不是 submit()

  • execute(Runnable x) 没有返回值。可以执行任务,但无法判断任务是否成功完成。(异步执行)
  • submit(Runnable task) 返回一个 Future 。通过 get 等待直到获取到检索结果为止。(同步执行)

7. 优化线程池线程的数量

在 《Java Concurrency in Practice》 选中有一个估算线程池带下的公式:

最优的池的大小:

在 Java 中,我么可以通过如下方法获取可用 CPU 数量:

1
Runtime.getRuntime().availableProcessors()

8. 在线程池中寻找堆栈

计算两个数的商:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class DivTask implements Runnable {

int a, b;

public DivTask(int a, int b) {
this.a = a;
this.b = b;
}

@Override
public void run() {
double re = a / b;
System.out.println(re);
}

public static void main(String[] args) {
ThreadPoolExecutor pools = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
for (int i = 0; i < 5; ++i) {
pools.submit(new DivTask(100, i));
}
}
}

上面的代码只有4个输出,当除数 i 为0,结果没有计算,一个简单的方法是放弃 submit() ,使用 execute()

这里我们可以得到一些错误信息:

1
2
3
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
at threadPool.DivTask.run(DivTask.java:19)
...

我们还想要一个重要信息,就是任务在哪里提交的?与其以后加班,不如扩展 ThreadPoolExecutor 线程池,让它在调度任务之前,先保存一下提交任务线程的堆栈信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class TraceThreadPoolExecutor extends ThreadPoolExecutor {

public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

@Override
public void execute(Runnable command) {
super.execute(wrap(command, clientTrace(), Thread.currentThread().getName()));
}

@Override
public Future<?> submit(Runnable task) {
return super.submit(wrap(task, clientTrace(), Thread.currentThread().getName()));
}

private Exception clientTrace() {
return new Exception("Client stack trace");
}

private Runnable wrap(final Runnable task, final Exception clientStack, String clientThreadName) {
return new Runnable() {
@Override
public void run() {
try {
task.run();
} catch (Exception e) {
clientStack.printStackTrace(); // 这里的参数含义是保存着提交任务的线程的堆栈信息
throw e;
}
}
};
}

public static void main(String[] args) {
ThreadPoolExecutor pools = new TraceThreadPoolExecutor(0, Integer.MAX_VALUE, 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
// 错误堆栈中可以看到哪里提交的任务
for (int i = 0; i < 5; ++i) {
pools.execute(new DivTask(100, i));
}
}
}

现在代码报错还将显示以下信息:

1
2
3
4
java.lang.Exception: Client stack trace
at threadPool.TraceThreadPoolExecutor.clientTrace(TraceThreadPoolExecutor.java:22)
at threadPool.TraceThreadPoolExecutor.execute(TraceThreadPoolExecutor.java:13)
at threadPool.TraceThreadPoolExecutor.main(TraceThreadPoolExecutor.java:43)

我们现在找到了任务提交的地方。

9. Fork/Join 框架

分而治之是非常有效处理大量数据的方法,著名的 MapReduce 也是采用了分而治之的思想,在 Linux 平台,函数 fork() 用来创建子进程,使得系统进程可以多一个执行分支,在 Java 中类似,而 join() 的意思是等待,使用了 fork() 以后多了一个执行分支,所以需要等待。

在 JDK 中,有一个 ForkJoinPool 线程池,对于 fork() 方法并不急着开启的,而是交给 ForkJoinPool 线程池进行处理。

大多数情况下一个物理线程需要处理多个逻辑任务,每个线程需要一个任务队列,可能存在:线程 A 已经将任务完成,而 B 还有任务要处理,线程 A 将帮助线程 B ,达到平衡,这是一种互助,如下图所示。

当线程帮助别人的时候,总是从底部开始拿数据,这样避免了数据的竞争。

下面是一个 ForkJoinPool 的重要接口:

1
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)

我们可以向 ForkJoinPool 线程池提交一个 ForkJoinTask 任务,这样的任务就是支持 fork()join() 的任务,而它有两个重要的子类,RecursiveAction 和 RecursiveTask ,他们分别表示没有返回值的任务和可以携带返回值的任务。

下面是一个小例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class CountTask extends RecursiveTask<Long> {
// fork/join demo
private static final int THRESHOLD = 10000;
private long start;
private long end;

public CountTask(long start, long end) {
this.start = start;
this.end = end;
}

@Override
protected Long compute() {
long sum = 0;
boolean canCompute = (end - start) < THRESHOLD;
if (canCompute) {
for (long i = start; i <= end; ++i) {
sum += i;
}
} else {
// 分任务咯,100个
long step = (start + end) / 100;
ArrayList<CountTask> subTasks = new ArrayList<>();
long pos = start;
for (int i = 0; i < 100; ++i) {
long lastOne = pos + step;
if (lastOne > end) lastOne = end;
CountTask subTask = new CountTask(pos, lastOne);
pos += step + 1;
subTasks.add(subTask);
subTask.fork();
}
for (CountTask t : subTasks) {
sum += t.join();
}
}
return sum;
}

public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
CountTask task = new CountTask(0, 200000L);
ForkJoinTask<Long> result = forkJoinPool.submit(task);
try {
long res = result.get();
System.out.println("sum + " + res);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}

ForkJoin 线程池使用一个无锁的栈来管理空闲的线程,如果工作线程暂时取不到可用的任务,则可能挂起,挂起线程被压入由线程池维护的栈中,待将来有任务时,再唤醒这些线程。

三、JDK 的并发容器

1. 并发集合

JDK 提供的这些容器大部分在 java.util.concurrent 包中,这些容器主要如下:

  • ConcurrentHashMap:这是一个高效的并发 HashMap 。你可以理解为一个线程安全的 HashMap 。
  • CopyOnWriteArrayList:这是一个 List ,从名字看就是和 ArrayList 是一族的。在读多写少的场合,这个 List 的性能非常好,远远好于 Vector 。
  • ConcurrentLinkedQueue:高效的并发队列,使用链表实现。可以看做一个线程安全的 Linkedlist 。
  • BlockingQueue:这是一个接口,JDK 内部通过链表、数组等方式实现了这个接口。表示阻塞队列,非常适合用于作为数据共享的通道。
  • ConcurrentSkipListMap:跳表的实现。这是一个 Map ,使用跳表的数据结构进行快速查找。

此外,Vector 也是线程安全,但是性能和上述专用工具没得比,另外 Collections 工具类可以帮助我们将任意集合包装成线程安全的集合。

2. 线程安全的 HashMap

做到线程安全的 HashMap ,一种可行的是 Colletions.synchronizedMap() 方法包装我们的 HashMap ,下面的 HashMap 就是线程安全的:

1
public static Map m = Collections.synchronizedMap(new HashMap());

Collections.synchronizedMap() 会生成一个名为 SynchronizedMap 的 Map 。它使用了委托,将自己所有 Map 相关的功能交给传入的 HashMap 实现,而自己则负责保证线程安全。

首先,我们用 SynchronizedMap 包装一个 Map :

1
2
3
4
5
private static class SynchronizedMap<K, V> implements Map<K, V> Serializable {
private static final long serialVersionUID = 197...L;
private final Map<K, V> m; // Backing Map
final Object mutex; // Object on which to synchronize
}

这里我们通过 mutex 实现对这个 m 的互斥操作。比如,对于 Map.get() 方法,实现如下:

1
2
3
4
5
public V get(Object key) {
synchronized (mutex) {
return m.get(key)
}
}

其他相关的 Map 操作都会使用这个 mutex 进行同步,从而实现线程安全。

如果并发级别不高,一般够用,但是在高并发下我们需要新的解决方案,一个更加专业的并发 HashMap 是 ConcurrentHashMap ,它专门为并发进行性能优化。

3. 有关 List 的线程安全

在 Java 中 ArrayList 和 Vector 都是使用数组作为实现,前者非线程安全,后者为线程安全,我们也可以使用 Collections 包来完成 ArrayList 的线程安全:

1
public static List<String> list = Collections.synchronizedList(new LinkedList<String>());

4. 高效读写队列 ConcurrentLinkedQueue

ConcurrentLinkedQueue 应该算在高并发环境中性能最好的队列。

首先在 ConcurrentLinkedQueue 中,定义了节点 Node :

1
2
3
4
private static class Node<E> {
volatile E item;
volatile Node<E> next;
}

使用 CAS 操作对 Node 进行一些维护:

1
2
3
boolean casItem(E cmp, E val) { // 第一个参数是期望值,第二个参数是设置目标值
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}

它内部有两个字段,head 和 tail ,分别代表链表头和链表尾,我们可以通过 head 以及 succ() 方法来遍历,而 tail 表示队列的末尾。

一般,我们期望 tail 总是末尾,但是实际上更新不是及时的,存在拖延现象,如下所示:

tail 的更新会延后,每次跳跃两个元素。

我们看一下加入节点的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);

for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}

初始情况下,由于队列为空,所以 p.next 为 null ,然后将 p 的 next 节点赋值 newNode ,也就是将新的元素加入到队列中,此时 p == t 是成立的,因此不会更新结尾 tail ,因此增加一个元素后,tail 并不会更新。

当我们试图增加第二个元素时,由于 t 在 head 的位置,因此 p.next 指向第一个元素,现在 q != null ,q 不是最后的节点。于是代码开始寻找最后一个节点。此时, p 指向第一个元素,而它的 next 是 null ,p 更新自己的 next ,此时 p != t ,于是更新 t 所在的位置,到链表的最后。

那么什么时候会出现 p == q 呢?这是因为遇到了哨兵节点,就是 next 指向自己的节点。主要表示要删除的节点和空节点。出现了的话,则使用新的 tail 作为结尾,避免重新查找 tail 的开销。

在单线程下,这就话不容易理解:

1
p = (t != (t = tail)) ? t : head;

!= 不是原子操作,首先是 t 的值,然后执行 t = tail ,取得新的 t 的值,然后比较是否相等,单线程不会出现 t != t ,但是在并发下,右边可能修改了,如果修改了,就现在进行修改,如果没有修改,重新查找结尾。

哨兵是怎么产生的呢?

1
2
3
ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<String>();
q.add("1");
q.poll();

下面是 poll() 方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;

if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}

由于现在队列中只有一个元素,所以 tail 没有更新,而是指向和 head 相同的位置,此时 head 本身的 item 是空的,next 是第一个元素,首先我们代码到 p = q 的位置,p 现在指向了第一个元素,第二次循环,p 的 item 被设置成了 null ,也就是弹出元素,被删除,此时 p 和 h 当然不等了,故执行了 updateHead() 方法:

1
2
3
4
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}

将 p 设置为链表的表头,原来的 head 被设置成了哨兵,通过 lazySetNext() 实现。现在一个哨兵节点被创建了,原来的 head 和 tail 实际上就是一个元素,如果再次执行 offer() 将会遇到这个 tail 。

原来头节点没用了,next指向本身,变成哨兵节点。

5. 不变模式下的 CopyOnWriteArrayList

有些场景下,读远远大于写,我们希望读尽可能快,写慢一点点没关系。

我们应该允许多个线程访问,但是写应该受到阻碍。所谓 CopyOnWrite 就是在写的时候,进行一次自我复制,如果需要修改,我们并不修改原有的内容,写完之后再替换,不影响读操作。

1
2
3
4
5
6
7
8
9
10
private volatile transient Object[] array;
public E get(int index) {
return get(getArray(), index);
}
final Object[] getArray() {
return array;
}
private E get(Object[] a, int index) {
return (E) a[index];
}

和读取相比,写入就比较麻烦了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}

这里为什么需要使用 final ReentrantLock lock = this.lock; 这句话,一部分重要一部分不重要。

重要:

  • this.lock 是类的成员变量,一般都是存到堆上,访问堆上的变量会涉及内存同步的操作(这个建议通过编译后的 bytecode 进行观察),而将其 copy 到栈上,然后访问就不存在这个问题了。
  • 在访问堆上的 this.lock 时,对于多个 CPU ,可能会存在 cache 命中的问题,这样必然会导致内存重新 load ,而 copy 到栈上,则直接是线程相关的,就不存在这个问题了。

不重要:

  • 这个 final 对于编译而言是可有可无的,即便将其final修饰符去掉,效果也一样。
  • Jdk1.6 及以前是没有这句话的,对结果是没有影响的。

首先使用锁,生成一个新的数组,然后将元素加入,接着新数组替换老数组,不影响读,修改完成,读就察觉了,因为 array 是 volatile 类型的。

6. 数据共享通道

我们说过 ConcurrentLinkedQueue 可以作为高性能的队列,但多线程的开发还有一个问题,如何在多个线程之间进行数据共享呢?

一般来说,我们总是希望系统是松耦合的。

如果我们进行重构或者升级,可以不修改线程 A ,而直接把线程 B 升级成线程 C ,保证系统的平滑过渡,其中升级的意见箱使用 BlockingQueue 来实现。

与之前的 ConcurrentLinkedQueue 和 CopyOnWriteArrayList 不同,BlockingQueue 是一个接口,而不是一个实现,主要实现有下面的内容:

其中 ArrayBlockingQueue 是基于数组实现的,而 LinkedBlockingQueue 基于链表。正因为如此 ArrayBlockingQueue 更适合做有界队列,因为队列中可以容纳的最大元素在创建的时候指定;而 LinkedBlockingQueue 适合做无界队列,内部元素可以动态增加。

BlockingQueue 之所以适合作为数据共享的通道,关键在于 Blocking 上,它有阻塞的意思,当服务线程处理完队列中信息后,如何知道下一条信息何时到来?

它会让服务线程在队列为空的时候,进行等待,当有信息进入队列后,自动将线程唤醒。

ArrayBlockingQueue 内部有一个对象数组:

1
final Object[] items;

向队列中压入元素使用 offer() 方法和 put() 方法,对于 offer ,如果队列满了,返回 false ,如果没有满,则正常入队列。对于 put ,是压入队列的末尾,如果队列满了,会一直等待,直到队列中有空闲的位置。

从队列中弹出元素可以使用 poll() 方法和 take() 方法,他们都是从头部获取一个元素,不同在于,队列如果为空 poll 方法会直接返回 null ,而 take 会等待,直到队列内有元素。

为了做好等待和通知两件事,在 ArrayBlockingQueue 内部定义了以下一些字段:

1
2
3
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;

当执行 take() 操作时,如果队列为空,则让当前线程等待在 notEmpty 上,新元素加入,则进行一次 notEmpty 上的通知。

下面是 take 的过程:

1
2
3
4
5
6
7
8
9
10
11
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}

代码会循环进行等待,当有新元素进入,线程会得到通知,下面是加入元素的代码。

1
2
3
4
5
6
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}

当新元素进入队列,需要通知在 notEmpty 上的线程,让他们继续工作。

同理,对于 put 操作,当队列满的时候,需要让压入线程等待:

1
2
3
4
5
6
7
8
9
10
11
12
public void put(E e) throws InterruptedException {
checkNotNull(e);
final Reentrantlock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}

当有元素挪走,出现了空位,自然也需要通知入队的线程:

1
2
3
4
5
6
7
8
9
private E extract() {
final Object[] items = this.items;
E x = items[takeIndex];
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
notFull.signal();
return x;
}

在生产者消费者中还会有 BlockingQueue 的身影。

7. 跳表

跳表有点类似平衡树,平衡树的插入和删除往往可能导致平衡树进行一次全局的调整。而跳表的插入和删除只需要对整个数据结构的局部进行操作即可。在高并发下,我们需要一个全局锁来保证整个平衡树的线程安全;对于跳表,我们只需要部分锁即可。

查询而言,跳表的复杂度也是 O(log n) ,JDK 使用跳表来实现一个 Map 。

跳表的另外一个特点是随机算法,它的本质是维护了多个链表,而且链表是分层的:

每上面一层链表都是下面的子集,一个元素插入哪些层是完全随机的。查找时,我们从顶层链表开始找,一旦发现查找元素大于当前链表的取值,跳到下一层继续。

到第二层,我们找到 8 ,但是 8 比 7 大,不行,继续进入下一层,这样查找,比从元素 1 开始逐个查找要快得多。

和哈希算法不同在于:哈希不会保存元素的顺序,而跳表的元素是排序的。

实现这一数据结构的类是 ConcurrentSkipListMap 。下面是一个跳表的简单使用:

1
2
3
4
5
6
7
Map<Integer, Integer> map=new ConcurrentskipListMap<Integer, Integer>();
for(int i=0;i<30;i++){
map. put(, i);
}
for(Map.Entry<Integer, Integer> entry:map.entrySet()) {
System.out.println(entry.getKey());
}

和 HashMap 不同,跳表的输出是有序的。

跳表的内部实现有几个关键的数据结构组成。首先是 Node ,一个 Node 就是一个节点,里面含有两个重要的元素 key 和 value 。每个 Node 还会指向下一个 Node ,因此还有一个元素 next 。

1
2
3
4
5
static final class Node<K,V> {
final K key;
volatile Object value;
volatile Node<K,V> next;
}

对 Node 的所有操作,使用的是 CAS 方法:

1
2
3
4
5
6
7
boolean casValue(Object cmp, Object val) { // 设置 value
return UNSAFE.compareAndSwapObject(this, valueOffset, cmp, val);
}

boolean casNext(Node<K,V> cmp, Node<K,V> val) { // 设置 next
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}

另外一个数据结构是 Index ,表示索引,内部包装了 Node ,同时增加了向下的引用和向右的引用。

1
2
3
4
5
static class Index<K,V> {
final Node<K,V> node;
final Index<K,V> down;
volatile Index<K,V> right;
}

整个跳表就是根据 Index 进行全网的组织的。

此外,对于每一层的表头,还需要记录当前在哪一层,为此,还需要一个称为 HeadIndex 的数据结构,表示链表头部的第一个 Index ,它集成自 Index 。

1
2
3
4
5
6
7
static final class HeadIndex<K, v> extends Index<K, v> {
final int level;
HeadIndex(Node<K, V> node, Index<K, V> down, Index<K, v> right, int level) {
super(node, down, right);
this.level = level;
}
}

对于跳表的所有操作,就是组织好这些 Index 之间的连接关系。