分享
 
 
 

Java数据报之失序和丢包

王朝java/jsp·作者佚名  2006-01-10
窄屏简体版  字體: |||超大  

习惯了TCP编程,认为UDP可以包办这些问题是错误的。一个UDP应用程序要承担可靠性方面的全部工作,包括报文的丢失、重复、时延、乱序以及连接失效等问题。

通常我们在可靠性好,传输时延小的局域网上开发测试,一些问题不容易暴露,但在大型互联网上却会出现错误。

UDP协议把递送的可靠性责任推到了上层即应用层,下面简单编写了几个类来专门处理两个问题:乱序和丢包。

四个类:DataPacket 类,PacketHeader类,PacketBody类 ,DataEntry类,位于同一个文件DataPacket .java中。

DataPacket 类相当于一个门面模式,提供给外部使用,通信数据也在这个类中处理。

package com.skysoft.pcks;

import java.io.*;

import java.net.*;

import java.util.*;

public class DataPacket {

InputStream is;

OutputStream os;

PacketHeader header;

PacketBody body;

ArrayList al;

public static final int DataSwapSize = 64532;

/**

* 在接收数据报使用

*/

public DataPacket() {

header = new PacketHeader();

body = new PacketBody();

al = new ArrayList();

}

/**

* 在发送数据报时使用,它调用报文分割操作.

* @param file String 硬盘文件

*/

public DataPacket(String file) {

this();

try {

is = new FileInputStream(file);

header.CalcHeaderInfo(is.available());

this.madeBody();

is.close();

//this.Gereratedata();

}

catch (FileNotFoundException ex) {

ex.printStackTrace();

}

catch (IOException ex1) {

ex1.printStackTrace();

}

}

/**

* 在发送数据报时使用,它调用报文分割操作.

* @param url URL url地址

*/

public DataPacket(URL url) {

this();

try {

//is = url.openStream();

URLConnection conn=url.openConnection();

is=conn.getInputStream();

int total=conn.getContentLength();

header.CalcHeaderInfo(total);

this.madeBody();

//System.out.println(total+":"+total);

is.close();

}

catch (IOException ex) {

ex.printStackTrace();

}

}

/**

* 为发送构造分组,使用PackageHeader处理了报头格式,并为分组编序号.

*/

private void madeBody() {

al.clear();

byte[] buffer;

DataEntry de;

for (int i = 0; i < header.fragmentcounter; i++) {

try {

ByteArrayOutputStream bos = new ByteArrayOutputStream();

//is.skip(i * body.BODY_BUFFER_SIZE);

header.ArrageSort(i);

de = new DataEntry(PacketBody.BODY_BUFFER_SIZE);

de.setSn(i);

de.setStreamsize(header.getStreamsize());

de.setFragmentcounter(header.getFragmentcounter());

if (header.isWTailFragment(i)) {

buffer = new byte[header.getMinfragment()];

is.read(buffer, 0, buffer.length);

header.setActByteSize(header.getMinfragment());

de.setActByteSize(header.getMinfragment());

}

else {

buffer = new byte[body.BODY_BUFFER_SIZE];

is.read(buffer, 0, buffer.length);

}

//System.out.println("length-------"+i+" "+body.getBody().length+" "+header.getMinfragment());

body.setBody(buffer);

//System.out.println("length:" + i + " " + header.toString());

bos.write(header.getByte(), 0, header.HEADER_BUFFER_SIZE);

bos.write(body.getBody(), 0, body.getBody().length);

de.setBytes(bos.toByteArray());

al.add(de);

}

catch (IOException ex) {

ex.printStackTrace();

}

}

}

/**

* 为发送构造分组,没有考虑报头格式,也没有为分组编序号.

*/

private void madeBody1() {

al.clear();

for (int i = 0; i < header.fragmentcounter; i++) {

try {

if (header.isWTailFragment(i))

is.read(body.getBody(), i * body.BODY_BUFFER_SIZE,

header.getMinfragment());

else

is.read(body.getBody(), i * body.BODY_BUFFER_SIZE,

body.BODY_BUFFER_SIZE);

ByteArrayOutputStream bos = new ByteArrayOutputStream();

bos.write(header.getByte(), 0, header.HEADER_BUFFER_SIZE);

bos.write(body.getBody(), header.HEADER_BUFFER_SIZE,

body.getBody().length);

al.add(bos);

}

catch (IOException ex) {

ex.printStackTrace();

}

}

}

/**

* 在接收到报文后,对此报文执行组装,并处理报文丢失和乱序情况.

* @param b1 byte[]

*/

public void Add(byte[] b1) {

byte[] buffer = (byte[]) b1.clone();

handlerText(buffer);

DataEntry de = new DataEntry(buffer, header.getActByteSize());

de.setSn(header.getSn());

de.setStreamsize(header.getStreamsize());

de.setFragmentcounter(header.getFragmentcounter());

al.add(de);

}

private void handlerText(byte[] buffer) {

ByteArrayOutputStream baos = new ByteArrayOutputStream();

baos.write(buffer, 0, header.HEADER_BUFFER_SIZE);

byte[] b=new byte[header.HEADER_BUFFER_SIZE];

System.arraycopy(buffer,0,b,0,b.length);

ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());

InputStreamReader isr = new InputStreamReader(bais);

BufferedReader br = new BufferedReader(isr);

try {

header = new PacketHeader(br.readLine());

}

catch (Exception ex) {

ex.printStackTrace();

}

}

private String calFileSize(int size) {

return size / 1024 + "K";

}

public ArrayList getDataPackets() {

return al;

}

/**

* 是否接收完毕,通过序号是否等于最大段数来判断,这也许有问题,比如,正好是最后一个段丢失了,这样

* 这个包整个就丢失了.

* @return

*/

public boolean isFull() {

return this.header.getSn() == this.header.getFragmentcounter() - 1 ? true : false;

}

/**

* 判断是否只有一个段.

* @return

*/

public boolean isZero() {

return this.header.getSn() == 0 ? true : false;

}

/**

* 该函数执行报文组装,不考虑丢失的报文.

* @return

*/

private ByteArrayOutputStream fetchDataPackets() {

ByteArrayOutputStream bos = new ByteArrayOutputStream();

byte[] buffer = null;

DataEntry de;

for (int i = 0; i < al.size(); i++) {

try {

de = this.getSnData(i);

buffer = de.getByte();

if (header.getStreamsize() == de.getStreamsize()) {

bos.write(de.getByte(), header.HEADER_BUFFER_SIZE, de.getActByteSize());

System.out.println(de.toString() + " -- fetchDataPackets");

}

}

catch (Exception ex) {

ex.printStackTrace();

}

}

return bos;

}

/**

* 该函数执行报文组装,对于丢失的报文,写入空报文.

* @return ByteArrayOutputStream

*/

private ByteArrayOutputStream fetchDataPackets_sn() {

ByteArrayOutputStream bos = new ByteArrayOutputStream();

byte[] buffer;

DataEntry de;

for (int i = 0; i < header.getFragmentcounter(); i++) {

try {

de = this.getSnData(i);

if (de == null) {

de = seachDeData(i);

}

buffer = de.getByte();

//System.out.println(de.getSn() + ":" + i);

//handlerText(buffer);

//bos.write(buffer, header.HEADER_BUFFER_SIZE,

// buffer.length - header.HEADER_BUFFER_SIZE);

if (header.getStreamsize() == de.getStreamsize()) {

bos.write(de.getByte(), header.HEADER_BUFFER_SIZE,

de.getActByteSize());

//System.out.println(de.toString());

}

}

catch (Exception ex) {

ex.printStackTrace();

}

}

return bos;

}

/**

* 对缓冲的数据包进行排序处理,即按顺序提取同一帧的数据,如果没有找到该序号的帧,则返回空值.

* @param sn int 要找的帧序号.

* @return DataEntry

*/

private DataEntry getSnData(int sn) {

DataEntry de = null;

for (int i = 0; i < al.size(); i++) {

de = (DataEntry) al.get(i);

if (header.getStreamsize() == de.getStreamsize()) {

if (sn == de.getSn())

break;

else

de = null;

}

}

return de;

}

/**

* 按序号开始向前或者是向后寻找最近的帧片段,日后可以增加请求重发功能,通过开一个通信连接.

* @param sn int

* @return DataEntry

*/

private DataEntry seachDeData(int sn) {

DataEntry de = null;

int initvalue, minvalue = 10000;

DataEntry back, fore = null;

for (int i = 0; i < al.size(); i++) {

de = (DataEntry) al.get(i);

if (header.getStreamsize() == de.getStreamsize()) {

initvalue = Math.abs(de.getSn() - sn);

if (de.getFragmentcounter() != de.getSn() && initvalue < minvalue) {

minvalue = initvalue;

fore = de;

}

}

}

return fore;

}

/**

* 除去最后一帧外,随机抽取一帧.

* @return DataEntry

*/

private DataEntry seachDeData() {

DataEntry de = null;

for (int i = 0; i < al.size(); i++) {

de = (DataEntry) al.get(i);

System.out.println("sky ::::" + de.getFragmentcounter() + ":" + de.getSn() +

":" + i);

if (header.getStreamsize() == de.getStreamsize()) {

if (de.getFragmentcounter() != de.getSn()) {

break;

}

}

}

return de;

}

/**

* 生成组装完的结果数据.因为用图像来做测试,所以令其返回图像.

* @return Image

*/

public java.awt.Image Gereratedata() {

ByteArrayInputStream bis;

java.awt.image.BufferedImage bimage = null;

try {

byte[] b = fetchDataPackets_sn().toByteArray();

//fetchDataPackets_old1()

bis = new ByteArrayInputStream(b);

bimage = javax.imageio.ImageIO.read(bis);

}

catch (Exception ex1) {

ex1.printStackTrace();

}

return bimage;

}

public static void main(String args[]) {

DataPacket dp = new DataPacket("e:\\nature\\14.jpg");

}

}

/**

* 数据实体,充当临时处理场所.

* @author Administrator

*

*/

class DataEntry {

byte[] bytes;

int fragmentcounter, sn, actbytesize;

long streamsize;

int minfragment;

public DataEntry() {

}

public DataEntry(int size) {

this.actbytesize = size;

}

public DataEntry(byte[] b, int i) {

this.bytes = b;

this.actbytesize = i;

}

public byte[] getByte() {

return this.bytes;

}

public void setBytes(byte[] b) {

this.bytes = b;

}

public void setStreamsize(long size) {

this.streamsize = size;

}

public long getStreamsize() {

return this.streamsize;

}

public int getMinfragment() {

return minfragment;

}

public synchronized void setSn(int i) {

this.sn = i;

}

public synchronized int getSn() {

return sn;

}

public synchronized int getFragmentcounter() {

return fragmentcounter;

}

public synchronized void setFragmentcounter(int c) {

this.fragmentcounter = c;

}

public void setActByteSize(int size) {

actbytesize = size;

}

public int getActByteSize() {

return actbytesize;

}

public String toString() {

return this.streamsize + "::" + this.fragmentcounter + "::" + this.sn +

"::" + this.actbytesize + " recv DataEntry";

}

}

/**

* 报头,处理报头格式

* @author Administrator

*

*/

class PacketHeader implements Serializable{

public static final int HEADER_BUFFER_SIZE = 1024;

int fragmentcounter, sn;

int actbytesize = PacketBody.BODY_BUFFER_SIZE;

byte[] header; //= new byte[HEADER_BUFFER_SIZE];

long streamsize;

int minfragment;

public PacketHeader() {

}

public PacketHeader(long l) {

this.setStreamsize(l);

}

public PacketHeader(String s) {

String[] tm = s.split("::");

this.setActByteSize(Integer.parseInt(tm[3]));

this.setSn(Integer.parseInt(tm[2]));

this.setFragmentcounter(Integer.parseInt(tm[1]));

this.setStreamsize(Long.parseLong(tm[0]));

}

/**

* 根据文件的段的顺序生成数据头.

* @param sn 文件序列

*/

public void ArrageSort(int sn) {

this.setSn(sn);

this.setByte();

}

public void CalcHeaderInfo(long l) {

this.setStreamsize(l);

CalcHeaderInfo();

}

/**

* 计算流要被分成的片段数量,并得出最小片段余量.

*/

public void CalcHeaderInfo() {

fragmentcounter = Math.round( (float) streamsize /

PacketBody.BODY_BUFFER_SIZE);

float critical = (float) streamsize / PacketBody.BODY_BUFFER_SIZE;

if (critical - fragmentcounter < 0.5 && critical - fragmentcounter > 0)

fragmentcounter++;

minfragment = (int) (streamsize % PacketBody.BODY_BUFFER_SIZE);

}

public byte[] getHeader() {

Long it = new Long(this.streamsize);

return new byte[] {it.byteValue()};

}

public byte[] getByte() {

return header; //this.toString().getBytes();

}

/**

* 生成报头字节,首先取得数据包头 流尺寸::段片数::段顺序::段实际尺寸 的字节形式,

* 然后加入回车换行符号,对于1024字节中剩余的部分一律写入元素为0的字节数组.

*/

public void setByte() {

ByteArrayOutputStream bos = new ByteArrayOutputStream();

byte[] buffer = this.toByte();

try {

bos.write(buffer);

bos.write("\r\n".getBytes());

bos.write(new byte[PacketHeader.HEADER_BUFFER_SIZE - buffer.length], 0,

PacketHeader.HEADER_BUFFER_SIZE - buffer.length);

header = bos.toByteArray();

}

catch (IOException ex) {

ex.printStackTrace();

}

}

public void setStreamsize(long size) {

this.streamsize = size;

}

public long getStreamsize() {

return this.streamsize;

}

public int getMinfragment() {

return minfragment;

}

public synchronized void setSn(int i) {

this.sn = i;

}

public int getSn() {

return sn;

}

public int getFragmentcounter() {

return fragmentcounter;

}

public synchronized void setFragmentcounter(int c) {

this.fragmentcounter = c;

}

public void setActByteSize(int size) {

actbytesize = size;

setByte();

}

public int getActByteSize() {

return actbytesize;

}

/**

* 数据包头的格式为:流尺寸::段片数::段顺序::段实际尺寸

* 报头字节长度是可变化的,比如,可以加入流的具体信息如:流所属文件的名称,文件类型以及一些其他信息.

* @return String

*/

public String toString() {

return streamsize + "::" + this.fragmentcounter + "::" + this.getSn() +

"::" + this.getActByteSize();

}

public byte[] toByte() {

return this.toString().getBytes();

}

/**

* 是否为尾段

* @param i int

* @return boolean

*/

public boolean isWTailFragment(int i) {

return (i == fragmentcounter - 1) ? true : false;

}

}

/**

* 用户数据区

* @author Administrator

*

*/

class PacketBody implements Serializable{

public static final int BODY_BUFFER_SIZE = 63508; //65508

byte[] body;

public PacketBody() {

}

public void setBody(byte[] b) {

this.body = b;

}

public byte[] getBody() {

return body;

}

}

这个数据处理类,将在接下来使用。

 
 
 
免责声明:本文为网络用户发布,其观点仅代表作者个人观点,与本站无关,本站仅提供信息存储服务。文中陈述内容未经本站证实,其真实性、完整性、及时性本站不作任何保证或承诺,请读者仅作参考,并请自行核实相关内容。
2023年上半年GDP全球前十五强
 百态   2023-10-24
美众议院议长启动对拜登的弹劾调查
 百态   2023-09-13
上海、济南、武汉等多地出现不明坠落物
 探索   2023-09-06
印度或要将国名改为“巴拉特”
 百态   2023-09-06
男子为女友送行,买票不登机被捕
 百态   2023-08-20
手机地震预警功能怎么开?
 干货   2023-08-06
女子4年卖2套房花700多万做美容:不但没变美脸,面部还出现变形
 百态   2023-08-04
住户一楼被水淹 还冲来8头猪
 百态   2023-07-31
女子体内爬出大量瓜子状活虫
 百态   2023-07-25
地球连续35年收到神秘规律性信号,网友:不要回答!
 探索   2023-07-21
全球镓价格本周大涨27%
 探索   2023-07-09
钱都流向了那些不缺钱的人,苦都留给了能吃苦的人
 探索   2023-07-02
倩女手游刀客魅者强控制(强混乱强眩晕强睡眠)和对应控制抗性的关系
 百态   2020-08-20
美国5月9日最新疫情:美国确诊人数突破131万
 百态   2020-05-09
荷兰政府宣布将集体辞职
 干货   2020-04-30
倩女幽魂手游师徒任务情义春秋猜成语答案逍遥观:鹏程万里
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案神机营:射石饮羽
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案昆仑山:拔刀相助
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案天工阁:鬼斧神工
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案丝路古道:单枪匹马
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:与虎谋皮
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:李代桃僵
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:指鹿为马
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案金陵:小鸟依人
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案金陵:千金买邻
 干货   2019-11-12
 
推荐阅读
 
 
 
>>返回首頁<<
 
靜靜地坐在廢墟上,四周的荒凉一望無際,忽然覺得,淒涼也很美
© 2005- 王朝網路 版權所有