这是一个TCP服务器端的实现代码,监听客户端的请求,在子线程中处理各个客户端发来的数据包,再将处理后的结果送回客户端。这里提供的代码很完整,包括一个mainclass,一个监听class和它的interface,一个包处理class,一个发送class,基本上可以直接使用。其中用到的一些工具类,例如Debug、GenProfile、Terminator等,它们代码也会出现在随后的系列文章中。
package org.kyle.net.svr.sample;
import java.io.*;
import java.util.*;
import java.net.*;
public class Sample
{
// GenProfile是一个配置文件工具类,从配置文件中取得运行参数
protected GenProfile m_env = null;
protected IListener m_Listener = null;
public Sample()
{
String cfgFile = System.getProperty("MainConfigFile","server.cfg");
m_env = new GenProfile( cfgFile );
}
public boolean startService()
{
try
{
// IntegrationFactory是一个工厂类,创建一个Listener实例
m_Listener=IntegrationFactory.createListener(m_env);
m_Listener.setProfile(m_env);
m_Listener.startListener();
Debug.info("Server started.");
return true;
}
catch( Exception e)
{
Debug.warning(e);
}
return false;
}
public boolean stopService()
{
try
{
m_ Listener.stopListener();
Debug.info("Server service stopped.");
return true;
}
catch( Exception e )
{
Debug.warning(e);
}
return false;
}
public static void main( String [] argv )
{
try
{
Sample main = new Sample();
main.startService();
// Terminator用来接收键盘操作,按下特定键后使程序退出。
Terminator terminator = null;
terminator = new Terminator(System.in, main);
terminator.start();
synchronized (main) {
main.wait(); //将主进程悬挂,直到在Terminator里激活。
main.stopService();
}
System.exit(0);
}
catch( Exception e )
{
Debug.warning(e);
}
}
}
package org.kyle.net.svr.sample;
import java.net.*;
public interface IListener
{
public void setProfile( GenProfile env );
public void listenOn(int port);
public void setTimeout( int timeout );
public void startListener();
public void stopListener();
public RawPkt accept();
public void close();
}
package org.kyle.net.svr.sample;
import java.net.*;
import java.io.*;
import java.util.*;
import java.math.*;
public class SampleListenerImpl extends Thread implements IListener
{
private boolean m_isRunning = false;
private boolean m_innerCall = false;
private int m_listenAt = -1;
private int m_timeout = -1;
private Socket m_skt = null;
private ServerSocket m_svrSkt = null;
public SampleListenerImpl ()
{
setName("SampleListener.");
}
public SampleListenerImpl ( GenProfile env )
throws SocketException, SecurityException, IOException
{
setName("SampleListener.");
if ( env == null )
throw new SecurityException("No Environment provided!");
m_env = env;
invokeSocket();
}
public void setProfile( GenProfile env )
{
m_env = env;
}
public void run()
{
try
{
invokeSocket();
Debug.info("Listening at " + m_svrSkt.getLocalPort() + "...");
while( m_isRunning )
{
try
{
m_innerCall = true;
accept();
m_innerCall = false;
}
catch( Exception e)
{
Debug.info(e);
}
}
}
catch(Exception e)
{
Debug.info(e);
}
}
public void startListener()
{
if ( !m_isRunning )
{
m_isRunning = true;
start();
}
}
public void stopListener()
{
if ( m_isRunning )
{
m_isRunning = false;
interrupt();
close();
}
}
public RawPkt accept()
{
if ( m_isRunning )
{
if ( m_innerCall )
{ }
else
{
Debug.finest("StandAlone Listener was started, external call of accept failed.");
return null;
}
}
try
{
m_skt = m_svrSkt.accept();
m_skt.setSoTimeout( m_env.getTimeout() * 1000 );
Debug.fine("ServerSocket accepted. ");
new FreeListener( m_skt );
return null;
}
catch( InterruptedIOException iioe)
{
Debug.info("Listener Timed Out: " + iioe.getMessage() + "\n");
}
catch(IOException ioe)
{
Debug.info(ioe);
}
catch(Exception e)
{
Debug.info(e);
}
return null;
}
public void listenOn(int port)
{
if ( port < 0 || port > 65535 ) port = 0;
m_listenAt = port;
}
public void close()
{
if ( m_skt != null )
{
if ( !m_isRunning )
{
try{
m_skt.close();
m_skt = null;
}
catch( IOException ioe)
{
Debug.warning(ioe);
}
}
}
}
public void setTimeout( int timeout )
{
if ( timeout < 0 ) timeout = 300;
m_timeout = timeout;
}
/////////////Private methods section.///////////////////////////////
private void invokeSocket()
throws SocketException, SecurityException, IOException
{
if ( m_skt == null )
{
m_svrSkt = new ServerSocket( m_listenAt != -1 ? m_listenAt : m_env.getListenAt() );
m_svrSkt.setSoTimeout( m_timeout != -1 ? m_timeout*1000: m_env.getTimeout()*1000 );
}
}
class FreeListener extends Thread
{
Socket m_skt;
public FreeListener( Socket skt )
{
m_skt = skt;
start();
}
public void run()
{
try
{
DataInputStream is = new DataInputStream(m_skt.getInputStream());
while( true )
{//这里可能收到两种包,一种是http包,一种是自定义的包,在自定义的包中,约定了第3第4个字节是这个包的长度。
byte[] pktHd = new byte[4];
byte[] rawData = null;
is.readFully(pktHd);
boolean bUseHttp = false;
String method = new String(pktHd);
if (method.equalsIgnoreCase("POST") || method.equalsIgnoreCase("GET ") || method.equalsIgnoreCase("PUT ")
|| method.equalsIgnoreCase("DELE") || method.equalsIgnoreCase("HEAD") || method.equalsIgnoreCase("LINK")
|| method.equalsIgnoreCase("UNLI"))
{
String url = is.readLine();
Debug.info(("\nHttp Packet received from [" +
m_skt.getInetAddress().getHostAddress() + ":" +
m_skt.getPort() + " --" + method + url) );
int len = 0;
if (method.equalsIgnoreCase("POST"))
{
do {
String s = is.readLine();
if (s == null)
throw new IOException("Not a HTTP request");
if (s.equals(""))
break;
int index = s.indexOf(':');
if (index == -1)
continue;
String name = s.substring(0, index);
String value = s.substring(index + 1);
if ("Content-Length".equalsIgnoreCase(name.trim()))
len = Integer.parseInt(value.trim());
} while(true);
Debug.finer("Content-Length=" + len);
}
if (len == 0) {m_skt.close();break;} //Must a hacker! close socket.
rawData = new byte[len];
is.readFully(rawData);
while(is.available() != 0) is.skip(is.available());
bUseHttp = true;
}
else
{
BigInteger bgInt = new BigInteger( new byte[] { pktHd[2], pktHd[3] } );
int rstLen = bgInt.intValue() - 4;
if ( rstLen <= 0 )
if ( m_env.statelessService() ) break;
else continue;
Debug.info("\nPacket received from [" +
m_skt.getInetAddress().getHostAddress() + ":" +
m_skt.getPort() + "]; total len:" + bgInt.intValue() +
",rest len:" + rstLen);
rawData = new byte[rstLen + 4];
System.arraycopy(pktHd, 0, rawData, 0, 4);
is.readFully(rawData, 4, rstLen);
bUseHttp = false;
}
//RawPkt封装了收到的包
RawPkt nPkt = new RawPkt(rawData, m_skt.getInetAddress(), m_skt.getPort() );
// SamplePacketHandler类用来处理收到的包,SampleSender类用来向客户端发送数据。
SamplePacketHandler handler = new SamplePacketHandler ( nPkt, new SampleSender( m_skt, m_env, bUseHttp) );
handler.start();
handler = null;
if ( m_env.statelessService() ) break;
}
}
catch( Exception e )
{
Debug.info(e);
}
}
}
}
package org.kyle.net.svr.sample;
import java.util.*;
import java.math.*;
public class SamplePacketHandler extends Thread
{
protected SampleSender m_sender = null;
protected GenProfile m_env = null;
private RawPkt m_rawPkt = null;
public SamplePacketHandler(RawPkt rawPkt, GenProfile env, SampleSender sender)
{
m_rawPkt = rawPkt;
m_env = env;
m_sender = sender;
}
public void run()
{
if ( !m_sender.hasDestInfo() )
{
m_sender.setDestinationAddress( m_rawPkt.getSrcAddress() );
m_sender.setDestinationPort( m_rawPkt.getSrcPort() );
}
//在这里对收到的数据包进行处理,结果封装在resPkt中。
m_sender.send( resPkt );
}
package org.kyle.net.svr.sample;
import java.net.*;
import java.util.*;
import java.io.*;
public class SampleSender
{
private GenProfile m_profile = null;
private InetAddress m_srcAddress = null;
private int m_srcPort = -1;
private InetAddress m_dstAddress = null;
private int m_dstPort = -1;
private boolean m_bUseHttp;
private Socket m_skt = null;
public SampleSender( GenProfile profile )
throws SocketException
{
this( null, profile );
}
public SampleSender( Socket skt, GenProfile profile )
throws SocketException
{
this( skt, profile, false );
}
public SampleSender( Socket skt, GenProfile profile, boolean bUseHttp)
throws SocketException
{
if ( profile == null)
throw new SocketException("null profile.");
m_skt = skt;
m_profile = profile;
m_bUseHttp = bUseHttp;
}
public void setSourceAddress(InetAddress srcAddr)
{
m_srcAddress = srcAddr;
}
public void setDestinationAddress(InetAddress dstAddr)
{
m_dstAddress = dstAddr;
}
public void setSourcePort(int srcPort)
{
m_srcPort = srcPort;
}
public boolean hasDestInfo()
{
return !( m_dstAddress == null || m_dstPort == -1 || m_dstPort <= 0 || m_dstPort >= 65535);
}
public void setDestinationPort(int dstPort)
{
m_dstPort = dstPort;
}
public boolean send(InfoPacket msg)
{
//将InfoPacket编码为一个字节数组
Encoder encoder = new Encoder();
encoder.setProfile( m_profile );
byte[] baPkt = encoder.encode( msg );
if ( baPkt == null || !hasDestInfo())
return false;
try
{
OutputStream os = getSocket().getOutputStream();
if (m_bUseHttp)
{
os.write(("HTTP/1.0 200 OK\r\nContent-Type: text/html\r\nContent-Length: " + baPkt.length + "\r\n\r\n").getBytes());
}
os.write( baPkt );
os.flush();
if ( m_profile.statelessService() )
{
os.close();
m_skt.close();
}
return true;
}
catch( IOException ioe )
{
Debug.info(ioe);
}
catch( Exception e )
{
Debug.info(e);
}
return false;
}
public InetAddress getSrcAddress()
{
return m_srcAddress;
}
public void setSrcAddress(InetAddress aSrcAddress)
{
m_srcAddress = aSrcAddress;
}
public int getSrcPort()
{
return m_srcPort;
}
public void setSrcPort(int aSrcPort)
{
m_srcPort = aSrcPort;
}
private Socket getSocket()
{
if ( m_skt != null ) return m_skt;
if ( !hasDestInfo() ) return null;
try
{
if ( m_srcAddress != null && m_srcPort != -1 )
return new Socket( m_dstAddress, m_dstPort, m_srcAddress, m_srcPort );
return new Socket( m_dstAddress, m_dstPort );
}
catch(SocketException se)
{
Debug.info(se);
}
catch( Exception e )
{
Debug.info(e);
}
return null;
}
}