分享
 
 
 

一种读写可并发进行的队列的实现方法

王朝other·作者佚名  2006-01-09
窄屏简体版  字體: |||超大  

1背景

目前采用多线程的处理机制中,如下处理方式是比较常见的:

一个线程负责将上游数据放到一个公共队列中,另外一个线程从公共队列中取出数据进行处理。读取操作都需要共用一个互斥量来保证线程安全,这样写数据和取数据的操作实际上是串行的,有些时候,这个操作将对软件处理性能造成一定影响。如果我们能够实现一个队列,读取操作不需要任何互斥量保护就可以保证线程安全,那么读写线程的处理能力将得到明显提高。

实际上就是保证队列的读取接口和写入接口之间不存在并发冲突,即一个线程只调用读取接口,一个线程只调用写入接口,这两个线程是不需要进行任何同步动作的;如果多个线程同时调用读取接口或者同时调用写入接口,那么读取接口和写入接口可以用不同的互斥量进行同步;最终达到我们的目的:多个线程中,读取速率不会影响写入的速率,反之亦然。

除了读取操作外,很多地方可能还要知道队列的大小,比如内部调试信息,或者实现中需要限制队列的最大容量等,这在读写两个线程都可能用到的,也希望在任意处理线程中不用加锁就可以取到这个信息,这个接口和读写接口都不存在并发冲突的问题,从而提高执行效率。

2实现方案

需要进行线程同步的操作都是因为大家需要修改(有的线程修改,也有的线程访问)公共资源造成的,只要我们能够保证忘列表中增加一个节点和删除一个节点都不需要都修改同一个资源,而且保证待访问的资源始终有效,那么就可以做到读取操作本身就是线程安全的。

下面是一个列表的简单实现。

struct ListNode

{

int data;

ListNode* next;

}

struct List

{

ListNode* beg;

ListNode* end;

List():beg(0),end(0){}

void push_back(int data)

{

if(!end)

beg = end = new ListNode(node);

else

{

end.next = new ListNode(node);

end = end-next;

}

}

void pop_front()

{

ListNode* top = beg;

beg = beg-next;

delete top;

if(!beg)

end = beg;

}

int front()

{

return beg-data;

}

}

上面的实现,因为增删操作都可能修改内部的beg,end变量,无法做到线程安全的。

如果push_back只修改end变量,pop_front只修改beg变量,那么这两个操作就可以做到线程安全的。如果列表为空的时候beg,end都以空指针来表示,就不可能做到这一点。如果列表为空的时候,beg,end也指向一个固定节点,那么就可能实现这个操作。如下所示:

struct List

{

ListNode* beg;

ListNode* end;

List():beg(new ListNode(0)),end(beg){}

}

当插入数据的时候都是用已有的end节点来保存数据,然后在生成一个新的表示结束的节点,如下

void push_back(int data)

{

end.data = data;

end.next = new ListNode(0);

end = end-next;

}

这样在插入数据的时候不需要修改beg变量了。在提取数据的时候也是做类似处理:

void pop_front()

{

if(!empty())

{

ListNode* oldBeg = beg;

beg = beg-next;

delete oldBeg;

}

}

考虑实际得取值操作

int front()

{

return beg-data;

}

如果调用front()操作得时候,beg刚好被删除,就可能造成很严重得问题。如果front()和pop_front()都只在一个读线程中使用,问题不大。如果将这两个接口得功能合并,如下:

int pop_front()

{

int ret = beg-data

ListNode* oldBeg = beg;

beg = beg-next;

delete oldBeg;

return ret;

}

好像使用起来更方便。但考虑到我们最终实现得列表应该可以保存任何类型得数据,如果自定义得类型在函数退出时候执行拷贝构造函数时出现异常,那么列表状态无法恢复了。也就是说这个接口不是异常安全的接口。考虑这一点,还是分为两个独立的接口更好一些。

当然在使用pop_front和front之前必须确保列表不为空:

boolean empty()

{

return beg == end;

}

如果将非空判断放到front内,那么为空得时候应该返回什么值?不知道。最好是用户自己明白这种情况下的风险。

如果存在多个读线程,那么front()和pop_front(),empty()都需要利用同一个互斥量保证线程同步。Push_back不应该和上面上个接口在同一个线程中调用。如果只是这么一个书面规范,实际应用中也很容易因为疏忽没有遵守这个规则。因此我们可以考虑分别提供两个不同的界面给读线程和写线程,这就使用Adaptor模式了,如下:

class WriteList

{

private:

List& list;

public:

WriteList(list& list): this.list(list){}

void push_back(int data) { list.push_back(data); }

};

class ReadList

{

private:

List& list;

public:

ReadList(list& list): this.list(list){}

void pop_front() { list.pop_front(); }

int front() { return list.front(); }

boolean empty() { return list.empty(); }

}

这样读取数据的线程只看到ReadList, 写数据的线程只能看到WriteList,就可以防止使用人员误用这4个接口。

下面考虑查询列表大小的接口:

int size()

{

int size = 0;

ListNode* ptmp = beg;

for(ListNode* ptmp = beg; ptmp != end; ptmp = ptmp.next, ++size);

return size;

}

这个操作过程需要访问beg,end指向的内存空间中保存的数据,但是pop_front的实现可能导致被访问的beg已经执行无效区域,为了确保代码安全,所有线程调试信息中需要输出列表大小的地方都要和pop_front一样使用同一个互斥量,这既造成性能问题,又有使用上的不便。很多时候写线程是需要调用这个接口的,比如要限制线程的最大元素数量的时候或者调试信息需要。为了解决不用加锁,可以访问size接口,需要保证已经删除的节点的内存区域的next数据可用,也就是说,删除节点只清除节点保存的数据区域,其它数据都是可用的。如果不释放内存区域,必然导致内存泄漏,我们可以考虑将这些已经释放的节点用于保存push_back写入的数据。

一个环行的链表可以解决这个问题。

Struct ListNode

{

int data ;

ListNode* next;

ListNode(int v,ListNode* pnext = 0): data(v),next(pnext){}

}

struct List

{

ListNode* beg;

ListNode* end;

List():beg(new ListNode(0)),end(beg){end-next = end;}

}

首先构造一个环行链表,push_back的时候如果先前没有被释放的元素,则将新元素加到end后面,仍然维持环行链表的结构:

void push_back(int data)

{

end-data = data;

if(end-next == beg){

end-next = new ListNode(0,end-next);

}

end = end-next;

}

在这个处理过程中,beg指针可能被读线程移动到下一位,但对这里实现的逻辑没有任何影响,而且最终逻辑也是正确的。

void pop_front()

{

if(beg != end) beg = beg-next;

}

同样在这个处理过程中,end指针可能被写线程移动到下一位,但对这里实现的逻辑没有任何影响,而且最终逻辑也是正确的。

其他接口的实现都是没有什么变化:

int front()

{

return beg-data;

}

boolean empty()

{

return beg == end;

}

int size()

{

int size = 0;

ListNode* ptmp = beg;

ListNode* pend = end;

for(ListNode* ptmp = beg; ptmp != pend; ptmp = ptmp-next, ++size);

return size;

}

注意函数实现过程中用了临时变量来保存当时的end指针,为了防止在size执行过程中,另一个线程不停的插入数据,导致这个循环无法终止,这样得到的值可以看作是列表某个时间的一个快照数据,和列表在size函数执行完毕后的真实大小可能有差异,这是多线程的特性所决定的,可以认为在这种处理方式下,出现这种误差是可以接受的。

可以为WriteList,ReadList加上size接口。多个线程都使用WriteList(或ReadList)那么是需要加锁进行线程同步的,但WriteList和ReadList的接口可以分别在多个线程中使用是不需要加锁进行线程同步的。

下面是实作代码,供参考。

/*******************************************

author: htj

********************************************/

#ifndef CIRCLE_LIST_H

#define CIRCLE_LIST_H

#include

template

class CircleList

{

struct ListNode;

typedef ListNode* node_pointer;

typedef ListNode node_type;

public:

typedef Alloc allocator_type;

typedef T value_type;

typedef typename allocator_type::size_type size_type;

typedef typename allocator_type::difference_type difference_type;

typedef typename allocator_type::reference reference;

typedef typename allocator_type::const_reference const_reference;

typedef typename allocator_type::pointer pointer;

typedef typename allocator_type::const_pointer const_pointer;

CircleList():m_end(allocate()),m_beg(m_end),m_size(0)

{

}

~CircleList()

{

for(node_pointer pbeg = m_beg; pbeg != m_end; pbeg-freeValue(), pbeg = pbeg-next);

node_pointer end = m_end;

node_pointer beg;

do{

beg = m_end-next;

deallocate(m_end);

m_end = beg;

}while(m_end != end);

}

void push_back(const_reference v)

{

m_end-assignValue(v);

if(m_beg == m_end-next){

m_end-next = allocate(m_end-next);

++m_size;

}

m_end = m_end-next;

}

bool empty() const

{

return m_beg == m_end;

}

const_reference front() const

{

return m_beg-value();

}

reference front()

{

return m_beg-value();

}

void pop_front()

{

if(m_beg != m_end)

{

m_beg-freeValue();

m_beg = m_beg-next;

}

}

size_type size() const

{

int ret = 0;

node_pointer pend = m_end;

for(node_pointer beg = m_beg; beg != m_end; ++ret, beg = beg-next);

return ret;

}

size_type buffer_size() const

{

return m_size;

}

private:

node_pointer m_end;

node_pointer m_beg;

size_type m_size;

//不明白allocator的使用方式,下面的表达式编译不通过

//typedef typename allocator_type::template rebind::other _node_alloc_type

//暂时使用new,delete来分配释放内存

node_pointer allocate()

{

return new node_type();

}

node_pointer allocate(node_pointer pnext)

{

return new node_type(pnext);

}

void deallocate(node_pointer pnode)

{

delete pnode;

}

struct ListNode

{

char buffer[sizeof(T)];

ListNode* next;

ListNode():next(this)

{

}

explicit ListNode(ListNode* pnext):next(pnext)

{

}

bool operator==(const ListNode& rhs) const

{

return next == rhs-next;

}

bool operator != (const ListNode& rhs) const

{

return !(*this == rhs);

}

void freeValue()

{

value().T::~T();

}

void assignValue(const T& data)

{

new(buffer) T(data);

}

T& value()

{

return *(T*)buffer;

}

const T& value() const

{

return *(T*)buffer;

}

};

};

template

class WriteList

{

LIST& m_list;

public:

typedef typename LIST::value_type value_type;

typedef typename LIST::size_type size_type;

typedef typename LIST::difference_type difference_type;

typedef typename LIST::reference reference;

typedef typename LIST::const_reference const_reference;

typedef typename LIST::pointer pointer;

typedef typename LIST::const_pointer const_pointer;

WriteList(LIST& l): m_list(l)

{

}

void push_back(const_reference data)

{

m_list.push_back(data);

}

bool empty()

{

return m_list.empty();

}

size_type size()

{

return m_list.size();

}

size_type buffer_size()

{

return m_list.buffer_size();

}

};

template

class ReadList

{

LIST& m_list;

public:

typedef typename LIST::value_type value_type;

typedef typename LIST::size_type size_type;

typedef typename LIST::difference_type difference_type;

typedef typename LIST::reference reference;

typedef typename LIST::const_reference const_reference;

typedef typename LIST::pointer pointer;

typedef typename LIST::const_pointer const_pointer;

ReadList(LIST& l): m_list(l)

{

}

void pop_front()

{

m_list.pop_front();

}

reference front()

{

return m_list.front();

}

const_reference front() const

{

return m_list.front();

}

bool empty()

{

return m_list.empty();

}

size_type size()

{

return m_list.size();

}

size_type buffer_size()

{

return m_list.buffer_size();

}

};

#endif

 
 
 
免责声明:本文为网络用户发布,其观点仅代表作者个人观点,与本站无关,本站仅提供信息存储服务。文中陈述内容未经本站证实,其真实性、完整性、及时性本站不作任何保证或承诺,请读者仅作参考,并请自行核实相关内容。
2023年上半年GDP全球前十五强
 百态   2023-10-24
美众议院议长启动对拜登的弹劾调查
 百态   2023-09-13
上海、济南、武汉等多地出现不明坠落物
 探索   2023-09-06
印度或要将国名改为“巴拉特”
 百态   2023-09-06
男子为女友送行,买票不登机被捕
 百态   2023-08-20
手机地震预警功能怎么开?
 干货   2023-08-06
女子4年卖2套房花700多万做美容:不但没变美脸,面部还出现变形
 百态   2023-08-04
住户一楼被水淹 还冲来8头猪
 百态   2023-07-31
女子体内爬出大量瓜子状活虫
 百态   2023-07-25
地球连续35年收到神秘规律性信号,网友:不要回答!
 探索   2023-07-21
全球镓价格本周大涨27%
 探索   2023-07-09
钱都流向了那些不缺钱的人,苦都留给了能吃苦的人
 探索   2023-07-02
倩女手游刀客魅者强控制(强混乱强眩晕强睡眠)和对应控制抗性的关系
 百态   2020-08-20
美国5月9日最新疫情:美国确诊人数突破131万
 百态   2020-05-09
荷兰政府宣布将集体辞职
 干货   2020-04-30
倩女幽魂手游师徒任务情义春秋猜成语答案逍遥观:鹏程万里
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案神机营:射石饮羽
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案昆仑山:拔刀相助
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案天工阁:鬼斧神工
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案丝路古道:单枪匹马
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:与虎谋皮
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:李代桃僵
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:指鹿为马
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案金陵:小鸟依人
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案金陵:千金买邻
 干货   2019-11-12
 
推荐阅读
 
 
 
>>返回首頁<<
 
靜靜地坐在廢墟上,四周的荒凉一望無際,忽然覺得,淒涼也很美
© 2005- 王朝網路 版權所有