Sunday, December 13, 2015

Java networking : NIO

Related posts : Java networking : UDP vs TCP

1. Intro

From wiki : https://en.wikipedia.org/wiki/Non-blocking_I/O_(Java)
Non-blocking I/O (usually called NIO, and sometimes called "New I/O") is a collection of Java programming language APIs that offer features for intensive I/O operations. 
Main elements : 

  • NIO data transfer is based on buffers
  • Channels are designed to provide for bulk data transfers to and from NIO buffers.
  • A selector  provides a mechanism for waiting on channels and recognizing when one or more become available for data transfer.


2. NIO server

Main concept of NIO : we can have one server thread which can work with multiple client threads in non-blocking mode. This is implemented by selectors, which have to be used of both sides : client and server. Server is "listening" the channel using selector :
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
After that we can check what exactly clients wants from server :
SelectionKey key = keys.next();
...
if (key.isConnectable()){  // client want to connect to server 
if (key.isWritable()){   // client want server to write something to channel
if (key.isReadable()){ // client want to  read from server 

 
After understanding what exactly client wants from server (connect, read or write), we can get client channel and  proceed with that operation :
SocketChannel channel = (SocketChannel) key.channel();
...
channel.accept();
..
ByteBuffer readBuffer = ByteBuffer.allocate(1000);
channel.read(readBuffer);
....
channel.write(ByteBuffer.wrap(data));

Also,  as it was mentioned before - data transfer is based on buffers.

Full code of server :

package com.demien.networking;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.*;

public class NioServer implements Runnable {

    public final static String ADDRESS = "127.0.0.1";
    public final static int PORT = 6666;
    public final static long TIMEOUT = 10000;

    private ServerSocketChannel serverChannel;
    private Selector selector;
    /**     * This hashmap is important. It keeps track of the data that will be written to the clients.     * This is needed because we read/write asynchronously and we might be reading while the server     * wants to write. In other words, we tell the Selector we are ready to write (SelectionKey.OP_WRITE)     * and when we get a key for writting, we then write from the Hashmap. The write() method explains this further.     */    private Map<SocketChannel, byte[]> clientMessages = new HashMap<SocketChannel, byte[]>();

    public static void main(String args[]) {
        Thread serverThread = new Thread(new NioServer());
        serverThread.run();
    }

    public NioServer() {
        init();
    }

    private void init() {
        System.out.println("initializing server");
        // We do not want to call init() twice and recreate the selector or the serverChannel.        if (selector != null) return;
        if (serverChannel != null) return;

        try {
            // This is how you open a Selector            selector = Selector.open();
            // This is how you open a ServerSocketChannel            serverChannel = ServerSocketChannel.open();
            // You MUST configure as non-blocking or else you cannot register the serverChannel to the Selector.            serverChannel.configureBlocking(false);
            // bind to the address that you will use to Serve.            serverChannel.socket().bind(new InetSocketAddress(ADDRESS, PORT));

            /**             * Here you are registering the serverSocketChannel to accept connection, thus the OP_ACCEPT.             * This means that you just told your selector that this channel will be used to accept connections.             * We can change this operation later to read/write, more on this later.             */            serverChannel.register(selector, SelectionKey.OP_ACCEPT);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override    public void run() {
        System.out.println("Now accepting connections...");
        try {
            // A run the server as long as the thread is not interrupted.            while (!Thread.currentThread().isInterrupted()) {
                /**                 * selector.select(TIMEOUT) is waiting for an OPERATION to be ready and is a blocking call.                 * For example, if a client connects right this second, then it will break from the select()                 * call and run the code below it. The TIMEOUT is not needed, but its just so it doesn't                 * block undefinitely.                 */                selector.select(TIMEOUT);

                /**                 * If we are here, it is because an operation happened (or the TIMEOUT expired).                 * We need to get the SelectionKeys from the selector to see what operations are available.                 * We use an iterator for this.                 */                Iterator<SelectionKey> keys = selector.selectedKeys().iterator();

                while (keys.hasNext()) {
                    SelectionKey key = keys.next();
                    // remove the key so that we don't process this OPERATION again.                    keys.remove();

                    // key could be invalid if for example, the client closed the connection.                    if (!key.isValid()) {
                        continue;
                    }
                    /**                     * In the server, we start by listening to the OP_ACCEPT when we register with the Selector.                     * If the key from the keyset is Acceptable, then we must get ready to accept the client                     * connection and do something with it. Go read the comments in the accept method.                     */                    if (key.isAcceptable()) {
                        System.out.println("Accepting connection");
                        accept(key);
                    }
                    /**                     * If you already read the comments in the accept() method, then you know we changed                     * the OPERATION to OP_WRITE. This means that one of these keys in the iterator will return                     * a channel that is writable (key.isWritable()). The write() method will explain further.                     */                    if (key.isWritable()) {
                        System.out.println("Writing...");
                        write(key);
                    }
                    /**                     * If you already read the comments in the write method then you understand that we registered                     * the OPERATION OP_READ. That means that on the next Selector.select(), there is probably a key                     * that is ready to read (key.isReadable()). The read() method will explain further.                     */                    if (key.isReadable()) {
                        System.out.println("Reading connection");
                        read(key);
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            closeConnection();
        }

    }

    /**     * We registered this channel in the Selector. This means that the SocketChannel we are receiving     * back from the key.channel() is the same channel that was used to register the selector in the accept()     * method. Again, I am just explaning as if things are synchronous to make things easy to understand.     * This means that later, we might register to write from the read() method (for example).     */    private void write(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        writeToChannel(channel);
        key.interestOps(SelectionKey.OP_READ);
    }

    private void writeToChannel(SocketChannel channel) throws IOException {
        /**         * The hashmap contains the object SockenChannel along with the information in it to be written.         * In this example, we send the "Hello from server" String and also an echo back to the client.         * This is what the hashmap is for, to keep track of the messages to be written and their socketChannels.         */        byte[] data = clientMessages.get(channel);
        if (data != null) {
            clientMessages.remove(channel);

            // Something to notice here is that reads and writes in NIO go directly to the channel and in form of            // a buffer.            channel.write(ByteBuffer.wrap(data));
        }

        // Since we wrote, then we should register to read next, since that is the most logical thing        // to happen next. YOU DO NOT HAVE TO DO THIS. But I am doing it for the purpose of the example        // Usually if you register once for a read/write/connect/accept, you never have to register again for that unless you        // register for none (0). Like it said, I am doing it here for the purpose of the example. The same goes for all others.
    }

    // Nothing special, just closing our selector and socket.    private void closeConnection() {
        System.out.println("Closing server down");
        if (selector != null) {
            try {
                selector.close();
                serverChannel.socket().close();
                serverChannel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**     * Since we are accepting, we must instantiate a serverSocketChannel by calling key.channel().     * We use this in order to get a socketChannel (which is like a socket in I/O) by calling     * serverSocketChannel.accept() and we register that channel to the selector to listen     * to a WRITE OPERATION. I do this because my server sends a hello message to each     * client that connects to it. This doesn't mean that I will write right NOW. It means that I     * told the selector that I am ready to write and that next time Selector.select() gets called     * it should give me a key with isWritable(). More on this in the write() method.     */    private void accept(SelectionKey key) throws IOException {
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);

        socketChannel.register(selector, SelectionKey.OP_WRITE);
        byte[] hello = new String("Hello from server").getBytes();
        clientMessages.put(socketChannel, hello);
    }

    /**     * We read data from the channel. In this case, my server works as an echo, so it calls the echo() method.     * The echo() method, sets the server in the WRITE OPERATION. When the while loop in run() happens again,     * one of those keys from Selector.select() will be key.isWritable() and this is where the actual     * write will happen by calling the write() method.     */    private void read(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
        readBuffer.clear();
        int read;
        try {
            read = channel.read(readBuffer);
        } catch (IOException e) {
            System.out.println("Reading problem, closing connection");
            key.cancel();
            channel.close();
            return;
        }
        if (read == -1) {
            System.out.println("Nothing was there to be read, closing connection");
            channel.close();
            key.cancel();
            return;
        }
        // IMPORTANT - don't forget the flip() the buffer. It is like a reset without clearing it.        readBuffer.flip();
        byte[] data = new byte[1000];
        readBuffer.get(data, 0, read);
        System.out.println("Received: " + extractString(data));

        echo(key, data);
    }

    private void echo(SelectionKey key, byte[] data) throws IOException {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        clientMessages.put(socketChannel, data);
        writeToChannel(socketChannel);
        key.interestOps(SelectionKey.OP_WRITE);
    }

    public static String extractString(byte[] array) {
        List<Byte> wrapper = new ArrayList<>();
        for (int i = 0; i < array.length; i++) {
            if (array[i] != 0) {
                wrapper.add(array[i]);
            } else {
                break;
            }
        }
        byte[] resultArray = new byte[wrapper.size()];
        for (int i = 0; i < wrapper.size(); i++) {
            resultArray[i] = wrapper.get(i);
        }


        return new String(resultArray);
    }
}


3. NIO client

Client implementation is based on the same principals : register selector and wait for "events" to understand what to do : connect, read or write.
Full code of client :

package com.demien.networking;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;


public class NioClient implements Runnable {

    private Selector selector;
    private BufferedReader stdIn = new BufferedReader( new InputStreamReader(System.in));

    public static void main(String[] args) {
        NioClient clientThread = new NioClient();
        Thread thread = new Thread(clientThread);
        thread.start();
    }

        @Override        public void run() {
            SocketChannel channel;
            try {
                selector = Selector.open();
                channel = SocketChannel.open();
                channel.configureBlocking(false);

                channel.register(selector, SelectionKey.OP_CONNECT);
                channel.connect(new InetSocketAddress(NioServer.ADDRESS, NioServer.PORT));

                System.out.println("Starting....");

                while (!Thread.interrupted()){

                    selector.select(1000);

                    Iterator<SelectionKey> keys = selector.selectedKeys().iterator();

                    while (keys.hasNext()){
                        SelectionKey key = keys.next();
                        keys.remove();

                        if (!key.isValid()) continue;

                        if (key.isConnectable()){
                            System.out.println("I am connected to the server");
                            connect(key);
                        }
                        if (key.isWritable()){
                            write(key);
                        }
                        if (key.isReadable()){
                            read(key);
                        }
                    }
                }
            } catch (Exception e1) {
                   e1.printStackTrace();
            } finally {
                close();
            }
        }

        private void close(){
            try {
                selector.close();
            } catch (IOException e) {
                  e.printStackTrace();
            }
        }

        private void read (SelectionKey key) throws IOException {
            SocketChannel channel = (SocketChannel) key.channel();
            ByteBuffer readBuffer = ByteBuffer.allocate(1000);
            readBuffer.clear();
            int length;
            try{
                length = channel.read(readBuffer);
            } catch (IOException e){
                System.out.println("Reading problem, closing connection");
                key.cancel();
                channel.close();
                return;
            }
            if (length == -1){
                System.out.println("Nothing was read from server");
                channel.close();
                key.cancel();
                return;
            }
            readBuffer.flip();
            byte[] buff = new byte[1024];
            readBuffer.get(buff, 0, length);
            System.out.println("Received: " + NioServer.extractString(buff));
            key.interestOps(SelectionKey.OP_WRITE);
        }

        private void write(SelectionKey key) throws IOException {
            SocketChannel channel = (SocketChannel) key.channel();
            String userInput = stdIn.readLine();
                channel.write(ByteBuffer.wrap(userInput.getBytes()));
            // lets get ready to read.            key.interestOps(SelectionKey.OP_READ);
        }

        private void connect(SelectionKey key) throws IOException {
            SocketChannel channel = (SocketChannel) key.channel();
            if (channel.isConnectionPending()){
                channel.finishConnect();
            }
            channel.configureBlocking(false);
            channel.register(selector, SelectionKey.OP_READ);
        }

} 

4. Running
After starting we can send messages to server by input (console) and watch "echo" output :
Starting....
I am connected to the server
Received: Hello from server
hello. i'm a client
Received: hello. i'm a client
how are you ?
Received: how are you ?



Java networking : UDP vs TCP

Related topic : Java networking : NIO

1. Intro 

Both TCP and UDP are protocols used for sending bits of data — known as packets — over the Internet. They both build on top of the Internet protocol. So, what's the difference ? 
In shorts, UDP - is more simple because it doesn't guarantee package delivery. 

Wiki : 
https://en.wikipedia.org/wiki/User_Datagram_Protocol
The User Datagram Protocol (UDP) is one of the core members of the Internet protocol suite. The protocol was designed by David P. Reed in 1980 and formally defined in RFC 768.
UDP uses a simple connectionless transmission model with a minimum of protocol mechanism. It has no handshaking dialogues, and thus exposes the user's program to any unreliability of the underlying network protocol. There is no guarantee of delivery, ordering, or duplicate protection. UDP provides checksums for data integrity, and port numbers for addressing different functions at the source and destination of the datagram.

https://en.wikipedia.org/wiki/Transmission_Control_Protocol
The Transmission Control Protocol (TCP) is a core protocol of the Internet protocol suite. It originated in the initial network implementation in which it complemented the Internet Protocol (IP). Therefore, the entire suite is commonly referred to as TCP/IP. TCP provides reliable, ordered, and error-checked delivery of a stream of octets between applications running on hosts communicating over an IP network. TCP is the protocol that major Internet applications such as the World Wide Webemailremote administration and file transferrely on. Applications that do not require reliable data stream service may use the User Datagram Protocol (UDP), which provides a connectionless datagram service that emphasizes reduced latency over reliability.




2. UDP Server.

Let's now create a server for UDP protocol. In Java we can use classes DatagramSocket and DatagramPacket. Also I had to create procedure to extract Strings for readed bytes. Server is reading data from socket and sending it back to client.

package com.demien.networking;

import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;

public class UdpServer {
    public final static int PORT_NUMBER = 6666;

    public static String extractString(byte[] array) {
        List<Byte> wrapper=new ArrayList<>();
        for (int i=0; i<array.length;i++) {
            if (array[i]!=0) {
                wrapper.add(array[i]);
            } else {
                break;
            }
        }
        byte[] resultArray=new byte[wrapper.size()];
        for (int i=0; i<wrapper.size();i++) {
            resultArray[i]=wrapper.get(i);
        }


        return  new String(resultArray);
    }

    public static void main(String args[]) throws Exception {
        System.out.println("Starting.... waiting for client connections......");
        try (
                DatagramSocket serverSocket = new DatagramSocket(PORT_NUMBER);
        ) {
            byte[] receiveData = new byte[1024];
            byte[] sendData;
            while (true) {
                DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);
                serverSocket.receive(receivePacket);
                String inputLine = extractString(receivePacket.getData());
                System.out.println("received: " + inputLine);
                InetAddress IPAddress = receivePacket.getAddress();
                int port = receivePacket.getPort();
                sendData = inputLine.getBytes();
                DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, IPAddress, port);
                serverSocket.send(sendPacket);
            }
        }
    }
}


3. UDP Client.

Client application is reading data from system.it(console), sending it to server and showing server responce.

package com.demien.networking;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;

public class UdpClient {
    public static void main(String args[]) throws Exception {
        System.out.println("Starting.... ");
        try (
                BufferedReader inFromUser = new BufferedReader(new InputStreamReader(System.in));
                DatagramSocket clientSocket = new DatagramSocket();
        ) {
            InetAddress IPAddress = InetAddress.getByName("localhost");
            byte[] sendData;
            byte[] receiveData = new byte[1024];
            System.out.println("Enter message :  ");
            while (true) {
                String userInput = inFromUser.readLine();
                sendData = userInput.getBytes();
                DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, IPAddress, UdpServer.PORT_NUMBER);
                clientSocket.send(sendPacket);
                DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);
                clientSocket.receive(receivePacket);
                String receivedSentence = UdpServer.extractString(receivePacket.getData());
                System.out.println("response from server:" + receivedSentence);
            }
        }

    }
}



4. TCP Server. 

The same logic as for UDP server, but using another classes for TCP protocol.

package com.demien.networking;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

public class TcpServer {

    public final static int PORT_NUMBER=6666;

    public static void main(String[] args) throws IOException {
        System.out.println("Starting.... waiting for client connections......");
        try (
                ServerSocket serverSocket = new ServerSocket(PORT_NUMBER);
                Socket clientSocket = serverSocket.accept();
                PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
                BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
        ) {
            System.out.println("Client is connected");
            String inputLine;
            while ((inputLine = in.readLine()) != null) {
                System.out.println("received:"+inputLine);
                out.println(inputLine);
            }
        } catch (IOException e) {
            System.out.println("Exception caught when trying to listen on port "                    + PORT_NUMBER + " or listening for a connection");
            System.out.println(e.getMessage());
        }
    }
}

5. TCP Client.

Also, the same client logic : read from console, send to server, display response.

package com.demien.networking;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;

public class TcpClient {

    public final static String HOST_NAME="localhost";

    public static void main(String[] args) throws IOException {

        System.out.println("Starting.... ");
        try (
                Socket echoSocket = new Socket(HOST_NAME, TcpServer.PORT_NUMBER);
                PrintWriter out = new PrintWriter(echoSocket.getOutputStream(), true);
                BufferedReader in = new BufferedReader(new InputStreamReader(echoSocket.getInputStream()));
                BufferedReader stdIn = new BufferedReader( new InputStreamReader(System.in))
        ) {
            System.out.println("Connected to server. Enter message :  ");
            String userInput;
            while ((userInput = stdIn.readLine()) != null) {
                out.println(userInput);
                System.out.println("response from server: " + in.readLine());
            }
        } catch (UnknownHostException e) {
            System.err.println("Unknown host:" + HOST_NAME);
            System.exit(1);
        } catch (IOException e) {
            System.err.println("Couldn't get I/O for the connection to " +
                    HOST_NAME);
            System.exit(1);
        }
    }
}