前面的可能还是不方便,再具体一点:
package zzzhc;
/**
* @author <a href="zzzhcmailto:zzzhc0508@hotmail.com">zzzhc</a>
*
*/
public interface SocketHandler extends ConnectHandler, ReadWriteHandler {
void onConnected();
void onConnectFailed(String msg);
/**
* 在数据从channel中读出后被调用.
*
*/
void onRead();
/**
* 在要写的数所已写入channnel后被调用.
*
*/
void onWrite();
void onClosed(String msg);
}
//抽象实现
package zzzhc;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
/**
* @author <a href="zzzhc'mailto:zzzhc0508@hotmail.com">zzzhc </a>
*
*/
public abstract class AbstractSocketHandler implements SocketHandler {
public final static int DEFAULT_BUFFER_SIZE = 2 * 1024;
protected final SelectorProcessor processor;
protected final SocketChannel sc;
protected final ByteBuffer readBuf;
protected final ByteBuffer writeBuf;
protected SocketAddress localAddress;
protected SocketAddress remoteAddress;
protected boolean connected = false;
protected boolean closed = false;
public AbstractSocketHandler(SelectorProcessor processor,
SocketAddress remoteAddress) throws IOException {
this.processor = processor;
this.remoteAddress = remoteAddress;
this.sc = SocketChannel.open();
this.sc.configureBlocking(false);
this.sc.connect(remoteAddress);
int readSize = DEFAULT_BUFFER_SIZE;
int writeSize = DEFAULT_BUFFER_SIZE;
try {
readSize = sc.socket().getReceiveBufferSize();
writeSize = sc.socket().getSendBufferSize();
} catch (SocketException e1) {
}
readBuf = ByteBuffer.allocate(readSize);
writeBuf = ByteBuffer.allocate(writeSize);
processor.register(sc, this, SelectionKey.OP_CONNECT);
}
public AbstractSocketHandler(SelectorProcessor processor,
SocketAddress remoteAddress, ByteBuffer readBuf, ByteBuffer writeBuf)
throws IOException {
this.processor = processor;
this.remoteAddress = remoteAddress;
this.sc = SocketChannel.open();
this.sc.configureBlocking(false);
this.sc.connect(remoteAddress);
readBuf.clear();
writeBuf.clear();
this.readBuf = readBuf;
this.writeBuf = writeBuf;
processor.register(sc, this, SelectionKey.OP_CONNECT);
}
public AbstractSocketHandler(SelectorProcessor processor, SocketChannel sc) {
this.processor = processor;
this.sc = sc;
this.connected = true;
if (this.sc.isBlocking()) {
try {
this.sc.configureBlocking(false);
} catch (IOException e) {
e.printStackTrace();
}
}
int readSize = DEFAULT_BUFFER_SIZE;
int writeSize = DEFAULT_BUFFER_SIZE;
try {
readSize = sc.socket().getReceiveBufferSize();
writeSize = sc.socket().getSendBufferSize();
} catch (SocketException e1) {
}
readBuf = ByteBuffer.allocateDirect(readSize);
writeBuf = ByteBuffer.allocateDirect(writeSize);
processor.register(sc, this, SelectionKey.OP_READ);
}
public AbstractSocketHandler(SelectorProcessor processor, SocketChannel sc,
ByteBuffer readBuf, ByteBuffer writeBuf) {
this.processor = processor;
this.sc = sc;
this.connected = true;
if (this.sc.isBlocking()) {
try {
this.sc.configureBlocking(false);
} catch (IOException e) {
e.printStackTrace();
}
}
readBuf.clear();
writeBuf.clear();
this.readBuf = readBuf;
this.writeBuf = writeBuf;
processor.register(sc, this, SelectionKey.OP_READ);
}
public void onConnected() {
System.out.println("connect to "+this.getRemoteAddress()+" ok");
}
public void onConnectFailed(String msg) {
System.out.println("connect to "+this.getRemoteAddress()+" failed:"+msg);
}
/**
* 如果一次没读完,最后须调用readBuf.compact().
* 如果已读完,须调用readBuf.clear().
*/
public void onRead() {
readBuf.flip();
int len = readBuf.limit();
byte[] buf = new byte[len];
readBuf.get(buf);
readBuf.clear();
System.out.print(new String(buf));
writeBuf.put(buf);
writeBuf.flip();
enableWrite();
}
public void onWrite() {
}
public void onClosed(String msg) {
System.out.println("channel closed:"+msg);
}
public void handleConnect() {
try {
if (sc.finishConnect()) {
onConnected();
connected = true;
} else {
onConnectFailed("");
}
processor.addInterestOps(sc, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
onConnectFailed(e.getMessage());
}
}
public void handleRead() {
try {
//System.out.println("read");
int len = sc.read(readBuf);
if (len == -1) {//closed
dispose("channel been closed correct");
} else {
//readBuf.flip();
onRead();
processor.addInterestOps(sc,SelectionKey.OP_READ);
}
} catch (IOException e) {
e.printStackTrace();
dispose(e.getMessage());
}
}
public void handleWrite() {
if (write()==true) {
onWrite();
}
}
/**
* 将writeBuf中的数据写入socketchannel中,如写完清空writeBuf返回true,否则返加false.
* 在使用该方法前应先对writeBuf调用flip()方法.
* @return
*/
protected boolean write() {
if (writeBuf.hasRemaining()) {
try {
sc.write(writeBuf);
} catch (IOException e) {
e.printStackTrace();
dispose(e.getMessage());
return false;
}
}
if (writeBuf.hasRemaining()) {
enableWrite();
return false;
}
else {
writeBuf.clear();
return true;
}
}
public SocketAddress getLocalAddress() {
if (localAddress == null) {
localAddress = this.sc.socket().getLocalSocketAddress();
}
return localAddress;
}
public SocketAddress getRemoteAddress() {
if (remoteAddress == null) {
remoteAddress = this.sc.socket().getRemoteSocketAddress();
}
return remoteAddress;
}
public void dispose(String msg) {
if (!closed) {
closed = true;
processor.closeChannel(sc);
onClosed(msg);
}
}
public boolean isConnected() {
return connected;
}
public boolean isClosed() {
return closed;
}
public void close() {
processor.closeChannel(sc);
}
public void enableRead() {
processor.addInterestOps(sc, SelectionKey.OP_READ);
}
public void enableWrite() {
processor.addInterestOps(sc, SelectionKey.OP_WRITE);
}
}
//使用方法,先在console运行nc -l -p 1234再运行EchoClient
package zzzhc;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
/**
* @author <a href="zzzhcmailto:zzzhc0508@hotmail.com">zzzhc</a>
*
*/
public class EchoClient extends AbstractSocketHandler {
public EchoClient(SelectorProcessor processor,SocketAddress remote) throws IOException {
super(processor,remote);
}
public static void main(String[] args) throws IOException{
EchoClient client = new EchoClient(SelectorProcessor.getDefaultInstance(),new InetSocketAddress("localhost",1234));
}
}
//复杂一点的
package zzzhc;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URL;
import java.nio.channels.FileChannel;
/**
* @author <a href="zzzhc'mailto:zzzhc0508@hotmail.com">zzzhc </a>
*
*/
public class SimpleHttpClient extends AbstractSocketHandler {
public final static int START_CONNECT = 0;
public final static int CONNECT_OK = 1;
public final static int CONNECT_FAILED = 2;
public final static int TRY_SEND_REQUEST = 10;
public final static int REQUEST_SENT = 11;
//public final static int RECV_RESPONSE_HEADER = 20;
public final static int RESPONSE_HEADER_END = 21;
public final static int START_DATA_TRANSFER = 30;
public final static int END_DATA_TRANSFER = 31;
protected int status = START_CONNECT;
protected URL url;
protected FileChannel fc;
protected FileOutputStream out;
private StringBuffer header = new StringBuffer();
public SimpleHttpClient(SelectorProcessor processor, SocketAddress remote,
URL url) throws IOException {
super(processor, remote);
this.url = url;
}
public void onConnected() {
super.onConnected();
try {
Thread.sleep(10);
}catch (Exception e){}
status = CONNECT_OK;
fillHeader(url);
enableWrite();
status = TRY_SEND_REQUEST;
}
protected void fillHeader(URL url) {
writeBuf.clear();
writeBuf.put("GET ".getBytes()).put(url.getPath().getBytes()).put(
" HTTP/1.1".getBytes()).put((byte) '\r').put((byte) '\n');
writeBuf.put("Host: ".getBytes()).put(url.getHost().getBytes()).put((byte) '\r')
.put((byte) '\n');
writeBuf.put((byte) '\r').put((byte) '\n');
writeBuf.flip();
}
public void onConnectFailed(String msg) {
super.onConnectFailed(msg);
status = CONNECT_FAILED;
System.exit(0);
}
public void onRead() {
readBuf.flip();
switch (status) {
case REQUEST_SENT:
int pos = readBuf.limit();
byte b;
while (readBuf.hasRemaining()) {
b = readBuf.get();
if (b == '\r') {
if (pos - readBuf.position() >= 3) {
b = readBuf.get();//\n
b = readBuf.get();
if (b == '\r') {
b = readBuf.get();//\n
readBuf.compact();
status = RESPONSE_HEADER_END;
return;
}
}
}
}
readBuf.position(pos);
readBuf.limit(readBuf.capacity());
break;
case RESPONSE_HEADER_END:
status = START_DATA_TRANSFER;
case START_DATA_TRANSFER:
System.out.println("start data transfer.");
if (out == null) {
String file = url.getFile().trim();
if ("".equals(file)) {
file = "index.html";
}
int idx = file.lastIndexOf('/');
if (idx != -1) {
file = file.substring(idx + 1);
}
if ("".equals(file)) {
file = "index.html";
}
try {
System.out.println("open "+file+" to write.");
out = new FileOutputStream(file);
fc = out.getChannel();
} catch (FileNotFoundException e) {
//this should not happend.
e.printStackTrace();
close();
}
}
try {
while (readBuf.hasRemaining()) {
fc.write(readBuf);
}
} catch (IOException e) {
e.printStackTrace();
close();
}
readBuf.clear();
break;
default:
System.out.println("status:"+status);
}
}
public void onClosed(String msg) {
super.onClosed(msg);
try {
if (out != null) {
out.close();
fc.close();
}
} catch (IOException e) {
e.printStackTrace();
}
System.exit(0);
}
public void onWrite() {
status = REQUEST_SENT;
System.out.println("request sent.");
}
public static void main(String[] args) throws IOException{
URL url = new URL("http://www.sohu.com/");
new SimpleHttpClient(SelectorProcessor.getDefaultInstance(),new InetSocketAddress("www.sohu.com",80),url);
}
}