1.导论:thread 是什么?为什么要用 thread?
Thread 通常被称做轻量级的行程(Lightweight process;LWP),这个说法似乎过于简单了一些,但却不失为一个好的起点; thread 是 UNIX process 的近亲,但却不完全相同。为了说明何谓 thread ,我们必须先了解 UNIX process 与 Mach task、 thread 间的关系。在 UNIX 中,一个 process 包括了一个执行中的程序,和一些他所需的系统资源,诸如档案描述表和地址空间等。但是在 Mach 中,一个 task 却只包括了一些系统资源; 而由thread 掌握了所有的执行活动。一个 Mach task 可能有任意多个 thread , 而Mach 系统中所有的 thread 均属于一些 task。属于同一个task 的所有 thread 共享该 task 所拥有的系统资源。因此, thread 实质上就是一个程序计数器、一个堆栈再加上一组缓存器。 UNIX 的一个 process 可以看成是一个只有一个 thread 的 Mach task。 跟UNIX process 比起来, thread 是非常娇小玲珑的,因此对 CPU 而言,产生一个 thread 是一件相对便宜的工作。另一方面,共享系统资源的 thread 跟独占系统资源的 process 比起来,thread 是也是相当节省内存的。 Mach thread 让程序设计师们能很方便的做出执行于单一或多重处理器环境下同时执行的程序。不需要考虑处理器多寡的问题,而直接得到多重处理的效能(如果有多的处理器的话)。此外即使在单一 CPU 的环境下, 如果程序是属于常常『休息』的那种,如 file 及 socket I/O,thread 还是能提供效能上的增进。 以下将介绍一些简单的 POSIX thread ,和他在 DEC OSF/1 OS, V3.0.上的版本(译注:我是在 solaris 2.5.1 /和 SunOS 4.1.4上测试的啦!差不多。),POSIX thread 简称为pthread,他和 non-POSIX 的 cthread非常相近。 2.HelloWorld
废话少说,现在就开始吧! pthread_create 函数建立一个 thread 。他需要四个
参数: thread 变量、 thread 特性、一个描述 thread 行为的函数和这个函数所需的
参数。举例如下: pthread_t a_thread;
pthread_attr_t a_thread_attribute;
void thread_function(void *argument);
char *some_argument;
pthread_create( &a_thread, a_thread_attribute, (void *)&thread_function,
(void *) &some_argument); thread attribute 描述 thread 的一些特性,目前我们只需要用他来指定 thread至少需要多大的堆栈。在未来会有许多有趣的 thread attribute ,但就目前而言,大部分的程序只需简单的指定 pthread_attr_default 就可以了。不像 process ,需要使用 fork() system call 让 child process 和他的 parents 同时开始执行, thread从 pthread_create 中指定的 thread_function 开始执行。理由非常简单:如果thread 不从一个另外的地方开始执行,将会造成一堆thread 使用相同的系统资源执行相同的指令。记得吗? thread 是『共享』系统资源的。(译注:在这里停下来,回忆一下 process 是怎么产生的... ^_^) 在知道如何产生一个 thread 后,就可以开始我们的第一个 thread 程序了!来设计一个 multi-thread 版的 printf("Hello world\n"); 吧!一开始,我们需要两个thread 变量,和一个 thread function ,另外,还要能告诉不同的 thread 印出不同的讯息。我想要让不同的 thread 印出 "hello world" ,不同的两个部分 "hello" 和"world"。程序看起来像这样: void print_message_function( void *ptr );
main()
{
pthread_t thread1, thread2;
char *message1 = "Hello";
char *message2 = "World";
pthread_create( &thread1, pthread_attr_default,
(void*)&print_message_function, (void*) message1);
pthread_create(&thread2, pthread_attr_default,
(void*)&print_message_function, (void*) message2);
exit(0);
}
void print_message_function( void *ptr )
{
char *message;
message = (char *) ptr;
printf("%s ", message);
}
注意 pthread_create 的参数 print_message_function 和他的参数 message1、message2,这支程序用 pthread_create 产生第一个 thread ,并以 "Hello" 作为其起始参数;接下来产生第二个 thread ,指定其起始参数为 "World" 。第一个 thread开始激活的时候,从 print_message_function 开始执行,其传入参数为 "Helllo"。他将 "Hello" 印出来,然后结束。第二个 thread 则做差不多的事情:印出 "World"。看起来很合理,但是这个程序有两个主要的缺陷。 第一个缺点,由于两个 thread 是同时进行的,所以我们无法保证第一个thread会先执行到 printf 那一行,所以在屏幕上可能会看到 "Hello World" ,也有可能会看到"World Hello"。另外,在 main(parent thread)里的 exit 呼叫将结束整个process ,这将导致所有的 thread 一起结束。所以说,如果 exit 在 printf 前被执行的话,将不会有任何的输出产生。事实上,在任何一个 thread (不论 parent or child)里呼叫 exit 都将导致 process 结束,而所有的 thread 也跟着一起结束了。所以如果要结束一个 thread ,我们必须使用 pthread_exit 这个函数。 在我们小程序里有两个竞争条件(race condition),一、看看是 parent process先执行到 exit 呢?还是 child process 先执行到 printf ?二、还有两个child thread 到底是谁会先印出讯息呢?为了让程序按照我们希望的顺序运作,我们尝试强迫每个 thread 间相互的等待,下面这个程序加入了两个 sleep 达成这个目的。 void print_message_function( void *ptr );
main()
{
pthread_t thread1, thread2;
char *message1 = "Hello";
char *message2 = "World";
pthread_create( &thread1, pthread_attr_default,
(void *) &print_message_function, (void *) message1);
sleep(10); //休息一下,等"Hello"印出来再产生下一个 thread pthread_create(&thread2, pthread_attr_default,
(void *) &print_message_function, (void *) message2);
sleep(10); //休息一下,等"World"印出来再结束。 exit(0);
}
void print_message_function( void *ptr )
{
char *message;
message = (char *) ptr;
printf("%s", message);
pthread_exit(0);
} 这个程序达成我们的目的了吗?不完全是,原因在于使用 timming delay 来达成thread 间的同步是错误的,因为 thread 间的紧密耦合(tightly coupled)特性很容易让我们使用一些不精确的方法来达成其间的同步处理;然而我们却不该这么做。在这个程序中我们遇到的竞争条件和分布式应用程序中,资源共享的情况完全相同。共享的资源为标准输出,而分散计算成原则为程序中的三个 thread。第一个thread必须在第二个thread 前使用 printf/stdout,而两者皆必须在 parent thread 呼叫exit 前完成他们的工作。 除了使用 delay 来达成同步的效果外,另一个错误发生在 sleep 系统呼叫;如同exit 对 process 的影响一样,当 thread 呼叫 sleep 时,讲导致整个 process 停下来。这表示所有属于这个 process 的 thread 也将跟着停顿下来。因此在上面这个程式中,呼叫 sleep 除了平白让程序慢了20秒,并不会有什么额外影响。另外一个适用的函数是 pthread_delay_np (np 表示 not process)。举例来说,要让thread 停顿两秒钟,可以用下列程序: struct timespec delay;
delay.tv_sec = 2;
delay.tv_nsec = 0;
pthread_delay_np( &delay ); 本节提到的函数有:pthread_create(),
pthread_exit(),
pthread_delay_np(). 3.Thread同步问题 POSIX 提供了两组用来使 thread 同步的基本指令: mutex 和 condition variable。mutex 指的是一组用来控制共享资源存取的一组函数。注意,在使用thread的情况下,因为整个地址空间都是共享的,所以所有的东西都可以视为共享资源。在一般情况下, thread 使用一些在pthreadcreate 之前定义或在其所呼叫的函数中定义的变量来完成其工作,并将他的成果经由整体变量合并。对这些大家都可以存取的变量,我们必须加以控制。 以下是一个 reader/writer 程序,程序中有一个reader,一个writer,他们共享
一个 buffer,且使用 mutex 来控制这个 buffer 的存取。 void reader_function(void);
void writer_function(void);
char buffer;
int buffer_has_item = 0;
pthread_mutex_t mutex;
struct timespec delay;
main()
{
pthread_t reader;
delay.tv_sec = 2;
delay.tv_nsec = 0;
pthread_mutex_init(&mutex, pthread_mutexattr_default);
pthread_create( &reader, pthread_attr_default, (void*)&reader_function,
NULL);
writer_function();
}
void writer_function(void)
{
while(1)
{
pthread_mutex_lock( &mutex );
if ( buffer_has_item == 0 )
{
buffer = make_new_item();
buffer_has_item = 1;
}
pthread_mutex_unlock( &mutex );
pthread_delay_np( &delay );
}
}
void reader_function(void)
{
while(1)
{
pthread_mutex_lock( &mutex );
if ( buffer_has_item == 1)
{
consume_item( buffer );
buffer_has_item = 0;
}
pthread_mutex_unlock( &mutex );
pthread_delay_np( &delay );
}
} 在这个简单的程序中,我们假设 buffer 的容量只有 1,因此这个 buffer 有两个可能的状态:『有一笔资料』或『没有资料』。 writer 首先将 mutex 锁定,如果 mutex 已经被锁定,则暂停,直到 mutex 被解锁。然后看看 buffer 是否是空的,若buffer 处于『没有资料』的状态,writer 产生一笔新的资料,将其放入 buffer中。然后将旗标buffer_has_item 设为 1,让 reader 可藉此旗标得知 buffer 内有一笔资料。最后writer 将 mutex 解锁,并休息 2 秒钟,让 reader 可藉此一空档取出 buffer 内的资料。这里使用的 delay跟之前的 delay 有截然不同的意义,如果不加上这个 delay 的话,writer 在 unlock mutex 后的下一个指令就是为了产生另一笔新的资料,再度 lock mutex。这将造成 reader 没有机会读取 buffer 中的资料。因此在此处加上一个 delay 看起来是个不错的主意。 reader 看起来和 writer 差不多,它首先 lock mutex,然后看看buffer 中是否有资料,若有资料则将其取出,然后将 mutex 解锁,接着 delay 2 秒,让 writer 有机会放入新的资料。在这个例子中,writer 和 reader 就这样一直的 run 下去,不断的产生/移除 buffer 中的资料。在其它的情况下,我们可能不再需要使用 mutex 了,此时可以使用 pthread_mutex_destroy(&mutex); 来释放 mutex。 在初始 mutex 的时候,我们使用了 pthread_mutexattr_default 来当作 mutex 特性。在 OSF/1 中,mutex 特性没啥用处,所以这样设就够了。 mutex 一般用在解决 race condition 问题,但是 mutex 并不是一个很强的机制,因为他只有两个状态:locked 和 unlocked。POSIX 定义的条件变量(condition variable)将 mutex 的功能加以延伸,能够做到让某一个 thread 能暂停,并等待另一个 thread 的信号(signal)。当信号来了,thread 就醒过来,然后将相关的mutex lock 起来。这样的作法可以解决 reader/writer 程序中的 spin-lock 问题。附录 A 中有一个使用 mutex 和 condition variable 做成的一个简单的 integer semaphores。有关 condition variable 的详细用法可以参考 man page。 本节提到的函数有:pthread_mutex_init(),
pthread_mutex_lock(),
pthread_mutex_unlock(),
pthread_mutex_destroy().
4. 使用 Semaphores 达成协调工作 (本节中用的Semapore 函数怪怪的,一般我不是这样用。看起来如果要这样写,必须用附录中的 library。) 接下来我们想要用 semaphore 来重写上节之 reader/writer 程序。用更强悍的整数 semaphore 来取代 mutex ,并解决 spin-lock 问题。与 Semaphore 相关的运算有 semaphore_up,semaphore_down,semaphore_init, semaphore_destroy, 和semaphore_decrement. 其中 semaphore_up 和 semaphore_down 和传统的 semaphore语法相同 -- down 运算将在 semaphore 之值小于或等于零时暂停。而 up 运算则递增 semaphore。 在使用 semaphore 前必须呼叫 init 函数,而所有 semaphore 的初始值均为 1。当 semaphore 不再被使用时, destroy 函数可以释放它。上述所有函数都只需要一个参数:一个指向 semaphore 对象的指针。 Semaphore_decrement 是一个 non-blocking function 他可以将 semaphore 递减到一个负值,这个作法有什么用处呢?一般用于在初始一个 semaphore 时设定它的初始值。稍后我们会举出一个例子。接下来首先看 semaphore 版本的 reader/writer程序。 void reader_function(void);
void writer_function(void);
char buffer;
Semaphore writers_turn;
Semaphore readers_turn;
main()
{
pthread_t reader;
semaphore_init( &readers_turn );
semaphore_init( &writers_turn );
/* writer must go first */
semaphore_down( &readers_turn );
pthread_create( &reader, pthread_attr_default,
(void *)&reader_function, NULL);
writer_function();
}
void writer_function(void)
{
while(1)
{
semaphore_down( &writers_turn );
buffer = make_new_item();
semaphore_up( &readers_turn );
}
}
void reader_function(void)
{
while(1)
{
semaphore_down( &readers_turn );
consume_item( buffer );
semaphore_up( &writers_turn );
}
} 上面这个例子尚未完前展现 integer semaphore 的威力。接下来我们将修改第二节中的 Hello World 程序,并使用 semaphore 来修正其 race conditions 问题。 void print_message_function( void *ptr ); Semaphore child_counter;
Semaphore worlds_turn; main()
{
pthread_t thread1, thread2;
char *message1 = "Hello";
char *message2 = "World";
semaphore_init( &child_counter );
semaphore_init( &worlds_turn ); semaphore_down( &worlds_turn ); /* world goes second */
semaphore_decrement( &child_counter ); /* value now 0 */
semaphore_decrement( &child_counter ); /* value now -1 */
/*
* child_counter now must be up-ed 2 times for a thread blocked on it
* to be released
*
*/
pthread_create( &thread1, pthread_attr_default,
(void *) &print_message_function, (void *) message1);
semaphore_down( &worlds_turn );
pthread_create(&thread2, pthread_attr_default,
(void *) &print_message_function, (void *) message2);
semaphore_down( &child_counter );
/* not really necessary to destroy since we are exiting anyway */
semaphore_destroy ( &child_counter );
semaphore_destroy ( &worlds_turn );
exit(0);
} void print_message_function( void *ptr )
{
char *message;
message = (char *) ptr;
printf("%s ", message);
fflush(stdout);
semaphore_up( &worlds_turn );
semaphore_up( &child_counter );
pthread_exit(0);
}
很容易可以看出,上面这个程序并没有race condition 问题,而且也会依照正确的顺序印出结果。其中 semaphore child_counter 的目的在于让 parent thread 暂停,直到所有的 children 执行 printf 和紧随其后的 semaphore_up(&child_counter)。
本节提到的函数有:semaphore_init(), semaphore_up(),
semaphore_down(), semaphore_destroy(),
semaphore_decrement(). 5.使用实务 Compile 使用 pthread 的程序,必须 include 相关的header file(译注:一般
是 pthread.h)并且连结 pthread library: cc hello_world.c -o hello_world -lpthreads
(在 Alpha 上你还要加上 -lc_r)
(译注:在 solaris 上用 -lthread 或 -lpthread 都可以。) 如果要使用 semaphore 则还必须使用相关的 header file 和 library。 DEC 的 pthread 是根据 POSIX IV 的 thread 标准而非 POSIX VIII 发展出来的。函数 pthread_join 允许一个 thread 等待另一指定的 thread 到该 thread 结束。因此在 Hello World 程序中,可以用来判断 children thread 是否结束。但是在 DEC上,这个函数不太可靠,在下列程序段中,如果指定的 some_thread 不存在,他将会造成错误,而不是直接 return。 pthread_t some_thread;
void *exit_status;
pthread_join( some_thread, &exit_status ); 另外一些奇怪的错误可能发生在 thread 函数之外的地方,但是却肇因于此。在我们的例子中,并不太去检查 thread 函数是否正确执行,然而这却是必要的。几乎所有的 pthread 函数都在发生错误时 return -1。举例如下: pthread_t some_thread;
if ( pthread_create( &some_thread, ... ) == -1 )
{
perror("Thread creation error");
exit(1);
} semaphore library 在发生错误的时候会印出一些讯息然后离开。
文中没有举出来,但是蛮有用的一些函数如下。 pthread_yield(); 通知 scheduler thread 想要出让他的执行权力,
不需要参数。 pthread_t me;
me = pthread_self(); 让 thread 取得他自己的 identifier。 pthread_t thread;
pthread_detach(thread); 通知 library 在后面的pthread_join 呼叫里,不需
exit status,可增进 thread 的效率。 Appendix A - Semaphore Library Code
==============================================================================
Semaphore.h follows
============================================================================== /*****************************************************************************
* Written by
* Tom Wagner (wagner@cs.umass.edu)
* at the Distributed Problem Solving Lab
* Department of Computer Science, University of Massachusetts,
* Amherst, MA 01003
*
* Copyright (c) 1995 UMASS CS Dept. All rights are reserved.
*
* Development of this code was partially supported by:
* ONR grant N00014-92-J-1450
* NSF contract CDA-8922572
*
* ---------------------------------------------------------------------------
*
* This code is free software; you can redistribute it and/or modify it.
* However, this header must remain intact and unchanged. Additional
* information may be appended after this header. Publications based on
* this code must also include an appropriate reference.
*
* This code is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE.
*
\****************************************************************************/ #ifndef SEMAPHORES
#define SEMAPHORES #include
#include
typedef struct Semaphore
{
int v;
pthread_mutex_t mutex;
pthread_cond_t cond;
}
Semaphore;
int semaphore_down (Semaphore * s);
int semaphore_decrement (Semaphore * s);
int semaphore_up (Semaphore * s);
void semaphore_destroy (Semaphore * s);
void semaphore_init (Semaphore * s);
int semaphore_value (Semaphore * s);
int tw_pthread_cond_signal (pthread_cond_t * c);
int tw_pthread_cond_wait (pthread_cond_t * c, pthread_mutex_t * m);
int tw_pthread_mutex_unlock (pthread_mutex_t * m);
int tw_pthread_mutex_lock (pthread_mutex_t * m);
void do_error (char *msg); #endif
==============================================================================
Semaphore.c follows
==============================================================================
/*****************************************************************************
* Written by
* Tom Wagner (wagner@cs.umass.edu)
* at the Distributed Problem Solving Lab
* Department of Computer Science, University of Massachusetts,
* Amherst, MA 01003
*
* Copyright (c) 1995 UMASS CS Dept. All rights are reserved.
*
* Development of this code was partially supported by:
* ONR grant N00014-92-J-1450
* NSF contract CDA-8922572
*
* ---------------------------------------------------------------------------
*
* This code is free software; you can redistribute it and/or modify it.
* However, this header must remain intact and unchanged. Additional
* information may be appended after this header. Publications based on
* this code must also include an appropriate reference.
*
* This code is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE.
*
\****************************************************************************/ #include "semaphore.h"
/*
* function must be called prior to semaphore use.
*
*/
void
semaphore_init (Semaphore * s)
{
s->v = 1;
if (pthread_mutex_init (&(s->mutex), pthread_mutexattr_default) == -1)
do_error ("Error setting up semaphore mutex"); if (pthread_cond_init (&(s->cond), pthread_condattr_default) == -1)
do_error ("Error setting up semaphore condition signal");
} /*
* function should be called when there is no longer a need for
* the semaphore.
*
*/
void
semaphore_destroy (Semaphore * s)
{
if (pthread_mutex_destroy (&(s->mutex)) == -1)
do_error ("Error destroying semaphore mutex"); if (pthread_cond_destroy (&(s->cond)) == -1)
do_error ("Error destroying semaphore condition signal");
} /*
* function increments the semaphore and signals any threads that
* are blocked waiting a change in the semaphore.
*
*/
int
semaphore_up (Semaphore * s)
{
int value_after_op; tw_pthread_mutex_lock (&(s->mutex)); (s->v)++;
value_after_op = s->v; tw_pthread_mutex_unlock (&(s->mutex));
tw_pthread_cond_signal (&(s->cond)); return (value_after_op);
} /*
* function decrements the semaphore and blocks if the semaphore is
* <= 0 until another thread signals a change.
*
*/
int
semaphore_down (Semaphore * s)
{
int value_after_op; tw_pthread_mutex_lock (&(s->mutex));
while (s->v <= 0)
{
tw_pthread_cond_wait (&(s->cond), &(s->mutex));
} (s->v)--;
value_after_op = s->v; tw_pthread_mutex_unlock (&(s->mutex)); return (value_after_op);
} /*
* function does NOT block but simply decrements the semaphore.
* should not be used instead of down -- only for programs where
* multiple threads must up on a semaphore before another thread
* can go down, i.e., allows programmer to set the semaphore to
* a negative value prior to using it for synchronization.
*
*/
int
semaphore_decrement (Semaphore * s)
{
int value_after_op; tw_pthread_mutex_lock (&(s->mutex));
s->v--;
value_after_op = s->v;
tw_pthread_mutex_unlock (&(s->mutex)); return (value_after_op);
} /*
* function returns the value of the semaphore at the time the
* critical section is accessed. obviously the value is not guarenteed
* after the function unlocks the critical section. provided only
* for casual debugging, a better approach is for the programmar to
* protect one semaphore with another and then check its value.
* an alternative is to simply record the value returned by semaphore_up
* or semaphore_down.
*
*/
int
semaphore_value (Semaphore * s)
{
/* not for sync */
int value_after_op; tw_pthread_mutex_lock (&(s->mutex));
value_after_op = s->v;
tw_pthread_mutex_unlock (&(s->mutex)); return (value_after_op);
} /* -------------------------------------------------------------------- */
/* The following functions replace standard library functions in that */
/* they exit on any error returned from the system calls. Saves us */
/* from having to check each and every call above. */
/* -------------------------------------------------------------------- */
int
tw_pthread_mutex_unlock (pthread_mutex_t * m)
{
int return_value; if ((return_value = pthread_mutex_unlock (m)) == -1)
do_error ("pthread_mutex_unlock"); return (return_value);
} int
tw_pthread_mutex_lock (pthread_mutex_t * m)
{
int return_value; if ((return_value = pthread_mutex_lock (m)) == -1)
do_error ("pthread_mutex_lock"); return (return_value);
} int
tw_pthread_cond_wait (pthread_cond_t * c, pthread_mutex_t * m)
{
int return_value; if ((return_value = pthread_cond_wait (c, m)) == -1)
do_error ("pthread_cond_wait"); return (return_value);
} int
tw_pthread_cond_signal (pthread_cond_t * c)
{
int return_value; if ((return_value = pthread_cond_signal (c)) == -1)
do_error ("pthread_cond_signal"); return (return_value);
}
/*
* function just prints an error message and exits
*
*/
void
do_error (char *msg)
{
perror (msg);
exit (1);
}