nio

Java Nio Echo Server Tutorial

This article is a tutorial on implementing a simple Java NIO “echo server”. This example will take the form of a rather simple client server application whereby a client or many clients will connect to a running server and post message(s) to the server which will in turn be “echoed” back to the respective clients.
 
 
 
 
 
 
 
 

1. Introduction

This article builds on two earlier articles on the subject of Java NIO, namely “Java Nio Tutorial for Beginners” and “Java Nio Asynchronous Channels Tutorial” where we implement a simple “echo server” using some of the abstractions and techniques discussed in the earlier articles.

2. Technologies used

The example code in this article was built and run using:

  • Java 1.8.101 (1.8.x will do fine)
  • Maven 3.3.9 (3.3.x will do fine)
  • Spring source tool suite 4.6.3 (Any Java IDE would work)
  • Ubuntu 16.04 (Windows, Mac or Linux will do fine)

3. Overview

A server process is started with a port property specified at runtime. This server process listens for incoming connections from potential client processes. Once an inbound connection from a client is detected the server process is notified of this and the connection is accepted. The client is then able to send a message to the server. Upon receipt of this message the server is once again notified and the server begins to read the incoming request, which when complete is subsequently sent back on the same connection to the client, hence the “echo”.

4. The EchoServer

What follows are the code snippets of the all the abstractions used in this EchoServer implementation.

4.1 ChannelWriter

ChannelWriter

public interface ChannelWriter {

    default void doWrite(final ByteBuffer buffer, final SocketChannel channel) throws IOException {
        if (Objects.isNull(buffer) || Objects.isNull(channel)) {
            throw new IllegalArgumentException("Required buffer and channel.");
        }

        while (buffer.hasRemaining()) {
            channel.write(buffer);
        }
    }
}
  • line 8: we ensure their is still bytes remaining between the current position and the limit
  • line 9: we attempt to write the remaining bytes in the ByteBuffer to the Channel

4.2 Client

Client

public final class Client implements ChannelWriter {

    private final InetSocketAddress hostAddress;

    public static void main(final String[] args) {
        if (args.length < 2) {
            throw new IllegalArgumentException("Expecting two arguments in order (1) port (2) message eg: 9999 \"Hello world\".");
        }

        new Client(Integer.valueOf(args[0])).start(args[1]);
    }

    private Client(final int port) {
        this.hostAddress = new InetSocketAddress(port);
    }

    private void start(final String message) {
        assert StringUtils.isNotEmpty(message);

        try (SocketChannel client = SocketChannel.open(this.hostAddress)) {

            final ByteBuffer buffer = ByteBuffer.wrap((message + Constants.END_MESSAGE_MARKER).trim().getBytes());

            doWrite(buffer, client);

            buffer.flip();

            final StringBuilder echo = new StringBuilder();
            doRead(echo, buffer, client);

            System.out.println(String.format("Message :\t %s \nEcho    :\t %s", message, echo.toString().replace(Constants.END_MESSAGE_MARKER, StringUtils.EMPTY)));
        } catch (IOException e) {
            throw new RuntimeException("Unable to communicate with server.", e);
        }
    }

    private void doRead(final StringBuilder data, final ByteBuffer buffer, final SocketChannel channel) throws IOException {
        assert !Objects.isNull(data) && !Objects.isNull(buffer) && !Objects.isNull(channel);

        while (channel.read(buffer) != -1) {
            data.append(new String(buffer.array()).trim());
            buffer.clear();
        }
    }
}

4.3 Server

Server

public final class Server implements ChannelWriter {

    private static final int BUFFER_SIZE = 1024;

    private final int port;
    private final Map<SocketChannel, StringBuilder> session;

    public static void main(final String[] args) {
        if (args.length < 1) {
            throw new IllegalArgumentException("Expecting one argument (1) port.");
        }

        new Server(Integer.valueOf(args[0])).start();
    }

    private Server(final int port) {
        this.port = port;
        this.session = new HashMap<>();
    }

    private void start() {
        try (Selector selector = Selector.open(); ServerSocketChannel channel = ServerSocketChannel.open()) {
            initChannel(channel, selector);

            while (!Thread.currentThread().isInterrupted()) {
                if (selector.isOpen()) {
                    final int numKeys = selector.select();
                    if (numKeys > 0) {
                        handleKeys(channel, selector.selectedKeys());
                    }
                } else {
                    Thread.currentThread().interrupt();
                }
            }
        } catch (IOException e) {
            throw new RuntimeException("Unable to start server.", e);
        } finally {
            this.session.clear();
        }
    }

    private void initChannel(final ServerSocketChannel channel, final Selector selector) throws IOException {
        assert !Objects.isNull(channel) && !Objects.isNull(selector);

        channel.socket().setReuseAddress(true);
        channel.configureBlocking(false);
        channel.socket().bind(new InetSocketAddress(this.port));
        channel.register(selector, SelectionKey.OP_ACCEPT);
    }

    private void handleKeys(final ServerSocketChannel channel, final Set<SelectionKey> keys) throws IOException {
        assert !Objects.isNull(keys) && !Objects.isNull(channel);

        final Iterator<SelectionKey> iterator = keys.iterator();
        while (iterator.hasNext()) {

            final SelectionKey key = iterator.next();
            try {
                if (key.isValid()) {
                    if (key.isAcceptable()) {
                        doAccept(channel, key);
                    } else if (key.isReadable()) {
                        doRead(key);
                    } else {
                        throw new UnsupportedOperationException("Key not supported by server.");
                    }
                } else {
                    throw new UnsupportedOperationException("Key not valid.");
                }
            } finally {
                if (mustEcho(key)) {
                    doEcho(key);
                    cleanUp(key);
                }

                iterator.remove();
            }
        }
    }

    private void doAccept(final ServerSocketChannel channel, final SelectionKey key) throws IOException {
        assert !Objects.isNull(key) && !Objects.isNull(channel);

        final SocketChannel client = channel.accept();
        client.configureBlocking(false);
        client.register(key.selector(), SelectionKey.OP_READ);

        // Create a session for the incoming connection
        this.session.put(client, new StringBuilder());
    }

    private void doRead(final SelectionKey key) throws IOException {
        assert !Objects.isNull(key);

        final SocketChannel client = (SocketChannel) key.channel();
        final ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);

        final int bytesRead = client.read(buffer);
        if (bytesRead > 0) {
            this.session.get(client).append(new String(buffer.array()).trim());
        } else if (bytesRead < 0) {
            if (mustEcho(key)) {
                doEcho(key);
            }

            cleanUp(key);
        }
    }

    private void doEcho(final SelectionKey key) throws IOException {
        assert !Objects.isNull(key);

        final ByteBuffer buffer = ByteBuffer.wrap(this.session.get(key.channel()).toString().trim().getBytes());

        doWrite(buffer, (SocketChannel) key.channel());
    }

    private boolean mustEcho(final SelectionKey key) {
        assert !Objects.isNull(key);

        return (key.channel() instanceof SocketChannel) && this.session.get((SocketChannel) key.channel()).toString().contains(Constants.END_MESSAGE_MARKER);
    }

    private void cleanUp(final SelectionKey key) throws IOException {
        assert !Objects.isNull(key);

        this.session.remove((SocketChannel) key.channel());

        key.channel().close();
        key.cancel();
    }
}
  • line 22: using try(...) (with resources) we open ServerSocketChannel and a Selector. The Selector will allow the Server to multiplex over ‘n’ number of SelectableChannel instances (ie: connections)
  • line 23: we initialize the ServerSocketChannel and register it with the Selector. We also express interest in the SelectionKey.OP_ACCEPT IO operation meaning that the ServerSocketChannel will only be interested accepting connections
  • line 26: check that the Selector is still open
  • line 27: call select() on the Selector, this is a blocking call and will only return when their are SelectionKey instances (expressing IO events)
  • line 29: handle the Set of SelectionKey instances from the select() call for the given ServerSocketChannel
  • line 45: allows binding to the port even if a previous connection on that same port is still in a TIME_WAIT state
  • line 46: ensure our Channel is in non-blocking mode for use by our Selector
  • line 47: bind at the address
  • line 48: register the Channel with the Selector
  • line 59: whilst processing the keys ensure the SelectionKey is valid
  • line 61: accept a new connection
  • line 63: read from the connection
  • line 71-76: ensure that after every IO event is handled we check if we must echo back to the client and if necessary cleanup (close) recources etc.
    Ensure we remove the SelectionKey from the Set of SelectionKey instances otherwise we will continue to process stale events
  • line 84-89: for every incoming SocketChannel connection ensure we set blocking to false and express interest in SelectionKey.OP_READ IO events and create a new session
  • line 99-100: if something was read – add it to the session buffer
  • line 101-106: if the end of the stream has been reach, echo, if required to and clean up resources

5. Example code

The attached sample code is a maven project and can be compiled by executing the following: mvn clean install in the project folder, assuming all packages / programs are installed. Then navigate to the target/classes folder within the project folder and execute the following:

Start Server

java com.javacodegeeks.nio.echoserver.Server 9999

Start Client

java com.javacodegeeks.nio.echoserver.Client 9999 "Hello world!"

substituting the 9999 with any port number of your choosing and the Hello world!with any message of your choosing. If successful you should see the following output:

Message :        Hello world! 
Echo    :        Hello world!

substituting “Hello world!” with whatever message you specified at runtime.

6. Summary

This example is demonstrated using the Selector class to multiplex over ‘n’ number of SelectableChannels and echo back any messages received from said Channels. The Selector allowed our Server to handle the incoming IO events from said SelectableChannels provided they were SelectionKey.OP_ACCEPT or SelectionKey.OP_READ ready. It managed a Session per connected Channel and disposed of said Channel once the echo was complete.

7. Download the source code

This was a Java NIO EchoServer tutorial.

Download
You can download the full source code of this example here: Java NIO EchoServer tutorial

JJ

Jean-Jay Vester graduated from the Cape Peninsula University of Technology, Cape Town, in 2001 and has spent most of his career developing Java backend systems for small to large sized companies both sides of the equator. He has an abundance of experience and knowledge in many varied Java frameworks and has also acquired some systems knowledge along the way. Recently he has started developing his JavaScript skill set specifically targeting Angularjs and also bridged that skill to the backend with Nodejs.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button