分享
 
 
 

浅谈.NET下的多线程和并行计算(六)线程池基础下

王朝学院·作者佚名  2010-01-07
窄屏简体版  字體: |||超大  

这节我们按照线程池的核心思想来自定义一个简单的线程池:

1) 池中使用的线程不少于一定数量,不多于一定数量

2) 池中线程不够的时候创建,富裕的时候收回

3) 任务排队,没有可用线程时,任务等待

我们的目的只是实现这些“需求”,不去考虑性能(比如等待一段时间再去创建新的线程等策略)以及特殊的处理(异常),在实现这个需求的过程中我们也回顾了线程以及线程同步的基本概念。

首先,把任务委托和任务需要的状态数据封装一个对象:

public class WorkItem

{

public WaitCallback Action { get; set; }

public object State { get; set; }

public WorkItem(WaitCallback action, object state)

{

this.Action = action;

this.State = state;

}

}然后来创建一个对象作为线程池中的一个线程:

public class SimpleThreadPoolThread

{

private object locker = new object();

private AutoResetEvent are = new AutoResetEvent(false);

private WorkItem wi;

private Thread t;

private bool b = true;

private bool isWorking;

public bool IsWorking

{

get

{

lock (locker)

{

return isWorking;

}

}

}

public event Action<SimpleThreadPoolThread> WorkComplete;

public SimpleThreadPoolThread()

{

lock (locker)

{

// 当前没有实际任务

isWorking = false;

}

t = new Thread(Work) { IsBackground = true };

t.Start();

}

public void SetWork(WorkItem wi)

{

this.wi = wi;

}

public void StartWork()

{

// 发出信号

are.Set();

}

public void StopWork()

{

// 空任务

wi = null;

// 停止线程循环

b = false;

// 发出信号结束线程

are.Set();

}

private void Work()

{

while (b)

{

// 没任务,等待信号

are.WaitOne();

if (wi != null)

{

lock (locker)

{

// 开始

isWorking = true;

}

// 执行任务

wi.Action(wi.State);

lock (locker)

{

// 结束

isWorking = false;

}

// 结束事件

WorkComplete(this);

}

}

}代码的细节可以看注释,对这段代码的整体结构作一个说明:

1) 由于这个线程是被线程池中任务复用的,所以线程的任务处于循环中,除非线程池打算回收这个线程,否则不会退出循环结束任务

2) 使用自动信号量让线程没任务的时候等待,由线程池在外部设置任务后发出信号来执行实际的任务,执行完毕后继续等待

3) 线程公开一个完成的事件,线程池可以挂接处理方法,在任务完成后更新线程池状态

4) 线程池中的所有线程都是后台线程

下面再来实现线程池:

public class SimpleThreadPool : IDisposable

{

private object locker = new object();

private bool b = true;

private int minThreads;

private int maxThreads;

private int currentActiveThreadCount;

private List<SimpleThreadPoolThread> simpleThreadPoolThreadList = new List<SimpleThreadPoolThread>();

private Queue<WorkItem> workItemQueue = new Queue<WorkItem>();

public int CurrentActiveThreadCount

{

get

{

lock (locker)

{

return currentActiveThreadCount;

}

}

}

public int CurrentThreadCount

{

get

{

lock (locker)

{

return simpleThreadPoolThreadList.Count;

}

}

}

public int CurrentQueuedWorkCount

{

get

{

lock (locker)

{

return workItemQueue.Count;

}

}

}

public SimpleThreadPool()

{

minThreads = 4;

maxThreads = 25;

Init();

}

public SimpleThreadPool(int minThreads, int maxThreads)

{

if (minThreads > maxThreads)

throw new ArgumentException("minThreads > maxThreads", "minThreads,maxThreads");

this.minThreads = minThreads;

this.maxThreads = maxThreads;

Init();

}

public void QueueUserWorkItem(WorkItem wi)

{

lock (locker)

{

// 任务入列

workItemQueue.Enqueue(wi);

}

}

private void Init()

{

lock (locker)

{

// 一开始创建最小线程

for (int i = 0; i < minThreads; i++)

{

CreateThread();

}

currentActiveThreadCount = 0;

}

new Thread(Work) { IsBackground = true }.Start();

}

private SimpleThreadPoolThread CreateThread()

{

SimpleThreadPoolThread t = new SimpleThreadPoolThread();

// 挂接任务结束事件

t.WorkComplete += new Action<SimpleThreadPoolThread>(t_WorkComplete);

// 线程入列

simpleThreadPoolThreadList.Add(t);

return t;

}

private void Work()

{

// 线程池主循环

while (b)

{

Thread.Sleep(100);

lock (locker)

{

// 如果队列中有任务并且当前线程小于最大线程

if (workItemQueue.Count > 0 && CurrentActiveThreadCount < maxThreads)

{

WorkItem wi = workItemQueue.Dequeue();

// 寻找闲置线程

SimpleThreadPoolThread availableThread = simpleThreadPoolThreadList.FirstOrDefault(t => t.IsWorking == false);

// 无则创建

if (availableThread == null)

availableThread = CreateThread();

// 设置任务

availableThread.SetWork(wi);

// 开始任务

availableThread.StartWork();

// 增加个活动线程

currentActiveThreadCount++;

}

}

}

}

private void t_WorkComplete(SimpleThreadPoolThread t)

{

lock (locker)

{

// 减少个活动线程

currentActiveThreadCount--;

// 如果当前线程数有所富裕并且比最小线程多

if ((workItemQueue.Count + currentActiveThreadCount) < minThreads && CurrentThreadCount > minThreads)

{

// 停止已完成的线程

t.StopWork();

// 从线程池删除线程

simpleThreadPoolThreadList.Remove(t);

}

}

}

public void Dispose()

{

// 所有线程停止

foreach (var t in simpleThreadPoolThreadList)

{

t.StopWork();

}

// 线程池主循环停止

b = false;

}

}线程池的结构如下:

1) 在构造方法中可以设置线程池最小和最大线程

2) 维护一个任务队列和一个线程池中线程的列表

3) 初始化线程池的时候就创建最小线程数量定义的线程

4) 线程池主循环每20毫秒就去处理一次,如果有任务并且线程池还可以处理任务的话,先是找闲置线程,找不到则创建一个

5) 通过设置任务委托以及发出信号量来开始任务

6) 线程池提供了三个属性来查看当前活动线程数,当前总线程数和当前队列中的任务数

7) 任务完成的回调事件中我们判断如果当前线程有富裕并且比最小线程多则回收线程

8) 线程池是IDispose对象,在Dispose()方法中停止所有线程后停止线程池主循环

写一段代码来测试线程池:

using (SimpleThreadPool t = new SimpleThreadPool(2, 4))

{

Stopwatch sw2 = Stopwatch.StartNew();

for (int i = 0; i < 10; i++)

{

t.QueueUserWorkItem(new WorkItem((index =>

{

Console.WriteLine(string.Format("#{0} : {1} / {2}", Thread.CurrentThread.ManagedThreadId, DateTime.Now.ToString("mm:ss"), index));

Console.WriteLine(string.Format("CurrentActiveThread: {0} / CurrentThread: {1} / CurrentQueuedWork: {2}", t.CurrentActiveThreadCount, t.CurrentThreadCount, t.CurrentQueuedWorkCount));

Thread.Sleep(1000);

}), i));

}

while (t.CurrentQueuedWorkCount > 0 || t.CurrentActiveThreadCount > 0)

{

Thread.Sleep(10);

}

Console.WriteLine("All work completed");

Console.WriteLine(string.Format("CurrentActiveThread: {0} / CurrentThread: {1} / CurrentQueuedWork: {2}", t.CurrentActiveThreadCount, t.CurrentThreadCount, t.CurrentQueuedWorkCount));

Console.WriteLine(sw2.ElapsedMilliseconds);

} 代码中我们向线程池推入10个任务,每个任务需要1秒执行,任务执行前输出当前任务的所属线程的Id,当前时间以及状态值。然后再输出线程池的几个状态属性。主线程循环等待所有任务完成后再次输出线程池状态属性以及所有任务完成耗费的时间:

 
 
 
免责声明:本文为网络用户发布,其观点仅代表作者个人观点,与本站无关,本站仅提供信息存储服务。文中陈述内容未经本站证实,其真实性、完整性、及时性本站不作任何保证或承诺,请读者仅作参考,并请自行核实相关内容。
2023年上半年GDP全球前十五强
 百态   2023-10-24
美众议院议长启动对拜登的弹劾调查
 百态   2023-09-13
上海、济南、武汉等多地出现不明坠落物
 探索   2023-09-06
印度或要将国名改为“巴拉特”
 百态   2023-09-06
男子为女友送行,买票不登机被捕
 百态   2023-08-20
手机地震预警功能怎么开?
 干货   2023-08-06
女子4年卖2套房花700多万做美容:不但没变美脸,面部还出现变形
 百态   2023-08-04
住户一楼被水淹 还冲来8头猪
 百态   2023-07-31
女子体内爬出大量瓜子状活虫
 百态   2023-07-25
地球连续35年收到神秘规律性信号,网友:不要回答!
 探索   2023-07-21
全球镓价格本周大涨27%
 探索   2023-07-09
钱都流向了那些不缺钱的人,苦都留给了能吃苦的人
 探索   2023-07-02
倩女手游刀客魅者强控制(强混乱强眩晕强睡眠)和对应控制抗性的关系
 百态   2020-08-20
美国5月9日最新疫情:美国确诊人数突破131万
 百态   2020-05-09
荷兰政府宣布将集体辞职
 干货   2020-04-30
倩女幽魂手游师徒任务情义春秋猜成语答案逍遥观:鹏程万里
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案神机营:射石饮羽
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案昆仑山:拔刀相助
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案天工阁:鬼斧神工
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案丝路古道:单枪匹马
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:与虎谋皮
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:李代桃僵
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:指鹿为马
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案金陵:小鸟依人
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案金陵:千金买邻
 干货   2019-11-12
 
推荐阅读
 
 
 
>>返回首頁<<
 
靜靜地坐在廢墟上,四周的荒凉一望無際,忽然覺得,淒涼也很美
© 2005- 王朝網路 版權所有