这是我的一个实验的报告,用互斥锁和条件变量来实现semaphore(叫信号量吗?)。
实验要求和报告都是英文写的,学的课是 realtime system and programming,这一部分是关于同步的。
实验要求;
Lab 4. Multi-Threading
Introduction
In the previous lab experiments, you have learnt and practiced multitasking programming based on single-thread multiple processes model. In this lab experiment, you will explore multipe-thread single-process model. POSIX thread is used in the lab experiment.
Threads differ from the process because threads in a process share the address space. You should know how to create threads, how data are passed to thread functions, how return values are received, how to synchronize the threads to access shared resources. Read lecture notes and Chapter 4 in "Advanced Linux Programming" for discussion on threads.
An implementation using POSIX semaphores are discussed in the lectures and the code is available.
Objectives
The following are the primary objectives of this lab experiment:
to expreciate multithread program model and understand the differences between threads and processes
to be able to create threads, to pass values to thread functions, to get return values fo thread functions
to understand mutex and be able to use mutex for mutual exclusion control
to understand condition variables and be able to use condition variables for synchronization
to understand bounded buffer producer-consumer problem
Task
Write a program that compute the following sum:
sum = 12 + 22 + 32 + ... + n2
The program creates two threads: producer and consumer. Producer computes i2 or i * i, and puts it into the bounded buffer, for i = 1, 2, 3, ..., n. Consumer takes an item from the bounded buffer at a time and does the sum.
Design
To focus on the objectives of the lab, some consideration and part of codes are given below.
1. Input and output
Input data: queue size, value n
output data: the sum produced by the cooperated threads and direct sum for checking the result. Possibly, include some extra information ( try my sample programs, see the links below)
Global variables: bounded buffer ( implemented by a queue), mutex and conditional variables, producer_done ( 1 if it is done, 0 otherwise )
2. Bounded buffer
A bounded buffer is implemented to buffer the data generated by the producer. An array based implementation is provided. Read queue.h and queue.c to see how it is implemented.
3. Producer and consumer
Producer accepts the value n from the pthread_create, it sets the flag producer_done before exiting and does the following loop
for i = 1 to n do
try to add one item ( i * i ) to the buffer
set producer_done = 1 to signal the complete of its job
Consumer takes items from the buffer and sum them. It returns sum:
for ( ; ; )
if there is more data item
get it and add it to sum
else if producer_done
break
No busy waiting is allowed in the implementation. Use mutex and condition variables for synchronization!
4. main function
File lab4.c is available for you. It contains a main ( ) function, global variable declaration, and thread functions. This main( ) is for single-producer single-consumer version.
Modify lab4.c to implement the single-producer mutiple-consumer version. The thread functions do not need to change.
5. Sample executable programs
I have two versions of codes:
lab4-sol : my sample solution to the lab experiment
lab4-sol2 : extended solution to the lab experiment, which allows multiple consumer threads to work together to solve the problem.
You can copy them to your directory and try them:
cp ~eric/public_html/courses/rea/labs/lab4-so* .
Assignment summary
1. Read lecture notes and chapter 4 in "Advanced Linux Programming" to understand threads
2. Study the implementaion using POSIX semaphores given in the file prod-cons.c
3. copy my sample programs lab4-sol and lab4-sol2, and test them with different input.
4. Study bounded buffer implementation : queue.h and queue.c
5. copy queue.c, queue.h and lab4.c to your work directory and complete the functions producer and consumer.
6. run your program with different input parameters and record the results
7. Discuss the results and problems you encountered, and how you solve the problems
8. Complete a laboratory report as described in the report format pages.
9. modify your program so that it allows multiple consumer threads to work together to solve the problem (try my program lab4-sol2 for example).
用semophore实现
/* file: prod-cons.c
It needs queue.c queue.h
function: producer-consumer problem using semaphores
to calculate 1 ^2 + 2 ^2 + ... + n ^2
compile: gcc -o prod-cons prod-cons.c queue.c -lpthread
run: ./prod-cons
*/
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include "queue.h"
/* global variables*/
Queue Q;
int producer_done = 0;
sem_t sem_full;/* count no. of occupied slots in buffer */
sem_t sem_empty;/* count no. of empty slots in buffer */
sem_t sem_mutex;/* for mutual exclusion access to the buffer */
void *
producer (void *arg)
{
int n;
int i;
n = *(int *) arg;
for (i = 1; i <= n; i++)
{
sem_wait (&sem_empty);
sem_wait (&sem_mutex);
if (IsFull (Q))
{
fprintf (stderr, "buff is full\n");
}
Enqueue (i * i, Q);
fprintf (stderr, " %d^2 = %d is put to the queue\n", i, i * i);
sem_post (&sem_mutex);
sem_post (&sem_full);
}
sem_wait (&sem_mutex);
producer_done = 1;
sem_post (&sem_mutex);
return NULL;
}
void *
consumer (void *arg)
{
int item;
int sum;
int sem_value;
sum = 0;
for (;;)
{
sem_wait (&sem_mutex);
if (IsEmpty (Q) && producer_done)
{
sem_post (&sem_mutex);
break;
}
sem_post (&sem_mutex);
sem_getvalue (&sem_full, &sem_value);
if (sem_value == 0)
fprintf (stderr, "buffer is empty\n");
sem_wait (&sem_full);
sem_wait (&sem_mutex);
item = Dequeue (Q);
sem_post (&sem_mutex);
sem_post (&sem_empty);
sum += item;
fprintf (stderr, "%d is removed from the queue. Current sum = %d\n",
item, sum);
}
return (void *) sum;
}
int
main ()
{
pthread_t prod_tid, cons_tid;
int i;
int n;
int sum, sum1;
int size;
printf ("Enter the queue size: ");
scanf ("%d", &size);
Q = CreateQueue (size);
printf ("This program compute 1 ^2 + 2 ^2 + ... + n ^2\n");
printf ("Enter n = ");
scanf ("%d", &n);
if (n <= 0)
{
fprintf (stderr, "please enter a positive integer n\n");
exit (1);
}
sum = 0;
for (i = 1; i <= n; i++)
sum += i * i;
printf ("the sum is %d\n", sum);
/* initialize semaphores */
sem_init (&sem_mutex, 0, 1);
sem_init (&sem_full, 0, 0);
sem_init (&sem_empty, 0, size);
sum1 = 0;
pthread_create (&prod_tid, NULL, producer, (void *) &n);
pthread_create (&cons_tid, NULL, consumer, NULL);
pthread_join (prod_tid, NULL);
pthread_join (cons_tid, (void *) &sum1);
printf ("the sum obtained by threads is %d\n", sum1);
printf ("the sum should be %d\n", sum);
return 0;
}
用互斥锁和条件变量实现
/* file: lab4.c
It needs queue.c queue.h to work with the lab4
function: producer-consumer problem: calculate 1 ^2 + 2 ^2 + ... + n ^2
compile: gcc -o lab4 lab4.c queue.c -lpthread
run: ./lab4
*/
#include <stdio.h>
#include <pthread.h>
#include "queue.h"
/* global variables*/
Queue Q;
int producer_done = 0;
pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond_full = PTHREAD_COND_INITIALIZER;
pthread_cond_t cond_empty = PTHREAD_COND_INITIALIZER;
void * producer ( void * arg ){
/* put your impl:ementation for producer thread here*/
int n,i,temp;
n = * (int *) arg;
for(i=1;i<=n;i++){
pthread_mutex_lock(&mutex);
while (IsFull(Q))
pthread_cond_wait(&cond_empty,&mutex);
temp= i * i;
Enqueue(temp,Q);
if ( i == n )
producer_done = 1;
pthread_cond_signal(&cond_full);
pthread_mutex_unlock(&mutex);
}
return NULL;
}
void * consumer ( void * arg ){
/* put your implementation for consumer thread here*/
int sum = 0;
int temp;
for(;;){
pthread_mutex_lock(&mutex);
while(IsEmpty(Q) && !producer_done)
pthread_cond_wait(&cond_full,&mutex);
if(IsEmpty(Q) && producer_done){
pthread_mutex_unlock(&mutex);
pthread_cond_signal(&cond_full);
break;
}
temp=Dequeue(Q);
printf("child %d has got %d\n",pthread_self(),temp);
pthread_cond_signal(&cond_empty);
pthread_mutex_unlock(&mutex);
sum+=temp;
}
return (void * ) sum;
}
int main ( ){
int consumer_number;
printf("please input the consumer_number:");
scanf("%d",&consumer_number);
pthread_t prod_tid, cons_tid[consumer_number];
int i;
int n;
int sum=0,consumer_sum=0,sum1=0;
int size;
printf("Enter the queue size: "); scanf ("%d", & size );
Q = CreateQueue ( size );
printf("This program compute 1 ^2 + 2 ^2 + ... + n ^2\n");
printf("Enter n = "); scanf ("%d", & n);
if ( n <= 0 ){
fprintf(stderr,"please enter a positive integer n\n");
exit ( 1 );
}
sum = 0;
for ( i = 1; i <= n ; i ++ )
sum += i * i;
pthread_create ( & prod_tid, NULL, producer, (void * ) & n );
for (i=1;i<=consumer_number;i++)
pthread_create ( & cons_tid[i], NULL, consumer, NULL );
pthread_join ( prod_tid, NULL );
for (i=1;i<=consumer_number;i++){
pthread_join ( cons_tid[i], (void * ) & sum1);
printf("the %d thread obtained %d\n",i,sum1);
consumer_sum+=sum1;
}
printf("the sum obtained by threads is %d\n", consumer_sum );
printf("the sum should be %d\n", sum );
if ( sum == consumer_sum )
printf("Right! The producer-consumer threads do a great job\n");
else
printf("producer-consumer made wrong result. Correct it\n");
return 0;
}
queue.h
typedef int ElementType;
#ifndef _Queue_h
#define _Queue_h
struct QueueRecord;
typedef struct QueueRecord *Queue;
int IsEmpty( Queue Q );
int IsFull( Queue Q );
Queue CreateQueue( int MaxElements );
void DestroyQueue( Queue Q );
void Enqueue( ElementType X, Queue Q );
ElementType Dequeue( Queue Q );
#endif /* _Queue_h */
#include "queue.h"
#include <stdlib.h>
#include <stdio.h>
struct QueueRecord {
int Capacity;
int Front;
int Rear;
int Size;
ElementType * Array;
};
int
IsEmpty( Queue Q )
{
return Q->Size == 0;
}
int
IsFull( Queue Q )
{
return Q->Size == Q->Capacity;
}
Queue
CreateQueue( int QueueSize )
{
Queue Q;
Q = malloc( sizeof( struct QueueRecord ) );
if( Q == NULL ){
fprintf(stderr, "Out of space!!!}\n" );
exit ( 1 );
}
Q->Array = malloc( sizeof( ElementType ) * QueueSize );
if( Q->Array == NULL ){
fprintf(stderr, "Out of space!!!" );
exit ( 1 );
}
Q->Capacity = QueueSize;
Q->Size = 0;
Q->Front =0;
Q->Rear = 0;
return Q;
}
void
DestroyQueue( Queue Q )
{
if( Q != NULL )
{
free( Q->Array );
free( Q );
}
}
void
Enqueue( ElementType X, Queue Q )
{
Q->Size++;
Q->Array[ Q->Rear ] = X;
Q->Rear ++;
if ( Q->Rear == Q->Capacity)
Q->Rear = 0;
}
ElementType Dequeue( Queue Q )
{
ElementType x;
x = Q->Array[Q->Front];
Q->Size--;
Q->Front ++;
if ( Q->Front == Q->Capacity )
Q->Front = 0;
return x;
}
实验报告
IntroductionIn laboratory experiment four, we deal with multithreading. With multithreading we can run different part of the program concurrently within the same process. Synchronization in the sample program is done with semaphores. In the modified program it is done with mutex and conditional variables.
Results of Running Sample Program2.1 Program function and related concepts
The sample program uses semaphore to synchronize the threads in order to protect the global data. The program thread producer computes i2 or i*i, and puts it into the bounded buffer, for 1,2,3… n. Consumer takes an item from the bounded buffer at a time and does the sum.
Program Development and Running Result3.1 Program development
First task is to use mutex and conditional variables instead of semaphores. At first the program was translated from semaphore to mutex and conditional variables without any changes. The program worked when there was only one producer and one consumer. We found that we can’t visit the Q->Size in the main function, because it is defined in the Queue.c. When the second task, to implement multiple consumers, some problems arised described in 4 Analysis.
Analysis of ResultsProblem 1: If we in the consumer function does the if-statement (which is used to exit the for loop) before the while-statement (which is used to wait for the producer to add item to the queue) then the result is one thread is stuck in the for loop, with no escape :Consumer1: Wait for the cond_full.
Consumer2: Wait for the cond full.
Producer: Finished its Enqueue() and send a signal to the cond_full.
Consumer1: Waked up go through the if-statement to Dequeue().
Consumer2: Suspended by the mutex.
Consumer1: Set the Q->size to 0,unlock the mutex and terminate it self.
Consumer2: Waked up and go to the Dequeue(), set Q->Size to –1.
Consumer2: Infinite loop because the Q->Size is minus and not empty and sets the Q-Size = Q-Size –1.
Problem 2: If we try to exit the loop without sending a cond_full signal, the other threads will experience starvation. The possibility that the consumer will experience starvation depends on whether any consumer waiting for the conditional value while the last item was dequeued. The chance that it happends is higher when the n-value and consumer number is higher and queue size is smaller. It also depends on the current workload of the cpu.
Consumer1: Wait for the cond_full
Consumer2: Wait for the cond_full
Producer: Finished it’s Enqueue and send a signal to the cond_full.
Consumer1: Waked up and Dequeue some iteams and unlock the mutex and exit the for statement and exit the function.
Consumer2: Is still waiting for the cond_full.
ConclusionIn a multiple thread environment you have to protect your resources carefully. It is easy to make a mistake when implement mutex and condtional values and you have to test the program many times with different parameter because the problem may arise only with specific parameter at a specific situation (e.g. when the workload is high).