在Java中,线程运行着一个可运行对象,该对象的概念在JTC C++ 中的对象即是JTCRunnable. 即一个线程运行的对象实体.线程本身是一个对象,线程中要运行的实体也是一个对象. JTCRunnable
=====================================================
主对象:
class Clock : public JTCRunnable
{
void run(); //当线程启动是调用
void start(); //启动线程
void stop(); //停止线程
};
=====================================================
其实JTCRunable很简单,只有一个虚方法,只有run才是我们要完全实现的方法.
class JTCRunnable : public virtual JTCRefCount
{
public:
virtual void run() = 0;
};
=====================================================
我们可以看到JTCRunable的继承类是个引用计数类.
class JTCRefCount
{
public:
long m_refcount;
在Win32下通过interlocked增加和减少引用.
在Unix下就需要一个Mutex来保护引用.
#ifndef WIN32
JTCMutex m_ref_mutex;
#endif
JTCRefCount()
: m_refcount(0)
{
}
virtual
~JTCRefCount()
{
}
};
=====================================================
线程将要运行的实体
run()
{
while(timer_)
try{
timer_ -> sleep(100);
}catch(const JTCInterruptedException&)
{}
}
=====================================================
函数实现线程运行的实体代码.需要一个线程的载体来运行他
start()
{
timer_ = new JTCThread(JTCRunnableHandle(this));
timer_ -> start(); //线程启动,
//具体JTCThread的函数start调用了:
posix-> pthread_create
windows -> _beginthreadex
来创建一个挂起的线程.run()函数与线程就扯上关系了.
}
=====================================================
停止线程后,将主对象中的线程句柄置为空
void stop()
{
timer_ = JTCThreadHandle();
}
=====================================================
主对象Clock调用:
Main()
{
try
{
JTCInitialize bootJTC(argc, argv);
JTCHandleT<Clock> c = new Clock();
c -> start();
JTCThread::sleep(1000*5);
c -> stop();
}
catch(const JTCException& e){
cerr << "JTCException: " << e.getMessage() << endl;
}
}
=====================================================
线程的构造函数,都是通过init成员函数初始化.
JTCThread::JTCThread(const char* name)
{ init(JTCThreadGroupHandle(0), JTCRunnableHandle(0), name); }
JTCThread::JTCThread(JTCRunnableHandle target, const char* name)
{ init(JTCThreadGroupHandle(0), target, name); }
JTCThread::JTCThread(JTCThreadGroupHandle& group, const char* name)
{ init(group, JTCRunnableHandle(0), name); }
JTCThread::JTCThread(JTCThreadGroupHandle& group,
JTCRunnableHandle target, const char* name)
{ init(group, target, name); }
=====================================================
void
JTCThread::init(
const JTCThreadGroupHandle& group,
JTCRunnableHandle target,
const char* name
)
{
m_name = 0; //初始线程名
m_target = target;//初始运行的目标主体
m_state = new_thread;//初始线程状态
m_thread_id = JTCThreadId();//初始线程id
m_detached = false; //non win32
m_thread_number = get_next_thread_number(); //自己的线程号
++m_refcount;//引用计数
m_is_adopted = false;
m_handle = 0;//win32线程的句柄
m_group = group; //线程组
}
=====================================================
重要的是JTCRunnableHandle target;
定义如下:
class JTCThread;
typedef JTCHandleT<JTCThread> JTCThreadHandle;
class JTCThreadGroup;
typedef JTCHandleT<JTCThreadGroup> JTCThreadGroupHandle;
class JTCRunnable;
typedef JTCHandleT<JTCRunnable> JTCRunnableHandle;
=====================================================
这里使用到了模板类JTCHandleT,提供赋值和比较操作
template <class T>
class JTCHandleT{
JTCHandleT(T* tg = 0);
JTCHandleT(const JTCHandleT<T>& rhs );
operator=
operator==
operator!=
operator!
operator bool
operator->()
operator*
T* m_object;
};
类通过m_object指针指向对象.
模板类封装了JTCThread,JTCThreadGroup,JTCRunable类指针操作,
如:在Clock类的start函数中,创建线程语句:
timer_ = new JTCThread(JTCRunnableHandle(this));
即传入了一个JTCRunable的类的实例指针.
JTCThread
=====================================================
class JTCThread : public virtual JTCRefCount
{
}
=====================================================
JTCThread中的run其实调用了构造传入初始化m_target对象的run方法
Void JTCThread::run()
{
if (m_target)
{
m_target -> run();
}
}
=====================================================
启动线程,即创建线程执行.
void JTCThread::start()
{
//声明
JTCSyncT<JTCMutex> guard(m_mutex);
//判断当前线程是否未新线程.
if (m_state != JTCThread::new_thread)
{
throw JTCIllegalThreadStateException("state is not new_thread");
}
分POSIX和WIN32实现
#if defined(HAVE_POSIX_THREADS)
…..
#ifdef HAVE_WIN32_THREADS
……
}
=====================================================
UNIX下的线程创建实现
#if defined(HAVE_POSIX_THREADS)
pthread_attr_t attr;
try{
JTC_SYSCALL_1(PTHREAD_ATTR_INIT, &attr, != 0)
}catch(const JTCSystemCallException& e) {
if (e.getError() == ENOMEM) {
throw JTCOutOfMemoryError(e.getMessage());
}
throw;
}
# if defined(HAVE_POSIX_THREADS) && !defined(__hpux)
// All threads set their priority explicitely. Under HPUX 11.x this
// requires root privilege so it's disabled.
pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED);
# endif
# ifdef HAVE_PTHREAD_ATTR_SETSTACKSIZE
if (lsd_initial_stack_size != 0) {
pthread_attr_setstacksize(&attr, lsd_initial_stack_size);
}
# endif
// Call the custom pthread attribute hook, if defined.
if (sm_attr_hook != 0) {
(*sm_attr_hook)(&attr);
}
pthread_t id;
try{
PTHREAD_CREATE(id, attr, lsf_thread_adapter, this)
}catch(const JTCSystemCallException& e) {
PTHREAD_ATTR_DESTROY(&attr);
if (e.getError() == EAGAIN || e.getError() == ENOMEM) {
throw JTCOutOfMemoryError(e.getMessage());
}
throw;
}
m_thread_id = JTCThreadId(id);
PTHREAD_ATTR_DESTROY(&attr);
#endif
=====================================================
WIN32下的线程创建实现
#ifdef HAVE_WIN32_THREADS
DWORD id;
try
{
JTC_SYSCALL_6(
m_handle = (HANDLE)::_beginthreadex,
NULL, lsd_initial_stack_size,
(unsigned (__stdcall*)(void*))lsf_thread_adapter, (LPVOID)this,
0, (unsigned int*)&id, == NULL)
}
catch(const JTCSystemCallException& e)
{
if (e.getError() == ERROR_NOT_ENOUGH_MEMORY ||
e.getError() == ERROR_OUTOFMEMORY)
{
throw JTCOutOfMemoryError(e.getMessage());
}
throw;
}
m_thread_id = JTCThreadId(id);
#endif
==========================================================
sleep()函数是很独立的函数,与本身线程没有多大关系
void JTCThread::sleep(long millis, int nano)
{
if (millis < 0)
{
throw JTCIllegalArgumentException
("timeout value is negative");
}
if (nano > 999999)
{
throw JTCIllegalArgumentException
("nanosecond timeout out of range");
}
//m_state = JTCThread::not_runnable;
#if defined(HAVE_PTHREAD_DELAY_NP)
struct timespec tv;
tv.tv_sec = millis/1000;
tv.tv_nsec = (millis% 1000)*1000000 + nano;
JTC_SYSCALL_1(pthread_delay_np, &tv, != 0);
#elif defined(HAVE_POSIX_THREADS)
struct timespec tv;
tv.tv_sec = millis/1000;
tv.tv_nsec = (millis% 1000)*1000000 + nano;
if (nanosleep(&tv, 0) < 0 && errno == EINTR)
{
throw JTCInterruptedException();
}
#endif
#ifdef HAVE_WIN32_THREADS
Sleep(millis); //调用Windows API
#endif
//m_state = JTCThread::runnable;
}
==========================================================
JTCMonitor从JTCMonitorBase继承,成员比较简单
class JTCMonitor : public JTCMonitorBase
{
//对外裸露了三个公有函数
wait();
notify()
notifyAll();
//内部有三个重要的私有函数
lock()
unloack();
notify_internal()
//另外我们注意到连两个友元
friend class JTCSynchronized;
friend class JTCSyncT< JTCMonitor >;
//最后一个是互斥成员
JTCRecursiveMutex m_mon_mutex;
}
==========================================================
JTCMonitorBase的成员也比较简单
class JTCMonitorBase
{
//检查Mornitor是否被锁
validate_owner(const JTCThreadId, const char* caller)
//Mornitor的条件变量
JTCCond m_mon_cond;
//记录有多少挂起的通知
int m_nnotify
}
==========================================================
JTCMonitor 和JTCSynchronized究竟如何关连起来?
JTCSynchronized的定义也简单,主要是定义了几个成员变量.竟然定义了三个互斥变量.
class JTCSynchronized
{
JTCMonitor* m_monitor
JTCMutex* m_mutex
JTCRecursiveMutex* m_rmutex
JTCRWMutex* m_rwmutex
}
m_monitor指向要监视器实体;
m_mutex指向关连的互斥实体
m_rmutex指向关连recursive的互斥实体
m_rwmutex指向读写互斥
类构造函数
JTCSynchronized(const JTCMonitor& mon);
m_monitor(&mon),
m_lock_type(monitor)
{
//事实上同步类利用了构造进来的监视器来加锁
m_monitor -> lock();
}
-------------------------------------------
其他构造也类似,利用构造进来的实体来加锁
JTCSynchronized::JTCSynchronized(const JTCMutex& m)
: m_mutex(&m),
m_lock_type(mutex)
{
m_mutex -> lock();
}
--------------------------------------------
JTCSynchronized::JTCSynchronized(
const JTCRecursiveMutex& m
)
: m_rmutex(&m),
m_lock_type(recursive_mutex)
{
m_rmutex -> lock();
}
-----------------------------------------------
JTCSynchronized::JTCSynchronized(
const JTCRWMutex& m,
ReadWriteLockType type
)
: m_rwmutex(&m),
m_lock_type(read_write_mutex)
{
if (type == read_lock)
m_rwmutex -> read_lock();
else
m_rwmutex -> write_lock();
}
==========================================================
举个应用例子:
例子TestThread继承了JTCMornitor,在线程实体run()函数,中使用同步sync,将TestThread的实例
this作构造参数传入.这样线程同步就起作用了.
class TestThread : public JTCThread, public JTCMonitor
{
run()
{
JTCSynchronized sync(*this);
//这里其实构造中调用了继承过来的JTCMonitor lock()函数,锁是保存在
//JTCMonitor中的定义的JTCRecursiveMutex 成员变量m_mon_mutex..
//下一步是到看看互斥定义的时候了.
//do your code here
……
}
}
==========================================================
先回过头来看lock()函数的实现
void JTCMonitor::lock() const
{
//JTCMonitor本身的互斥变量来枷锁
if (m_mon_mutex.lock()){
// 第一次置m_nnotify为0
JTCMonitor* This = (JTCMonitor*)this;
This -> m_nnotify = 0;
}
}
==========================================================
class JTCRecursiveMutex
{
//对外的锁函数
lock();
unlock();
trylock();
get_owner();
count();
//内部实现的锁函数
lock_internal();
trylock_internal();
unlock_internal();
reset_for_condvar();
//到了与平台相关的定义了
#if defined(HAVE_POSIX_THREADS)
pthread_mutex_t m_lock; // Pthreads mutex.
#endif
#if defined(HAVE_WIN32_THREADS)
CRITICAL_SECTION m_lock; // WIN32 critical section.
#endif
//成员定义
JTCMutex m_internal; //内部锁
unsigned int m_count; //加锁的次数
JTCThreadId m_owner; //加锁线程ID
friend class JTCCondHelper;
friend class JTCCond;
}
==========================================================
在lock()调用地时候,JTCRecursiveMutex类通过JTCMutex联系起来.
看看调用lock()函数,发生了什么
bool
JTCRecursiveMutex::lock() const
{
//
// Work around lack of mutable.
//
return ((JTCRecursiveMutex*)this) -> lock_internal(1);
}
==========================================================
lock_internal()函数地实现
bool JTCRecursiveMutex::lock_internal(int count)
{
bool rc = false;
bool obtained = false;
while (!obtained)
{
---------------------------------------------------------------------------
m_internal.lock();
if (m_count == 0)
{
m_count = count;
m_owner = JTCThreadId::self();
obtained = true;
rc = true;
try
{
#if defined(HAVE_POSIX_THREADS)
JTC_SYSCALL_1(pthread_mutex_lock, &m_lock, != 0)
#endif
#if defined(HAVE_WIN32_THREADS)
EnterCriticalSection(&m_lock);
#endif
}
catch(...)
{
try{
m_internal.unlock();
}catch(...)
{
}
throw;
}
}
else if (m_owner == JTCThreadId::self())
{
m_count += count;
obtained = true;
}
m_internal.unlock();
---------------------------------------------------------------------------
if (!obtained)
{
#if defined(HAVE_POSIX_THREADS)
JTC_SYSCALL_1(pthread_mutex_lock, &m_lock, != 0)
pthread_mutex_unlock(&m_lock);
#endif
#if defined(HAVE_WIN32_THREADS)
EnterCriticalSection(&m_lock);
LeaveCriticalSection(&m_lock);
#endif
}
}
return rc;
}
==========================================================
函数内部使用了JTCMutex作为内部的锁,是看JTCMutex类的时候了.
class JTCMutex
{
--------------------------------------------------------------------
lock();
unlock();
trylock();
get_owner();
count();
--------------------------------------------------------------------
//平台相关成员定义
#if defined(HAVE_POSIX_THREADS)
pthread_mutex_t m_lock; // Pthread mutex.
#endif
#if defined(HAVE_WIN32_THREADS)
CRITICAL_SECTION m_lock; // WIN32 critical section.
#endif
--------------------------------------------------------------------
友元定义
friend class JTCCond;
}
==========================================================
lock()函数的UNIX实现
{
#if defined(HAVE_POSIX_THREADS)
#if defined(__GNUC__) && defined(__OPTIMIZE__)
//
// The optimizer for GCC 2.95.1 is broken. The following
// three lines of code "fix" the problem.
//
volatile int i = 1;
if (i == 0)
++i;
#endif
pthread_mutex_t* lock = &((JTCMutex*)this) -> m_lock;
JTC_SYSCALL_1(pthread_mutex_lock, lock, != 0)
#endif
}
lock()函数的WIN32实现
{
#if defined(HAVE_WIN32_THREADS)
CRITICAL_SECTION* crit = &((JTCMutex*)this) -> m_lock;
EnterCriticalSection(crit);
#endif
}
==========================================================
回到以前的JTCMornitorBase的类定义中,有一个是条件变量成员
JTCCond m_mon_cond;,在JTCMornitor的类的wait()函数使用到他
Void JTCMonitor::wait()
{
validate_owner(m_mon_mutex.get_owner(), "wait()");
notify_internal(m_nnotify); // int m_nnotify挂起的通知数
try
{
// JTCRecursiveMutex m_mon_mutex;等待监视器的互斥变量
m_mon_cond.wait(m_mon_mutex);
}
catch(...)
{
m_nnotify = 0;
throw;
}
m_nnotify = 0;
}
==========================================================
条件变量类,他通过构造函数传入的等待的条件对象,实现对对象的改变的通知.
例如:多个线程等待某个线程的互斥条件有没有释放,某个时刻释放后通知各个线程
class JTCCond
{
wait(JTCRecursiveMutex& mutex);
wait(JTCMutex& mutex);
signal();
broadcast();
wait_internal( JTCRecursiveMutex&,long timeout);
calc_timeout(long timeout);//posix
//平台相关成员变量定义
#if defined(HAVE_POSIX_THREADS)
pthread_cond_t m_cond; // Pthread condition variable.
#endif
#if defined(HAVE_WIN32_THREADS)
CondImpl* m_impl;
#endif
}
wait()函数简单调用了内部wait_internal()函数,等待入参的互斥条件
JTCCond::wait(JTCRecursiveMutex& mutex)
{
wait_internal(mutex, -1);
}
先看win32的实现
#if defined(HAVE_WIN32_THREADS)
wait_internal(JTCRecursiveMutex& mutex,)
{
m_impl -> pre_wait();
unsigned int count = mutex.reset_for_condvar();
try
{
//调用内部条件类的的wait函数,内部利用信号量来实现
bool rc = m_impl -> wait(timeout);
mutex.lock(count);
return rc;
}
catch(...)
{
mutex.lock(count);
throw;
}
#endif
}
主要焦点在成员变量的函数m_impl->wait()上
CondImpl* m_impl
bool CondImpl::wait(long timeout)
{
// Wait for the queue semaphore to be signaled.
try{
bool rc = m_queue.wait(timeout);
postwait(!rc);
return rc;
}catch(...)
{
postwait(false);
throw;
}
}
发现有一个队列成员变量和发送等待的postwait()函数.先看一下CondImpl的定义
==========================================================
class CondImpl
{
signal();
pre_wait();
wait();
postwait();
//关键的成员定义
JTCSemaphore m_gate;
JTCSemaphore m_queue;//可以说是信号队列
JTCMutex m_internal; //本身内部用来加锁
long m_blocked;
long m_unblocked;
long m_to_unblock;
}
==========================================================
成员m_queue虽然叫队列,但他是通过信号量来实现的.
class JTCSemaphore
{
JTCSemaphore(long initial_count = 0);
~JTCSemaphore();
//wait和post实现了信号量等待和投递
bool wait(long timeout = -1);
void post(int count = 1);
#if defined(HAVE_WIN32_THREADS)
HANDLE m_sem; // The semaphore handle
#endif
};
构造函数调用了CreateSemaphore();函数,创建的时候信号量最大为0x7fffffff,
开始的信号量为构造参数初始化.
JTCSemaphore::JTCSemaphore(long initial_count)
{
#if defined(HAVE_WIN32_THREADS)
JTC_SYSCALL_4(m_sem = CreateSemaphore, 0, initial_count, 0x7fffffff, 0,
== INVALID_HANDLE_VALUE)
#endif
}
当信号量为大于0时,友信号,等于0时无信号.
win32的信号量实现调用了WaitForSingleObject和ReleaseSemaphore函数.
当信号量为0,Wait()一直在等待,如果信号量大于0时函数返回,信号量减一,
而post()是将信号量加一.
bool JTCSemaphore::wait(long timeout)
{
#if defined(HAVE_WIN32_THREADS)
if (timeout < 0) {
timeout = INFINITE;
}
int rc;
JTC_SYSCALL_2(rc = WaitForSingleObject, m_sem, timeout, == WAIT_ABANDONED);
return rc != WAIT_TIMEOUT;
#endif
}
void JTCSemaphore::post(int count)
{
#if defined(HAVE_WIN32_THREADS)
JTC_SYSCALL_3(ReleaseSemaphore, m_sem, count, 0, == 0)
#endif
}
==========================================================
psotwait()的实现
void CondImpl::postwait(bool timeout)
{
m_internal.lock();
m_unblocked++;
if (m_to_unblock != 0)
{
bool last = (--m_to_unblock == 0);
m_internal.unlock();
if (timeout) {
m_queue.wait();
}
if (last) {
m_gate.post();
}else{
m_queue.post();
}
}else{
m_internal.unlock();
}
}
关于SYSCALL
提供对重要函数调用出现失败时异常错误信息和定位发生异常的代码行和文件名输出和继续抛出异常的宏包装。FN是函数名,其他是参数,最后是函数返回值。这种做法值得学习。
#define JTC_SYSCALL_6(FN,A1,A2,A3,A4,A5,A6,COND)
do {
DECLARE_ERRNO
if ((ASSIGN_ERRNO (FN (A1,A2,A3,A4,A5,A6))) COND)
JTC_THROW_EXCEPTION( ERRNO, #FN << JTC_FMT_ARG_6(A1,A2,A3,A4,A5,A6) )
} while (0);
# define JTC_THROW_EXCEPTION(CODE,MSG)
{
long error = CODE;
char msg[ 512 ];
msg[sizeof(msg)-1] = '\0';
JTC_STD(ostrstream) stream(msg, sizeof(msg)-1);
stream << MSG << " == " << error << " [" << __FILE__
<< ':' << __LINE__ << ']' << JTC_STD(ends);
throw JTCSystemCallException(msg, error);
}
JTC_STD是关于标准流的宏定义
#ifndef HAVE_JTC_NO_IOSTREAM
# ifdef HAVE_STD_IOSTREAM
# define JTC_STD(x) std::x
# else
# define JTC_STD(x) x
# endif
#endif // !HAVE_JTC_NO_IOSTREAM
函数返回值的宏定义
#if defined(WIN32)
# define ERRNO GetLastError()
# define DECLARE_ERRNO
# define ASSIGN_ERRNO
#else
# define ERRNO _jtc_syscallError
# define DECLARE_ERRNO int _jtc_syscallError;
# define ASSIGN_ERRNO _jtc_syscallError =
#endif