在本文中,我们将简单观察一下多线程应用程序开发中公共线程用法,同时,解释一下生产者-消费者编程方案,并研究一个实际的示例来向您演示 Consumer 类是如何工作的。
多线程基础知识
多线程是一种使应用程序能同时处理多个操作的编程技术。通常有两种不同类型的多线程操作使用多个线程:
适时事件,当作业必须在特定的时间或在特定的间隔内调度执行时
后台处理,当后台事件必须与当前执行流并行处理或执行时
适时事件的示例包括程序提醒、超时事件以及诸如轮询和刷新之类的重复性操作。后台处理的示例包括等待发送的包或等待处理的已接收的消息。
生产者-消费者关系
生产者-消费者方案很适合于后台处理类别的情况。这些情况通常围绕一个作业“生产者”方和一个作业“消费者”方。当然,关于作业并行执行还有其它考虑事项。在大多数情况下,对于使用同一资源的作业,应以“先来先服务”的方式按顺序处理,这可以通过使用单线程的消费者轻松实现。通过使用这种方法,我们使用单个线程来访问单个资源,而不是用多个线程来访问单个资源。
要启用标准消费者,当作业到来时创建一个作业队列来存储所有作业。生产者线程通过将新对象添加到消费者队列来交付这个要处理的新对象。然后消费者线程从队列取出每个对象,并依次处理。当队列为空时,消费者进入休眠。当新的对象添加到空队列时,消费者会醒来并处理该对象。因为大多数应用程序喜欢顺序处理方式,所以消费者通常是单线程的。
问题:代码重复
因为生产者-消费者方案很常用,所以在构建应用程序时它可能会出现几次,这导致了代码重复。我们熟悉到,这显示了在应用程序开发过程期间多次使用了生产者-消费者方案的问题。
当第一次需要生产者-消费者行为时,通过编写一个采用一个线程和一个队列的类来实现该行为。当第二次需要这种行为时,我们着手从头开始实现它,但是接着熟悉到以前已经做过这件事了。我们复制了代码并修改了处理对象的方式。当第三次在该应用程序中实现生产者-消费者行为时,很明显我们复制了太多代码。我们决定,需要一个适用的 Consumer 类,它将处理我们所有的生产者-消费者方案。
我们的解决方案:Consumer 类
我们创建 Consumer 类的目的是:在我们的应用程序中,消除这种代码重复 ? 为每个生产者-消费者实例编写一个新作业队列和消费者线程来解决这个问题。有了适当的 Consumer 类,我们所必须做的只是编写专门用于作业处理(业务逻辑)的代码。这使得我们的代码更清楚、更易于维护以及更改起来更灵活。
我们对 Consumer 类有如下需求:
重用:我们希望这个类包括所有东西。一个线程、一个队列以及使这两者结合在一起的所有逻辑。这将使我们只须编写队列中“消费”特定作业的代码。(因而,例如,程序员使用 Consumer 类时,将重载 onConsume(ObjectjoBToBeConsumed) 方法。)
队列选项:我们希望能够设置将由 Consumer 对象使用的队列实现。但是,这意味着我们必须确保队列是线程安全的或使用一个不会与消费操作冲突的单线程生产者。无论使用哪种方法,都必须将队列设计成答应不同的进程能访问其方法。
Consumer 线程优先级:我们希望能够设置 Consumer 线程运行的优先级。
Consumer 线程命名:线程拥有一个有意义的名称会比较方便,当然这的确有助于调试。例如,假如您向 Java 虚拟机发送了一个信号,它将生成一个完整的线程转储 ? 所有线程及其相应堆栈跟踪的快照。要在 Windows 平台上生成这个线程转储,您必须在 Java 程序运行的窗口中按下键序列 <ctrl><break>,或者单击窗口上的“关闭”按钮。有关如何使用完整的线程转储来诊断 Java 软件问题的更多信息,请参阅参考资料。
类代码
在 getThread() 方法中,我们使用“惰性创建”来创建 Consumer 的线程,如清单 1 所示:
清单 1. 创建 Consumer 的线程
/**
* Lazy creation of the Consumer´s thread.
*
* @return the Consumer´s thread
*/
private Thread getThread()
{
if (_thread==null)
{
_thread = new Thread()
{
public void run()
{
Consumer.this.run();
}
};
}
return _thread;
该线程的 run() 方法运行 Consumer 的 run() 方法,它是主消费者循环,如清单 2 所示:
清单 2. run() 方法是主 Consumer 循环
/**
* Main Consumer´s thread method.
*/
private void run()
{
while (!_isTerminated)
{
// job handling loop
while (true){Object o;
synchronized (_queue)
{
if (_queue.isEmpty())
break;
o = _queue.remove();
}
if (o == null)
break;
onConsume(o);
}
// if we are not terminated and the queue is still empty
// then wait until new jobs arrive.
synchronized(_waitForJobsMonitor)
{if (_isTerminated)
break;
if(_queue.isEmpty())
{try
{_waitForJobsMonitor.wait();
}
catch (InterruptedException ex)
{
}
}
}
}
}// run()