分享
 
 
 

boost::thread简要分析(2):线程同步

王朝other·作者佚名  2006-01-09
窄屏简体版  字體: |||超大  

除了thread,boost::thread另一个重要组成部分是mutex,以及工作在mutex上的boost::mutex::scoped_lock、condition和barrier,这些都是为实现线程同步提供的。

mutex

boost提供的mutex有6种:

boost::mutex

boost::try_mutex

boost::timed_mutex

boost::recursive_mutex

boost::recursive_try_mutex

boost::recursive_timed_mutex

下面仅对boost::mutex进行分析。

mutex类是一个CriticalSection(临界区)封装类,它在构造函数中新建一个临界区并InitializeCriticalSection,然后用一个成员变量

void* m_mutex;

来保存该临界区结构。

除此之外,mutex还提供了do_lock、do_unlock等方法,这些方法分别调用EnterCriticalSection、LeaveCriticalSection来修改成员变量m_mutex(CRITICAL_SECTION结构指针)的状态,但这些方法都是private的,以防止我们直接对mutex进行锁操作,所有的锁操作都必须通过mutex的友元类detail::thread::lock_ops<mutex>来完成,比较有意思的是,lock_ops的所有方法:lock、unlock、trylock等都是static的,如lock_ops<Mutex>::lock的实现:

template <typename Mutex>

class lock_ops : private noncopyable

{

...

public:

static void lock(Mutex& m)

{

m.do_lock();

}

...

}

boost::thread的设计者为什么会这么设计呢?我想大概是:

1、boost::thread的设计者不希望被我们直接操作mutex,改变其状态,所以mutex的所有方法都是private的(除了构造函数,析构函数)。

2、虽然我们可以通过lock_ops来修改mutex的状态,如:

#include <boost/thread/thread.hpp>

#include <boost/thread/mutex.hpp>

#include <boost/thread/detail/lock.hpp>

int main()

{

boost::mutex mt;

//mt.do_lock(); // Error! Can not access private member!

boost::detail::thread::lock_ops<boost::mutex>::lock(mt);

return 0;

}

但是,这是不推荐的,因为mutex、scoped_lock、condition、barrier是一套完整的类系,它们是相互协同工作的,像上面这么操作没有办法与后面的几个类协同工作。

scoped_lock

上面说过,不应该直接用lock_ops来操作mutex对象,那么,应该用什么呢?答案就是scoped_lock。与存在多种mutex一样,存在多种与mutex对应的scoped_lock:

scoped_lock

scoped_try_lock

scoped_timed_lock

这里我们只讨论scoped_lock。

scoped_lock是定义在namespace boost::detail::thread下的,为了方便我们使用(也为了方便设计者),mutex使用了下面的typedef:

typedef detail::thread::scoped_lock<mutex> scoped_lock;

这样我们就可以通过:

boost::mutex::scoped_lock

来使用scoped_lock类模板了。

由于scoped_lock的作用仅在于对mutex加锁/解锁(即使mutex EnterCriticalSection/LeaveCriticalSection),因此,它的接口也很简单,除了构造函数外,仅有lock/unlock/locked(判断是否已加锁),及类型转换操作符void*,一般我们不需要显式调用这些方法,因为scoped_lock的构造函数是这样定义的:

explicit scoped_lock(Mutex& mx, bool initially_locked=true)

: m_mutex(mx), m_locked(false)

{

if (initially_locked) lock();

}

注:m_mutex是一个mutex的引用。

因此,当我们不指定initially_locked参数构造一个scoped_lock对象时,scoped_lock会自动对所绑定的mutex加锁,而析构函数会检查是否加锁,若已加锁,则解锁;当然,有些情况下,我们可能不需要构造时自动加锁,这样就需要自己调用lock方法。后面的condition、barrier也会调用scoped_lock的lock、unlock方法来实现部分方法。

正因为scoped_lock具有可在构造时加锁,析构时解锁的特性,我们经常会使用局部变量来实现对mutex的独占访问。如thread部分独占访问cout的例子:

#include <boost/thread/thread.hpp>

#include <boost/thread/mutex.hpp>

#include <iostream>

boost::mutex io_mutex;

void count() // worker function

{

for (int i = 0; i < 10; ++i)

{

boost::mutex::scoped_lock lock(io_mutex);

std::cout << i << std::endl;

}

}

int main(int argc, char* argv[])

{

boost::thread thrd1(&count);

boost::thread thrd2(&count);

thrd1.join();

thrd2.join();

return 0;

}

在每次输出信息时,为了防止整个输出过程被其它线程打乱,通过对io_mutex加锁(进入临界区),从而保证了输出的正确性。

在使用scoped_lock时,我们有时候需要使用全局锁(定义一个全局mutex,当需要独占访问全局资源时,以该全局mutex为参数构造一个scoped_lock对象即可。全局mutex可以是全局变量,也可以是类的静态方法等),有时候则需要使用对象锁(将mutex定义成类的成员变量),应该根据需要进行合理选择。

Java的synchronized可用于对方法加锁,对代码段加锁,对对象加锁,对类加锁(仍然是对象级的),这几种加锁方式都可以通过上面讲的对象锁来模拟;相反,在Java中实现全局锁好像有点麻烦,必须将请求封装到类中,以转换成上面的四种synchronized形式之一。

condition

condition的接口如下:

class condition : private boost::noncopyable // Exposition only

{

public:

// construct/copy/destruct

condition();

~condition();

// notification

void notify_one();

void notify_all();

// waiting

template<typename ScopedLock> void wait(ScopedLock&);

template<typename ScopedLock, typename Pred> void wait(ScopedLock&, Pred);

template<typename ScopedLock>

bool timed_wait(ScopedLock&, const boost::xtime&);

template<typename ScopedLock, typename Pred>

bool timed_wait(ScopedLock&, Pred);

};

其中wait用于等待某个condition的发生,而timed_wait则提供具有超时的wait功能,notify_one用于唤醒一个等待该condition发生的线程,notify_all则用于唤醒所有等待该condition发生的线程。

由于condition的语义相对较为复杂,它的实现也是整个boost::thread库中最复杂的(对Windows版本而言,对支持pthread的版本而言,由于pthread已经提供了pthread_cond_t,使得condition实现起来也十分简单),下面对wait和notify_one进行简要分析。

condition内部包含了一个condition_impl对象,由该对象执行来处理实际的wait、notify_one...等操作。

下面先对condition_impl进行简要分析。

condition_impl在其构造函数中会创建两个Semaphore(信号量):m_gate、m_queue,及一个Mutex(互斥体,跟boost::mutex类似,但boost::mutex是基于CriticalSection<临界区>的):m_mutex,其中:

m_queue

相当于当前所有等待线程的等待队列,构造函数中调用CreateSemaphore来创建Semaphore时,lMaximumCount参数被指定为(std::numeric_limits<long>::max)(),即便如此,condition的实现者为了防止出现大量等待线程的情况(以至于超过了long的最大值),在线程因执行condition::wait进入等待状态时会先:

WaitForSingleObject(reinterpret_cast<HANDLE>(m_queue), INFINITE);

以等待被唤醒,但很难想象什么样的应用需要处理这么多线程。

m_mutex

用于内部同步的控制。

但对于m_gate我很奇怪,我仔细研究了一下condition_imp的实现,还是不明白作者引入m_gate这个变量的用意何在,既然已经有了用于同步控制的m_mutex,再引入一个m_gate实在让我有点不解。

以下是condition::wait调用的do_wait方法简化后的代码:

template <typename M>

void do_wait(M& mutex)

{

m_impl.enter_wait();

lock_ops::unlock(mutex, state); //对传入的scoped_lock对象解锁,以便别的线程可以对其进行加锁,并执行某些处理,否则,本线程等待的condition永远不会发生(因为没有线程可以获得访问资源的权利以使condition发生)

m_impl.do_wait(); //执行等待操作,等待其它线程执行notify_one或notify_all操作以获得

lock_ops::lock(mutex, state); //重新对scoped_lock对象加锁,获得独占访问资源的权利

}

condition::timed_wait的实现方法与此类似,而notify_one、notify_all仅将调用请求转发给m_impl,就不多讲了。

虽然condition的内部实现比较复杂,但使用起来还是比较方便的。下面是一个使用condition的多Producer-多Consumer同步的例子(这是本人为即将推出的“大卫的Design Patterns学习笔记”编写的Mediator模式的示例):

#include <boost/thread/thread.hpp>

#include <boost/thread/mutex.hpp>

#include <boost/thread/condition.hpp>

#include <boost/thread/xtime.hpp>

#include <iostream>

#include <time.h> // for time()

#include <Windows.h> // for Sleep, change it for other platform, we can use

// boost::thread::sleep, but it's too inconvenient.

typedef boost::mutex::scoped_lock scoped_lock;

boost::mutex io_mutex;

class Product

{

int num;

public:

Product(int num) : num(num) {}

friend std::ostream& operator<< (std::ostream& os, Product& product)

{

return os << product.num;

}

};

class Mediator

{

private:

boost::condition cond;

boost::mutex mutex;

Product** pSlot; // product buffer/slot

unsigned int slotCount, // buffer size

productCount; // current product count

bool stopFlag; // should all thread stop or not

public:

Mediator(const int slotCount) : slotCount(slotCount), stopFlag(false), productCount(0)

{

pSlot = new Product*[slotCount];

}

virtual ~Mediator()

{

for (int i = 0; i < static_cast<int>(productCount); i++)

{

delete pSlot[i];

}

delete [] pSlot;

}

bool Stop() const { return stopFlag; }

void Stop(bool) { stopFlag = true; }

void NotifyAll() // notify all blocked thread to exit

{

cond.notify_all();

}

bool Put( Product* pProduct)

{

scoped_lock lock(mutex);

if (productCount == slotCount)

{

{

scoped_lock lock(io_mutex);

std::cout << "Buffer is full. Waiting..." << std::endl;

}

while (!stopFlag && (productCount == slotCount))

cond.wait(lock);

}

if (stopFlag) // it may be notified by main thread to quit.

return false;

pSlot[ productCount++ ] = pProduct;

cond.notify_one(); // this call may cause *pProduct to be changed if it wakes up a consumer

return true;

}

bool Get(Product** ppProduct)

{

scoped_lock lock(mutex);

if (productCount == 0)

{

{

scoped_lock lock(io_mutex);

std::cout << "Buffer is empty. Waiting..." << std::endl;

}

while (!stopFlag && (productCount == 0))

cond.wait(lock);

}

if (stopFlag) // it may be notified by main thread to quit.

{

*ppProduct = NULL;

return false;

}

*ppProduct = pSlot[--productCount];

cond.notify_one();

return true;

}

};

class Producer

{

private:

Mediator* pMediator;

static unsigned int num;

unsigned int id; // Producer id

public:

Producer(Mediator* pMediator) : pMediator(pMediator) { id = num++; }

void operator() ()

{

Product* pProduct;

srand( (unsigned)time( NULL ) + id ); // each thread need to srand differently

while (!pMediator->Stop())

{

pProduct = new Product( rand() % 100 );

// must print product info before call Put, as Put may wake up a consumer

// and cause *pProuct to be changed

{

scoped_lock lock(io_mutex);

std::cout << "Producer[" << id << "] produces Product["

<< *pProduct << "]" << std::endl;

}

if (!pMediator->Put(pProduct)) // this function only fails when it is notified by main thread to exit

delete pProduct;

Sleep(100);

}

}

};

unsigned int Producer::num = 1;

class Consumer

{

private:

Mediator* pMediator;

static unsigned int num;

unsigned int id; // Consumer id

public:

Consumer(Mediator* pMediator) : pMediator(pMediator) { id = num++; }

void operator() ()

{

Product* pProduct = NULL;

while (!pMediator->Stop())

{

if (pMediator->Get(&pProduct))

{

scoped_lock lock(io_mutex);

std::cout << "Consumer[" << id << "] is consuming Product["

<< *pProduct << "]" << std::endl;

delete pProduct;

}

Sleep(100);

}

}

};

unsigned int Consumer::num = 1;

int main()

{

Mediator mediator(2); // we have only 2 slot to put products

// we have 2 producers

Producer producer1(&mediator);

boost::thread thrd1(producer1);

Producer producer2(&mediator);

boost::thread thrd2(producer2);

// and we have 3 consumers

Consumer consumer1(&mediator);

boost::thread thrd3(consumer1);

Consumer consumer2(&mediator);

boost::thread thrd4(consumer2);

Consumer consumer3(&mediator);

boost::thread thrd5(consumer3);

// wait 1 second

Sleep(1000);

// and then try to stop all threads

mediator.Stop(true);

mediator.NotifyAll();

// wait for all threads to exit

thrd1.join();

thrd2.join();

thrd3.join();

thrd4.join();

thrd5.join();

return 0;

}

barrier

barrier类的接口定义如下:

class barrier : private boost::noncopyable // Exposition only

{

public:

// construct/copy/destruct

barrier(size_t n);

~barrier();

// waiting

bool wait();

};

barrier类为我们提供了这样一种控制线程同步的机制:

前n - 1次调用wait函数将被阻塞,直到第n次调用wait函数,而此后第n + 1次到第2n - 1次调用wait也会被阻塞,直到第2n次调用,依次类推。

barrier::wait的实现十分简单:

barrier::barrier(unsigned int count)

: m_threshold(count), m_count(count), m_generation(0)

{

if (count == 0)

throw std::invalid_argument("count cannot be zero.");

}

bool barrier::wait()

{

boost::mutex::scoped_lock lock(m_mutex); // m_mutex is the base of barrier and is initilized by it's default constructor.

unsigned int gen = m_generation; // m_generation will be 0 for call 1~n-1, and 1 for n~2n - 1, and so on...

if (--m_count == 0)

{

m_generation++; // cause m_generation to be changed in call n/2n/...

m_count = m_threshold; // reset count

m_cond.notify_all(); // wake up all thread waiting here

return true;

}

while (gen == m_generation) // if m_generation is not changed, lock current thread.

m_cond.wait(lock);

return false;

}

因此,说白了也不过是mutex的一个简单应用。

以下是一个使用barrier的例子:

#include <boost/thread/thread.hpp>

#include <boost/thread/barrier.hpp>

int i = 0;

boost::barrier barr(3); // call barr.wait 3 * n times will release all threads in waiting

void thread()

{

++i;

barr.wait();

}

int main()

{

boost::thread thrd1(&thread);

boost::thread thrd2(&thread);

boost::thread thrd3(&thread);

thrd1.join();

thrd2.join();

thrd3.join();

return 0;

}

如果去掉其中thrd3相关的代码,将使得线程1、2一直处于wait状态,进而使得主线程无法退出。

xtime

xtime是boost::thread中用来表示时间的一个辅助类,它是一个仅包含两个成员变量的结构体:

struct xtime

{

//...

xtime_sec_t sec;

xtime_nsec_t nsec;

};

condition::timed_wait、thread::sleep等涉及超时的函数需要用到xtime。

需要注意的是,xtime表示的不是一个时间间隔,而是一个时间点,因此使用起来很不方便。为了方便使用xtime,boost提供了一些辅助的xtime操作函数,如xtime_get、xtime_cmp等。

以下是一个使用xtime来执行sleep的例子(跟简单的一句Sleep比起来,实在是太复杂了),其中用到了xtime初始化函数xtime_get:

#include <boost/thread/thread.hpp>

#include <boost/thread/xtime.hpp>

#include <iostream>

int main()

{

boost::xtime xt;

boost::xtime_get(&xt, boost::TIME_UTC); // initialize xt with current time

xt.sec += 1; // change xt to next second

boost::thread::sleep(xt); // do sleep

std::cout << "1 second sleep over." << std::endl;

return 0;

}

 
 
 
免责声明:本文为网络用户发布,其观点仅代表作者个人观点,与本站无关,本站仅提供信息存储服务。文中陈述内容未经本站证实,其真实性、完整性、及时性本站不作任何保证或承诺,请读者仅作参考,并请自行核实相关内容。
2023年上半年GDP全球前十五强
 百态   2023-10-24
美众议院议长启动对拜登的弹劾调查
 百态   2023-09-13
上海、济南、武汉等多地出现不明坠落物
 探索   2023-09-06
印度或要将国名改为“巴拉特”
 百态   2023-09-06
男子为女友送行,买票不登机被捕
 百态   2023-08-20
手机地震预警功能怎么开?
 干货   2023-08-06
女子4年卖2套房花700多万做美容:不但没变美脸,面部还出现变形
 百态   2023-08-04
住户一楼被水淹 还冲来8头猪
 百态   2023-07-31
女子体内爬出大量瓜子状活虫
 百态   2023-07-25
地球连续35年收到神秘规律性信号,网友:不要回答!
 探索   2023-07-21
全球镓价格本周大涨27%
 探索   2023-07-09
钱都流向了那些不缺钱的人,苦都留给了能吃苦的人
 探索   2023-07-02
倩女手游刀客魅者强控制(强混乱强眩晕强睡眠)和对应控制抗性的关系
 百态   2020-08-20
美国5月9日最新疫情:美国确诊人数突破131万
 百态   2020-05-09
荷兰政府宣布将集体辞职
 干货   2020-04-30
倩女幽魂手游师徒任务情义春秋猜成语答案逍遥观:鹏程万里
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案神机营:射石饮羽
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案昆仑山:拔刀相助
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案天工阁:鬼斧神工
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案丝路古道:单枪匹马
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:与虎谋皮
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:李代桃僵
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:指鹿为马
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案金陵:小鸟依人
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案金陵:千金买邻
 干货   2019-11-12
 
推荐阅读
 
 
 
>>返回首頁<<
 
靜靜地坐在廢墟上,四周的荒凉一望無際,忽然覺得,淒涼也很美
© 2005- 王朝網路 版權所有