Java Nio Echo Server Tutorial
This article is a tutorial on implementing a simple Java NIO “echo server”. This example will take the form of a rather simple client server application whereby a client or many clients will connect to a running server and post message(s) to the server which will in turn be “echoed” back to the respective clients.
1. Introduction
This article builds on two earlier articles on the subject of Java NIO, namely “Java Nio Tutorial for Beginners” and “Java Nio Asynchronous Channels Tutorial” where we implement a simple “echo server” using some of the abstractions and techniques discussed in the earlier articles.
2. Technologies used
The example code in this article was built and run using:
- Java 1.8.101 (1.8.x will do fine)
- Maven 3.3.9 (3.3.x will do fine)
- Spring source tool suite 4.6.3 (Any Java IDE would work)
- Ubuntu 16.04 (Windows, Mac or Linux will do fine)
3. Overview
A server process is started with a port property specified at runtime. This server process listens for incoming connections from potential client processes. Once an inbound connection from a client is detected the server process is notified of this and the connection is accepted. The client is then able to send a message to the server. Upon receipt of this message the server is once again notified and the server begins to read the incoming request, which when complete is subsequently sent back on the same connection to the client, hence the “echo”.
4. The EchoServer
What follows are the code snippets of the all the abstractions used in this EchoServer implementation.
4.1 ChannelWriter
ChannelWriter
public interface ChannelWriter { default void doWrite(final ByteBuffer buffer, final SocketChannel channel) throws IOException { if (Objects.isNull(buffer) || Objects.isNull(channel)) { throw new IllegalArgumentException("Required buffer and channel."); } while (buffer.hasRemaining()) { channel.write(buffer); } } }
- line 8: we ensure their is still bytes remaining between the current position and the limit
- line 9: we attempt to write the remaining bytes in the ByteBuffer to the Channel
4.2 Client
Client
public final class Client implements ChannelWriter { private final InetSocketAddress hostAddress; public static void main(final String[] args) { if (args.length < 2) { throw new IllegalArgumentException("Expecting two arguments in order (1) port (2) message eg: 9999 \"Hello world\"."); } new Client(Integer.valueOf(args[0])).start(args[1]); } private Client(final int port) { this.hostAddress = new InetSocketAddress(port); } private void start(final String message) { assert StringUtils.isNotEmpty(message); try (SocketChannel client = SocketChannel.open(this.hostAddress)) { final ByteBuffer buffer = ByteBuffer.wrap((message + Constants.END_MESSAGE_MARKER).trim().getBytes()); doWrite(buffer, client); buffer.flip(); final StringBuilder echo = new StringBuilder(); doRead(echo, buffer, client); System.out.println(String.format("Message :\t %s \nEcho :\t %s", message, echo.toString().replace(Constants.END_MESSAGE_MARKER, StringUtils.EMPTY))); } catch (IOException e) { throw new RuntimeException("Unable to communicate with server.", e); } } private void doRead(final StringBuilder data, final ByteBuffer buffer, final SocketChannel channel) throws IOException { assert !Objects.isNull(data) && !Objects.isNull(buffer) && !Objects.isNull(channel); while (channel.read(buffer) != -1) { data.append(new String(buffer.array()).trim()); buffer.clear(); } } }
- line 20: using
try(...)
(with resources) we open a SocketChannel to the configured InetSocketAddress - line 22: we create a ByteBuffer wrapping the contents of the specified
message
- line 24: we call
write(...)
passing the ByteBuffer and the SocketChannel - line 26: flipping the ByteBuffer to initialize the position and limit for reading
- line 29: call
read(...)
passing the StringBuilder (for placing the read contents into), the ByteBuffer and the SocketChannel - line 37-44: we ensure we read everything from the Server
4.3 Server
Server
public final class Server implements ChannelWriter { private static final int BUFFER_SIZE = 1024; private final int port; private final Map<SocketChannel, StringBuilder> session; public static void main(final String[] args) { if (args.length < 1) { throw new IllegalArgumentException("Expecting one argument (1) port."); } new Server(Integer.valueOf(args[0])).start(); } private Server(final int port) { this.port = port; this.session = new HashMap<>(); } private void start() { try (Selector selector = Selector.open(); ServerSocketChannel channel = ServerSocketChannel.open()) { initChannel(channel, selector); while (!Thread.currentThread().isInterrupted()) { if (selector.isOpen()) { final int numKeys = selector.select(); if (numKeys > 0) { handleKeys(channel, selector.selectedKeys()); } } else { Thread.currentThread().interrupt(); } } } catch (IOException e) { throw new RuntimeException("Unable to start server.", e); } finally { this.session.clear(); } } private void initChannel(final ServerSocketChannel channel, final Selector selector) throws IOException { assert !Objects.isNull(channel) && !Objects.isNull(selector); channel.socket().setReuseAddress(true); channel.configureBlocking(false); channel.socket().bind(new InetSocketAddress(this.port)); channel.register(selector, SelectionKey.OP_ACCEPT); } private void handleKeys(final ServerSocketChannel channel, final Set<SelectionKey> keys) throws IOException { assert !Objects.isNull(keys) && !Objects.isNull(channel); final Iterator<SelectionKey> iterator = keys.iterator(); while (iterator.hasNext()) { final SelectionKey key = iterator.next(); try { if (key.isValid()) { if (key.isAcceptable()) { doAccept(channel, key); } else if (key.isReadable()) { doRead(key); } else { throw new UnsupportedOperationException("Key not supported by server."); } } else { throw new UnsupportedOperationException("Key not valid."); } } finally { if (mustEcho(key)) { doEcho(key); cleanUp(key); } iterator.remove(); } } } private void doAccept(final ServerSocketChannel channel, final SelectionKey key) throws IOException { assert !Objects.isNull(key) && !Objects.isNull(channel); final SocketChannel client = channel.accept(); client.configureBlocking(false); client.register(key.selector(), SelectionKey.OP_READ); // Create a session for the incoming connection this.session.put(client, new StringBuilder()); } private void doRead(final SelectionKey key) throws IOException { assert !Objects.isNull(key); final SocketChannel client = (SocketChannel) key.channel(); final ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE); final int bytesRead = client.read(buffer); if (bytesRead > 0) { this.session.get(client).append(new String(buffer.array()).trim()); } else if (bytesRead < 0) { if (mustEcho(key)) { doEcho(key); } cleanUp(key); } } private void doEcho(final SelectionKey key) throws IOException { assert !Objects.isNull(key); final ByteBuffer buffer = ByteBuffer.wrap(this.session.get(key.channel()).toString().trim().getBytes()); doWrite(buffer, (SocketChannel) key.channel()); } private boolean mustEcho(final SelectionKey key) { assert !Objects.isNull(key); return (key.channel() instanceof SocketChannel) && this.session.get((SocketChannel) key.channel()).toString().contains(Constants.END_MESSAGE_MARKER); } private void cleanUp(final SelectionKey key) throws IOException { assert !Objects.isNull(key); this.session.remove((SocketChannel) key.channel()); key.channel().close(); key.cancel(); } }
- line 22: using
try(...)
(with resources) we open ServerSocketChannel and a Selector. The Selector will allow the Server to multiplex over ‘n’ number of SelectableChannel instances (ie: connections) - line 23: we initialize the ServerSocketChannel and register it with the Selector. We also express interest in the
SelectionKey.OP_ACCEPT
IO operation meaning that the ServerSocketChannel will only be interested accepting connections - line 26: check that the Selector is still open
- line 27: call
select()
on the Selector, this is a blocking call and will only return when their are SelectionKey instances (expressing IO events) - line 29: handle the Set of SelectionKey instances from the
select()
call for the given ServerSocketChannel - line 45: allows binding to the port even if a previous connection on that same port is still in a
TIME_WAIT
state - line 46: ensure our Channel is in non-blocking mode for use by our Selector
- line 47: bind at the address
- line 48: register the Channel with the Selector
- line 59: whilst processing the keys ensure the SelectionKey is valid
- line 61: accept a new connection
- line 63: read from the connection
- line 71-76: ensure that after every IO event is handled we check if we must echo back to the client and if necessary cleanup (close) recources etc.
Ensure we remove the SelectionKey from the Set of SelectionKey instances otherwise we will continue to process stale events - line 84-89: for every incoming SocketChannel connection ensure we set blocking to false and express interest in
SelectionKey.OP_READ
IO events and create a new session - line 99-100: if something was read – add it to the session buffer
- line 101-106: if the end of the stream has been reach, echo, if required to and clean up resources
5. Example code
The attached sample code is a maven project and can be compiled by executing the following: mvn clean install
in the project folder, assuming all packages / programs are installed. Then navigate to the target/classes
folder within the project folder and execute the following:
Start Server
java com.javacodegeeks.nio.echoserver.Server 9999
Start Client
java com.javacodegeeks.nio.echoserver.Client 9999 "Hello world!"
substituting the 9999
with any port number of your choosing and the Hello world!
with any message of your choosing. If successful you should see the following output:
Message : Hello world! Echo : Hello world!
substituting “Hello world!” with whatever message you specified at runtime.
6. Summary
This example is demonstrated using the Selector class to multiplex over ‘n’ number of SelectableChannels and echo back any messages received from said Channels. The Selector allowed our Server to handle the incoming IO events from said SelectableChannels provided they were SelectionKey.OP_ACCEPT
or SelectionKey.OP_READ
ready. It managed a Session per connected Channel and disposed of said Channel once the echo was complete.
7. Download the source code
This was a Java NIO EchoServer tutorial.
You can download the full source code of this example here: Java NIO EchoServer tutorial