(1)根据xml文件来管理线程池的最大最小线程数
(2)对线程池通过Timer定期扫描以防止线程未激活;
(3)通过某一个变量(本程序中是freeThreadCount)来得到空闲线程的数目;
一、配置xml(listen.xml)是:
<?xml version="1.0" encoding="UTF-8"?>
<config>
<ConsumeThreadPool>
<minPools>10</minPools> <!--线程池最小线程-->
<maxPools>100</maxPools> <!--线程池最大线程-->
<checkThreadPeriod>5</checkThreadPeriod> <!--检查线程池中线程的周期5分钟-->
</ConsumeThreadPool>
</config>
二、对于ConsumeThreadPoolPara的javabean:
import java.io.*;
public class ConsumeThreadPoolPara implements Serializable{
private int minPools;
private int maxPools;
private int checkThreadPeriod;
public int getMinPools(){
return minPools;
}
public int getMaxPools(){
return maxPools;
}
public int getCheckThreadPeriod(){
return checkThreadPeriod;
}
public void setMinPools(int minPools){
this.minPools = minPools;
}
public void setMaxPools(int maxPools){
this.maxPools = maxPools;
}
public void setCheckThreadPeriod(int checkThreadPeriod){
this.checkThreadPeriod = checkThreadPeriod;
}
public String toString(){
return minPools+" " + maxPools+" "+checkThreadPeriod;
}
public ConsumeThreadPoolPara() {
}
public static void main(String[] args) {
ConsumeThreadPoolPara consumeThreadPool1 = new ConsumeThreadPoolPara();
}
}
三、解析xml程序代码(生成ConsumeThreadPoolPara):
使用jdom解析:
import org.jdom.*;
import org.jdom.input.SAXBuilder;
import java.io.*;
import java.util.*;
public class ParseConfig {
static Hashtable Listens = null;
static ConnPara connpara = null;
static ConsumeThreadPoolPara consumeThreadPoolPara = null;
private static String configxml = "listen.xml";
static{
getConsumeThreadPoolPara(); //得到消费的线程池的参数
}
/**
* 装载文档
* @return 返回根结点
* @throws JDOMException
*/
public static Element loadDocument() throws JDOMException{
SAXBuilder parser = new SAXBuilder(); // 新建立构造器
try {
Document document = parser.build(configxml);
Element root = document.getRootElement();
return root;
}catch(JDOMException e){
logger.error("listen.xml文件格式非法!");
throw new JDOMException();
}
}
public static ConsumeThreadPoolPara getConsumeThreadPoolPara(){
if(consumeThreadPoolPara ==null){
try {
Element root = loadDocument();
Element consumeThreadPool = root.getChild("ConsumeThreadPool");
if (consumeThreadPool != null) { //代表有数据库配置
consumeThreadPoolPara = new ConsumeThreadPoolPara();
Element minPools = consumeThreadPool.getChild("minPools");
consumeThreadPoolPara.setMinPools(Integer.parseInt(minPools.getTextTrim()));
Element maxPools = consumeThreadPool.getChild("maxPools");
consumeThreadPoolPara.setMaxPools(Integer.parseInt(maxPools.getTextTrim()));
Element checkThreadPeriod = consumeThreadPool.getChild("checkThreadPeriod");
consumeThreadPoolPara.setCheckThreadPeriod(Integer.parseInt(checkThreadPeriod.getTextTrim()));
}
}
catch (JDOMException e) {
}
}
return consumeThreadPoolPara;
}
}
四、线程池源代码:
import java.util.*;
/**
* <p>Title: 线程池</p>
* <p>Description: 采集消费模块</p>
* <p>Copyright: Copyright (c) 2004</p>
* <p>Company: </p>
* @author 张荣斌
* @version 1.0
*/
public class ThreadPool {
private static int minPools = 10; //最小连接池数目
private static int maxPools = 100; //最大连接池数目
private static int checkThreadPeriod = 5; //检查连接池的周期
ArrayList m_ThreadList; //工作线程列表
LinkedList m_RunList = null; //工作任务列表
int totalThread = 0; //总线程数
static int freeThreadCount = 0; //未被使用的线程数目
private java.util.Timer timer = null; //定时器
static Object o = new Object();
static{ //先初始化线程池的参数
ConsumeThreadPoolPara consumeThreadPoolPara = ParseConfig.getConsumeThreadPoolPara();
if(consumeThreadPoolPara!=null){
minPools = consumeThreadPoolPara.getMinPools();
maxPools = consumeThreadPoolPara.getMaxPools();
checkThreadPeriod = consumeThreadPoolPara.getCheckThreadPeriod()*60*1000;
}
}
public void setMinPools(int minPools){
this.minPools = minPools;
}
public void setMaxPools(int maxPools){
this.maxPools = maxPools;
}
public void setCheckThreadPeriod(int checkThreadPeriod){
this.checkThreadPeriod = checkThreadPeriod;
}
public ThreadPool() {
m_ThreadList=new ArrayList();
m_RunList=new LinkedList();
for(int i=0;i<minPools;i++){
WorkerThread temp=new WorkerThread();
totalThread = totalThread + 1;
m_ThreadList.add(temp);
temp.start();
try{
Thread.sleep(100);
}catch(Exception e){
}
}
timer = new Timer(true); //启动定时器
timer.schedule(new CheckThreadTask(this),0,checkThreadPeriod);
}
/**
* 当有一个工作来的时候启动线程池的线程
* 1.当空闲线程数为0的时候,看总线程是否小于最大线程池的数目,就new一个新的线程,否则sleep,直到有空闲线程为止;
* 2.当空闲线程不为0,则将任务丢给空闲线程去完成
* @param work
*/
public synchronized void run(String work)
{
if (freeThreadCount == 0) {
if(totalThread<maxPools){
WorkerThread temp = new WorkerThread();
totalThread = totalThread + 1;
m_ThreadList.add(temp);
temp.start();
synchronized(m_RunList){
m_RunList.add(work);
m_RunList.notify();
}
}else{
while (freeThreadCount == 0) {
try {
Thread.sleep(200);
}
catch (InterruptedException e) {
}
}
synchronized(m_RunList){
m_RunList.add(work);
m_RunList.notify();
}
}
} else {
synchronized(m_RunList){
m_RunList.add(work);
m_RunList.notify();
}
}
}
/**
* 检查所有的线程的有效性
*/
public synchronized void checkAllThreads() {
Iterator lThreadIterator = m_ThreadList.iterator();
while (lThreadIterator.hasNext()) { //逐个遍厉
WorkerThread lTestThread = (WorkerThread) lThreadIterator.next();
if (! (lTestThread.isAlive())) { //如果处在非活动状态时
lTestThread = new WorkerThread(); //重新生成个线程
lTestThread.start(); //启动
}
}
}
/**
* 打印调试信息
*/
public void printDebugInfo(){
System.out.println("totalThread="+totalThread);
System.out.println("m_ThreadList.size()="+m_ThreadList.size());
}
/**
*
* <p>Title: 工作线程类</p>
* @author 张荣斌
* @version 1.0
*/
class WorkerThread extends Thread{
boolean running = true;
String work;
public void run(){
while(running){
synchronized(o){
freeThreadCount++;
}
synchronized(m_RunList){
while(m_RunList.size() == 0){
try{
m_RunList.wait();
if(!running) return;
}catch(InterruptedException e){
}
}
synchronized(o){
freeThreadCount--;
}
work = (String)m_RunList.removeLast();
if(work==null) return;
}
// 得到了work 进行工作,这里work可以换成自己的工作类
}
}
}
}
/**
*
* <p>Title: 定时器调动的任务</p>
* @author 张荣斌
* @version 1.0
*/
class CheckThreadTask extends TimerTask{
private static boolean isRunning = false;
private ThreadPool pool;
public CheckThreadTask(ThreadPool pool){
this.pool = pool;
}
public void run() {
if (!isRunning) {
isRunning = true;
pool.checkAllThreads();
isRunning = false;
}
}
}