分享
 
 
 

JTC线程库阅读分析

王朝other·作者佚名  2006-01-09
窄屏简体版  字體: |||超大  

在Java中,线程运行着一个可运行对象,该对象的概念在JTC C++ 中的对象即是JTCRunnable. 即一个线程运行的对象实体.线程本身是一个对象,线程中要运行的实体也是一个对象. JTCRunnable

=====================================================

主对象:

class Clock : public JTCRunnable

{

void run(); //当线程启动是调用

void start(); //启动线程

void stop(); //停止线程

};

=====================================================

其实JTCRunable很简单,只有一个虚方法,只有run才是我们要完全实现的方法.

class JTCRunnable : public virtual JTCRefCount

{

public:

virtual void run() = 0;

};

=====================================================

我们可以看到JTCRunable的继承类是个引用计数类.

class JTCRefCount

{

public:

long m_refcount;

在Win32下通过interlocked增加和减少引用.

在Unix下就需要一个Mutex来保护引用.

#ifndef WIN32

JTCMutex m_ref_mutex;

#endif

JTCRefCount()

: m_refcount(0)

{

}

virtual

~JTCRefCount()

{

}

};

=====================================================

线程将要运行的实体

run()

{

while(timer_)

try{

timer_ -> sleep(100);

}catch(const JTCInterruptedException&)

{}

}

=====================================================

函数实现线程运行的实体代码.需要一个线程的载体来运行他

start()

{

timer_ = new JTCThread(JTCRunnableHandle(this));

timer_ -> start(); //线程启动,

//具体JTCThread的函数start调用了:

posix-> pthread_create

windows -> _beginthreadex

来创建一个挂起的线程.run()函数与线程就扯上关系了.

}

=====================================================

停止线程后,将主对象中的线程句柄置为空

void stop()

{

timer_ = JTCThreadHandle();

}

=====================================================

主对象Clock调用:

Main()

{

try

{

JTCInitialize bootJTC(argc, argv);

JTCHandleT<Clock> c = new Clock();

c -> start();

JTCThread::sleep(1000*5);

c -> stop();

}

catch(const JTCException& e){

cerr << "JTCException: " << e.getMessage() << endl;

}

}

=====================================================

线程的构造函数,都是通过init成员函数初始化.

JTCThread::JTCThread(const char* name)

{ init(JTCThreadGroupHandle(0), JTCRunnableHandle(0), name); }

JTCThread::JTCThread(JTCRunnableHandle target, const char* name)

{ init(JTCThreadGroupHandle(0), target, name); }

JTCThread::JTCThread(JTCThreadGroupHandle& group, const char* name)

{ init(group, JTCRunnableHandle(0), name); }

JTCThread::JTCThread(JTCThreadGroupHandle& group,

JTCRunnableHandle target, const char* name)

{ init(group, target, name); }

=====================================================

void

JTCThread::init(

const JTCThreadGroupHandle& group,

JTCRunnableHandle target,

const char* name

)

{

m_name = 0; //初始线程名

m_target = target;//初始运行的目标主体

m_state = new_thread;//初始线程状态

m_thread_id = JTCThreadId();//初始线程id

m_detached = false; //non win32

m_thread_number = get_next_thread_number(); //自己的线程号

++m_refcount;//引用计数

m_is_adopted = false;

m_handle = 0;//win32线程的句柄

m_group = group; //线程组

}

=====================================================

重要的是JTCRunnableHandle target;

定义如下:

class JTCThread;

typedef JTCHandleT<JTCThread> JTCThreadHandle;

class JTCThreadGroup;

typedef JTCHandleT<JTCThreadGroup> JTCThreadGroupHandle;

class JTCRunnable;

typedef JTCHandleT<JTCRunnable> JTCRunnableHandle;

=====================================================

这里使用到了模板类JTCHandleT,提供赋值和比较操作

template <class T>

class JTCHandleT{

JTCHandleT(T* tg = 0);

JTCHandleT(const JTCHandleT<T>& rhs );

operator=

operator==

operator!=

operator!

operator bool

operator->()

operator*

T* m_object;

};

类通过m_object指针指向对象.

模板类封装了JTCThread,JTCThreadGroup,JTCRunable类指针操作,

如:在Clock类的start函数中,创建线程语句:

timer_ = new JTCThread(JTCRunnableHandle(this));

即传入了一个JTCRunable的类的实例指针.

JTCThread

=====================================================

class JTCThread : public virtual JTCRefCount

{

}

=====================================================

JTCThread中的run其实调用了构造传入初始化m_target对象的run方法

Void JTCThread::run()

{

if (m_target)

{

m_target -> run();

}

}

=====================================================

启动线程,即创建线程执行.

void JTCThread::start()

{

//声明

JTCSyncT<JTCMutex> guard(m_mutex);

//判断当前线程是否未新线程.

if (m_state != JTCThread::new_thread)

{

throw JTCIllegalThreadStateException("state is not new_thread");

}

分POSIX和WIN32实现

#if defined(HAVE_POSIX_THREADS)

…..

#ifdef HAVE_WIN32_THREADS

……

}

=====================================================

UNIX下的线程创建实现

#if defined(HAVE_POSIX_THREADS)

pthread_attr_t attr;

try{

JTC_SYSCALL_1(PTHREAD_ATTR_INIT, &attr, != 0)

}catch(const JTCSystemCallException& e) {

if (e.getError() == ENOMEM) {

throw JTCOutOfMemoryError(e.getMessage());

}

throw;

}

# if defined(HAVE_POSIX_THREADS) && !defined(__hpux)

// All threads set their priority explicitely. Under HPUX 11.x this

// requires root privilege so it's disabled.

pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED);

# endif

# ifdef HAVE_PTHREAD_ATTR_SETSTACKSIZE

if (lsd_initial_stack_size != 0) {

pthread_attr_setstacksize(&attr, lsd_initial_stack_size);

}

# endif

// Call the custom pthread attribute hook, if defined.

if (sm_attr_hook != 0) {

(*sm_attr_hook)(&attr);

}

pthread_t id;

try{

PTHREAD_CREATE(id, attr, lsf_thread_adapter, this)

}catch(const JTCSystemCallException& e) {

PTHREAD_ATTR_DESTROY(&attr);

if (e.getError() == EAGAIN || e.getError() == ENOMEM) {

throw JTCOutOfMemoryError(e.getMessage());

}

throw;

}

m_thread_id = JTCThreadId(id);

PTHREAD_ATTR_DESTROY(&attr);

#endif

=====================================================

WIN32下的线程创建实现

#ifdef HAVE_WIN32_THREADS

DWORD id;

try

{

JTC_SYSCALL_6(

m_handle = (HANDLE)::_beginthreadex,

NULL, lsd_initial_stack_size,

(unsigned (__stdcall*)(void*))lsf_thread_adapter, (LPVOID)this,

0, (unsigned int*)&id, == NULL)

}

catch(const JTCSystemCallException& e)

{

if (e.getError() == ERROR_NOT_ENOUGH_MEMORY ||

e.getError() == ERROR_OUTOFMEMORY)

{

throw JTCOutOfMemoryError(e.getMessage());

}

throw;

}

m_thread_id = JTCThreadId(id);

#endif

==========================================================

sleep()函数是很独立的函数,与本身线程没有多大关系

void JTCThread::sleep(long millis, int nano)

{

if (millis < 0)

{

throw JTCIllegalArgumentException

("timeout value is negative");

}

if (nano > 999999)

{

throw JTCIllegalArgumentException

("nanosecond timeout out of range");

}

//m_state = JTCThread::not_runnable;

#if defined(HAVE_PTHREAD_DELAY_NP)

struct timespec tv;

tv.tv_sec = millis/1000;

tv.tv_nsec = (millis% 1000)*1000000 + nano;

JTC_SYSCALL_1(pthread_delay_np, &tv, != 0);

#elif defined(HAVE_POSIX_THREADS)

struct timespec tv;

tv.tv_sec = millis/1000;

tv.tv_nsec = (millis% 1000)*1000000 + nano;

if (nanosleep(&tv, 0) < 0 && errno == EINTR)

{

throw JTCInterruptedException();

}

#endif

#ifdef HAVE_WIN32_THREADS

Sleep(millis); //调用Windows API

#endif

//m_state = JTCThread::runnable;

}

==========================================================

JTCMonitor从JTCMonitorBase继承,成员比较简单

class JTCMonitor : public JTCMonitorBase

{

//对外裸露了三个公有函数

wait();

notify()

notifyAll();

//内部有三个重要的私有函数

lock()

unloack();

notify_internal()

//另外我们注意到连两个友元

friend class JTCSynchronized;

friend class JTCSyncT< JTCMonitor >;

//最后一个是互斥成员

JTCRecursiveMutex m_mon_mutex;

}

==========================================================

JTCMonitorBase的成员也比较简单

class JTCMonitorBase

{

//检查Mornitor是否被锁

validate_owner(const JTCThreadId, const char* caller)

//Mornitor的条件变量

JTCCond m_mon_cond;

//记录有多少挂起的通知

int m_nnotify

}

==========================================================

JTCMonitor 和JTCSynchronized究竟如何关连起来?

JTCSynchronized的定义也简单,主要是定义了几个成员变量.竟然定义了三个互斥变量.

class JTCSynchronized

{

JTCMonitor* m_monitor

JTCMutex* m_mutex

JTCRecursiveMutex* m_rmutex

JTCRWMutex* m_rwmutex

}

m_monitor指向要监视器实体;

m_mutex指向关连的互斥实体

m_rmutex指向关连recursive的互斥实体

m_rwmutex指向读写互斥

类构造函数

JTCSynchronized(const JTCMonitor& mon);

m_monitor(&mon),

m_lock_type(monitor)

{

//事实上同步类利用了构造进来的监视器来加锁

m_monitor -> lock();

}

-------------------------------------------

其他构造也类似,利用构造进来的实体来加锁

JTCSynchronized::JTCSynchronized(const JTCMutex& m)

: m_mutex(&m),

m_lock_type(mutex)

{

m_mutex -> lock();

}

--------------------------------------------

JTCSynchronized::JTCSynchronized(

const JTCRecursiveMutex& m

)

: m_rmutex(&m),

m_lock_type(recursive_mutex)

{

m_rmutex -> lock();

}

-----------------------------------------------

JTCSynchronized::JTCSynchronized(

const JTCRWMutex& m,

ReadWriteLockType type

)

: m_rwmutex(&m),

m_lock_type(read_write_mutex)

{

if (type == read_lock)

m_rwmutex -> read_lock();

else

m_rwmutex -> write_lock();

}

==========================================================

举个应用例子:

例子TestThread继承了JTCMornitor,在线程实体run()函数,中使用同步sync,将TestThread的实例

this作构造参数传入.这样线程同步就起作用了.

class TestThread : public JTCThread, public JTCMonitor

{

run()

{

JTCSynchronized sync(*this);

//这里其实构造中调用了继承过来的JTCMonitor lock()函数,锁是保存在

//JTCMonitor中的定义的JTCRecursiveMutex 成员变量m_mon_mutex..

//下一步是到看看互斥定义的时候了.

//do your code here

……

}

}

==========================================================

先回过头来看lock()函数的实现

void JTCMonitor::lock() const

{

//JTCMonitor本身的互斥变量来枷锁

if (m_mon_mutex.lock()){

// 第一次置m_nnotify为0

JTCMonitor* This = (JTCMonitor*)this;

This -> m_nnotify = 0;

}

}

==========================================================

class JTCRecursiveMutex

{

//对外的锁函数

lock();

unlock();

trylock();

get_owner();

count();

//内部实现的锁函数

lock_internal();

trylock_internal();

unlock_internal();

reset_for_condvar();

//到了与平台相关的定义了

#if defined(HAVE_POSIX_THREADS)

pthread_mutex_t m_lock; // Pthreads mutex.

#endif

#if defined(HAVE_WIN32_THREADS)

CRITICAL_SECTION m_lock; // WIN32 critical section.

#endif

//成员定义

JTCMutex m_internal; //内部锁

unsigned int m_count; //加锁的次数

JTCThreadId m_owner; //加锁线程ID

friend class JTCCondHelper;

friend class JTCCond;

}

==========================================================

在lock()调用地时候,JTCRecursiveMutex类通过JTCMutex联系起来.

看看调用lock()函数,发生了什么

bool

JTCRecursiveMutex::lock() const

{

//

// Work around lack of mutable.

//

return ((JTCRecursiveMutex*)this) -> lock_internal(1);

}

==========================================================

lock_internal()函数地实现

bool JTCRecursiveMutex::lock_internal(int count)

{

bool rc = false;

bool obtained = false;

while (!obtained)

{

---------------------------------------------------------------------------

m_internal.lock();

if (m_count == 0)

{

m_count = count;

m_owner = JTCThreadId::self();

obtained = true;

rc = true;

try

{

#if defined(HAVE_POSIX_THREADS)

JTC_SYSCALL_1(pthread_mutex_lock, &m_lock, != 0)

#endif

#if defined(HAVE_WIN32_THREADS)

EnterCriticalSection(&m_lock);

#endif

}

catch(...)

{

try{

m_internal.unlock();

}catch(...)

{

}

throw;

}

}

else if (m_owner == JTCThreadId::self())

{

m_count += count;

obtained = true;

}

m_internal.unlock();

---------------------------------------------------------------------------

if (!obtained)

{

#if defined(HAVE_POSIX_THREADS)

JTC_SYSCALL_1(pthread_mutex_lock, &m_lock, != 0)

pthread_mutex_unlock(&m_lock);

#endif

#if defined(HAVE_WIN32_THREADS)

EnterCriticalSection(&m_lock);

LeaveCriticalSection(&m_lock);

#endif

}

}

return rc;

}

==========================================================

函数内部使用了JTCMutex作为内部的锁,是看JTCMutex类的时候了.

class JTCMutex

{

--------------------------------------------------------------------

lock();

unlock();

trylock();

get_owner();

count();

--------------------------------------------------------------------

//平台相关成员定义

#if defined(HAVE_POSIX_THREADS)

pthread_mutex_t m_lock; // Pthread mutex.

#endif

#if defined(HAVE_WIN32_THREADS)

CRITICAL_SECTION m_lock; // WIN32 critical section.

#endif

--------------------------------------------------------------------

友元定义

friend class JTCCond;

}

==========================================================

lock()函数的UNIX实现

{

#if defined(HAVE_POSIX_THREADS)

#if defined(__GNUC__) && defined(__OPTIMIZE__)

//

// The optimizer for GCC 2.95.1 is broken. The following

// three lines of code "fix" the problem.

//

volatile int i = 1;

if (i == 0)

++i;

#endif

pthread_mutex_t* lock = &((JTCMutex*)this) -> m_lock;

JTC_SYSCALL_1(pthread_mutex_lock, lock, != 0)

#endif

}

lock()函数的WIN32实现

{

#if defined(HAVE_WIN32_THREADS)

CRITICAL_SECTION* crit = &((JTCMutex*)this) -> m_lock;

EnterCriticalSection(crit);

#endif

}

==========================================================

回到以前的JTCMornitorBase的类定义中,有一个是条件变量成员

JTCCond m_mon_cond;,在JTCMornitor的类的wait()函数使用到他

Void JTCMonitor::wait()

{

validate_owner(m_mon_mutex.get_owner(), "wait()");

notify_internal(m_nnotify); // int m_nnotify挂起的通知数

try

{

// JTCRecursiveMutex m_mon_mutex;等待监视器的互斥变量

m_mon_cond.wait(m_mon_mutex);

}

catch(...)

{

m_nnotify = 0;

throw;

}

m_nnotify = 0;

}

==========================================================

条件变量类,他通过构造函数传入的等待的条件对象,实现对对象的改变的通知.

例如:多个线程等待某个线程的互斥条件有没有释放,某个时刻释放后通知各个线程

class JTCCond

{

wait(JTCRecursiveMutex& mutex);

wait(JTCMutex& mutex);

signal();

broadcast();

wait_internal( JTCRecursiveMutex&,long timeout);

calc_timeout(long timeout);//posix

//平台相关成员变量定义

#if defined(HAVE_POSIX_THREADS)

pthread_cond_t m_cond; // Pthread condition variable.

#endif

#if defined(HAVE_WIN32_THREADS)

CondImpl* m_impl;

#endif

}

wait()函数简单调用了内部wait_internal()函数,等待入参的互斥条件

JTCCond::wait(JTCRecursiveMutex& mutex)

{

wait_internal(mutex, -1);

}

先看win32的实现

#if defined(HAVE_WIN32_THREADS)

wait_internal(JTCRecursiveMutex& mutex,)

{

m_impl -> pre_wait();

unsigned int count = mutex.reset_for_condvar();

try

{

//调用内部条件类的的wait函数,内部利用信号量来实现

bool rc = m_impl -> wait(timeout);

mutex.lock(count);

return rc;

}

catch(...)

{

mutex.lock(count);

throw;

}

#endif

}

主要焦点在成员变量的函数m_impl->wait()上

CondImpl* m_impl

bool CondImpl::wait(long timeout)

{

// Wait for the queue semaphore to be signaled.

try{

bool rc = m_queue.wait(timeout);

postwait(!rc);

return rc;

}catch(...)

{

postwait(false);

throw;

}

}

发现有一个队列成员变量和发送等待的postwait()函数.先看一下CondImpl的定义

==========================================================

class CondImpl

{

signal();

pre_wait();

wait();

postwait();

//关键的成员定义

JTCSemaphore m_gate;

JTCSemaphore m_queue;//可以说是信号队列

JTCMutex m_internal; //本身内部用来加锁

long m_blocked;

long m_unblocked;

long m_to_unblock;

}

==========================================================

成员m_queue虽然叫队列,但他是通过信号量来实现的.

class JTCSemaphore

{

JTCSemaphore(long initial_count = 0);

~JTCSemaphore();

//wait和post实现了信号量等待和投递

bool wait(long timeout = -1);

void post(int count = 1);

#if defined(HAVE_WIN32_THREADS)

HANDLE m_sem; // The semaphore handle

#endif

};

构造函数调用了CreateSemaphore();函数,创建的时候信号量最大为0x7fffffff,

开始的信号量为构造参数初始化.

JTCSemaphore::JTCSemaphore(long initial_count)

{

#if defined(HAVE_WIN32_THREADS)

JTC_SYSCALL_4(m_sem = CreateSemaphore, 0, initial_count, 0x7fffffff, 0,

== INVALID_HANDLE_VALUE)

#endif

}

当信号量为大于0时,友信号,等于0时无信号.

win32的信号量实现调用了WaitForSingleObject和ReleaseSemaphore函数.

当信号量为0,Wait()一直在等待,如果信号量大于0时函数返回,信号量减一,

而post()是将信号量加一.

bool JTCSemaphore::wait(long timeout)

{

#if defined(HAVE_WIN32_THREADS)

if (timeout < 0) {

timeout = INFINITE;

}

int rc;

JTC_SYSCALL_2(rc = WaitForSingleObject, m_sem, timeout, == WAIT_ABANDONED);

return rc != WAIT_TIMEOUT;

#endif

}

void JTCSemaphore::post(int count)

{

#if defined(HAVE_WIN32_THREADS)

JTC_SYSCALL_3(ReleaseSemaphore, m_sem, count, 0, == 0)

#endif

}

==========================================================

psotwait()的实现

void CondImpl::postwait(bool timeout)

{

m_internal.lock();

m_unblocked++;

if (m_to_unblock != 0)

{

bool last = (--m_to_unblock == 0);

m_internal.unlock();

if (timeout) {

m_queue.wait();

}

if (last) {

m_gate.post();

}else{

m_queue.post();

}

}else{

m_internal.unlock();

}

}

关于SYSCALL

提供对重要函数调用出现失败时异常错误信息和定位发生异常的代码行和文件名输出和继续抛出异常的宏包装。FN是函数名,其他是参数,最后是函数返回值。这种做法值得学习。

#define JTC_SYSCALL_6(FN,A1,A2,A3,A4,A5,A6,COND)

do {

DECLARE_ERRNO

if ((ASSIGN_ERRNO (FN (A1,A2,A3,A4,A5,A6))) COND)

JTC_THROW_EXCEPTION( ERRNO, #FN << JTC_FMT_ARG_6(A1,A2,A3,A4,A5,A6) )

} while (0);

# define JTC_THROW_EXCEPTION(CODE,MSG)

{

long error = CODE;

char msg[ 512 ];

msg[sizeof(msg)-1] = '\0';

JTC_STD(ostrstream) stream(msg, sizeof(msg)-1);

stream << MSG << " == " << error << " [" << __FILE__

<< ':' << __LINE__ << ']' << JTC_STD(ends);

throw JTCSystemCallException(msg, error);

}

JTC_STD是关于标准流的宏定义

#ifndef HAVE_JTC_NO_IOSTREAM

# ifdef HAVE_STD_IOSTREAM

# define JTC_STD(x) std::x

# else

# define JTC_STD(x) x

# endif

#endif // !HAVE_JTC_NO_IOSTREAM

函数返回值的宏定义

#if defined(WIN32)

# define ERRNO GetLastError()

# define DECLARE_ERRNO

# define ASSIGN_ERRNO

#else

# define ERRNO _jtc_syscallError

# define DECLARE_ERRNO int _jtc_syscallError;

# define ASSIGN_ERRNO _jtc_syscallError =

#endif

 
 
 
免责声明:本文为网络用户发布,其观点仅代表作者个人观点,与本站无关,本站仅提供信息存储服务。文中陈述内容未经本站证实,其真实性、完整性、及时性本站不作任何保证或承诺,请读者仅作参考,并请自行核实相关内容。
2023年上半年GDP全球前十五强
 百态   2023-10-24
美众议院议长启动对拜登的弹劾调查
 百态   2023-09-13
上海、济南、武汉等多地出现不明坠落物
 探索   2023-09-06
印度或要将国名改为“巴拉特”
 百态   2023-09-06
男子为女友送行,买票不登机被捕
 百态   2023-08-20
手机地震预警功能怎么开?
 干货   2023-08-06
女子4年卖2套房花700多万做美容:不但没变美脸,面部还出现变形
 百态   2023-08-04
住户一楼被水淹 还冲来8头猪
 百态   2023-07-31
女子体内爬出大量瓜子状活虫
 百态   2023-07-25
地球连续35年收到神秘规律性信号,网友:不要回答!
 探索   2023-07-21
全球镓价格本周大涨27%
 探索   2023-07-09
钱都流向了那些不缺钱的人,苦都留给了能吃苦的人
 探索   2023-07-02
倩女手游刀客魅者强控制(强混乱强眩晕强睡眠)和对应控制抗性的关系
 百态   2020-08-20
美国5月9日最新疫情:美国确诊人数突破131万
 百态   2020-05-09
荷兰政府宣布将集体辞职
 干货   2020-04-30
倩女幽魂手游师徒任务情义春秋猜成语答案逍遥观:鹏程万里
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案神机营:射石饮羽
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案昆仑山:拔刀相助
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案天工阁:鬼斧神工
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案丝路古道:单枪匹马
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:与虎谋皮
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:李代桃僵
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:指鹿为马
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案金陵:小鸟依人
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案金陵:千金买邻
 干货   2019-11-12
 
推荐阅读
 
 
 
>>返回首頁<<
 
靜靜地坐在廢墟上,四周的荒凉一望無際,忽然覺得,淒涼也很美
© 2005- 王朝網路 版權所有