Batch

Spring Batch ETL Job Example

Through this article, we are going to demonstrate an ETL use case leveraging the advantages of Spring Batch, A typical batch program generally reads a large number of records from a database, file, or queue, processes the data in some fashion, and then writes back data in a modified form.

The main advantage of batch applications is that they do not require any manual intervention. As a result, they can be scheduled to run at times when resources aren’t being utilized.

As an example, We will look at an ETL tool which runs in batch mode to calculate the financial stock market prices (Open, Low, High, Close). Huge financial stock market trades logs need to be parsed on a daily basis to fetch the required useful information. The input files are extracted and processed to obtain the required information, and the output data gets loaded to a CSV files. This whole process is carried out in batch mode.
 

1. Project Environment

  1. Spring Boot 1.3.3.RELEASE
  2. Apache Maven 3.0.5
  3. JDK 1.8
  4. Eclipse 4.4 (Luna)

2. Project Structure

project-structure
Figure 1: Project Structure

3. Dependencies

We have the following dependencies inside our below POM file.

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>org.springframework</groupId>
	<artifactId>springbatch-example-code</artifactId>
	<version>0.1.0</version>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>1.3.3.RELEASE</version>
	</parent>

	<properties>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-batch</artifactId>
		</dependency>
	</dependencies>


	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

4. Reader

FxMarketEventReader is an ItemReader that reads lines from input CSV file trades.csv which defined by setResource(Resource), then skip the file header at the start of a file using setLinesToSkip(int linesToSkip), after that map each line to an item FxMarketEvent using setLineMapper(LineMapper<T> lineMapper).

FxMarketEventReader.java:

package com.fx.batch.reader;

import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.core.io.ClassPathResource;

import com.fx.batch.model.FxMarketEvent;

/**
 * The Class FxMarketEventReader.
 *
 * @author ashraf
 */
public class FxMarketEventReader extends FlatFileItemReader<FxMarketEvent> {

	public FxMarketEventReader() {
		//Set input file
		this.setResource(new ClassPathResource("trades.csv"));
		//Skip the file header line
		this.setLinesToSkip(1);
		//Line is mapped to item (FxMarketEvent) using setLineMapper(LineMapper)
		this.setLineMapper(new DefaultLineMapper<FxMarketEvent>() {
			{
				setLineTokenizer(new DelimitedLineTokenizer() {
					{
						setNames(new String[] { "stock", "time", "price", "shares" });
					}
				});
				setFieldSetMapper(new BeanWrapperFieldSetMapper<FxMarketEvent>() {
					{
						setTargetType(FxMarketEvent.class);
					}
				});
			}
		});
	}

}

trades.csv:

 
stock,time,price,shares
JHX,09:30:00.00,57,95
JNJ,09:30:00.00,91.14,548
OPK,09:30:00.00,8.3,300
OPK,09:30:00.00,8.3,63
OMC,09:30:00.00,74.53,100
OMC,09:30:00.00,74.53,24
TWTR,09:30:00.00,64.89,100
TWTR,09:30:00.00,64.89,25
TWTR,09:30:00.00,64.89,245
TWTR,09:30:00.00,64.89,55
USB,09:30:00.00,39.71,400
USB,09:30:00.00,39.71,359
USB,09:30:00.00,39.71,41
USB,09:30:00.00,39.71,259
USB,09:30:00.00,39.71,100
VALE,09:30:00.00,14.88,900
VALE,09:30:00.00,14.88,1000
VALE,09:30:00.00,14.88,100
VALE,09:30:00.00,14.88,1000
VALE,09:30:00.00,14.88,260
VALE,09:30:00.00,14.88,100
BSBR,09:30:00.00,5.87,1100
BSBR,09:30:00.00,5.87,800
BRK.B,09:30:00.00,118.35,422

5. Processor

FxMarketEventProcessor is an ItemProcessor, takes FxMarketEvent as an input and converts it to Trade as an output. Although, It’s possible to return the same or different type than the one provided, returning null indicates that the item should not be continued to be processed.

FxMarketEventProcessor.java:

 
package com.fx.batch.processor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.batch.item.ItemProcessor;

import com.fx.batch.model.FxMarketEvent;
import com.fx.batch.model.Trade;

/**
 * The Class FxMarketEventProcessor.
 * 
 * @author ashraf
 */
public class FxMarketEventProcessor implements ItemProcessor<FxMarketEvent, Trade> {

	private static final Logger log = LoggerFactory.getLogger(FxMarketEventProcessor.class);

	@Override
	public Trade process(final FxMarketEvent fxMarketEvent) throws Exception {

		final String stock = fxMarketEvent.getStock();
		final String time = fxMarketEvent.getTime();
		final double price = Double.valueOf(fxMarketEvent.getPrice());
		final long shares = Long.valueOf(fxMarketEvent.getShares());
		final Trade trade = new Trade(stock, time, price, shares);

		log.trace("Converting (" + fxMarketEvent + ") into (" + trade + ")");

		return trade;
	}

}

6. Writer

StockPriceAggregator is an ItemWriter that aggregates the trading day stocks prices to calculates the Open, Low, High and Close for each stock, then update the FxMarketPricesStore.

StockPriceAggregator.java:

 
package com.fx.batch.writer;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;

import com.fx.batch.model.FxMarketPricesStore;
import com.fx.batch.model.StockPriceDetails;
import com.fx.batch.model.Trade;

/**
 * The Class StockPriceAggregator.
 * 
 * @author ashraf
 */
public class StockPriceAggregator implements ItemWriter<Trade> {

	@Autowired
	private FxMarketPricesStore fxMarketPricesStore;

	private static final Logger log = LoggerFactory.getLogger(StockPriceAggregator.class);

	@Override
	public void write(List<? extends Trade> trades) throws Exception {
		trades.forEach(t -> {
			if (fxMarketPricesStore.containsKey(t.getStock())) {
				double tradePrice = t.getPrice();
				StockPriceDetails priceDetails = fxMarketPricesStore.get(t.getStock());
				// Set highest price
				if (tradePrice > priceDetails.getHigh()) {
					priceDetails.setHigh(tradePrice);
				}
				// Set lowest price
				if (tradePrice < priceDetails.getLow()) {
					priceDetails.setLow(tradePrice);
				}
				// Set close price
				priceDetails.setClose(tradePrice);
			} else {
				log.trace("Adding new stock {}", t.getStock());
				fxMarketPricesStore.put(t.getStock(),
						new StockPriceDetails(t.getStock(), t.getPrice(), t.getPrice(), t.getPrice(), t.getPrice()));
			}
		});
	}

}

7. Listener

JobCompletionNotificationListener is a JobExecutionListener that provides a callback function afterJob(JobExecution jobExecution) to load the stocks prices into CSV file prices.csv after the ETL Job completion.

JobCompletionNotificationListener.java:

 
package com.fx.batch.listener;

import java.io.BufferedWriter;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.beans.factory.annotation.Autowired;

import com.fx.batch.model.FxMarketPricesStore;
import com.fx.batch.model.StockPriceDetails;

/**
 * The Class JobCompletionNotificationListener
 *
 * @author ashraf
 */
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {

	private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);

	private static final String HEADER = "stock,open,close,low,high";

	private static final String LINE_DILM = ",";

	@Autowired
	private FxMarketPricesStore fxMarketPricesStore;

	@Override
	public void afterJob(JobExecution jobExecution) {
		if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
			log.trace("Loading the results into file");
			Path path = Paths.get("prices.csv");
			try (BufferedWriter fileWriter = Files.newBufferedWriter(path)) {
				fileWriter.write(HEADER);
				fileWriter.newLine();
				for (StockPriceDetails pd : fxMarketPricesStore.values()) {
					fileWriter.write(new StringBuilder().append(pd.getStock())
							.append(LINE_DILM).append(pd.getOpen())
							.append(LINE_DILM).append(pd.getClose())
							.append(LINE_DILM).append(pd.getLow())
							.append(LINE_DILM).append(pd.getHigh()).toString());
					fileWriter.newLine();
				}
			} catch (Exception e) {
				log.error("Fetal error: error occurred while writing {} file", path.getFileName());
			}
		}
	}
}

8. Configuring and Running a Job

8.1. Job Java Configuration

Batch application java based configuration has two main components the @EnableBatchConfiguration annotation and two builders (JobBuilderFactoryStepBuilderFactory).

The @EnableBatchConfiguration provides a base configuration for building batch jobs. Within this base configuration, an instance of StepScope and JobScope so your beans inside steps can have @Scope("step") and @Scope("job") respectively. Also, there is a number of beans made available to be autowired:

BatchConfiguration.java:

 
package com.fx.batch;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.fx.batch.listener.JobCompletionNotificationListener;
import com.fx.batch.model.FxMarketEvent;
import com.fx.batch.model.FxMarketPricesStore;
import com.fx.batch.model.Trade;
import com.fx.batch.processor.FxMarketEventProcessor;
import com.fx.batch.reader.FxMarketEventReader;
import com.fx.batch.writer.StockPriceAggregator;

/**
 * The Class BatchConfiguration.
 * 
 * @author ashraf
 */
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

	@Autowired
	public JobBuilderFactory jobBuilderFactory;

	@Autowired
	public StepBuilderFactory stepBuilderFactory;

	@Bean
	public FxMarketPricesStore fxMarketPricesStore() {
		return new FxMarketPricesStore();
	}

	// FxMarketEventReader (Reader)
	@Bean
	public FxMarketEventReader fxMarketEventReader() {
		return new FxMarketEventReader();
	}

	// FxMarketEventProcessor (Processor)
	@Bean
	public FxMarketEventProcessor fxMarketEventProcessor() {
		return new FxMarketEventProcessor();
	}

	// StockPriceAggregator (Writer)
	@Bean
	public StockPriceAggregator stockPriceAggregator() {
		return new StockPriceAggregator();
	}

	// JobCompletionNotificationListener (File loader)
	@Bean
	public JobExecutionListener listener() {
		return new JobCompletionNotificationListener();
	}

	// Configure job step
	@Bean
	public Job fxMarketPricesETLJob() {
		return jobBuilderFactory.get("FxMarket Prices ETL Job").incrementer(new RunIdIncrementer()).listener(listener())
				.flow(etlStep()).end().build();
	}

	@Bean
	public Step etlStep() {
		return stepBuilderFactory.get("Extract -> Transform -> Aggregate -> Load").<FxMarketEvent, Trade> chunk(10000)
				.reader(fxMarketEventReader()).processor(fxMarketEventProcessor())
				.writer(stockPriceAggregator())
				.build();
	}

}

8.2. Running a Job

Launching a batch job requires two things: the Job to be launched and a JobLauncher. For example, if launching a job from the command line, a new JVM will be instantiated for each Job, and thus every job will have its own JobLauncher.

prices.csv:

stock,open,close,low,high
CVCO,68.4,68.7,68.4,68.7
SCS,15.77,15.58,15.58,15.836
SCU,26.11,26.11,26.11,26.11
BBD,12.21,12.18,12.1599,12.26
BBG,26.72,26.17,26.07,26.98
BBF,12.46,12.39,12.39,12.46
BBH,87.97,88.19,87.81,88.76
SCON,2.15,2.15,2.15,2.15
SCX,14.57,14.57,14.57,14.57
BBK,13.78,13.76,13.76,13.78
SCOK,1.16,1.16,1.16,1.16
SCZ,50.6,50.54,50.5,50.84
STPZ,52.88,52.9,52.84,52.9
JIVE,11.16,11.2,11.16,11.24
BBL,61.35,61.27,61.25,61.37
BBN,19.06,19.0503,19.05,19.06
SDD,12.14,12.14,12.14,12.14
TWTC,30.58,30.32,30.29,30.58
BBT,37.11,36.96,36.91,37.18
SCOR,28.47,28.445,28.21,28.79
CEAI,0.298,0.298,0.298,0.298
BBW,7.59,7.59,7.59,7.59
BBY,39.75,40.24,39.61,40.3
BBX,15.62,15.6,15.6,15.62
FNLC,17.12,17.49,17.12,17.49

9. Download the Source Code

This was an example to show how to create an ETL Spring Batch Job.

Download
You can download the full source code of this example here: SpringBatchExampleCode.zip

Ashraf Sarhan

Ashraf Sarhan is a passionate software engineer, an open source enthusiast, has a Bsc. degree in Computer and Information Systems from Alexandria University. He is experienced in building large, scalable and distributed enterprise applications/service in multiple domains. He also has a keen interest in JavaEE, SOA, Agile and Big Data technologies.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top button