博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java 非阻塞式的高伸缩性IO通信 学习demo
阅读量:2394 次
发布时间:2019-05-10

本文共 17435 字,大约阅读时间需要 58 分钟。

本示例是通过Java实现的非阻塞式高性能IO通信的例子。通信的交互协议:

  1. 服务端给每个accept到的SocketChannel发送一条欢迎信息(Welcome to Log on!)
  2. 客户端连接在建立完成之后生成随机数,若生成的随机数是5,则向服务端发送关闭消息(good-bye.),之后关闭连接释放资源;若生成的不是5,则生成消息发送给服务端
  3. 服务端接收客户端发送的消息,判断是否为关闭消息(good-bye.),若是则关闭连接释放资源;若不是则在日志记录接收到的客户端信息,再将消息发送给客户端
  4. 客户端在接收到服务端返回的消息之后打印消息,且再次生成随机数,重复2,3步骤

消息的封装格式(这里涉及到了拆包和解包):

消息长度(int)+消息类容(为了简便只是String)

每条消息的总长度:4+消息长度

服务端代码

import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;import java.net.InetSocketAddress;import java.net.StandardSocketOptions;import java.nio.ByteBuffer;import java.nio.CharBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.channels.spi.SelectorProvider;import java.nio.charset.CharacterCodingException;import java.nio.charset.Charset;import java.nio.charset.CharsetDecoder;import java.nio.charset.CharsetEncoder;import java.util.*;/** * Nio非阻塞服务端 + 消息的拆分包 * 通信协议: * 1.客户端连接时候服务端发送 Welcome to Log on! 消息 * 2.消息格式:消息长度(int)+消息内容(String) * 3.客户端主动终止连接(发送 good-bye.消息) */public class NonBlockingEchoServer {    private static final Logger LOG = LoggerFactory.getLogger(NonBlockingEchoServer.class);    private static final SelectorProvider selectorProvider = SelectorProvider.provider();    private final Charset defaultCharset = Charset.forName("UTF-8");    private final CharsetDecoder decoder = defaultCharset.newDecoder();    private final CharsetEncoder encoder = defaultCharset.newEncoder();    private final Map
> keepDataTrack = new HashMap<>(); private final String bindIp = "0.0.0.0"; private final int bindPort = 5555; private final String strWelcomeMsg = "Welcome to Log on!"; public static void main(String[] args) { new NonBlockingEchoServer().startEchoServer(); } private void startEchoServer() { // open Selector and ServerSocketChannel by calling the open() method try (Selector selector = selectorProvider.openSelector(); ServerSocketChannel serverSocketChannel = selectorProvider.openServerSocketChannel()) { // check that both of them were successfully opened if (serverSocketChannel.isOpen() && selector.isOpen()) { // configure non-blocking mode serverSocketChannel.configureBlocking(false); // set some options serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 256 * 1024); serverSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); // bind the server socket channel to port serverSocketChannel.bind(new InetSocketAddress(bindIp, bindPort)); // register the current channel with the given selector serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, serverSocketChannel); // display a waiting message while ... waiting! LOG.info("Waiting for connections ..."); this.handler(selector); } else { LOG.info("The server socket channel or selector cannot be opened!"); } } catch (IOException ex) { LOG.error("Creation Selector or ServerSocketChannel Error!", ex); } } private void handler(Selector selector) throws IOException { while (true) { // wait for incoming events selector.select(); // there is something to process on selected keys Iterator
keys = selector.selectedKeys().iterator(); while (keys.hasNext()) { SelectionKey key = keys.next(); // prevent the same key from coming up again keys.remove(); if (!key.isValid()) { Object attachment = key.attachment(); if (attachment instanceof ServerSocketChannel) { LOG.error("服务端异常,退出服务.或者升级服务重新启动服务!"); System.exit(1); } if (attachment instanceof SocketChannel) { LOG.warn("Client connect exception. remote address is {}", ((SocketChannel) attachment).getRemoteAddress()); continue; } } if (key.isAcceptable()) { this.handleAccept(key, selector); } else if (key.isReadable()) { this.handleRead(key); } else if (key.isWritable()) { this.handleWrite(key); } } } } // isAcceptable returned true 服务端先写后读 private void handleAccept(SelectionKey key, Selector selector) throws IOException { ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = serverChannel.accept(); socketChannel.configureBlocking(false); LOG.info("Incoming connection from: {}", socketChannel.getRemoteAddress()); // write a welcome message ByteBuffer welcomeMsg = this.wrapWelcomeMsg(); socketChannel.write(welcomeMsg); // register channel with selector for further I/O List
list = new ArrayList<>(2); list.add(ByteBuffer.allocate(128));//readBuffer list.add(ByteBuffer.allocate(128));//writeBuffer keepDataTrack.put(socketChannel, list); socketChannel.register(selector, SelectionKey.OP_READ, socketChannel); selector.wakeup(); } private ByteBuffer wrapWelcomeMsg() throws CharacterCodingException { ByteBuffer buffer = encoder.encode(CharBuffer.wrap(strWelcomeMsg)); ByteBuffer welcomeMsg = ByteBuffer.allocate(buffer.remaining() + 4); welcomeMsg.putInt(buffer.remaining()); welcomeMsg.put(buffer); welcomeMsg.flip(); return welcomeMsg; } // isReadable returned true private void handleRead(SelectionKey key) { SocketChannel socketChannel = (SocketChannel) key.channel(); try { ByteBuffer buffer = keepDataTrack.get(socketChannel).get(0); try { socketChannel.read(buffer); } catch (IOException e) { LOG.warn("Cannot read error!", e); socketChannel.close(); key.cancel(); return; } buffer.mark();//set mark = position buffer.flip(); int msgLen = buffer.getInt(); if (buffer.remaining() >= msgLen) {//能够解析到一个完整的包 byte[] buf = new byte[msgLen]; buffer.get(buf); String receive = decoder.decode(ByteBuffer.wrap(buf)).toString().trim(); if ("good-bye.".equals(receive)) { this.keepDataTrack.remove(socketChannel); LOG.warn("Connection closed by: {}", socketChannel.getRemoteAddress()); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } socketChannel.close(); key.cancel(); return; } else { LOG.debug("[Server] ({}) from {}", receive, socketChannel.getRemoteAddress()); // write back to client if (buffer.hasRemaining()) { buffer.compact(); } else { buffer.clear(); } doEchoJob(key, encoder.encode(CharBuffer.wrap(receive))); } } else {//解析不够一个完整的数据包 buffer.reset();//set position = mark LOG.debug("The received data is less than one pack"); } } catch (IOException e) { LOG.warn("Read data from {} exception.", socketChannel, e); if (socketChannel.isOpen()) { try { socketChannel.close(); } catch (IOException ex) { //NOOP } } } } // isWritable returned true private void handleWrite(SelectionKey key) { SocketChannel socketChannel = (SocketChannel) key.channel(); try { ByteBuffer buffer = keepDataTrack.get(socketChannel).get(1); try { socketChannel.write(buffer); } catch (IOException e) { socketChannel.close(); key.cancel(); } key.interestOps(SelectionKey.OP_READ); if (buffer.hasRemaining()) { buffer.compact(); } else { buffer.clear(); } } catch (IOException e) { LOG.warn("Write data to {} exception.", socketChannel, e); if (socketChannel.isOpen()) { try { socketChannel.close(); } catch (IOException ex) { //NOOP } } } } /** * 将从客户端读入的数据重新包装之后发送给客户端 * * @param key 用于关联输入和输出 * @param data 被输入处理后想要发送给输出的数据 */ private void doEchoJob(SelectionKey key, ByteBuffer data) { SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer buffer = keepDataTrack.get(socketChannel).get(1); buffer.putInt(data.remaining()); buffer.put(data); buffer.flip(); key.interestOps(SelectionKey.OP_WRITE); }}

客户端代码

import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;import java.net.InetSocketAddress;import java.net.StandardSocketOptions;import java.nio.ByteBuffer;import java.nio.CharBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.nio.channels.UnresolvedAddressException;import java.nio.channels.spi.SelectorProvider;import java.nio.charset.Charset;import java.nio.charset.CharsetDecoder;import java.nio.charset.CharsetEncoder;import java.util.Iterator;import java.util.Set;import java.util.concurrent.ThreadLocalRandom;/** * Nio非阻塞客户端+ 消息的拆分包 * 通信协议: * 1.客户端连接时候服务端发送 Welcome to Log on! 消息 * 2.消息格式:消息长度(int)+消息内容(String) * 3.客户端主动终止连接(发送 good-bye. 消息) */public class NonBlockingEchoClient implements Runnable {    private static final Logger LOG = LoggerFactory.getLogger(NonBlockingEchoClient.class);    private static final SelectorProvider selectorProvider = SelectorProvider.provider();    private final Charset charset = Charset.forName("UTF-8");    private final CharsetDecoder decoder = charset.newDecoder();    private final CharsetEncoder encoder = charset.newEncoder();    private final ByteBuffer readBuffer = ByteBuffer.allocateDirect(128);    private final ByteBuffer writeBuffer = ByteBuffer.allocateDirect(128);    private final SocketChannel socketChannel;    private final Selector selector;    private final int DEFAULT_PORT = 5555;    private final String IP = "127.0.0.1";    private final ByteBuffer byeMsg = encoder.encode(CharBuffer.wrap("good-bye."));    private boolean close = false;    public NonBlockingEchoClient() throws IOException {        // open Selector and ServerSocketChannel by calling the open() method        this.selector = selectorProvider.openSelector();        this.socketChannel = selectorProvider.openSocketChannel();        // configure non-blocking mode        socketChannel.configureBlocking(false);        // set some options        socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 128);        socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, 128);        socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);        // register the current channel with the given selector        InetSocketAddress address = new InetSocketAddress(IP, DEFAULT_PORT);        // connect to remote host        Boolean connected = Boolean.FALSE;        try {            connected = socketChannel.connect(address);        } catch (UnresolvedAddressException e) {            this.releaseResource();            throw new IOException("Can't resolve address: " + address, e);        } catch (IOException ex) {            this.releaseResource();            throw ex;        }        if (connected) {            this.socketChannel.register(this.selector, SelectionKey.OP_READ);        } else {            this.socketChannel.register(this.selector, SelectionKey.OP_CONNECT);        }    }    public static void main(String[] args) {        // org.apache.log4j.PropertyConfigurator.configure("./config/log4j.properties");        for (int i = 0; i < 10; i++) {            try {                new Thread(new NonBlockingEchoClient()).start();            } catch (IOException e) {                LOG.error("Instantiation Selector or SocketChannel Error!", e);            }        }    }    @Override    public void run() {        while (!close) {            try {                // waiting for the connection                if (this.selector.select() > 0) {// get keys                    Set
keys = selector.selectedKeys(); Iterator
its = keys.iterator(); // process each key while (its.hasNext()) { SelectionKey key = its.next(); // remove the current key its.remove(); if (key.isConnectable()) { this.socketChannel.finishConnect(); //key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ); key.interestOps(SelectionKey.OP_READ); } if (key.isWritable()) { this.writeDateToChannel(key); } if (key.isReadable()) { this.readDataFromChannel(key); } if (!key.isValid()) { key.channel().close(); key.cancel(); } } } } catch (IOException e) { try { this.releaseResource(); } catch (IOException e1) { e1.printStackTrace(); } } } try { this.releaseResource(); } catch (IOException e) { LOG.warn("Close connect exception.", e); } } private void writeDateToChannel(SelectionKey key) { SocketChannel keySocketChannel = (SocketChannel) key.channel(); try { int r = ThreadLocalRandom.current().nextInt(10); if (r == 5) { LOG.debug("5 was generated! Close the socket channel!"); writeBuffer.putInt(byeMsg.remaining()); writeBuffer.put(byeMsg); writeBuffer.flip(); keySocketChannel.write(writeBuffer); this.close = true; try { Thread.sleep(200); } catch (InterruptedException ex) { //NOOP } writeBuffer.clear(); } else { ByteBuffer tmp = encoder.encode(CharBuffer.wrap((Thread.currentThread().getName() + " [Client]Random number: ").concat(String.valueOf(r)))); writeBuffer.putInt(tmp.remaining()); writeBuffer.put(tmp); writeBuffer.flip(); ; keySocketChannel.write(writeBuffer); if (writeBuffer.hasRemaining()) { writeBuffer.compact(); } else { writeBuffer.clear(); } key.interestOps(SelectionKey.OP_READ); } } catch (IOException e) { LOG.warn("Write data to channel exception.", e); } } private void readDataFromChannel(SelectionKey key) { SocketChannel keySocketChannel = (SocketChannel) key.channel(); try { keySocketChannel.read(readBuffer); readBuffer.mark(); readBuffer.flip(); int msgLen = readBuffer.getInt(); if (readBuffer.remaining() >= msgLen) { byte[] buf = new byte[msgLen]; readBuffer.get(buf); String receive = decoder.decode(ByteBuffer.wrap(buf)).toString().trim(); LOG.debug("{}[[client]] {}", Thread.currentThread().getName(), receive); if (readBuffer.hasRemaining()) { readBuffer.compact(); } else { readBuffer.clear(); } key.interestOps(SelectionKey.OP_WRITE); } else { readBuffer.reset(); LOG.debug("The received data is less than one pack"); } } catch (IOException e) { LOG.warn("Read data from channel exception.", e); } } /** * 连接异常释放资源 * * @throws IOException */ private void releaseResource() throws IOException { this.socketChannel.shutdownOutput(); try { Thread.sleep(10); } catch (InterruptedException e) { // noop } this.socketChannel.close(); this.selector.close(); this.close = false; }}

转载于:https://my.oschina.net/u/1250501/blog/1574898

你可能感兴趣的文章
oracle数据库
查看>>
oracle中间的数据类型
查看>>
oracle中创建表空间和创建用户
查看>>
同态加密
查看>>
换手机号之后
查看>>
安全论文中的符号含义
查看>>
Ubuntu16.04 删除不用软件
查看>>
论文划分
查看>>
vscode利用cmake调试
查看>>
zcash挖矿
查看>>
zcash挖矿指南
查看>>
区块链术语解释
查看>>
./configure,make,make install的作用
查看>>
学术论文录用结果通知(Notification)
查看>>
Theorem等数学化的论述
查看>>
PKI和X509证书
查看>>
使用HttpClient爬取国内疫情数据
查看>>
引用传递和值传递有什么区别
查看>>
C++从入门到放肆!
查看>>
C++是什么?怎么学?学完了能得到什么?
查看>>