java.nio.channels.AsynchronousChannelGroup Example
This article introduces the AsynchronousChannelGroup
and its basic usage. This class is available since Java SE 7 as part of Java NIO 2 file API. This article’s example shows using this class with asynchronous channels.
The example in this article is compiled and run in Windows OS environment. Note that Java SE 7 is required to run the code.
1. Introduction
AsynchronousChannelGroup
abstract class is defined in the java.nio.channels
package.
This class has functions to handle the I/O operations and their completion initiated by asynchronous channels that are bound to the group.
1.1. Async channels opened with a group
AsynchronousSocketChannel
and AsynchronousServerSocketChannel
classes can be associated with a group. These classes have open() methods, that create the channels, take a group as a parameter. AsynchronousFileChannel
does not use async channel group.
1.2. Association with thread pools
A group has an associated thread pool.
- The tasks that are submitted to handle I/O events are associated with that thread pool.
- The results of the tasks of asynchronous operations performed on channels in the group are consumed by completion handlers (
CompletionHandler
) – are within the group. - The pooled threads may also run other support-tasks of asynchronous I/O operations.
1.3. Creating a group
An asynchronous channel group is created by invoking one of the methods: withFixedThreadPool()
, withCachedThreadPool()
or withThreadPool()
. These methods take as parameter a thread pool which is owned by the group. The associated thread pool is shutdown when the group is terminated.
There is also a default group which the JVM maintains in the system. This is used by async channels when no group is explicitly specified when a channel is opened.
The completion handler for an I/O operation initiated on a channel bound to a group is guaranteed to be invoked by one of the pooled threads in the group.
1.4. Shutdown
The shutdown()
method is used to initiate an orderly shutdown of a group. The group terminates only when:
- all asynchronous channels that are bound to the group are closed
- all active completion handlers have run to completion; there will not be any stopping or interrupting the associated threads
- any resources used by the group are released
Any attempts to construct a channel that binds to the group will throw an exception.
The shutdownNow()
method can be used to initiate a forceful shutdown of the group. This method functions similar to the shutdown()
method, except that it closes all open channels.
2. An Example
In this example an async server socket channel is created using an async channel group. The server receives messages from an async socket channel client.
The example shows the group creation, association with a channel and the shutdown functions.
2.1. Create a channel group
final AsynchronousChannelGroup group = AsynchronousChannelGroup.withFixedThreadPool(5, Executors.defaultThreadFactory());
The async channel group’s withFixedThreadPool()
static method creates a new group with a fixed thread pool. The resulting group reuses a fixed number of threads. The method takes two parameters, the number of threads in the pool and the factory used for creating new threads.
2.2. Associate a group with a channel
final AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open(group);
The code snippet shows that an async server socket channel is created and is bound to the group.
2.3. Bind the channel to a port
InetSocketAddress hostAddress = new InetSocketAddress("localhost", 3883); listener.bind(hostAddress);
2.4. Process client connections
The following code snippet shows the async server socket channel initiates an async operation to accept async socket channel client connection. The method is of the form: abstract void accept(A attachment, CompletionHandler handler)
. The handler parameter is a completion handler that is invoked when a connection is accepted. The result passed to the completion handler is the AsynchronousSocketChannel
to the new connection.
Note the program waits to accept until the client starts and connects on the port of this server.
listener.accept(att1, new CompletionHandler() { @Override public void completed(AsynchronousSocketChannel ch, Object att) { System.out.println("Completed: " + att); String msg = handleConnection(ch); if (msg.equals("Bye")) { if (! group.isTerminated()) { System.out.println("Terminating the group..."); try{ group.shutdownNow(); group.awaitTermination(10, TimeUnit.SECONDS); } catch (IOException | InterruptedException e) { ... } } att = "Next connection"; System.out.println("Waiting for - " + att); listener.accept(att, this); } @Override public void failed(Throwable e, Object att) { System.out.println(att + " - handler failed"); e.printStackTrace(); currentThread.interrupt(); } }); currentThread.join();
In this example, from the handler’s completed()
method:
- Executes the method
handleConnection(ch);
which receives the message from a client. - When the value of the message is “Bye”, which indicates that there are no more messages (and connections) from clients, the server is to be shutdown.
- The group’s
shutdownNow()
method terminates the group. This closes the channel and waits for orderly completion of the handler.
The code listener.accept(att, this);
statement is for accepting the next connection, following the first one, from a new client. ‘this’ is the instance of the completion handler in which the statement is executing. The program waits here.
The handler’s failed()
method is executed when the handler fails. In this example, the handler fails as the server waits for a next connection and the group is terminated. Note that the termination process closes the channel.
The group’s awaitTermination(10, TimeUnit.SECONDS)
method waits for the specified seconds, before the termination.
NOTE: Also, see these examples.
3. The Code and the Output
The application has two programs the client and the server. The following is the complete code, the run instructions and the output details.
3.1. Code
3.1.1. Server
Server.java
import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.nio.channels.AsynchronousChannelGroup; import java.net.InetSocketAddress; import java.util.concurrent.Future; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class Server { public static void main (String [] args) throws IOException { new Server().go(); } private Thread currentThread; private void go() throws IOException { final AsynchronousChannelGroup group = AsynchronousChannelGroup.withFixedThreadPool(5, Executors.defaultThreadFactory()); final AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open(group); InetSocketAddress hostAddress = new InetSocketAddress("localhost", 3883); listener.bind(hostAddress); System.out.println("Server channel bound to port: " + hostAddress.getPort()); System.out.println("Waiting for client to connect... "); currentThread = Thread.currentThread(); final String att1 = "First connection"; listener.accept(att1, new CompletionHandler() { @Override public void completed(AsynchronousSocketChannel ch, Object att) { System.out.println("Completed: " + att); String msg = handleConnection(ch); if (msg.equals("Bye")) { if (! group.isTerminated()) { System.out.println("Terminating the group..."); try{ group.shutdownNow(); group.awaitTermination(10, TimeUnit.SECONDS); } catch (IOException | InterruptedException e) { System.out.println("Exception during group termination"); e.printStackTrace(); } currentThread.interrupt(); } } att = "Next connection"; System.out.println("Waiting for - " + att); listener.accept(att, this); } @Override public void failed(Throwable e, Object att) { System.out.println(att + " - handler failed"); e.printStackTrace(); currentThread.interrupt(); } }); try { currentThread.join(); } catch (InterruptedException e) { } System.out.println ("Exiting the server"); } // go() private String handleConnection(AsynchronousSocketChannel ch) { ByteBuffer buffer = ByteBuffer.allocate(32); Future result = ch.read(buffer); while (! result.isDone()) { // do nothing } buffer.flip(); String msg = new String(buffer.array()).trim(); System.out.println("Message from client: " + msg); buffer.clear(); return msg; } }
3.1.2. Client
Client.java
import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.Future; import java.nio.channels.AsynchronousSocketChannel; import java.util.concurrent.ExecutionException; import java.net.InetSocketAddress; public class Client { public static void main (String [] args) throws Exception { new Client().go(); } private void go() throws IOException, InterruptedException, ExecutionException { AsynchronousSocketChannel client = AsynchronousSocketChannel.open(); InetSocketAddress hostAddress = new InetSocketAddress("localhost", 3883); Future future = client.connect(hostAddress); future.get(); // returns null System.out.println("Client is started"); System.out.println("Sending message to server: "); byte [] bytes = new String("Bye").getBytes(); ByteBuffer buffer = ByteBuffer.wrap(bytes); Future result = client.write(buffer); while (! result.isDone()) { System.out.println("... "); } System.out.println(new String(buffer.array()).trim()); buffer.clear(); client.close(); } }
NOTE: In this application multiple clients may be used with different message text.
3.2. The output
The client and server programs are to be started independently. Note the server program is to be started first.
3.2.1. Start the server
Start the server program in a new DOS window. The following is the output:
> java Server Server channel bound to port: 3883 Waiting for client to connect...
From the output note the server port 3883. The program waits to accept, until the client connects.
3.2.2. Start the client
Start the client program in another DOS window. The following is the output:
> java Client Client is started Sending message to server: ... Bye
From the output, note the client is started. This connects to the server’s port 3883. After connecting, a message is sent to the server.
3.2.3. Check messages on server
The output:
Server channel bound to port: 3883 Waiting for client to connect... Completed: First connection Message from client: Bye Terminating the group... Waiting for - Next connection Next connection - handler failed Exiting the server java.nio.channels.ClosedChannelException at sun.nio.ch.WindowsAsynchronousServerSocketChannelImpl.implAccept(WindowsAsynchronousServerSocketChannelImpl. ava:295) at sun.nio.ch.AsynchronousServerSocketChannelImpl.accept(AsynchronousServerSocketChannelImpl.java:134) at Server$1.completed(Server.java:68) at Server$1.completed(Server.java:39) at sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:126) at sun.nio.ch.Invoker$2.run(Invoker.java:206) at sun.nio.ch.Iocp$EventHandlerTask.run(Iocp.java:353) at sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724)
From the output:
- Completed: First connection
Message from client: Bye
Terminating the group…
The output shows the server receiving the message sent by the client. The message “Bye” initiates the group termination and server shutdown. - Waiting for – Next connection
Next connection – handler failed
The output shows the server is waiting for the following connection. The group’s termination is initiated, so the server’saccept()
method fails. The handler’sfailed()
method is executed. The output shows theClosedChannelException
‘s stack trace, due to closing the channel as the group terminates.
4. Download Java Source Code
This was an example of java.nio.channels.AsynchronousChannelGroup
You can download the full source code of this example here : AsynchronousChannelGroupExample.zip