nio

Java Nio Asynchronous Channels Tutorial

This article is a tutorial on the Asynchronous Channels API which was released as part of Java 7. The API can be viewed here. The example code will demonstrate use of the core abstractions of this API and will capture the essence of using the API.

1. Introduction

Core abstractions of Asynchronous Channels API

The Asynchronous Channels API’s supplemented the core Java NIO API’s with additional functionality in the Java 7 release. Coined NIO.2 the supplement provided many utilities for NIO usage but the crown jewel was the AsynchronousChannel API’s.

A common phrase thrown around when discussing Java NIO is “non-blocking” but now one gets to add the word “asynchronous” as well. This can lead to a wonderful ice breaker in the form of “non-blocking asynchronous IO”.

What a mouthful and even I had difficulty thoroughly digesting and understanding that, but I feel it important to understand what that phrase means and how it relates to the AsynchronousChannel API’s.

  • Asynchronous IO is where an interface or API allows us to provide call back code, to be executed when a particular IO operation completes. This is where the AsynchronousChannel class and much of it’s hierarchy come into play.
  • Non blocking IO is where an IO operation will return immediately either with data, an error or no data. ie: When reading from a non-blocking channel, either the number of bytes read is returned or -1 meaning nothing more to read or an exception is thrown if some invalid state is encountered. Java NIO in JDK 1.4 introduced us to the Selector which was an abstraction that allowed us to leverage non-blocking IO.

AsynchronousChannel instances proxy IO operations and provide a means for notifying the program when said operations complete.

2. Technologies used

The example code in this article was built and run using:

  • Java 1.8.101 (1.8.x will do fine)
  • Maven 3.3.9 (3.3.x will do fine)
  • Spring source tool suite 4.6.3 (Any Java IDE would work)
  • Ubuntu 16.04 (Windows, Mac or Linux will do fine)

3. API Interaction

When interacting (reading, writing or connecting) with the AsynchronousChannel API the results of these interactions result in “Future” results or “Complete” results.

  • Future results are encapsulated in the Future API. This facilitates a “pending” result which can later be retrieved or acted on by leveraging the Future API.
  • Complete results are “hooked” into by supplying a CompletionHandler implementation to the method call (read, write or connect).

4. AsynchronousChannel

The AsynchronousChannel is a specialization of the Channel interface that enhances IO operations (read, write, connect or close) with asynchronous abilities. Calling read() or write() or connect() on the AsynchronousChannel produces a different result and provides a different method signature to that of the conventional NIO Channel implementations. This varies by way of:

  • Returning a Future from a read, write or connect invocation
  • Allowing a CompletionHandler implementation to be injected at method invocation to facilitate call back style processing when the IO event completes normally or via error.
  • All methods being asynchronous return immediately and delegate processing of the IO operation to the kernel, with the instruction of being notified when the IO operation completes, either by way of the CompletionHandler implementation being invoked or the Future getting it’s result.

Calling close() simply closes the Channel asynchronously and ensures any outstanding IO operations terminate via an AsynchronousCloseException. Typically AsynchronousChannel implementations are associated with an explicit Thread pool by way of the AsynchronousChannelGroup implementation which effectively manages all Channel instances associated with it and provides Thread resources for all Channel instances it manages to handle their IO operations. An AsynchronousChannel implementation is associated with the AsynchronousChannelGroup at construction time via the following:

  • AsynchronousSocketChannel: AsynchronousSocketChannel.open(group)
  • AsynchronousServerSocketChannel: AsynchronousServerSocketChannel.open(group)

What follows now are simple snippets of CompletionHandler and Future usage of the AsynchronousChannel API.

CompletionHandler example

channel.connect(remoteAddress, context, new CompletionHandler<Void, String>() {
    @Override
    public void completed(final Void result, final Object attachment) {...
    }

    @Override
    public void failed(final Throwable exc, final Object attachment) {...
    }
});


  • line 1: connect(...) is called on the AsynchronousChannel (AsynchronousSocketChannel) implementation. A remote address to connect to is supplied, a context specific object context is supplied and a callback CompletionHandler implementation is supplied. The context specific object represents a method to propagate context to the CompletionHandler implementation, particularly if the CompletionHandler implementation is used in a stateless fashion, ie: shared. This “context” manifests itself as the attachment object in the CompletionHandler implementation. An example of propagating context could be when trying to assemble a complete client request that was spread across multiple Channel read(...) invocations.
  • line 3: this method is called upon normal completion of the IO operation (read, write, connect). In the case of connect(...) the first argument to the method signature is Void whereas with read(...) and write(...) the first argument is the number of bytes read or written from the completed IO operation. The attachment argument is the manifestation of the context argument from line 1 and can be used to establish “context” in a stateless CompletionHandler implementation.
  • line 7: this method is called upon abnormal (erroneous) completion of an IO operation (read, write, connect). In all IO operations (read, write, connect) the method signature is the same providing us with the reason for failure  in the form of a Throwable instance and of course the context argument.

Future write example using AsynchronousFileChannel

final Future result = channel.write(buffer, filePosition);



  • line 1: this method is called with a Buffer implementation and a position in the file to write from. The implementation will start writing from the given position and continue writing bytes until the buffer is written out to file. The Future return value encapsulates the pending result of how many bytes were written to the file.

5. AsynchronousByteChannel

The AsynchronousByteChannel is a specialization of the AsynchronousChannel that reads and write bytes. It is implemented concretely by AsynchronousSocketChannel.

6. AsynchronousFileChannel

The AsynchronousFileChannel class is an asynchronous channel for reading, writing, and manipulating a file via ByteBuffers. Creating an AsynchronousFileChannel instance can be done via the two static open(...) methods:

AsynchronousFileChannel open method#1

public static AsynchronousFileChannel open(Path file, OpenOption... options);

AsynchronousFileChannel open method#2

public static AsynchronousFileChannel open(Path file, Set<? extends OpenOption> options, ExecutorService executor, FileAttribute<?>... attrs);

OpenOption, more specifically StandardOpenOption enumerates the various modes / options the File is manipulated with, eg: OPEN, READ, WRITE etc and will naturally have an effect on what can be done with the file. Interestingly enough the Channel does not allow for an AsynchronousChannelGroup at construction but rather an ExecutorService to allow for explicit thread resource usage as opposed to a default thread group.

The AsynchronousFileChannel provides methods for locking files, truncating files, and retrieving file sizes. Read and write actions expect a ByteBuffer and a position, position being the location in the file to start reading or writing from, illustrating one of the main differences between the FileChannel class. The position being required for multithreaded use. This type of Channel is safe for multithreaded use and multiple IO (read and write) operations can be outstanding at the same time but their order of execution is undetermined, be aware of this!

FileLocks, another feature of AsynchronousFileChannels, are as the name implies but can vary by type of lock and operating system support.

  • shared lock – meaning the lock can be shared provided the lock granularity is “shared”. Also the Channel has to be opened in READ mode otherwise a NonReadableChannelException will be thrown.
  • exclusive lock – only one lock is held. Also the Channel must be opened in write mode otherwise a NonWritableChannelException will be thrown.

FileLocks can also lock the entire file or regions of the file based on position. eg: Locking a file from position 10 would imply locking the file from the 10th byte through to the end of the file.

6.1 AsynchronousFileChannel Exceptions

The following code snippets demonstrate use of the AsynchronousFileChannel via the Future API for reading, writing and locking. The samples are driven from unit tests all of which can be sourced from the download for this article.

AsynchronousFileChannel read sample

public String read(final String path) {
...
	try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(pathToFile, StandardOpenOption.READ)) {
		result = read(channel, ByteBuffer.allocate(Constants.BUFFER_SIZE), new StringBuilder(), START_POS);
	} catch (IOException e) {
		throw new RuntimeException(UNABLE_TO_READ_CONTENTS, e);
	}
...
}

private String read(final AsynchronousFileChannel channel, final ByteBuffer buffer, final StringBuilder contents, final long filePosition) {
	assert !Objects.isNull(channel) && !Objects.isNull(buffer) && !Objects.isNull(contents);

	final Future<Integer> result = channel.read(buffer, filePosition);
	try {
		final int bytesRead = result.get();
		if (bytesRead != -1) {
			contents.append(new String(buffer.array()).trim());

			buffer.clear();
			return read(channel, buffer, contents, filePosition + bytesRead);
		} else {
			return contents.toString();
		}
	} catch (InterruptedException | ExecutionException e) {
		throw new RuntimeException(UNABLE_TO_READ_CONTENTS, e);
	}
}

  • line 3-4: creates the AsynchronousFileChannel and calls the recursive read method with a newly constructed ByteBuffer.
  • line 11: the method signature takes the position to continue reading from in each recursive routine.
  • line 14: gets the result of the read, the number of bytes, blocks until the result is available.
  • line 18: appends the contents of what was read from the ByteBuffer to the StringBuilder.
  • line 20-21: clears the ByteBuffer prior to the next invocation and calls the method recursively again.

AsynchronousFileChannel write sample

public void write(final String path, final String contents) {
	final Path pathToFile = Paths.get(path);

	try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(pathToFile, StandardOpenOption.WRITE, StandardOpenOption.CREATE)) {
		final ByteBuffer buffer = ByteBuffer.wrap(contents.getBytes());

		write(channel, buffer, START_POS);
	} catch (IOException e) {
		throw new RuntimeException(UNABLE_TO_WRITE_CONTENTS, e);
	}
}

private void write(final AsynchronousFileChannel channel, final ByteBuffer buffer, final long filePosition) {
	assert !Objects.isNull(channel) && !Objects.isNull(buffer);

	final Future<Integer> result = channel.write(buffer, filePosition);
	try {
		final int bytesWritten = result.get();
		while (buffer.hasRemaining()) {
			buffer.compact();
			write(channel, buffer, bytesWritten + filePosition);
		}
	} catch (InterruptedException | ExecutionException e) {
		throw new RuntimeException(UNABLE_TO_WRITE_CONTENTS, e);
	}
}
  • line 2: gets the Path object to the file.
  • line 4-5: creates the AsynchronousFileChannel (ensures the file is created if not already via options) and also creates the ByteBuffer for the contents to write.
  • line 7: calls write with the position of the file to start writing from.
  • line 16: gets the result of the write, the number of bytes written.
  • line 18-21: loops while their are still bytes in the ByteBuffer and writes it out to file.

AsynchronousFileChannel lock sample

@Test
public void testExclusiveLock() throws IOException, InterruptedException, ExecutionException {
	try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get(this.filePath), StandardOpenOption.WRITE, StandardOpenOption.CREATE)) {
	     final FileLock lock = channel.lock().get();

	     assertTrue("Lock is not exclusive", !lock.isShared());
	}
}

@Test
public void testSharedLock() throws IOException, InterruptedException, ExecutionException {
	try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get(this.filePath), StandardOpenOption.READ, StandardOpenOption.CREATE)) {
	     final FileLock lock = channel.lock(0, 0L, true).get();

	     assertTrue("Lock is exclusive", lock.isShared());
	}
}

@Test(expected = OverlappingFileLockException.class)
public void testOverlappingLock() {
	final CountDownLatch innerThreadLatch = new CountDownLatch(1);
	final CountDownLatch testThreadLatch = new CountDownLatch(1);

	try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get(this.filePath), StandardOpenOption.WRITE, StandardOpenOption.CREATE)) {

		new Thread() {
			public void run() {
				try {
					channel.lock().get();
					innerThreadLatch.countDown();
					testThreadLatch.await();
				} catch (OverlappingFileLockException | ExecutionException | InterruptedException e) {
					throw new RuntimeException("Unable to get lock on file for overlapping lock test", e);
				}
			}
		}.start();

		innerThreadLatch.await();
		channel.lock().get();
	} catch (InterruptedException | ExecutionException | IOException e) {
		throw new RuntimeException(e);
	} finally {
		testThreadLatch.countDown();
	}
}
  • line 3: create the AsynchronousFileChannel ensuring we create the file if it does not already exist.
  • line 4,6,13,15: obtains a FileLock in either shared or exclusive modes and validates that state.
  • The final test, although not highlighted, is a test to prove an overlapping lock exception where two threads compete for the same lock. Latches are used to ensure co-ordination between their competitive spirits. The takeaway from this last test is that inside the same JVM process all Threads share the same locks, therefore trying to aquire an already held lock (exclusive) will result in an OverlappingFileLockException. Using file locks to synchronize thread access to file regions will not work, however in concert with normal thread synchronization and file locks one can achive co-ordinated access to files between threads and processes.

7. AsynchronousServerSocketChannel

The AsynchronousServerSocketChannel is a Channel for accepting new socket connections. An AsynchronousServerSocketChannel can be created via the two static open(...) methods:

AsynchronousServerSocketChannel open method #1

public static AsynchronousServerSocketChannel open(AsynchronousChannelGroup group) throws IOException

AsynchronousServerSocketChannel open method #2

public static AsynchronousServerSocketChannel open() throws IOException

The AsynchronousChannelGroup is an abstraction that provides the AsynchronousServerSocketChannel with it’s thread pool to handle it’s IO operations asynchronously. The AsynchronousServerSocketChannel also implements the NetworkChannel interface which provides the ability to set channel SocketOption (more specifically StandardSocketOptions) values and to bind to SocketAddress values.

7.1 AsynchronousServerSocketChannel Exceptions

AsynchronousServerSocketChannel creation

...
private final AsynchronousServerSocketChannel server;
private final AsynchronousChannelGroup group;
...
public Server(final int port, final int poolSize, final String echo) {
	try {
		this.group = AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(poolSize));
		this.server = AsynchronousServerSocketChannel.open(this.group).bind(new InetSocketAddress(port));
...

AsynchronousServerSocketChannel accepting connection with CompletionHandler

...
this.server.accept(requestKey, new CompletionHandler<AsynchronousSocketChannel, String>() {
	public void completed(final AsynchronousSocketChannel channel, final String attachment) {

		// Delegate off to another thread for the next connection.
		accept(IdGenerator.generate());

		// Delegate off to another thread to handle this connection.
		Server.this.read(channel, attachment);
	}

	public void failed(final Throwable exc, final String attachment) {
		System.out.println(String.format("Server: Failed to accept connection in thread %s", Thread.currentThread().getName()));
		exc.printStackTrace();
	}
});
  • line 2-3: accept() is called and a requestKey and a CompletionHandler is supplied to handle the incoming connection. The requestKey is a unique String generated for the purposes of establishing context in the multithreaded / asynchronous Channel. The attachment in the completed(...) method call represents context and is actually the requestKey being ushered into the CompletionHandler from the earlier accept()call.
  • line 6: We are non-blocking and it is important to delegate off as soon as possible to handle the next incoming connection, a unique key is generated (requestKey) which will later become the attachment (context) for the CompletionHandler.
  • line 9: We handle the current connection by calling read(...) which will take the attachment for context and ultimately create a new CompletionHandler for the purposes of reading the client request.
  • line 12: If the IO operation fails, this method is called with the context and the reason for failure.

8. AsynchronousSocketChannel

The AsynchronousSocketChannel is an asynchronous Channel for connected sockets. Such a Channel has the ability to connect to a remote address, read and write asynchronously, with the Future and CompletionHandler abstractions being provided as a means for manipulating the outcomes of said IO operations. As per the AsynchronousServerSocketChannel, the AsynchronousSocketChannel also implements the NetworkChannel interface which provides the ability to set channel SocketOption (more specifically StandardSocketOptions) values and to bind to SocketAddress values.

An AsynchronousSocketChannel can be opened via the two static open(...) methods:
AsynchronousSocketChannel open method #1

public static AsynchronousSocketChannel open(AsynchronousChannelGroup group) throws IOException

AsynchronousSocketChannel open method #2

public static AsynchronousSocketChannel open() throws IOException

8.1 AsynchronousSocketChannel Exceptions

AsynchronousSocketChannel creation and connection

...
for (int i = 0; i < this.numConnections; i++) {
	AsynchronousSocketChannel client;
	try {
		client = AsynchronousSocketChannel.open(this.group);
		connect(client, IdGenerator.generate());
	} catch (IOException e) {
		throw new RuntimeException("Client: Unable to start clients", e);
	}
}
...
private void connect(final AsynchronousSocketChannel channel, final String requestId) {
	channel.connect(this.remoteAddress, requestId, new CompletionHandler<Void, String>() {

	    @Override
	    public void completed(final Void result, final String attachment) {
		System.out.println(String.format("Client: Connect Completed in thread %s", Thread.currentThread().getName()));
		updateMessageCache(attachment, StringUtils.EMPTY, Client.this.messageCache);

		write(channel, attachment);
	    }

	    @Override
	    public void failed(final Throwable exc, final String attachment) {
		System.out.println(String.format("Client: Connect Failed in thread %s", Thread.currentThread().getName()));
		exc.printStackTrace();

		Client.this.latch.countDown();
		closeChannel(channel);
	    }
	});
}
...
private void write(final AsynchronousSocketChannel channel, final String requestId) {
	assert !Objects.isNull(channel);

	final ByteBuffer contents = create(Constants.BUFFER_SIZE);
	contents.put(requestId.getBytes());
	contents.put(Constants.END_MESSAGE_MARKER.getBytes());
	contents.flip();

	channel.write(contents, requestId, new CompletionHandler<Integer, String>() {

	    @Override
	    public void completed(final Integer result, final String attachment) {
		System.out.println(String.format("Client: Write Completed in thread %s", Thread.currentThread().getName()));
		read(channel, attachment);
	    }
  • line 5: the AsynchronousSocketChannel is created supplying an AsynchronousChannelGroup upon creation for threading purposes.
  • line 6: a connection is attempted for the  Channel supplying a unique String value as context for the connection.
  • line 12-13: connect(...)is called and in particular the Channel’sconnect(...) is invoked passing a remoteAddress requestId and a CompletionHandler to handle the outcome of the IO operation. The requestId is the context variable and manifests itself as the attachment in the CompletionHandler.
  • line 20:  write(...) is called passing the Channel upon which the connection was established and the context (attachment). So effectively upon connection competion we commence an IO operation and as this is a client in a client server program the first call of action is to write a request to the Server.
  • line 29: we close the Channel upon failure to connect.
  • line 42: write(...) is called on the Channel supplying a ByteBuffer as source, a context variable (requestId) and a CompletionHandler.

9. Summary

In this tutorial we have covered the main abstractions in the asynchronous channels API, specifically focusing on the types of AsnchronousChannel implementations, what they are and how to use them.

We have seen under what circumstances behavior could become exceptional (Exceptions) and how to manipulate the outcome of IO operations on said Channels via “pending” and complete results.

10. Download the source code

This was a Java NIO Asynchronous Channels tutorial

Download
You can download the full source code of this example here: Java NIO Asynchronous Channels

JJ

Jean-Jay Vester graduated from the Cape Peninsula University of Technology, Cape Town, in 2001 and has spent most of his career developing Java backend systems for small to large sized companies both sides of the equator. He has an abundance of experience and knowledge in many varied Java frameworks and has also acquired some systems knowledge along the way. Recently he has started developing his JavaScript skill set specifically targeting Angularjs and also bridged that skill to the backend with Nodejs.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button