分享
 
 
 

一个支持多线程同步循环队列的实现

王朝other·作者佚名  2006-01-29
窄屏简体版  字體: |||超大  

队列元素为一个无符字符数组(即字节数组)。循环队列中只存放该数组的地址。这个地址指向一个存储区域,该存储区的结构为:

_______________________________________________

|数组长度(4B)|数组内容(由前面的长度决定长度)|

---------------------------------------------------------------

这个循环队列支持多线程同步操作,对队列改动时,有互斥锁mutex防止不同步。

工作环境:

linux 9.0

编译:

g++ BytesQueue.cpp main.cpp -o main -lpthread

下面是代码部分:

俩个main.cpp函数,前一个为一般单线程应用。后一个为多线程应用。

/*

BytesQueue.h

zhangggdlt

2004/11/15

to realize a queue storing bytes array.

*/

#ifndef _BYTES_QUEUE_H

#define _BYTES_QUEUE_H

#include <pthread.h>

#include <unistd.h>

#define OPERATION_OK 0

#define QUEUE_FULL -1

#define QUEUE_EMPTY -2

#define INCREASE_FAILED -3

#define NO_AREA -4

#define POINT_NULL -5

#define FAILED_LOCK -6

typedef int ERR_NUMBER;

typedef unsigned char uint_8;

/*

The class BytesQueue is used to realize store an unsigned char array into the queue which

sustain mutiple thread and sycronization.

This queue is a cycle queue. The size of the queue can be set when it is constructed and you

can also increas the size of the queue during the application.

*/

class BytesQueue

{

private:

int _size;

int _head;

int _rear;

uint_8 **_buffer;

pthread_mutex_t QueMutex;

public:

BytesQueue(int size=512);

ERR_NUMBER increaseSize(int size=512);

ERR_NUMBER inQueue(const uint_8 *data, int len);

ERR_NUMBER outQueue(uint_8 *data, int &len);

void destroy();

void errMessage(ERR_NUMBER err);

void showBytesQueue(BytesQueue& bq);

};

#endif //_BYTES_QUEUE_H

——————————————————————————————————————

/*

BytesQueue.cpp

zhangggdlt

2004/12/9

to realize a stack storing bytes array which sustain the mutitread and sycronization.

*/

#include <stdio.h>

#include <string.h>

#include "BytesQueue.h"

/*

Constructor.

This BytesQueue can sustain sycronization among the mutiThread.

It means you can use this data structure under mutithread.

*/

BytesQueue::BytesQueue(int size) //size = 512

{

this->_size = size;

this->_buffer = new (uint_8*)[this->_size];

this->_head = 0;

this->_rear = 0;

pthread_mutex_init(&QueMutex, NULL);

}

/*

You can use this number fuction to increase the size of the queue.

The data will not be lost during the increasement.

*/

ERR_NUMBER BytesQueue::increaseSize(int size) //size = 512

{

uint_8 **temp;

int eleCount = (this->_rear - this->_head + 1 + this->_size) % this->_size;

int tempSize = this->_size;

int i,j;

this->_size += size;

if(!(temp = new (uint_8*)[this->_size]))

return INCREASE_FAILED;

if (this->_rear == this->_head) //empty queue

{

eleCount = 0;

}

if ((this->_rear+1)%this->_size == this->_head) //full queue

{

eleCount = this->_size - size;

}

for (i=this->_head ,j=0; j<eleCount; j++,i=(i+1)%this->_size)

{

temp[i] = this->_buffer[i%tempSize];

this->_rear = i;

}

delete []this->_buffer;

this->_buffer = temp;

return OPERATION_OK;

}

/*

This function is use to accept one element into the queue.

You must remember the element is a unsigned char array.

Len is the length of the data.

*/

ERR_NUMBER BytesQueue::inQueue(const uint_8 *data, int len)

{

uint_8 *temp;

if ((this->_rear+1)%this->_size == this->_head)

{

printf("The queue is full!\n");

return QUEUE_FULL;

}

if (!(temp = new uint_8[len + 4]))

return NO_AREA;

if (pthread_mutex_trylock(&QueMutex))

{

printf("Try lock failed!\n");

return FAILED_LOCK;

}

this->_buffer[this->_rear] = temp;

memcpy(this->_buffer[this->_rear], &len, 4);

memcpy(this->_buffer[this->_rear]+4, data, len);

this->_rear = (this->_rear + 1) % this->_size;

pthread_mutex_unlock(&QueMutex);

return OPERATION_OK;

}

/*

This function is use to set free one element from the queue.

You must get a buffer big enough to store the data before you call the function.

At the same time you need a more int &len to get the data length.

*/

ERR_NUMBER BytesQueue::outQueue(uint_8 *data, int &len)

{

if(!data)

return POINT_NULL;

if(this->_head == this->_rear)

{

printf("The queue is empty!\n");

return QUEUE_EMPTY;

}

if (pthread_mutex_trylock(&QueMutex))

{

printf("Try lock failed!\n");

return FAILED_LOCK;

}

memcpy((void*)&len, this->_buffer[this->_head], 4);

memcpy((void*)data, this->_buffer[this->_head]+4, len);

this->_head = (this->_head + 1) % this->_size;

pthread_mutex_unlock(&QueMutex);

return OPERATION_OK;

}

/*

This function is use to set free the data structure.

*/

void BytesQueue::destroy()

{

while (this->_head != this->_rear)

{

delete [](this->_buffer[this->_head]);

this->_head = (this->_head + 1) % this->_size;

}

delete [](this->_buffer);

this->_size = 0;

this->_buffer = NULL;

this->_head = 0;

this->_rear = 0;

}

/*

This fuction is use to test.

Show the result of the call fuction.

*/

void BytesQueue::errMessage(ERR_NUMBER err)

{

switch(err)

{

case OPERATION_OK:

printf(" push is ok!\n");

break;

case QUEUE_FULL:

printf(" push failed! The queue is full!!\n");

break;

case QUEUE_EMPTY:

printf(" pop failed! The queue is empty!!\n");

break;

case INCREASE_FAILED:

printf(" increase queue size failed! \n");

break;

default:

printf(" other things are wrong! \n");

break;

}

}

/*

This fuction is used to show the infomation of the current queue.

*/

void BytesQueue::showBytesQueue(BytesQueue& bq)

{

printf(" %s\n", "The info of the BytesQueue is :");

printf(" size : %d\n", bq._size);

printf(" head : %d\n", bq._head);

printf(" rear : %d\n", bq._rear);

printf(" buf addr : 0x%x\n", bq._buffer);

}

/*

using namespace NetworkProtocols;

//this is a good example to show how to use the data structure BytesQueue.

int main()

{

int len,i;

char ch;

ERR_NUMBER err;

uint_8 bufi[]={1,2,3,4,5,6,7,8,9,0};

uint_8 bufo[10];

BytesQueue bs;

bs.showBytesQueue(bs);

ch = getchar();

while(ch != 'q')

{

switch(ch)

{

case 'i':

err = bs.inQueue(bufi, 10);

bs.errMessage(err);

bs.showBytesQueue(bs);

ch = getchar();

break;

case 'o':

err = bs.outQueue(bufo, len);

bs.errMessage(err);

bs.showBytesQueue(bs);

ch = getchar();

break;

case 'e':

err = bs.increaseSize();

bs.errMessage(err);

bs.showBytesQueue(bs);

ch = getchar();

break;

case 'h':

printf("....................Help................\n");

printf(" i: go into an array into Queue.\n");

printf(" o: go out of an array out of the queue.\n");

printf(" e: enlarge the size of the queue.\n");

printf(" h: help\n");

printf(" q: quit the system.\n");

ch = getchar();

break;

default:

if (ch != '\n')

printf("...........Your input is wrong! Again!..............\n");

ch = getchar();

break;

}

}

bs.destroy();

bs.showBytesQueue(bs);

return 0;

}

*/

————————————————————————————————————————

//main.cpp

#include <stdio.h>

#include "BytesQueue.h"

typedef struct

{

int id;

BytesQueue *bq;

uint_8 *buf;

int len;

int delay;

}MyParameter;

pthread_t threads[5];

pthread_mutex_t QueMutex;

pthread_attr_t attr;

void *inQueue(void* pvar)

{

int i = 1;

MyParameter *para = (MyParameter*)pvar;

while( i )

{

printf("Thread inQue: %d is working! \n", para->id);

para->bq->inQueue(para->buf, para->len);

para->bq->showBytesQueue(*(para->bq));

//para->bs->push(para->buf,para->len);

//para->bs->showBytesStack(*(para->bs));

usleep(para->delay);

i ++;

}

pthread_exit(NULL);

}

void *outQueue(void* pvar)

{

int i = 1;

MyParameter *para = (MyParameter*)pvar;

while( i )

{

printf("-------------Thread outQue: %d is working! \n", para->id);

para->bq->outQueue(para->buf, para->len);

para->bq->showBytesQueue(*(para->bq));

//para->bs->pop(para->buf,para->len);

//para->bs->showBytesStack(*(para->bs));

usleep(para->delay);

i ++;

}

pthread_exit(NULL);

}

int main()

{

//IpStack::IpStack(int size) //size=10

uint_8 mybuf1[] = {

0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x50,

0x58, 0x0D, 0x0D, 0x0D, 0x08, 0x00, 0x45, 0x00,

0x00, 0x34, 0x00, 0xF2, 0x00, 0x00, 0x40, 0x11,

0xB6, 0x65, 0xC0, 0xA8, 0x21, 0x0F, 0xC0, 0xA8,

0x21, 0x02, 0x04, 0x01, 0x00, 0x05, 0x00, 0x20,

0x60, 0x4c, 0x73, 0x66, 0x61, 0x73, 0x64, 0x66,

0x73, 0x61, 0x64, 0x66, 0x61, 0x73, 0x64, 0x66,

0x73, 0x64, 0x61, 0x66, 0x61, 0x73, 0x66, 0x73,

0x64, 0x66

};

uint_8 mybuf2[100];

int len;

BytesQueue bq(100);

MyParameter paras[4]={

{0,&bq,mybuf1,66,1000000},

{1,&bq,mybuf1,66,2000000},

{2,&bq,mybuf1,66,3000000},

{3,&bq,mybuf2,len,1000000}

};

//bq.showBytesQueue(bq);

pthread_attr_init(&attr);

pthread_create(&threads[0], &attr, inQueue, (void *)&paras[0]);

pthread_create(&threads[1], &attr, inQueue, (void *)&paras[1]);

pthread_create(&threads[2], &attr, inQueue, (void *)&paras[2]);

pthread_create(&threads[3], &attr, outQueue, (void *)&paras[3]);

//pthread_create(&threads[4], &attr, outQueue, (void *)&paras[3]);

for (int i=0; i<4; i++)

{

pthread_join(threads[i], NULL);

}

pthread_attr_destroy(&attr);

bq.destroy();

//bq.showBytesStack(bq);

printf("ok!!\n");

pthread_exit (NULL);

return 0;

} //end of main

____________________________________________________________________________________

zhangggdlt

2004.12.10

(完)

 
 
 
免责声明:本文为网络用户发布,其观点仅代表作者个人观点,与本站无关,本站仅提供信息存储服务。文中陈述内容未经本站证实,其真实性、完整性、及时性本站不作任何保证或承诺,请读者仅作参考,并请自行核实相关内容。
2023年上半年GDP全球前十五强
 百态   2023-10-24
美众议院议长启动对拜登的弹劾调查
 百态   2023-09-13
上海、济南、武汉等多地出现不明坠落物
 探索   2023-09-06
印度或要将国名改为“巴拉特”
 百态   2023-09-06
男子为女友送行,买票不登机被捕
 百态   2023-08-20
手机地震预警功能怎么开?
 干货   2023-08-06
女子4年卖2套房花700多万做美容:不但没变美脸,面部还出现变形
 百态   2023-08-04
住户一楼被水淹 还冲来8头猪
 百态   2023-07-31
女子体内爬出大量瓜子状活虫
 百态   2023-07-25
地球连续35年收到神秘规律性信号,网友:不要回答!
 探索   2023-07-21
全球镓价格本周大涨27%
 探索   2023-07-09
钱都流向了那些不缺钱的人,苦都留给了能吃苦的人
 探索   2023-07-02
倩女手游刀客魅者强控制(强混乱强眩晕强睡眠)和对应控制抗性的关系
 百态   2020-08-20
美国5月9日最新疫情:美国确诊人数突破131万
 百态   2020-05-09
荷兰政府宣布将集体辞职
 干货   2020-04-30
倩女幽魂手游师徒任务情义春秋猜成语答案逍遥观:鹏程万里
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案神机营:射石饮羽
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案昆仑山:拔刀相助
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案天工阁:鬼斧神工
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案丝路古道:单枪匹马
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:与虎谋皮
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:李代桃僵
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:指鹿为马
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案金陵:小鸟依人
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案金陵:千金买邻
 干货   2019-11-12
 
推荐阅读
 
 
 
>>返回首頁<<
 
靜靜地坐在廢墟上,四周的荒凉一望無際,忽然覺得,淒涼也很美
© 2005- 王朝網路 版權所有