Java Nio Heartbeat Example
This article is a tutorial on implementing a simple Java NIO Heartbeat. This example will take the form of “n” number of “Broadcast” mode processes which will multicast data via UDP to “n” number of “Subscribe” processes that have expressed interest in receiving said traffic.
1. Introduction
This article builds on three earlier articles on the subject of Java NIO, namely “Java Nio Tutorial for Beginners”, “Java Nio Asynchronous Channels Tutorial” and “Java Nio EchoServer“. Before getting stuck into the “meat” of our example, it is best to get some background into the topic itself. According to Wikipedia a “heartbeat” in computer systems is a periodic signal generated by hardware or software to indicate normal operation or to synchronize parts of a system. So true to the name it is indeed a measure of life of individual components in a distributed computer system and one can deduce then by it’s absence, presence and frequency, the state of the system in which it occurs.
In the same breath, when one talks about “heartbeats” in computer systems the term “UDP” often comes up and with good reason. It is the protocol of choice when implementing “hearbeat” type solutions, weather it be cluster membership negotiations or life signing (heartbeats). The low latency of this “connection-less” protocol also plays to the nature of “heartbeating” in distributed systems.
Important to note that unlike TCP, UDP makes no guarantee on delivery of packets, the low latency of UDP stems from this not having to guarantee delivery via the typical SYN ACK (3 way handshake etc).
We go one step further in this example and we multicast the traffic out to interested parties. Now why would we do this and what other choices are there? Typically the following choices would present themselves:
- Unicast: From one machine to another. One-to-One
- Broadcast: From one machine to all possible machines. One-to-All (within the broadcast domain – ie: behind a router or in a private network)
- Multicast: From one machine to multiple machines that have stated interest in receiving said traffic. This can traverse the broadcast domain and extend past a router.
2. Technologies used
The example code in this article was built and run using:
- Java 1.8.101 (1.8.x will do fine)
- Maven 3.3.9 (3.3.x will do fine)
- Spring source tool suite 4.6.3 (Any Java IDE would work)
- Ubuntu 16.04 (Windows, Mac or Linux will do fine)
3. Overview
The abstractions of use to us when wanting to effect UDP in Java Nio would be the DatagramChannel, which also happens to be a SelectableChannel priming it for use by a Selector in a very Thread efficient manner. It also happens to implement MulticastChannel which supports Internet Protocol (IP) multicasting.
3.1 DatagramChannel
A DatagramChannel is opened by one of the static open(...)
methods of the class itself. One of the open(...)
methods are of particular interest to us and that is:
DatagramChannel open for multicast
public static DatagramChannel open(ProtocolFamily family) throws IOException
The ProtocolFamily is required when attempting to multicast with this Channel and should correspond to the IP type of the multicast group that this Channel will join. eg: IPV4 StandardProtocolFamily.INET
A DatagramChannel need not be connected to use the send(...)
and receive(...)
methods of this class, conversely so the read(...)
and write(...)
methods do.
3.2 MulticastChannel
This Channel supports (IP) multicasting. Of particular interest to us is this part of it’s API:
DatagramChannel configuration
... channel.setOption(StandardSocketOptions.IP_MULTICAST_IF, NetworkInterface); channel.join(InetAddress, this.multicastNetworkInterface); ...
line 2: the NetworkInterface is the interface through which we will send / receive UDP multicast traffic
line 3: we ensure we join the multicast group (express interest in receiving traffic to this group) by way of passing a InetAddress (the multicast IP) and a NetworkInterface (the interface through which we will receive said multicast traffic). Multicast IP ranges range from 224.0.0.0 to 239.255.255.255 typically.
A MulticastChannel can join “n” number of multicast groups and can join a group on different network interfaces.
4. Multicaster
Multicaster
final class Multicaster implements ScheduledChannelOperation { private final String id; private final ScheduledExecutorService scheduler; private final NetworkInterface networkInterface; private final InetSocketAddress multicastGroup; Multicaster(final String id, final String ip, final String interfaceName, final int port, final int poolSize) { if (StringUtils.isEmpty(id) || StringUtils.isEmpty(ip) || StringUtils.isEmpty(interfaceName)) { throw new IllegalArgumentException("required id, ip and interfaceName"); } this.id = id; this.scheduler = Executors.newScheduledThreadPool(poolSize); this.multicastGroup = new InetSocketAddress(ip, port); try { this.networkInterface = NetworkInterface.getByName(interfaceName); } catch (SocketException e) { throw new RuntimeException("unable to start broadcaster", e); } } @Override public ScheduledExecutorService getService() { return this.scheduler; } void run(final CountDownLatch endLatch) { assert !Objects.isNull(endLatch); try (DatagramChannel channel = DatagramChannel.open()) { initChannel(channel); doSchedule(channel); endLatch.await(); } catch (IOException | InterruptedException e) { throw new RuntimeException("unable to run broadcaster", e); } finally { this.scheduler.shutdownNow(); } } private void doSchedule(final DatagramChannel channel) { assert !Objects.isNull(channel); doSchedule(channel, new Runnable() { public void run() { System.out.println(String.format("Multicasting for %s", Multicaster.this.id)); try { Multicaster.this.doBroadcast(channel); } catch (IOException e) { e.printStackTrace(); } } }, 0L, Constants.Schedule.PULSE_DELAY_IN_MILLISECONDS, TimeUnit.MILLISECONDS); } private void initChannel(final DatagramChannel channel) throws IOException { assert !Objects.isNull(channel); channel.bind(null); channel.setOption(StandardSocketOptions.IP_MULTICAST_IF, this.networkInterface); } private void doBroadcast(final DatagramChannel channel) throws IOException { assert !Objects.isNull(channel); Pulse.broadcast(this.id, this.multicastGroup, channel); } }
- line 14: we create a ScheduledExecutorService with the purposes of scheduling the multicast heartbeat pulse to the multicast group
- line 15: we create a InetSocketAddress which will be the multicast group to which we will send our heartbeats
- line 18: we create a NetworkInterface which will encapsulate the interface through which our multicast heartbeats will travel
- line 34: we initialize our DatagramChannel
- line 35: we schedule our heartbeat thread
- line 48-58: represents the schedule task that is run, this is quite simply a
send(...)
operation on the DatagramChannel to the InetSocketAddress which represents our multicast group - line 64: allow any socket address to be bound to the socket – does not matter
- line 65: ensure we set the NetworkInterface to be used for the multicast heartbeats that are sent. We don’t set the TTL for the multicast, although you could if you want.
5. Subscriber
Subscriber
final class Subscriber implements ScheduledChannelOperation { private final String id; private final ScheduledExecutorService scheduler; private final NetworkInterface networkInterface; private final InetSocketAddress hostAddress; private final InetAddress group; private final ConcurrentMap<String, Pulse> pulses; Subscriber(final String id, final String ip, final String interfaceName, final int port, final int poolSize) { if (StringUtils.isEmpty(id) && StringUtils.isEmpty(ip) || StringUtils.isEmpty(interfaceName)) { throw new IllegalArgumentException("required id, ip and interfaceName"); } this.id = id; this.scheduler = Executors.newScheduledThreadPool(poolSize); this.hostAddress = new InetSocketAddress(port); this.pulses = new ConcurrentHashMap<>(); try { this.networkInterface = NetworkInterface.getByName(interfaceName); this.group = InetAddress.getByName(ip); } catch (SocketException | UnknownHostException e) { throw new RuntimeException("unable to start broadcaster", e); } } @Override public ScheduledExecutorService getService() { return this.scheduler; } void run() { try (final DatagramChannel channel = DatagramChannel.open(StandardProtocolFamily.INET); final Selector selector = Selector.open()) { System.out.printf("Starting subscriber %s", id); initChannel(channel, selector); doSchedule(channel); while (!Thread.currentThread().isInterrupted()) { if (selector.isOpen()) { final int numKeys = selector.select(); if (numKeys > 0) { handleKeys(channel, selector.selectedKeys()); } } else { Thread.currentThread().interrupt(); } } } catch (IOException e) { throw new RuntimeException("unable to run subscriber", e); } finally { this.scheduler.shutdownNow(); } } private void initChannel(final DatagramChannel channel, final Selector selector) throws IOException { assert !Objects.isNull(channel) && Objects.isNull(selector); channel.configureBlocking(false); channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); channel.bind(this.hostAddress); channel.setOption(StandardSocketOptions.IP_MULTICAST_IF, this.networkInterface); channel.join(this.group, this.networkInterface); channel.register(selector, SelectionKey.OP_READ); } private void handleKeys(final DatagramChannel channel, final Set<SelectionKey> keys) throws IOException { assert !Objects.isNull(keys) && !Objects.isNull(channel); final Iterator<SelectionKey> iterator = keys.iterator(); while (iterator.hasNext()) { final SelectionKey key = iterator.next(); try { if (key.isValid() && key.isReadable()) { Pulse.read(channel).ifPresent((pulse) -> { this.pulses.put(pulse.getId(), pulse); }); } else { throw new UnsupportedOperationException("key not valid."); } } finally { iterator.remove(); } } } private void doSchedule(final DatagramChannel channel) { assert !Objects.isNull(channel); doSchedule(channel, new Runnable() { public void run() { Subscriber.this.pulses.forEach((id, pulse) -> { if (pulse.isDead(Constants.Schedule.DOWNTIME_TOLERANCE_DEAD_SERVICE_IN_MILLISECONDS)) { System.out.println(String.format("FATAL : %s removed", id)); Subscriber.this.pulses.remove(id); } else if (!pulse.isValid(Constants.Schedule.DOWNTIME_TOLERANCE_IN_MILLISECONDS)) { System.out.println(String.format("WARNING : %s is down", id)); } else { System.out.println(String.format("OK : %s is up", id)); } }); } }, 0L, Constants.Schedule.PULSE_DELAY_IN_MILLISECONDS, TimeUnit.MILLISECONDS); } }
- line 16: we create a ScheduledExecutorService to schedule the polling of the heartbeat pulses we have received thus far via UDP multicast
- line 17: we create a InetSocketAddress for the specified port and instantiate it for the “localhost”
- line 21: we create NetworkInterface for the specified interface name, this will be the interface through which the
Subscriber
will receive UDP multicast heartbeat pulses - line 22: we create a InetAddress representing the multicast group from which we will receive multicast messages
- line 34: we open the DatagramChannel but also specify the
ProtocolFamily
and this should correspond to the address type of the multicast group this Channel will be joining. - line 37-38: we initialize the Channel and schedule the polling of heartbeat pulses
- line 40-49: while the current thread is still running we utilize the
Selector
and await incoming UDP multicast heartbeats in a non-blocking way. - line 63-64: we set the multicast interface and join the multicast group using the multicast interface
- line 77-79: we read a Pulse from the UDP multicast packet.
6. Running the program
The example project is a maven project and must be built into a “fat” or “uber” jar by issuing the following command mvn clean install package
. The resulting artifact can be found in the “target” folder located in the project root folder. The project can be run in two modes, one being “MULTICAST” and the other being “SUBSCRIBE”. Obviously the “MULTICAST” mode will publish packets (heartbeats) to the multicast group and the “SUBSCRIBE” mode will receive said heartbeats.
The beauty of the example is that you can spin up as many “MULTICAST” processes as you wish (ensure you give them all unique id’s) and as many “SUBSCRIBE” processes as you wish (ensure you give them also unique id’s). This can be done in random order meaning “MULTICAST” or “SUBSCRIBE” in any order. Simply put, as soon as heartbeats arrive, the subscribers will know about it and begin reporting as shown below:
On top are the two “MULTICAST” processes and on the bottom are the two “SUBSCRIBE” processes. Notice how the subscribers report the other processes being “up” until I terminate one of them (“A” – top left) and then after a configurable tolerance is exceeded, the last packet / pulse is reported as too old, the subscribers notify us that “A is down”. After a short while A is removed and considered dead. We then bring A back up and you see immediately a heartbeat is received and the subscribers begin reporting “A is up” along with “B”, which never went down.
To run this example you need a network interface, ie: you need to be plugged into a network point as the network interface name is required. Although I have been successful in sending packets via the wireless interface as the “BROADCASTER”, to the multicast group, and receiving them in the “SUBSCRIBER” via a LAN interface, I would highly recommend using a LAN (cable) interface for both MULTICAST and BROADCAST processes.The interface will serve as the NetworkInterface on which the multicast group is joined.
The following arguments are required to run the program:
- -i ‘Unique id of the process’ : eg: A or B or S1
- -m ‘Mode’ : can be one of MULICAST or SUBSCRIBE
- -n ‘NetworkInterface name’ : be sure to run
ifconfig
(linux) oripconfig
(windows) to find the interface name. eg: eth0
- Running a MULTICAST process: `java -jar heartbeat-0.0.1-SNAPSHOT.jar -m MULTICAST -i A -n eth0`
- Running a SUBSCRIBE process: `java -jar heartbeat-0.0.1-SNAPSHOT.jar -m SUBSCRIBE -i s1 -n eth0`
7. Summary
In this example we demonstrated how to build a simple heartbeat application using UDP and Java NIO. We took it a bit further and leveraged multicasting as a method to publish our heartbeat signals across the network to interested parties. These interested parties can change state, notify persons or even try to resurrect down / faulty services when they become aware of problems via the heartbeat signals.
8. Download the source code
This was a Java Nio Heartbeat tutorial
You can download the full source code of this example here: Java Nio Heartbeat tutorial