队列元素为一个无符字符数组(即字节数组)。循环队列中只存放该数组的地址。这个地址指向一个存储区域,该存储区的结构为:
_______________________________________________
|数组长度(4B)|数组内容(由前面的长度决定长度)|
---------------------------------------------------------------
这个循环队列支持多线程同步操作,对队列改动时,有互斥锁mutex防止不同步。
工作环境:
linux 9.0
编译:
g++ BytesQueue.cpp main.cpp -o main -lpthread
下面是代码部分:
俩个main.cpp函数,前一个为一般单线程应用。后一个为多线程应用。
/*
BytesQueue.h
zhangggdlt
2004/11/15
to realize a queue storing bytes array.
*/
#ifndef _BYTES_QUEUE_H
#define _BYTES_QUEUE_H
#include <pthread.h>
#include <unistd.h>
#define OPERATION_OK 0
#define QUEUE_FULL -1
#define QUEUE_EMPTY -2
#define INCREASE_FAILED -3
#define NO_AREA -4
#define POINT_NULL -5
#define FAILED_LOCK -6
typedef int ERR_NUMBER;
typedef unsigned char uint_8;
/*
The class BytesQueue is used to realize store an unsigned char array into the queue which
sustain mutiple thread and sycronization.
This queue is a cycle queue. The size of the queue can be set when it is constructed and you
can also increas the size of the queue during the application.
*/
class BytesQueue
{
private:
int _size;
int _head;
int _rear;
uint_8 **_buffer;
pthread_mutex_t QueMutex;
public:
BytesQueue(int size=512);
ERR_NUMBER increaseSize(int size=512);
ERR_NUMBER inQueue(const uint_8 *data, int len);
ERR_NUMBER outQueue(uint_8 *data, int &len);
void destroy();
void errMessage(ERR_NUMBER err);
void showBytesQueue(BytesQueue& bq);
};
#endif //_BYTES_QUEUE_H
——————————————————————————————————————
/*
BytesQueue.cpp
zhangggdlt
2004/12/9
to realize a stack storing bytes array which sustain the mutitread and sycronization.
*/
#include <stdio.h>
#include <string.h>
#include "BytesQueue.h"
/*
Constructor.
This BytesQueue can sustain sycronization among the mutiThread.
It means you can use this data structure under mutithread.
*/
BytesQueue::BytesQueue(int size) //size = 512
{
this->_size = size;
this->_buffer = new (uint_8*)[this->_size];
this->_head = 0;
this->_rear = 0;
pthread_mutex_init(&QueMutex, NULL);
}
/*
You can use this number fuction to increase the size of the queue.
The data will not be lost during the increasement.
*/
ERR_NUMBER BytesQueue::increaseSize(int size) //size = 512
{
uint_8 **temp;
int eleCount = (this->_rear - this->_head + 1 + this->_size) % this->_size;
int tempSize = this->_size;
int i,j;
this->_size += size;
if(!(temp = new (uint_8*)[this->_size]))
return INCREASE_FAILED;
if (this->_rear == this->_head) //empty queue
{
eleCount = 0;
}
if ((this->_rear+1)%this->_size == this->_head) //full queue
{
eleCount = this->_size - size;
}
for (i=this->_head ,j=0; j<eleCount; j++,i=(i+1)%this->_size)
{
temp[i] = this->_buffer[i%tempSize];
this->_rear = i;
}
delete []this->_buffer;
this->_buffer = temp;
return OPERATION_OK;
}
/*
This function is use to accept one element into the queue.
You must remember the element is a unsigned char array.
Len is the length of the data.
*/
ERR_NUMBER BytesQueue::inQueue(const uint_8 *data, int len)
{
uint_8 *temp;
if ((this->_rear+1)%this->_size == this->_head)
{
printf("The queue is full!\n");
return QUEUE_FULL;
}
if (!(temp = new uint_8[len + 4]))
return NO_AREA;
if (pthread_mutex_trylock(&QueMutex))
{
printf("Try lock failed!\n");
return FAILED_LOCK;
}
this->_buffer[this->_rear] = temp;
memcpy(this->_buffer[this->_rear], &len, 4);
memcpy(this->_buffer[this->_rear]+4, data, len);
this->_rear = (this->_rear + 1) % this->_size;
pthread_mutex_unlock(&QueMutex);
return OPERATION_OK;
}
/*
This function is use to set free one element from the queue.
You must get a buffer big enough to store the data before you call the function.
At the same time you need a more int &len to get the data length.
*/
ERR_NUMBER BytesQueue::outQueue(uint_8 *data, int &len)
{
if(!data)
return POINT_NULL;
if(this->_head == this->_rear)
{
printf("The queue is empty!\n");
return QUEUE_EMPTY;
}
if (pthread_mutex_trylock(&QueMutex))
{
printf("Try lock failed!\n");
return FAILED_LOCK;
}
memcpy((void*)&len, this->_buffer[this->_head], 4);
memcpy((void*)data, this->_buffer[this->_head]+4, len);
this->_head = (this->_head + 1) % this->_size;
pthread_mutex_unlock(&QueMutex);
return OPERATION_OK;
}
/*
This function is use to set free the data structure.
*/
void BytesQueue::destroy()
{
while (this->_head != this->_rear)
{
delete [](this->_buffer[this->_head]);
this->_head = (this->_head + 1) % this->_size;
}
delete [](this->_buffer);
this->_size = 0;
this->_buffer = NULL;
this->_head = 0;
this->_rear = 0;
}
/*
This fuction is use to test.
Show the result of the call fuction.
*/
void BytesQueue::errMessage(ERR_NUMBER err)
{
switch(err)
{
case OPERATION_OK:
printf(" push is ok!\n");
break;
case QUEUE_FULL:
printf(" push failed! The queue is full!!\n");
break;
case QUEUE_EMPTY:
printf(" pop failed! The queue is empty!!\n");
break;
case INCREASE_FAILED:
printf(" increase queue size failed! \n");
break;
default:
printf(" other things are wrong! \n");
break;
}
}
/*
This fuction is used to show the infomation of the current queue.
*/
void BytesQueue::showBytesQueue(BytesQueue& bq)
{
printf(" %s\n", "The info of the BytesQueue is :");
printf(" size : %d\n", bq._size);
printf(" head : %d\n", bq._head);
printf(" rear : %d\n", bq._rear);
printf(" buf addr : 0x%x\n", bq._buffer);
}
/*
using namespace NetworkProtocols;
//this is a good example to show how to use the data structure BytesQueue.
int main()
{
int len,i;
char ch;
ERR_NUMBER err;
uint_8 bufi[]={1,2,3,4,5,6,7,8,9,0};
uint_8 bufo[10];
BytesQueue bs;
bs.showBytesQueue(bs);
ch = getchar();
while(ch != 'q')
{
switch(ch)
{
case 'i':
err = bs.inQueue(bufi, 10);
bs.errMessage(err);
bs.showBytesQueue(bs);
ch = getchar();
break;
case 'o':
err = bs.outQueue(bufo, len);
bs.errMessage(err);
bs.showBytesQueue(bs);
ch = getchar();
break;
case 'e':
err = bs.increaseSize();
bs.errMessage(err);
bs.showBytesQueue(bs);
ch = getchar();
break;
case 'h':
printf("....................Help................\n");
printf(" i: go into an array into Queue.\n");
printf(" o: go out of an array out of the queue.\n");
printf(" e: enlarge the size of the queue.\n");
printf(" h: help\n");
printf(" q: quit the system.\n");
ch = getchar();
break;
default:
if (ch != '\n')
printf("...........Your input is wrong! Again!..............\n");
ch = getchar();
break;
}
}
bs.destroy();
bs.showBytesQueue(bs);
return 0;
}
*/
————————————————————————————————————————
//main.cpp
#include <stdio.h>
#include "BytesQueue.h"
typedef struct
{
int id;
BytesQueue *bq;
uint_8 *buf;
int len;
int delay;
}MyParameter;
pthread_t threads[5];
pthread_mutex_t QueMutex;
pthread_attr_t attr;
void *inQueue(void* pvar)
{
int i = 1;
MyParameter *para = (MyParameter*)pvar;
while( i )
{
printf("Thread inQue: %d is working! \n", para->id);
para->bq->inQueue(para->buf, para->len);
para->bq->showBytesQueue(*(para->bq));
//para->bs->push(para->buf,para->len);
//para->bs->showBytesStack(*(para->bs));
usleep(para->delay);
i ++;
}
pthread_exit(NULL);
}
void *outQueue(void* pvar)
{
int i = 1;
MyParameter *para = (MyParameter*)pvar;
while( i )
{
printf("-------------Thread outQue: %d is working! \n", para->id);
para->bq->outQueue(para->buf, para->len);
para->bq->showBytesQueue(*(para->bq));
//para->bs->pop(para->buf,para->len);
//para->bs->showBytesStack(*(para->bs));
usleep(para->delay);
i ++;
}
pthread_exit(NULL);
}
int main()
{
//IpStack::IpStack(int size) //size=10
uint_8 mybuf1[] = {
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x50,
0x58, 0x0D, 0x0D, 0x0D, 0x08, 0x00, 0x45, 0x00,
0x00, 0x34, 0x00, 0xF2, 0x00, 0x00, 0x40, 0x11,
0xB6, 0x65, 0xC0, 0xA8, 0x21, 0x0F, 0xC0, 0xA8,
0x21, 0x02, 0x04, 0x01, 0x00, 0x05, 0x00, 0x20,
0x60, 0x4c, 0x73, 0x66, 0x61, 0x73, 0x64, 0x66,
0x73, 0x61, 0x64, 0x66, 0x61, 0x73, 0x64, 0x66,
0x73, 0x64, 0x61, 0x66, 0x61, 0x73, 0x66, 0x73,
0x64, 0x66
};
uint_8 mybuf2[100];
int len;
BytesQueue bq(100);
MyParameter paras[4]={
{0,&bq,mybuf1,66,1000000},
{1,&bq,mybuf1,66,2000000},
{2,&bq,mybuf1,66,3000000},
{3,&bq,mybuf2,len,1000000}
};
//bq.showBytesQueue(bq);
pthread_attr_init(&attr);
pthread_create(&threads[0], &attr, inQueue, (void *)¶s[0]);
pthread_create(&threads[1], &attr, inQueue, (void *)¶s[1]);
pthread_create(&threads[2], &attr, inQueue, (void *)¶s[2]);
pthread_create(&threads[3], &attr, outQueue, (void *)¶s[3]);
//pthread_create(&threads[4], &attr, outQueue, (void *)¶s[3]);
for (int i=0; i<4; i++)
{
pthread_join(threads[i], NULL);
}
pthread_attr_destroy(&attr);
bq.destroy();
//bq.showBytesStack(bq);
printf("ok!!\n");
pthread_exit (NULL);
return 0;
} //end of main
____________________________________________________________________________________
zhangggdlt
2004.12.10
(完)