Sunday, December 13, 2015

Java networking : NIO

Related posts : Java networking : UDP vs TCP

1. Intro

From wiki : https://en.wikipedia.org/wiki/Non-blocking_I/O_(Java)
Non-blocking I/O (usually called NIO, and sometimes called "New I/O") is a collection of Java programming language APIs that offer features for intensive I/O operations. 
Main elements : 

  • NIO data transfer is based on buffers
  • Channels are designed to provide for bulk data transfers to and from NIO buffers.
  • A selector  provides a mechanism for waiting on channels and recognizing when one or more become available for data transfer.


2. NIO server

Main concept of NIO : we can have one server thread which can work with multiple client threads in non-blocking mode. This is implemented by selectors, which have to be used of both sides : client and server. Server is "listening" the channel using selector :
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
After that we can check what exactly clients wants from server :
SelectionKey key = keys.next();
...
if (key.isConnectable()){  // client want to connect to server 
if (key.isWritable()){   // client want server to write something to channel
if (key.isReadable()){ // client want to  read from server 

 
After understanding what exactly client wants from server (connect, read or write), we can get client channel and  proceed with that operation :
SocketChannel channel = (SocketChannel) key.channel();
...
channel.accept();
..
ByteBuffer readBuffer = ByteBuffer.allocate(1000);
channel.read(readBuffer);
....
channel.write(ByteBuffer.wrap(data));

Also,  as it was mentioned before - data transfer is based on buffers.

Full code of server :

package com.demien.networking;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.*;

public class NioServer implements Runnable {

    public final static String ADDRESS = "127.0.0.1";
    public final static int PORT = 6666;
    public final static long TIMEOUT = 10000;

    private ServerSocketChannel serverChannel;
    private Selector selector;
    /**     * This hashmap is important. It keeps track of the data that will be written to the clients.     * This is needed because we read/write asynchronously and we might be reading while the server     * wants to write. In other words, we tell the Selector we are ready to write (SelectionKey.OP_WRITE)     * and when we get a key for writting, we then write from the Hashmap. The write() method explains this further.     */    private Map<SocketChannel, byte[]> clientMessages = new HashMap<SocketChannel, byte[]>();

    public static void main(String args[]) {
        Thread serverThread = new Thread(new NioServer());
        serverThread.run();
    }

    public NioServer() {
        init();
    }

    private void init() {
        System.out.println("initializing server");
        // We do not want to call init() twice and recreate the selector or the serverChannel.        if (selector != null) return;
        if (serverChannel != null) return;

        try {
            // This is how you open a Selector            selector = Selector.open();
            // This is how you open a ServerSocketChannel            serverChannel = ServerSocketChannel.open();
            // You MUST configure as non-blocking or else you cannot register the serverChannel to the Selector.            serverChannel.configureBlocking(false);
            // bind to the address that you will use to Serve.            serverChannel.socket().bind(new InetSocketAddress(ADDRESS, PORT));

            /**             * Here you are registering the serverSocketChannel to accept connection, thus the OP_ACCEPT.             * This means that you just told your selector that this channel will be used to accept connections.             * We can change this operation later to read/write, more on this later.             */            serverChannel.register(selector, SelectionKey.OP_ACCEPT);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override    public void run() {
        System.out.println("Now accepting connections...");
        try {
            // A run the server as long as the thread is not interrupted.            while (!Thread.currentThread().isInterrupted()) {
                /**                 * selector.select(TIMEOUT) is waiting for an OPERATION to be ready and is a blocking call.                 * For example, if a client connects right this second, then it will break from the select()                 * call and run the code below it. The TIMEOUT is not needed, but its just so it doesn't                 * block undefinitely.                 */                selector.select(TIMEOUT);

                /**                 * If we are here, it is because an operation happened (or the TIMEOUT expired).                 * We need to get the SelectionKeys from the selector to see what operations are available.                 * We use an iterator for this.                 */                Iterator<SelectionKey> keys = selector.selectedKeys().iterator();

                while (keys.hasNext()) {
                    SelectionKey key = keys.next();
                    // remove the key so that we don't process this OPERATION again.                    keys.remove();

                    // key could be invalid if for example, the client closed the connection.                    if (!key.isValid()) {
                        continue;
                    }
                    /**                     * In the server, we start by listening to the OP_ACCEPT when we register with the Selector.                     * If the key from the keyset is Acceptable, then we must get ready to accept the client                     * connection and do something with it. Go read the comments in the accept method.                     */                    if (key.isAcceptable()) {
                        System.out.println("Accepting connection");
                        accept(key);
                    }
                    /**                     * If you already read the comments in the accept() method, then you know we changed                     * the OPERATION to OP_WRITE. This means that one of these keys in the iterator will return                     * a channel that is writable (key.isWritable()). The write() method will explain further.                     */                    if (key.isWritable()) {
                        System.out.println("Writing...");
                        write(key);
                    }
                    /**                     * If you already read the comments in the write method then you understand that we registered                     * the OPERATION OP_READ. That means that on the next Selector.select(), there is probably a key                     * that is ready to read (key.isReadable()). The read() method will explain further.                     */                    if (key.isReadable()) {
                        System.out.println("Reading connection");
                        read(key);
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            closeConnection();
        }

    }

    /**     * We registered this channel in the Selector. This means that the SocketChannel we are receiving     * back from the key.channel() is the same channel that was used to register the selector in the accept()     * method. Again, I am just explaning as if things are synchronous to make things easy to understand.     * This means that later, we might register to write from the read() method (for example).     */    private void write(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        writeToChannel(channel);
        key.interestOps(SelectionKey.OP_READ);
    }

    private void writeToChannel(SocketChannel channel) throws IOException {
        /**         * The hashmap contains the object SockenChannel along with the information in it to be written.         * In this example, we send the "Hello from server" String and also an echo back to the client.         * This is what the hashmap is for, to keep track of the messages to be written and their socketChannels.         */        byte[] data = clientMessages.get(channel);
        if (data != null) {
            clientMessages.remove(channel);

            // Something to notice here is that reads and writes in NIO go directly to the channel and in form of            // a buffer.            channel.write(ByteBuffer.wrap(data));
        }

        // Since we wrote, then we should register to read next, since that is the most logical thing        // to happen next. YOU DO NOT HAVE TO DO THIS. But I am doing it for the purpose of the example        // Usually if you register once for a read/write/connect/accept, you never have to register again for that unless you        // register for none (0). Like it said, I am doing it here for the purpose of the example. The same goes for all others.
    }

    // Nothing special, just closing our selector and socket.    private void closeConnection() {
        System.out.println("Closing server down");
        if (selector != null) {
            try {
                selector.close();
                serverChannel.socket().close();
                serverChannel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**     * Since we are accepting, we must instantiate a serverSocketChannel by calling key.channel().     * We use this in order to get a socketChannel (which is like a socket in I/O) by calling     * serverSocketChannel.accept() and we register that channel to the selector to listen     * to a WRITE OPERATION. I do this because my server sends a hello message to each     * client that connects to it. This doesn't mean that I will write right NOW. It means that I     * told the selector that I am ready to write and that next time Selector.select() gets called     * it should give me a key with isWritable(). More on this in the write() method.     */    private void accept(SelectionKey key) throws IOException {
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);

        socketChannel.register(selector, SelectionKey.OP_WRITE);
        byte[] hello = new String("Hello from server").getBytes();
        clientMessages.put(socketChannel, hello);
    }

    /**     * We read data from the channel. In this case, my server works as an echo, so it calls the echo() method.     * The echo() method, sets the server in the WRITE OPERATION. When the while loop in run() happens again,     * one of those keys from Selector.select() will be key.isWritable() and this is where the actual     * write will happen by calling the write() method.     */    private void read(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
        readBuffer.clear();
        int read;
        try {
            read = channel.read(readBuffer);
        } catch (IOException e) {
            System.out.println("Reading problem, closing connection");
            key.cancel();
            channel.close();
            return;
        }
        if (read == -1) {
            System.out.println("Nothing was there to be read, closing connection");
            channel.close();
            key.cancel();
            return;
        }
        // IMPORTANT - don't forget the flip() the buffer. It is like a reset without clearing it.        readBuffer.flip();
        byte[] data = new byte[1000];
        readBuffer.get(data, 0, read);
        System.out.println("Received: " + extractString(data));

        echo(key, data);
    }

    private void echo(SelectionKey key, byte[] data) throws IOException {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        clientMessages.put(socketChannel, data);
        writeToChannel(socketChannel);
        key.interestOps(SelectionKey.OP_WRITE);
    }

    public static String extractString(byte[] array) {
        List<Byte> wrapper = new ArrayList<>();
        for (int i = 0; i < array.length; i++) {
            if (array[i] != 0) {
                wrapper.add(array[i]);
            } else {
                break;
            }
        }
        byte[] resultArray = new byte[wrapper.size()];
        for (int i = 0; i < wrapper.size(); i++) {
            resultArray[i] = wrapper.get(i);
        }


        return new String(resultArray);
    }
}


3. NIO client

Client implementation is based on the same principals : register selector and wait for "events" to understand what to do : connect, read or write.
Full code of client :

package com.demien.networking;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;


public class NioClient implements Runnable {

    private Selector selector;
    private BufferedReader stdIn = new BufferedReader( new InputStreamReader(System.in));

    public static void main(String[] args) {
        NioClient clientThread = new NioClient();
        Thread thread = new Thread(clientThread);
        thread.start();
    }

        @Override        public void run() {
            SocketChannel channel;
            try {
                selector = Selector.open();
                channel = SocketChannel.open();
                channel.configureBlocking(false);

                channel.register(selector, SelectionKey.OP_CONNECT);
                channel.connect(new InetSocketAddress(NioServer.ADDRESS, NioServer.PORT));

                System.out.println("Starting....");

                while (!Thread.interrupted()){

                    selector.select(1000);

                    Iterator<SelectionKey> keys = selector.selectedKeys().iterator();

                    while (keys.hasNext()){
                        SelectionKey key = keys.next();
                        keys.remove();

                        if (!key.isValid()) continue;

                        if (key.isConnectable()){
                            System.out.println("I am connected to the server");
                            connect(key);
                        }
                        if (key.isWritable()){
                            write(key);
                        }
                        if (key.isReadable()){
                            read(key);
                        }
                    }
                }
            } catch (Exception e1) {
                   e1.printStackTrace();
            } finally {
                close();
            }
        }

        private void close(){
            try {
                selector.close();
            } catch (IOException e) {
                  e.printStackTrace();
            }
        }

        private void read (SelectionKey key) throws IOException {
            SocketChannel channel = (SocketChannel) key.channel();
            ByteBuffer readBuffer = ByteBuffer.allocate(1000);
            readBuffer.clear();
            int length;
            try{
                length = channel.read(readBuffer);
            } catch (IOException e){
                System.out.println("Reading problem, closing connection");
                key.cancel();
                channel.close();
                return;
            }
            if (length == -1){
                System.out.println("Nothing was read from server");
                channel.close();
                key.cancel();
                return;
            }
            readBuffer.flip();
            byte[] buff = new byte[1024];
            readBuffer.get(buff, 0, length);
            System.out.println("Received: " + NioServer.extractString(buff));
            key.interestOps(SelectionKey.OP_WRITE);
        }

        private void write(SelectionKey key) throws IOException {
            SocketChannel channel = (SocketChannel) key.channel();
            String userInput = stdIn.readLine();
                channel.write(ByteBuffer.wrap(userInput.getBytes()));
            // lets get ready to read.            key.interestOps(SelectionKey.OP_READ);
        }

        private void connect(SelectionKey key) throws IOException {
            SocketChannel channel = (SocketChannel) key.channel();
            if (channel.isConnectionPending()){
                channel.finishConnect();
            }
            channel.configureBlocking(false);
            channel.register(selector, SelectionKey.OP_READ);
        }

} 

4. Running
After starting we can send messages to server by input (console) and watch "echo" output :
Starting....
I am connected to the server
Received: Hello from server
hello. i'm a client
Received: hello. i'm a client
how are you ?
Received: how are you ?



No comments:

Post a Comment