J2SE 5.0新特性 之 线程
作者:汪泓(hongwang_001@163.com)
1.1. 进程、线程与线程池
所谓进程是一种在自身定址空间中执行的相对独立的程序,是现代操作系统的基石。现在的多任务操作系统,会周期性地将CPU的时间划分给每一个进程,使操作系统得以同时执行一个以上的程序。
线程则是进程中的一个“单一连续控制的流程”,一个进程中可以拥有多个并行的线程。但线程不能单独存在,它依附于进程,只能从进程中派生而来。如果一个进程派生出了两个线程,那这两个线程共享此进程的全局变量和代码段,但每个线程各拥有各自的堆栈,因此它们拥有各自的局部变量。
在了解了线程的概念后,下面我们就可以进入正题,现在先来看一下线程池究竟是怎么一回事?其实线程池的原理很简单,类似于操作系统中的缓冲区的概念,它的处理流程如下:先启动若干数量的线程,并让这些线程都处于睡眠状态,当客户端有一个新请求时,就会唤醒线程池中的某一个睡眠的线程,让它来处理客户端的这个请求,当处理完这个请求后,线程又恢复到睡眠状态。这种方法的引入,会减少频繁创建与销毁线程所带来的系统负担,从而留出更多的CPU时间和内存来处理实际的应用逻辑。
1.2. Java的线程概述
在 Java 编程的早期阶段,位于 Oswego 市的纽约州立大学(SUNY) 的一位教授Doug Lea决定创建一个简单的库,以帮助开发人员构建可以更好地处理多线程情况的应用程序。这并不是说用现有的库就不能实现,但是就像有了标准网络库一样,用经过调试的、可信任的库更容易自己处理多线程。在 Addision-Wesley 的一本相关书籍《Concurrent Programming in Java: Design Principles and Patterns》的帮助下,这个库变得越来越流行了。最终,作者 Doug Lea 决定设法让它成为 Java 平台的标准部分 —— JSR-166。这个库最后变成了 Tiger 版本的 java.util.concurrent 包。以下我们将针对J2SE(TM)5.0中引入的关于线程方面的新内容进行详细的介绍。
1.3. Collection部分的扩容
1.3.1. Queue 接口
java.util 包为Collection提供了一个新的基本接口:java.util.Queue。虽然肯定可以在相对应的两端进行添加和删除而将 java.util.List 作为队列对待,但是这个新的 Queue 接口提供了支持添加、删除和检查集合的更多方法,如下所示:
public boolean offer(Object element)
public Object remove()
public Object poll()
public Object element()
public Object peek()
对于队列中大小限制,比如想在一个满的队列中加入一个新项,这时新的 offer 方法就可以起到相应的作用了。它不是对调用 add() 方法抛出一个 unchecked 异常,而只是得到由 offer() 返回的 false。remove() 和 poll() 方法都是从队列中删除第一个元素(head)。remove() 的行为与原有的 Collection 接口相似,但是新的 poll() 在用空集合调用时不是抛出异常,只是返回 null。因此新的方法更适合更容易出现在有其他异常条件的情况之中。后两个方法 element() 和 peek() 用于在队列的头部查询元素。与 remove() 方法类似,在队列为空时,element() 抛出一个异常,而 peek() 返回 null。
J2SE(TM)5.0 中,Queue有两种实现方式:通过实现新增的 BlockingQueue 接口以及直接实现Queue接口。下面是用LinkedList作为Queue 使用的一种方法
1.3.1.1. Queue 的实现
Queue queue = new LinkedList();
queue.offer("1");
queue.offer("2");
queue.offer("3");
queue.offer("4");
System.out.println("Head of queue is: " + queue.poll());
再复杂一点的是新的java.util.AbstractQueue 类。这个类的工作方式类似于 java.util.AbstractList 和 java.util.AbstractSet 类。在创建自定义集合时,不用自己实现整个接口,只是继承抽象实现并填入细节。使用 AbstractQueue 时,必须为方法 offer()、 poll() 和 peek() 提供实现。像 add() 和 addAll() 这样的方法修改为使用 offer(),而 clear() 和 remove() 使用 poll()。 最后,element() 使用 peek()。当然可以在子类中提供这些方法的优化实现,但是不是必须这么做。而且,不必创建自己的子类,可以使用几个内置的(什么)实现, 其中两个是不阻塞队列: PriorityQueue 和 ConcurrentLinkedQueue。
新的 java.util.concurrent 包在 Collection Framework 中可用的具体集合类中加入了 BlockingQueue 接口和五个阻塞队列类。BlockingQueue 接口的 Javadoc 给出了阻塞队列的基本用法,如下图所示。生产者中的 put() 操作会在没有空间可用时阻塞,而消费者的 take() 操作会在队列中没有任何东西时阻塞。
1.3.1.2. BlockingQueue的使用
class Producer implements Runnable {
private final BlockingQueue queue;
Producer(BlockingQueue q) { queue = q; }
public void run() {
try {
while(true) { queue.put(produce()); }
} catch (InterruptedException ex) { ... handle ...}
}
Object produce() { ... }
}
class Consumer implements Runnable {
private final BlockingQueue queue;
Consumer(BlockingQueue q) { queue = q; }
public void run() {
try {
while(true) { consume(queue.take()); }
} catch (InterruptedException ex) { ... handle ...}
}
void consume(Object x) { ... }
}
class Setup {
void main() {
BlockingQueue q = new SomeQueueImplementation();
Producer p = new Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}
}
五个队列所提供的各有不同:
1、 ArrayBlockingQueue:一个由数组支持的有界队列。
2、 LinkedBlockingQueue:一个由链接节点支持的可选有界队列。
3、 PriorityBlockingQueue:一个由优先级堆支持的无界优先级队列。
4、 DelayQueue:一个由优先级堆支持的、基于时间的调度队列。
5、 SynchronousQueue:一个利用 BlockingQueue 接口的简单聚集(rendezvous)机制。
前两个类 ArrayBlockingQueue 和 LinkedBlockingQueue 几乎相同,只是在后备存储器方面有所不同,LinkedBlockingQueue 并不总是有容量界限。无大小界限的 LinkedBlockingQueue 类在添加元素时永远不会有阻塞队列的等待。新的 DelayQueue 实现可能是其中最有意思的一个了。加入到队列中的元素必须实现新的 Delayed 接口,而且只有一个方法 —— long getDelay(java.util.concurrent.TimeUnit unit)。因为队列的大小没有界限,使得添加可以立即返回,但是在延迟时间过去之前,不能从队列中取出元素。如果多个元素完成了延迟,那么最早失效/失效时间最长的元素将第一个取出,实际上这个实现并不那么复杂。以下程序就是DelayQueue 的一个具体实现:
1.3.1.3. DelayQueue 实现
class Setup {
void main() {
BlockingQueue q = new SomeQueueImplementation();
Producer p = new Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}
} return ((NanoDelay)other).trigger == trigger;
}
public boolean equals(NanoDelay other) {
return ((NanoDelay)other).trigger == trigger;
}
public long getDelay(TimeUnit unit) {
long n = trigger - System.nanoTime();
return unit.convert(n, TimeUnit.NANOSECONDS);
}
public long getTriggerTime() {
return trigger;
}
public String toString() {
return String.valueOf(trigger);
}
}
public static void main(String args[]) throws InterruptedException {
Random random = new Random();
DelayQueue queue = new DelayQueue();
for (int i=0; i < 5; i++) {
queue.add(new NanoDelay(random.nextInt(1000)));
}
long last = 0;
for (int i=0; i < 5; i++) {
NanoDelay delay = (NanoDelay)(queue.take());
long tt = delay.getTriggerTime();
System.out.println("Trigger time: " + tt);
if (i != 0) {
System.out.println("Delta: " + (tt - last));
}
last = tt;
}
}
}
这个例子首先是一个内部类 NanoDelay,它实质上将暂停给定的任意纳秒(nanosecond)数,这里利用了 System 的新 nanoTime() 方法。然后 main() 方法只是将 NanoDelay 对象放到队列中并再次将它们取出来。如果希望队列项做一些其他事情,就需要在 Delayed 对象的实现中加入方法,并在从队列中取出后调用这个新方法。(请随意扩展 NanoDelay 以试验加入其他方法做一些有趣的事情。)显示从队列中取出元素的两次调用之间的时间差。如果时间差是负数,可以视为一个错误,因为永远不会在延迟时间结束后,在一个更早的触发时间从队列中取得项。SynchronousQueue 类是最简单的。它没有内部容量。它就像线程之间的手递手机制。在队列中加入一个元素的生产者会等待另一个线程的消费者。当这个消费者出现时,这个元素就直接在消费者和生产者之间传递,永远不会加入到阻塞队列中。
1.3.2. List、Set、Map接口
新的 java.util.concurrent.ConcurrentMap 接口和 ConcurrentHashMap具体类扩展了先前的Map接口,而ConcurrentHashMap是对ConcurrentMap的直接的具体实现。新的接口增加了一组线程安全相关的基本操作:putIfAbsent,remove,replace。 putIfAbsent() 方法用于在 map 中进行添加。这个方法以要添加到 ConcurrentMap 实现中的键的值为参数,就像普通的 put() 方法,但是只有在 map 不包含这个键时,才能将键加入到 map 中。如果 map 已经包含这个键,那么这个键的现有值就会保留。像 putIfAbsent() 方法一样,重载后的 remove() 方法有两个参数 —— 键和值。在调用时,只有当键映射到指定的值时才从 map 中删除这个键。如果不匹配,那么就不删除这个键,并返回 false。如果值匹配键的当前映射内容,那么就删除这个键。
对于新的 CopyOnWriteArrayList 和 CopyOnWriteArraySet 类,所有可变的(mutable)操作都首先取得后台数组的副本,对副本进行更改,然后替换副本。这种做法保证了在遍历自身更改的集合时,永远不会抛出 ConcurrentModificationException。遍历集合会用原来的集合完成,而在以后的操作中使用更新后的集合。这些新的集合,CopyOnWriteArrayList 和 CopyOnWriteArraySet,最适合于读操作通常大大超过写操作的情况。
1.4. 线程池
就线程池的实际实现方式而言,术语“线程池”有些使人误解,因为线程池“明显的”实现在大多数情形下并不一定产生我们希望的结果。术语“线程池”先于 Java 平台出现,因此它可能是较少面向对象方法的产物。然而,该术语仍继续广泛应用着。
我们通常想要的是同一组固定的工作线程相结合的工作队列,它使用 wait() 和 notify() 来通知等待线程新的工作已经到达了。该工作队列通常被实现成具有相关监视器对象的某种链表。以下代码实现了具有线程池的工作队列。
public class WorkQueue
{
private final int nThreads;
private final PoolWorker[] threads;
private final LinkedList queue;
public WorkQueue(int nThreads)
{
this.nThreads = nThreads;
queue = new LinkedList();
threads = new PoolWorker[nThreads];
for (int i=0; i<nThreads; i++) {
threads[i] = new PoolWorker();
threads[i].start();
}
}
public void execute(Runnable r) {
synchronized(queue) {
queue.addLast(r);
queue.notify();
}
}
private class PoolWorker extends Thread {
public void run() {
Runnable r;
while (true) {
synchronized(queue) {
while (queue.isEmpty()) {
try
{
queue.wait();
}
catch (InterruptedException ignored)
{
}
}
r = (Runnable) queue.removeFirst();
}
// If we don't catch RuntimeException,
// the pool could leak threads
try {
r.run();
}
catch (RuntimeException e) {
// You might want to log something here
}
}
}
}
}
虽然线程池是构建多线程应用程序的强大机制,但使用它并不是没有风险的。用线程池构建的应用程序容易遭受任何其它多线程应用程序容易遭受的所有并发风险,诸如同步错误和死锁,它还容易遭受特定于线程池的少数其它风险,诸如与池有关的死锁、资源不足和线程泄漏。
在J2SE(TM)5.0 中,Doug Lea 编写了一个优秀的并发实用程序开放源码库 util.concurrent,它包括互斥、信号量、诸如在并发访问下执行得很好的队列和散列表之类集合类以及几个工作队列实现。该包中的 PooledExecutor 类是一种有效的、广泛使用的以工作队列为基础的线程池的正确实现。Util.concurrent 定义一个 Executor 接口,以异步地执行 Runnable,另外还定义了 Executor 的几个实现,它们具有不同的调度特征。将一个任务排入 executor 的队列非常简单:
Executor executor = new QueuedExecutor();
...
Runnable runnable = ... ;
executor.execute(runnable);
PooledExecutor 是一个复杂的线程池实现,它不但提供工作线程(worker thread)池中任务的调度,而且还可灵活地调整池的大小,同时还提供了线程生命周期管理,这个实现可以限制工作队列中任务的数目,以防止队列中的任务耗尽所有可用内存,另外还提供了多种可用的关闭和饱和度策略(阻塞、废弃、抛出、废弃最老的、在调用者中运行等)。所有的 Executor 实现为您管理线程的创建和销毁,包括当关闭 executor 时,关闭所有线程,
有时您希望异步地启动一个进程,同时希望在以后需要这个进程时,可以使用该进程的结果。FutureResult 实用程序类使这变得很容易。FutureResult 表示可能要花一段时间执行的任务,并且可以在另一个线程中执行此任务,FutureResult 对象可用作执行进程的句柄。通过它,您可以查明该任务是否已经完成,可以等待任务完成,并检索其结果。可以将 FutureResult 与 Executor 组合起来;可以创建一个 FutureResult 并将其排入 executor 的队列,同时保留对 FutureResult 的引用。下面实例演示了一个一同使用 FutureResult 和 Executor 的简单示例,它异步地启动图像着色,并继续进行其它处理:
1.4.1. FutureResult 实例
Executor executor = ...
ImageRenderer renderer = ...
FutureResult futureImage = new FutureResult();
Runnable command = futureImage.setter(new Callable() {
public Object call() { return renderer.render(rawImage); }
});
// start the rendering process
executor.execute(command);
// do other things while executing
drawBorders();
drawCaption();
// retrieve the future result, blocking if necessary
drawImage((Image)(futureImage.get())); // use future
还可以使用 FutureResult 来提高按需装入高速缓存的并发性。通过将 FutureResult 放置在高速缓存内,而不是放置计算本身的结果,可以减少持有高速缓存上写锁的时间。虽然这种做法不能加快第一个线程把某一项放入高速缓存,但它将减少第一个线程阻塞其它线程访问高速缓存的时间。它还使其它线程更早地使用结果,因为它们可以从高速缓存中检索 FutureTask。以下是使用用于高速缓存的 FutureResult 示例:
1.4.2. 使用 FutureResult 来改善高速缓存
public class FileCache {
private Map cache = new HashMap();
private Executor executor = new PooledExecutor();
public void get(final String name) {
FutureResult result;
synchronized(cache) {
result = cache.get(name);
if (result == null) {
result = new FutureResult();
executor.execute(result.setter(new Callable() {
public Object call() { return loadFile(name); }
}));
cache.put(result);
}
}
return result.get();
}
}
这种方法使第一个线程快速地进入和退出同步块,使其它线程与第一个线程一样快地得到第一个线程计算的结果,不可能出现两个线程都试图计算同一个对象。
1.5. 结束语
线程在各个应用领域里占据着举足轻重的地位,而且它在实际应用中的复杂程度非只言片语能够讲清楚的,这里仅仅介绍了在J2SE(TM)5.0中新加入的成分,如读者需要更深入的了解或更全面地学习线程的话,请参阅相关专业书籍。