分享
 
 
 

客户端源码分析之三: StorageWrapper 类

王朝other·作者佚名  2006-01-30
窄屏简体版  字體: |||超大  

客户端源码分析之三: StorageWrapper 类

作者:小马哥

日期:2004-6-30

StorageWrapper 的作用:把文件片断进一步切割为子片断,并且为这些子片断发送 request消息。在获得子片断后,将数据写入磁盘。

请结合 Storage 类的分析来看。

几点说明:

1、 为了获取传输性能,BT把文件片断切割为多个子片断。

2、 BT为获取一个子片断,需要向拥有该子片断的peer发送request消息(关于 request消息,参见《BT协议规范》)。

3、 例如一个256k大小的片断,索引号是10,被划分为16个16k大小的子片断。那么需要为这16个子片断分别产生一个 request 消息。这些request消息在发出之前,以list的形式保存在 inactive_requests 这个list中。例如对这个片断,就保存在inactive_requests下标为 10(片断的索引号)的地方,值是如下的 list:[(0,16k),(16k, 16k), (32k, 16k), (48k, 16k), (64k, 16k), (80k, 16k), (96k, 16k), (112k, 16k), (128k, 16k), (144k, 16k), (160k, 16k), (176k, 16k), (192k, 16k), (208k, 16k), (224k, 16k), (240k, 16k)]。这个处理过程在 _make_inactive() 函数中。因为这些request还没有发送出去,所以叫做 inactive request(未激活的请求)。如果一个 request 发送出去了,那么叫做 active request。为每个片断已经发送出去的request个数记录在 numactive 中。如果收到一个子片断,那么 active request 个数就要减1。amount_inactive 记录了尚没有发出request的子片断总的大小。

4、 每当获得一个子片段,都要写入磁盘。如果子片断所属的片断在磁盘上还没有分配空间,那么首先需要为整个片断分配空间。如何为片断分配空间?这正是 StorageWrapper 类中最难理解的一部分代码。这个“空间分配算法”说起来很简单,但是在没有任何注释的情况下去看代码,耗费了我好几天的时间。具体的算法分析,请看 _piece_came_in() 的注释。

class StorageWrapper:

def __init__(self, storage, request_size, hashes,

piece_size, finished, failed,

statusfunc = dummy_status, flag = Event(), check_hashes = True,

data_flunked = dummy_data_flunked):

self.storage = storage # Storage 对象

self.request_size = request_size #子片断大小

self.hashes = hashes # 文件片断摘要信息

self.piece_size = piece_size # 片断大小

self.data_flunked = data_flunked # 一个函数,用来检查片断的完整性

self.total_length = storage.get_total_length() # 文件总大小

self.amount_left = self.total_length # 未下载完的文件大小

# 文件总大小的有效性检查

# 因为最后一个片断长度可能小于 piece_size

if self.total_length <= piece_size * (len(hashes) - 1):

raise ValueError, 'bad data from tracker - total too small'

if self.total_length > piece_size * len(hashes):

raise ValueError, 'bad data from tracker - total too big'

# 两个事件,分布在下载完成和下载失败的时候设置

self.finished = finished

self.failed = failed

这几个变量的作用在前面已经介绍过了。

self.numactive = [0] * len(hashes)

inactive_request

inactive_requests 的值全部被初始化为1,这表示每个片断都需要发送 request。后面在对磁盘文件检查之后,那些已经获得的片断,在 inactive_requests中对应的是 None,表示不需要再为这些片断发送 request了。

self.inactive_requests = [1] * len(hashes)

self.amount_inactive = self.total_length

# 是否进入 EndGame 模式?关于 endgame 模式,在《Incentives Build Robustness in BitTorrent 》的“片断选择算法”中有介绍。后面可以看到,在为最后一个“子片断”产生请求后,进入 endgame 模式。

self.endgame = False

self.have = Bitfield(len(hashes))

# 该片是否检查了完整性

self.waschecked = [check_hashes] * len(hashes)

这两个变量用于“空间分配算法”

self.places = { }

self.holes = [ ]

if len(hashes) == 0:

finished()

return

targets = {}

total = len(hashes)

# 检查每一个片断,,,

for i in xrange(len(hashes)):

# 如果磁盘上,还没有完全为这个片断分配空间,那么这个片断需要被下载,在 targets 字典中添加一项(如果已经存在,就不用添加了),它的关键字(key)是该片断的摘要值,它的值(value)是一个列表, 这个片断的索引号被添加到这个列表中。

这里一度让我非常迷惑,因为一直以为不同的文件片断肯定具有不同的摘要值。后来才想明白了,那就是:两个不同的文件片断,可能拥有相同的摘要值。不是么?只要这两个片断的内容是一样的。

这一点,对后面的分析非常重要。

if not self._waspre(i):

targets.setdefault(hashes[i], []).append(i)

total -= 1

numchecked = 0.0

if total and check_hashes:

statusfunc({"activity" : 'checking existing file', "fractionDone" : 0})

# 这是一个内嵌在函数中的函数。在 c++ 中,可以有内部类,不过好像没有内部函数的说法。这个函数只能在 __init__() 内部使用。

这个函数在一个片段被确认获得后调用

# piece: 片断的索引号

# pos: 这个片断在磁盘上存储的位置

例如,片断5可能存储在片断2的位置上。请参看后面的“空间分配算法”

def markgot(piece, pos, self = self, check_hashes = check_hashes):

self.places[piece] = pos

self.have[piece] = True

self.amount_left -= self._piecelen(piece)

self.amount_inactive -= self._piecelen(piece)

不用再为这个片断发送 request消息了

self.inactive_requests[piece] = None

self.waschecked[piece] = check_hashes

lastlen = self._piecelen(len(hashes) - 1) # 最后一个片断的长度

# 对每一个片断

for i in xrange(len(hashes)):

#如果磁盘上,还没有完全为这个片断分配空间,那么在 holes 中添加该片断的索引号。

if not self._waspre(i):

self.holes.append(i)

# 否则,也就是空间已经分配。但是还是不能保证这个片断已经完全获得了,正如分析 Storage 时提到的那样,可能存在“空洞”

# 如果不需要进行有效性检查,那么简单调用 markgot() 表示已经获得了该片断。这显然是一种不负责任的做法。

elif not check_hashes:

markgot(i, i)

# 如果需要进行有效性检查

else:

sha是python内置的模块,它封装了 SHA-1摘要算法。SHA-1摘要算法对一段任意长的数据进行计算,得出一个160bit (也就是20个字节)长的消息摘要。在 torrent 文件中,保存了每个片断的消息摘要。接收方在收到一个文件片断之后,再计算一次消息摘要,然后跟 torrent 文件中对应的值进行比较,如果结果不一致,那么说明数据在传输过程中发生了变化,这样的数据应该被丢弃。

这里,首先,根据片断i的起始位置开始,lastlen长的一段数据构造一个 sha 对象。

sh = sha(self.storage.read(piece_size * i, lastlen))

计算这段数据的消息摘要

sp = sh.digest()

然后,更新 sh 这个 sha 对象,注意,是根据片断 i 剩下的数据来更新的。关于 sha::update() 的功能,请看 python的帮助。如果有两段数据 a 和 b,那么

sh = sha(a)

sh.update(b),等效于

sh = sha(a+b)

所以,下面这个表达式等于

sh.update(self.storage.read(piece_size*i, self._piecelen(i)))

sh.update(self.storage.read(piece_size * i + lastlen, self._piecelen(i) - lastlen))

所以,这次计算出来的就是片断i 的摘要

(原来的困惑:为什么不直接计算 i 的摘要,要这么绕一下了?后来分析清楚“空间分配算法”之后,这后面一段代码也就没有什么问题了。)

s = sh.digest()

如果计算出来的摘要和 hashes[i] 一致(后者是从 torrent 文件中获得的),那么,这个片断有效且已经存在于磁盘上。

if s == hashes[i]:

markgot(i, i)

elif targets.get(s)

and self._piecelen(i) == self._piecelen(targets[s][-1]):

markgot(targets[s].pop(), i)

elif not self.have[len(hashes) - 1]

and sp == hashes[-1]

and (i == len(hashes) - 1 or not self._waspre(len(hashes) - 1)):

markgot(len(hashes) - 1, i)

else:

self.places[i] = i

if flag.isSet():

return

numchecked += 1

statusfunc({'fractionDone': 1 - float(self.amount_left) / self.total_length})

# 如果所有片断都下载完了,那么结束。

if self.amount_left == 0:

finished()

# 检查某个片断,是否已经在磁盘上分配了空间,调用的是 Storage:: was_preallocated()

def _waspre(self, piece):

return self.storage.was_preallocated(piece * self.piece_size,

self._piecelen(piece))

# 获取指定片断的长度,只有最后一个片断大小可能小于 piece_size

def _piecelen(self, piece):

if piece < len(self.hashes) - 1:

return self.piece_size

else:

return self.total_length - piece * self.piece_size

# 返回剩余文件的大小

def get_amount_left(self):

return self.amount_left

# 判断是否已经获得了一些文件片断

def do_I_have_anything(self):

return self.amount_left < self.total_length

# 将指定片断切割为“子片断”

def _make_inactive(self, index):

# 先获取该片断的长度

length = min(self.piece_size, self.total_length - self.piece_size * index)

l = []

x = 0

# 为了获得更好的传输性能,BT把每个文件片断又分为更小的“子片断”,我们可以在 download.py 文件中 default 变量中,找到“子片断”大小的定义:

'download_slice_size', 2 ** 14, "How many bytes to query for per request."

这里定义的“子片断”大小是16k。

下面这个循环,就是将一个片断进一步切割为“子片断”的过程。

while x + self.request_size < length:

l.append((x, self.request_size))

x += self.request_size

l.append((x, length - x))

# 将 l 保存到 inactive_requests 这个列表中

self.inactive_requests[index] = l

# 是否处于 endgame 模式,关于endgame模式,参加《Incentives Build Robustness in BitTorrent》

def is_endgame(self):

return self.endgame

def get_have_list(self):

return self.have.tostring()

def do_I_have(self, index):

return self.have[index]

# 判断指定的片断,是否还有 request没有发出?如果有,那么返回 true,否则返回 false。

def do_I_have_requests(self, index):

return not not self.inactive_requests[index]

为指定片断创建一个 request 消息,返回的是一个二元组,例如(32k, 16k),表示“子片断”的起始位置是 32k ,大小是 16k。

def new_request(self, index):

# returns (begin, length)

# 如果还没有为该片断创建 request。,那么调用 _make_inactive() 创建 request列表。(inactive_requests[index] 初始化的值是1)

if self.inactive_requests[index] == 1:

self._make_inactive(index)

# numactive[index] 记录了已经为该片断发出了多少个 request。

self.numactive[index] += 1

rs = self.inactive_requests[index]

# 从 inactive_request 中移出最小的那个request(也就是起始位置最小)。

r = min(rs)

rs.remove(r)

# amount_inactive 记录了尚没有发出request的子片断总的大小。

self.amount_inactive -= r[1]

# 如果这是最后一个“子片断”,那么进入 endgame 模式

if self.amount_inactive == 0:

self.endgame = T.rue

# 返回这个 request

return r

def piece_came_in(self, index, begin, piece):

try:

return self._piece_came_in(index, begin, piece)

except IOError, e:

self.failed('IO Error ' + str(e))

return True

如果获得了某个“子片断”,那么调用这个函数。

index:“子片断”所在片断的索引号,

begin:“子片断”在片断中的起始位置,

piece:实际数据

def _piece_came_in(self, index, begin, piece):

# 如果之前没有获得过该片断中任何“子片断”,那么首先需要在磁盘上为整个片断分配空间。

空间分配的算法如下:

假设一共是6个片断,现在已经为 0、1、4三个片断分配了空间,那么

holes:[2, 3, 5]

places:{0:0, 1:1, 4:4}

现在要为片断5分配空间,思路是把片断5的空间暂时先分配在片断2应该在的空间上。这样分配以后,

holes:[3, 5]

places: {0:0, 1:1, 4:4, 5:2}

假设下一步为片断2分配空间,因为2的空间已经被5占用,所以把5的数据转移到3上,2才可以使用自己的空间。这样分配之后,

holes:[5]

places:{0:0, 1:1, 2:2, 4:4, 5:3}

最后,为3分配空间,因为3的空间被5占用,所以把5的数据转移到5自己的空间上,3就可以使用自己的空间了。这样分配之后,

holes:[]

places:{0:0, 1:1, 2:2, 3:3, 4:4, 5:5}

下面这段比较晦涩的代码,实现的就是这种空间分配算法。

if not self.places.has_key(index):

n = self.holes.pop(0)

if self.places.has_key(n):

oldpos = self.places[n]

old = self.storage.read(self.piece_size * oldpos, self._piecelen(n))

if self.have[n] and sha(old).digest() != self.hashes[n]:

self.failed('data corrupted on disk - maybe you have two copies running?')

return True

self.storage.write(self.piece_size * n, old)

self.places[n] = n

if index == oldpos or index in self.holes:

self.places[index] = oldpos

else:

for p, v in self.places.items():

if v == index:

break

self.places[index] = index

self.places[p] = oldpos

old = self.storage.read(self.piece_size * index, self.piece_size)

self.storage.write(self.piece_size * oldpos, old)

elif index in self.holes or index == n:

if not self._waspre(n):

self.storage.write(self.piece_size * n,

self._piecelen(n) * chr(0xFF))

self.places[index] = n

else:

for p, v in self.places.items():

if v == index:

break

self.places[index] = index

self.places[p] = n

old = self.storage.read(self.piece_size * index, self._piecelen(n))

self.storage.write(self.piece_size * n, old)

# 调用 Stoarge::write() 将这个子片断写入磁盘,注意是写到 places[index] 所在的空间上。

self.storage.write(self.places[index] * self.piece_size + begin, piece)

# 既然获得了一个子片断,那么发出的request个数显然要减少一个。

self.numactive[index] -= 1

# 如果既没有尚未发出的 request,而且也没有已发出的request(每当获得一个子片断,numactive[index]减少1,numactive[index]为0,说明所有发出的 request都已经接收到了响应的数据),那么显然整个片断已经全部获得了。

if not self.inactive_requests[index] and not self.numactive[index]:

检查整个片断的有效性,如果通过检查

if sha(self.storage.read(self.piece_size * self.places[index],

self._piecelen(index))).digest() == self.hashes[index]:

#“我”已经拥有了这个片断

self.have[index] = True

self.inactive_requests[index] = None

# 也检查过了有效性

self.waschecked[index] = True

self.amount_left -= self._piecelen(index)

if self.amount_left == 0:

self.finished()

如果没有通过有效性检查

else:

self.data_flunked(self._piecelen(index))

得丢弃这个片断

self.inactive_requests[index] = 1

self.amount_inactive += self._piecelen(index)

return False

return True

# 如果向某个 peer 发送的获取“子片断”的请求丢失了,那么调用此函数

def request_lost(self, index, begin, length):

self.inactive_requests[index].append((begin, length))

self.amount_inactive += length

self.numactive[index] -= 1

def get_piece(self, index, begin, length):

try:

return self._get_piece(index, begin, length)

except IOError, e:

self.failed('IO Error ' + str(e))

return None

def _get_piece(self, index, begin, length):

if not self.have[index]:

return None

if not self.waschecked[index]:

# 检查片断的 hash值,如果错误,返回 None

if sha(self.storage.read(self.piece_size * self.places[index],

self._piecelen(index))).digest() != self.hashes[index]:

self.failed('told file complete on start-up, but piece failed hash check')

return None

# 通过 hash 检查

self.waschecked[index] = True

# 检查一下“子片断”长度是否越界

if begin + length > self._piecelen(index):

return None

# 调用 Storage::read() ,将该“子片断”数据从磁盘上读出来,返回值就是这段数据。

return self.storage.read(self.piece_size * self.places[index] + begin, length)

 
 
 
免责声明:本文为网络用户发布,其观点仅代表作者个人观点,与本站无关,本站仅提供信息存储服务。文中陈述内容未经本站证实,其真实性、完整性、及时性本站不作任何保证或承诺,请读者仅作参考,并请自行核实相关内容。
2023年上半年GDP全球前十五强
 百态   2023-10-24
美众议院议长启动对拜登的弹劾调查
 百态   2023-09-13
上海、济南、武汉等多地出现不明坠落物
 探索   2023-09-06
印度或要将国名改为“巴拉特”
 百态   2023-09-06
男子为女友送行,买票不登机被捕
 百态   2023-08-20
手机地震预警功能怎么开?
 干货   2023-08-06
女子4年卖2套房花700多万做美容:不但没变美脸,面部还出现变形
 百态   2023-08-04
住户一楼被水淹 还冲来8头猪
 百态   2023-07-31
女子体内爬出大量瓜子状活虫
 百态   2023-07-25
地球连续35年收到神秘规律性信号,网友:不要回答!
 探索   2023-07-21
全球镓价格本周大涨27%
 探索   2023-07-09
钱都流向了那些不缺钱的人,苦都留给了能吃苦的人
 探索   2023-07-02
倩女手游刀客魅者强控制(强混乱强眩晕强睡眠)和对应控制抗性的关系
 百态   2020-08-20
美国5月9日最新疫情:美国确诊人数突破131万
 百态   2020-05-09
荷兰政府宣布将集体辞职
 干货   2020-04-30
倩女幽魂手游师徒任务情义春秋猜成语答案逍遥观:鹏程万里
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案神机营:射石饮羽
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案昆仑山:拔刀相助
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案天工阁:鬼斧神工
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案丝路古道:单枪匹马
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:与虎谋皮
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:李代桃僵
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:指鹿为马
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案金陵:小鸟依人
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案金陵:千金买邻
 干货   2019-11-12
 
推荐阅读
 
 
 
>>返回首頁<<
 
靜靜地坐在廢墟上,四周的荒凉一望無際,忽然覺得,淒涼也很美
© 2005- 王朝網路 版權所有