AsynchronousChannelGroup

java.nio.channels.AsynchronousChannelGroup Example

This article introduces the AsynchronousChannelGroup and its basic usage. This class is available since Java SE 7 as part of Java NIO 2 file API. This article’s example shows using this class with asynchronous channels.

The example in this article is compiled and run in Windows OS environment. Note that Java SE 7 is required to run the code.
 
 
 
 
 
 
 

1. Introduction

AsynchronousChannelGroup abstract class is defined in the java.nio.channels package.

This class has functions to handle the I/O operations and their completion initiated by asynchronous channels that are bound to the group.

1.1. Async channels opened with a group

AsynchronousSocketChannel and AsynchronousServerSocketChannel classes can be associated with a group. These classes have open() methods, that create the channels, take a group as a parameter. AsynchronousFileChannel does not use async channel group.

1.2. Association with thread pools

A group has an associated thread pool.

  • The tasks that are submitted to handle I/O events are associated with that thread pool.
  • The results of the tasks of asynchronous operations performed on channels in the group are consumed by completion handlers (CompletionHandler) – are within the group.
  • The pooled threads may also run other support-tasks of asynchronous I/O operations.

1.3. Creating a group

An asynchronous channel group is created by invoking one of the methods: withFixedThreadPool(), withCachedThreadPool() or withThreadPool(). These methods take as parameter a thread pool which is owned by the group. The associated thread pool is shutdown when the group is terminated.

There is also a default group which the JVM maintains in the system. This is used by async channels when no group is explicitly specified when a channel is opened.

The completion handler for an I/O operation initiated on a channel bound to a group is guaranteed to be invoked by one of the pooled threads in the group.

1.4. Shutdown

The shutdown() method is used to initiate an orderly shutdown of a group. The group terminates only when:

  • all asynchronous channels that are bound to the group are closed
  • all active completion handlers have run to completion; there will not be any stopping or interrupting the associated threads
  • any resources used by the group are released

Any attempts to construct a channel that binds to the group will throw an exception.

The shutdownNow() method can be used to initiate a forceful shutdown of the group. This method functions similar to the shutdown() method, except that it closes all open channels.

2. An Example

In this example an async server socket channel is created using an async channel group. The server receives messages from an async socket channel client.

The example shows the group creation, association with a channel and the shutdown functions.

2.1. Create a channel group

final AsynchronousChannelGroup group = AsynchronousChannelGroup.withFixedThreadPool(5, Executors.defaultThreadFactory());

The async channel group’s withFixedThreadPool() static method creates a new group with a fixed thread pool. The resulting group reuses a fixed number of threads. The method takes two parameters, the number of threads in the pool and the factory used for creating new threads.

2.2. Associate a group with a channel

final AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open(group);

The code snippet shows that an async server socket channel is created and is bound to the group.

2.3. Bind the channel to a port

InetSocketAddress hostAddress = new InetSocketAddress("localhost", 3883);
listener.bind(hostAddress);

2.4. Process client connections

The following code snippet shows the async server socket channel initiates an async operation to accept async socket channel client connection. The method is of the form: abstract void accept(A attachment, CompletionHandler handler). The handler parameter is a completion handler that is invoked when a connection is accepted. The result passed to the completion handler is the AsynchronousSocketChannel to the new connection.

Note the program waits to accept until the client starts and connects on the port of this server.

listener.accept(att1, new CompletionHandler() {
    @Override
    public void completed(AsynchronousSocketChannel ch, Object att) {

        System.out.println("Completed: " + att);
        String msg = handleConnection(ch);
					
        if (msg.equals("Bye")) {

            if (! group.isTerminated()) {

                System.out.println("Terminating the group...");

                try{
                    group.shutdownNow();
                    group.awaitTermination(10, TimeUnit.SECONDS);
                }
                catch (IOException | InterruptedException e) {				
                    ...
            }
        }
				
        att = "Next connection";
        System.out.println("Waiting for - " + att);
        listener.accept(att, this);
    }	
    @Override
    public void failed(Throwable e, Object att) {
			
        System.out.println(att + " - handler failed");
        e.printStackTrace();
        currentThread.interrupt();
     }
});

currentThread.join();

In this example, from the handler’s completed() method:

  • Executes the method handleConnection(ch); which receives the message from a client.
  • When the value of the message is “Bye”, which indicates that there are no more messages (and connections) from clients, the server is to be shutdown.
  • The group’s shutdownNow() method terminates the group. This closes the channel and waits for orderly completion of the handler.

The code listener.accept(att, this); statement is for accepting the next connection, following the first one, from a new client. ‘this’ is the instance of the completion handler in which the statement is executing. The program waits here.

The handler’s failed() method is executed when the handler fails. In this example, the handler fails as the server waits for a next connection and the group is terminated. Note that the termination process closes the channel.

The group’s awaitTermination(10, TimeUnit.SECONDS) method waits for the specified seconds, before the termination.

NOTE: Also, see these examples.

3. The Code and the Output

The application has two programs the client and the server. The following is the complete code, the run instructions and the output details.

3.1. Code

3.1.1. Server

Server.java

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.channels.AsynchronousChannelGroup;
import java.net.InetSocketAddress;
import java.util.concurrent.Future;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Server {

    public static void main (String [] args)
            throws IOException {
	
        new Server().go();
    }

    private Thread currentThread;

    private void go()
            throws IOException {
			
        final AsynchronousChannelGroup group = AsynchronousChannelGroup.withFixedThreadPool(5, Executors.defaultThreadFactory());

        final AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open(group);
			
        InetSocketAddress hostAddress = new InetSocketAddress("localhost", 3883);
        listener.bind(hostAddress);
		
        System.out.println("Server channel bound to port: " + hostAddress.getPort());
        System.out.println("Waiting for client to connect... ");
		
        currentThread = Thread.currentThread();
		
        final String att1 = "First connection";

        listener.accept(att1, new CompletionHandler() {
            @Override
            public void completed(AsynchronousSocketChannel ch, Object att) {

                System.out.println("Completed: " + att);
                String msg = handleConnection(ch);
					
                if (msg.equals("Bye")) {

                    if (! group.isTerminated()) {

                        System.out.println("Terminating the group...");

                        try{
                            group.shutdownNow();
                            group.awaitTermination(10, TimeUnit.SECONDS);
                        }
                        catch (IOException | InterruptedException e) {
						
                            System.out.println("Exception during group termination");
                            e.printStackTrace();
                        }
						
                        currentThread.interrupt();
                    }
                }
				
                att = "Next connection";
                System.out.println("Waiting for - " + att);
                listener.accept(att, this);
            }
			
            @Override
            public void failed(Throwable e, Object att) {
			
                System.out.println(att + " - handler failed");
                e.printStackTrace();
                currentThread.interrupt();
            }
        });
			
        try {
            currentThread.join();
        }
        catch (InterruptedException e) {
        }
		
        System.out.println ("Exiting the server");	
    } // go()
			
    private String handleConnection(AsynchronousSocketChannel ch) {
			
        ByteBuffer buffer = ByteBuffer.allocate(32);
        Future result = ch.read(buffer);
        while (! result.isDone()) {
            // do nothing
        }

        buffer.flip();
        String msg = new String(buffer.array()).trim();
        System.out.println("Message from client: " + msg);
        buffer.clear();
		
        return msg; 
    }	
}

3.1.2. Client

Client.java

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Future;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.ExecutionException;
import java.net.InetSocketAddress;

public class Client {

    public static void main (String [] args)
            throws Exception {
	
        new Client().go();
    }

    private void go()
            throws IOException, InterruptedException, ExecutionException {
	
        AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
        InetSocketAddress hostAddress = new InetSocketAddress("localhost", 3883);
        Future future = client.connect(hostAddress);
        future.get(); // returns null

        System.out.println("Client is started");
        System.out.println("Sending message to server: ");
		
        byte [] bytes = new String("Bye").getBytes();
        ByteBuffer buffer = ByteBuffer.wrap(bytes);
        Future result = client.write(buffer);
		
        while (! result.isDone()) {
            System.out.println("... ");
        }
		
        System.out.println(new String(buffer.array()).trim());
        buffer.clear();	
        client.close();
    }
}

NOTE: In this application multiple clients may be used with different message text.

3.2. The output

The client and server programs are to be started independently. Note the server program is to be started first.

3.2.1. Start the server

Start the server program in a new DOS window. The following is the output:

> java Server

Server channel bound to port: 3883
Waiting for client to connect...

From the output note the server port 3883. The program waits to accept, until the client connects.

3.2.2. Start the client

Start the client program in another DOS window. The following is the output:

> java Client

Client is started
Sending message to server:
...
Bye

From the output, note the client is started. This connects to the server’s port 3883. After connecting, a message is sent to the server.

3.2.3. Check messages on server

The output:

Server channel bound to port: 3883
Waiting for client to connect...

Completed: First connection
Message from client: Bye
Terminating the group...

Waiting for - Next connection
Next connection - handler failed
Exiting the server
java.nio.channels.ClosedChannelException
        at sun.nio.ch.WindowsAsynchronousServerSocketChannelImpl.implAccept(WindowsAsynchronousServerSocketChannelImpl.
ava:295)
        at sun.nio.ch.AsynchronousServerSocketChannelImpl.accept(AsynchronousServerSocketChannelImpl.java:134)
        at Server$1.completed(Server.java:68)
        at Server$1.completed(Server.java:39)
        at sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:126)
        at sun.nio.ch.Invoker$2.run(Invoker.java:206)
        at sun.nio.ch.Iocp$EventHandlerTask.run(Iocp.java:353)
        at sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)

From the output:

  • Completed: First connection
    Message from client: Bye
    Terminating the group…
    The output shows the server receiving the message sent by the client. The message “Bye” initiates the group termination and server shutdown.
  • Waiting for – Next connection
    Next connection – handler failed
    The output shows the server is waiting for the following connection. The group’s termination is initiated, so the server’s accept() method fails. The handler’s failed() method is executed. The output shows the ClosedChannelException‘s stack trace, due to closing the channel as the group terminates.

4. Download Java Source Code

This was an example of java.nio.channels.AsynchronousChannelGroup

Download
You can download the full source code of this example here : AsynchronousChannelGroupExample.zip

Prasad Saya

Prasad Saya is a software engineer with over ten years’ experience in application development, maintenance, testing and consulting on various platforms. He is a certified Java and Java EE developer. At present his interest is in developing Java applications. He also has experience working with databases and ERP applications.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button