本示例是通过Java实现的非阻塞式高性能IO通信的例子。通信的交互协议:
- 服务端给每个accept到的SocketChannel发送一条欢迎信息(Welcome to Log on!)
- 客户端连接在建立完成之后生成随机数,若生成的随机数是5,则向服务端发送关闭消息(good-bye.),之后关闭连接释放资源;若生成的不是5,则生成消息发送给服务端
- 服务端接收客户端发送的消息,判断是否为关闭消息(good-bye.),若是则关闭连接释放资源;若不是则在日志记录接收到的客户端信息,再将消息发送给客户端
- 客户端在接收到服务端返回的消息之后打印消息,且再次生成随机数,重复2,3步骤
消息的封装格式(这里涉及到了拆包和解包):
消息长度(int)+消息类容(为了简便只是String)
每条消息的总长度:4+消息长度
服务端代码
import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;import java.net.InetSocketAddress;import java.net.StandardSocketOptions;import java.nio.ByteBuffer;import java.nio.CharBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.channels.spi.SelectorProvider;import java.nio.charset.CharacterCodingException;import java.nio.charset.Charset;import java.nio.charset.CharsetDecoder;import java.nio.charset.CharsetEncoder;import java.util.*;/** * Nio非阻塞服务端 + 消息的拆分包 * 通信协议: * 1.客户端连接时候服务端发送 Welcome to Log on! 消息 * 2.消息格式:消息长度(int)+消息内容(String) * 3.客户端主动终止连接(发送 good-bye.消息) */public class NonBlockingEchoServer { private static final Logger LOG = LoggerFactory.getLogger(NonBlockingEchoServer.class); private static final SelectorProvider selectorProvider = SelectorProvider.provider(); private final Charset defaultCharset = Charset.forName("UTF-8"); private final CharsetDecoder decoder = defaultCharset.newDecoder(); private final CharsetEncoder encoder = defaultCharset.newEncoder(); private final Map> keepDataTrack = new HashMap<>(); private final String bindIp = "0.0.0.0"; private final int bindPort = 5555; private final String strWelcomeMsg = "Welcome to Log on!"; public static void main(String[] args) { new NonBlockingEchoServer().startEchoServer(); } private void startEchoServer() { // open Selector and ServerSocketChannel by calling the open() method try (Selector selector = selectorProvider.openSelector(); ServerSocketChannel serverSocketChannel = selectorProvider.openServerSocketChannel()) { // check that both of them were successfully opened if (serverSocketChannel.isOpen() && selector.isOpen()) { // configure non-blocking mode serverSocketChannel.configureBlocking(false); // set some options serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 256 * 1024); serverSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); // bind the server socket channel to port serverSocketChannel.bind(new InetSocketAddress(bindIp, bindPort)); // register the current channel with the given selector serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, serverSocketChannel); // display a waiting message while ... waiting! LOG.info("Waiting for connections ..."); this.handler(selector); } else { LOG.info("The server socket channel or selector cannot be opened!"); } } catch (IOException ex) { LOG.error("Creation Selector or ServerSocketChannel Error!", ex); } } private void handler(Selector selector) throws IOException { while (true) { // wait for incoming events selector.select(); // there is something to process on selected keys Iterator keys = selector.selectedKeys().iterator(); while (keys.hasNext()) { SelectionKey key = keys.next(); // prevent the same key from coming up again keys.remove(); if (!key.isValid()) { Object attachment = key.attachment(); if (attachment instanceof ServerSocketChannel) { LOG.error("服务端异常,退出服务.或者升级服务重新启动服务!"); System.exit(1); } if (attachment instanceof SocketChannel) { LOG.warn("Client connect exception. remote address is {}", ((SocketChannel) attachment).getRemoteAddress()); continue; } } if (key.isAcceptable()) { this.handleAccept(key, selector); } else if (key.isReadable()) { this.handleRead(key); } else if (key.isWritable()) { this.handleWrite(key); } } } } // isAcceptable returned true 服务端先写后读 private void handleAccept(SelectionKey key, Selector selector) throws IOException { ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = serverChannel.accept(); socketChannel.configureBlocking(false); LOG.info("Incoming connection from: {}", socketChannel.getRemoteAddress()); // write a welcome message ByteBuffer welcomeMsg = this.wrapWelcomeMsg(); socketChannel.write(welcomeMsg); // register channel with selector for further I/O List list = new ArrayList<>(2); list.add(ByteBuffer.allocate(128));//readBuffer list.add(ByteBuffer.allocate(128));//writeBuffer keepDataTrack.put(socketChannel, list); socketChannel.register(selector, SelectionKey.OP_READ, socketChannel); selector.wakeup(); } private ByteBuffer wrapWelcomeMsg() throws CharacterCodingException { ByteBuffer buffer = encoder.encode(CharBuffer.wrap(strWelcomeMsg)); ByteBuffer welcomeMsg = ByteBuffer.allocate(buffer.remaining() + 4); welcomeMsg.putInt(buffer.remaining()); welcomeMsg.put(buffer); welcomeMsg.flip(); return welcomeMsg; } // isReadable returned true private void handleRead(SelectionKey key) { SocketChannel socketChannel = (SocketChannel) key.channel(); try { ByteBuffer buffer = keepDataTrack.get(socketChannel).get(0); try { socketChannel.read(buffer); } catch (IOException e) { LOG.warn("Cannot read error!", e); socketChannel.close(); key.cancel(); return; } buffer.mark();//set mark = position buffer.flip(); int msgLen = buffer.getInt(); if (buffer.remaining() >= msgLen) {//能够解析到一个完整的包 byte[] buf = new byte[msgLen]; buffer.get(buf); String receive = decoder.decode(ByteBuffer.wrap(buf)).toString().trim(); if ("good-bye.".equals(receive)) { this.keepDataTrack.remove(socketChannel); LOG.warn("Connection closed by: {}", socketChannel.getRemoteAddress()); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } socketChannel.close(); key.cancel(); return; } else { LOG.debug("[Server] ({}) from {}", receive, socketChannel.getRemoteAddress()); // write back to client if (buffer.hasRemaining()) { buffer.compact(); } else { buffer.clear(); } doEchoJob(key, encoder.encode(CharBuffer.wrap(receive))); } } else {//解析不够一个完整的数据包 buffer.reset();//set position = mark LOG.debug("The received data is less than one pack"); } } catch (IOException e) { LOG.warn("Read data from {} exception.", socketChannel, e); if (socketChannel.isOpen()) { try { socketChannel.close(); } catch (IOException ex) { //NOOP } } } } // isWritable returned true private void handleWrite(SelectionKey key) { SocketChannel socketChannel = (SocketChannel) key.channel(); try { ByteBuffer buffer = keepDataTrack.get(socketChannel).get(1); try { socketChannel.write(buffer); } catch (IOException e) { socketChannel.close(); key.cancel(); } key.interestOps(SelectionKey.OP_READ); if (buffer.hasRemaining()) { buffer.compact(); } else { buffer.clear(); } } catch (IOException e) { LOG.warn("Write data to {} exception.", socketChannel, e); if (socketChannel.isOpen()) { try { socketChannel.close(); } catch (IOException ex) { //NOOP } } } } /** * 将从客户端读入的数据重新包装之后发送给客户端 * * @param key 用于关联输入和输出 * @param data 被输入处理后想要发送给输出的数据 */ private void doEchoJob(SelectionKey key, ByteBuffer data) { SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer buffer = keepDataTrack.get(socketChannel).get(1); buffer.putInt(data.remaining()); buffer.put(data); buffer.flip(); key.interestOps(SelectionKey.OP_WRITE); }}
客户端代码
import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;import java.net.InetSocketAddress;import java.net.StandardSocketOptions;import java.nio.ByteBuffer;import java.nio.CharBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.nio.channels.UnresolvedAddressException;import java.nio.channels.spi.SelectorProvider;import java.nio.charset.Charset;import java.nio.charset.CharsetDecoder;import java.nio.charset.CharsetEncoder;import java.util.Iterator;import java.util.Set;import java.util.concurrent.ThreadLocalRandom;/** * Nio非阻塞客户端+ 消息的拆分包 * 通信协议: * 1.客户端连接时候服务端发送 Welcome to Log on! 消息 * 2.消息格式:消息长度(int)+消息内容(String) * 3.客户端主动终止连接(发送 good-bye. 消息) */public class NonBlockingEchoClient implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(NonBlockingEchoClient.class); private static final SelectorProvider selectorProvider = SelectorProvider.provider(); private final Charset charset = Charset.forName("UTF-8"); private final CharsetDecoder decoder = charset.newDecoder(); private final CharsetEncoder encoder = charset.newEncoder(); private final ByteBuffer readBuffer = ByteBuffer.allocateDirect(128); private final ByteBuffer writeBuffer = ByteBuffer.allocateDirect(128); private final SocketChannel socketChannel; private final Selector selector; private final int DEFAULT_PORT = 5555; private final String IP = "127.0.0.1"; private final ByteBuffer byeMsg = encoder.encode(CharBuffer.wrap("good-bye.")); private boolean close = false; public NonBlockingEchoClient() throws IOException { // open Selector and ServerSocketChannel by calling the open() method this.selector = selectorProvider.openSelector(); this.socketChannel = selectorProvider.openSocketChannel(); // configure non-blocking mode socketChannel.configureBlocking(false); // set some options socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 128); socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, 128); socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); // register the current channel with the given selector InetSocketAddress address = new InetSocketAddress(IP, DEFAULT_PORT); // connect to remote host Boolean connected = Boolean.FALSE; try { connected = socketChannel.connect(address); } catch (UnresolvedAddressException e) { this.releaseResource(); throw new IOException("Can't resolve address: " + address, e); } catch (IOException ex) { this.releaseResource(); throw ex; } if (connected) { this.socketChannel.register(this.selector, SelectionKey.OP_READ); } else { this.socketChannel.register(this.selector, SelectionKey.OP_CONNECT); } } public static void main(String[] args) { // org.apache.log4j.PropertyConfigurator.configure("./config/log4j.properties"); for (int i = 0; i < 10; i++) { try { new Thread(new NonBlockingEchoClient()).start(); } catch (IOException e) { LOG.error("Instantiation Selector or SocketChannel Error!", e); } } } @Override public void run() { while (!close) { try { // waiting for the connection if (this.selector.select() > 0) {// get keys Setkeys = selector.selectedKeys(); Iterator its = keys.iterator(); // process each key while (its.hasNext()) { SelectionKey key = its.next(); // remove the current key its.remove(); if (key.isConnectable()) { this.socketChannel.finishConnect(); //key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ); key.interestOps(SelectionKey.OP_READ); } if (key.isWritable()) { this.writeDateToChannel(key); } if (key.isReadable()) { this.readDataFromChannel(key); } if (!key.isValid()) { key.channel().close(); key.cancel(); } } } } catch (IOException e) { try { this.releaseResource(); } catch (IOException e1) { e1.printStackTrace(); } } } try { this.releaseResource(); } catch (IOException e) { LOG.warn("Close connect exception.", e); } } private void writeDateToChannel(SelectionKey key) { SocketChannel keySocketChannel = (SocketChannel) key.channel(); try { int r = ThreadLocalRandom.current().nextInt(10); if (r == 5) { LOG.debug("5 was generated! Close the socket channel!"); writeBuffer.putInt(byeMsg.remaining()); writeBuffer.put(byeMsg); writeBuffer.flip(); keySocketChannel.write(writeBuffer); this.close = true; try { Thread.sleep(200); } catch (InterruptedException ex) { //NOOP } writeBuffer.clear(); } else { ByteBuffer tmp = encoder.encode(CharBuffer.wrap((Thread.currentThread().getName() + " [Client]Random number: ").concat(String.valueOf(r)))); writeBuffer.putInt(tmp.remaining()); writeBuffer.put(tmp); writeBuffer.flip(); ; keySocketChannel.write(writeBuffer); if (writeBuffer.hasRemaining()) { writeBuffer.compact(); } else { writeBuffer.clear(); } key.interestOps(SelectionKey.OP_READ); } } catch (IOException e) { LOG.warn("Write data to channel exception.", e); } } private void readDataFromChannel(SelectionKey key) { SocketChannel keySocketChannel = (SocketChannel) key.channel(); try { keySocketChannel.read(readBuffer); readBuffer.mark(); readBuffer.flip(); int msgLen = readBuffer.getInt(); if (readBuffer.remaining() >= msgLen) { byte[] buf = new byte[msgLen]; readBuffer.get(buf); String receive = decoder.decode(ByteBuffer.wrap(buf)).toString().trim(); LOG.debug("{}[[client]] {}", Thread.currentThread().getName(), receive); if (readBuffer.hasRemaining()) { readBuffer.compact(); } else { readBuffer.clear(); } key.interestOps(SelectionKey.OP_WRITE); } else { readBuffer.reset(); LOG.debug("The received data is less than one pack"); } } catch (IOException e) { LOG.warn("Read data from channel exception.", e); } } /** * 连接异常释放资源 * * @throws IOException */ private void releaseResource() throws IOException { this.socketChannel.shutdownOutput(); try { Thread.sleep(10); } catch (InterruptedException e) { // noop } this.socketChannel.close(); this.selector.close(); this.close = false; }}