threads

BlockingQueue example

With this example we are going to demonstrate how to use a BlockingQueue. The steps of the example are described in short:

  • We have created two classes that implement the Runnable interface and override its run() API method.
  • The first runnable, FileEnumerationTask has a BlockingQueue of Files, a File that is an empty dump file and another File. It also has a method, enumerate(File dir), where it recursively gets the array of abstract pathnames denoting the files in the directory denoted by the given abstract pathname, using listFile() API method of File, until it finds a File that is not a directory, using isDirectory() API method of File. When it reaches to a file that is not a directory, it inserts it to the BlockingQueue, using put(File e) API method of BlockingQueue. In its run() method it calls the enumerate(File e) method for the given file, that puts the files under the given directory to the BlockingQueue. Then it puts the dump file to the BlockingQueue.
  • The other runnable, SearchTask also has a BlockingQueue of Files and a String keyword. It also has a method, search(File), that reads the file using a Scanner over a FileInputStream connected to the file. It iterates over the lines of the file, using hasNextLine() and nextLine() API methods of Scanner, and if it contains the specified keyword it prints the line. In its run() method this runnable retrieves and removes the head of the BlockingQueue. If the file that it retrieves is equal to the dump file of the FileEnumerationTask, then it puts it in its BlockingQueue, else it uses its rearch(File e) method for the file to search for the given keyword.
  • We create a BlockingQueue of Files and a new FileEnumerationTask for this blocking queue and a given path to a File.
  • We create a new thread to run this runnable and we also create a new SearchTask for a given keyword and the given blocking queue.
  • The threads created by the two runnables keep adding and retrieving files to the blocking queue and in this way the specified keword is searched over the files under the specified directory.

Let’s take a look at the code snippet that follows: 

package com.javacodegeeks.snippets.core;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;


public class BlockingQueueExample {

    public static void main(String[] args) {


  Scanner input = new Scanner(System.in);

  System.out.print("Enter base directory (e.g. C:/Users/nikos7/Desktop): ");


  String dir = input.nextLine();


  System.out.print("Enter keyword (e.g. output): ");


  String kword = input.nextLine();


  final int FILE_QUEUE_SIZE = 10;


  final int THREADS = 100;


  BlockingQueue<File> blockingQueue = new ArrayBlockingQueue<>(FILE_QUEUE_SIZE);


  FileEnumerationTask enumerator = new FileEnumerationTask(blockingQueue, new File(dir));


  new Thread(enumerator).start();

  for (int i = 1; i <= THREADS; i++) {


new Thread(new SearchTask(blockingQueue, kword)).start();

  }
    }
}

class FileEnumerationTask implements Runnable {

    public static File dumFile = new File("");
    private BlockingQueue<File> blockingQueue;
    private File rootDir;

    public FileEnumerationTask(BlockingQueue<File> blockingQueue, File rootDir) {

  this.blockingQueue = blockingQueue;

  this.rootDir = rootDir;
    }

    @Override
    public void run() {

  try {


enumerate(rootDir);


blockingQueue.put(dumFile);

  } catch (InterruptedException ex) {

  }
    }

    public void enumerate(File dir) throws InterruptedException {

  File[] files = dir.listFiles();

  for (File file : files) {


if (file.isDirectory()) {


    enumerate(file);


} else {


    blockingQueue.put(file);


}

  }
    }
}

class SearchTask implements Runnable {

    public SearchTask(BlockingQueue<File> queue, String keyword) {

  this.queue = queue;

  this.keyword = keyword;
    }

    @Override
    public void run() {

  try {


boolean done = false;


while (!done) {


    File file = queue.take();


    if (file == FileEnumerationTask.dumFile) {



  queue.put(file);



  done = true;


    } else {



  search(file);


    }


}

  } catch (IOException e) {


e.printStackTrace();

  } catch (InterruptedException e) {

  }
    }

    public void search(File file) throws IOException {

  Scanner in = new Scanner(new FileInputStream(file));

  int lineNumber = 0;

  while (in.hasNextLine()) {


lineNumber++;


String line = in.nextLine();


if (line.contains(keyword)) {


    System.out.printf("%s:%d:%s%n", file.getPath(), lineNumber,




line);


}

  }

  in.close();
    }
    private BlockingQueue<File> queue;
    private String keyword;
}

Output:

Enter base directory (e.g. C:/Users/nikos7/Desktop): C:/Users/nikos7/Desktop
Enter keyword (e.g. output): output
C:Usersnikos7Desktopapache-ant-1.8.3binrunant.pl:25:# and returns the XML formatted output)
.
.
.

  
This was an example of how to use a BlockingQueue in Java.

Byron Kiourtzoglou

Byron is a master software engineer working in the IT and Telecom domains. He is an applications developer in a wide variety of applications/services. He is currently acting as the team leader and technical architect for a proprietary service creation and integration platform for both the IT and Telecom industries in addition to a in-house big data real-time analytics solution. He is always fascinated by SOA, middleware services and mobile development. Byron is co-founder and Executive Editor at Java Code Geeks.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button