Spring Batch Multithreading Example
Through this article, we are going to show you Spring batch multithreading feature. In an enterprise, when data gets big with more customers, transactions and site hits. Your batch jobs need to be able to keep up. Spring Batch was designed from the ground up to be highly scalable, to fit the needs of both small batch jobs and large enterprise-scale batch infrastructures. This section looks at one approach Spring Batch takes for scaling batch jobs beyond the default flow which is multithreaded steps.
We will hack our last article Spring Batch ETL Job to calculates the financial market daily stock volume. Understanding volume can provide insight into a stock’s behavior to help you determine its overall health. The most important rule is this: volume precedes price. Typically, before a stock price moves, volume comes into play. The beauty of this indicator is its flexibility. Changes in volume can be used intra-day to determine short-term price movement or over several days to determine a stock’s two to three day trend direction.
Our example will use the same structure of our last article Spring Batch ETL Job with the same reader and processor classes but we will create a new writer and listener classes to aggregate and save daily volume for each stock in volume.csv
. Let’s take a look below.
1. Multithreaded Step
Spring Batch’s multithreaded step concept allows a batch job to use Spring’s TaskExecutor
abstraction to execute each chunk in its own thread. a step in a job can be configured to perform within a threadpool, processing each chunk independently. As chunks are processed, Spring Batch keeps track of what is done accordingly. If an error occurs in any one of the threads, the job’s processing is rolled back or terminated per the regular Spring Batch functionality.
2. Writer
StockVolumeAggregator
is our new writer that aggregates the trading day volume for each stock, then update the FxMarketPricesStore
. It looks like our Spring Batch ETL Job writer StockPriceAggregator
with a little changes to calculates the stock volume.
StockVolumeAggregator.java:
package com.quantvalley.batch.writer; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.item.ItemWriter; import org.springframework.beans.factory.annotation.Autowired; import com.quantvalley.batch.model.FxMarketVolumeStore; import com.quantvalley.batch.model.StockVolume; import com.quantvalley.batch.model.Trade; /** * The Class StockVolumeAggregator. * * @author ashraf */ public class StockVolumeAggregator implements ItemWriter<Trade> { @Autowired private FxMarketVolumeStore fxMarketVolumeStore; private static final Logger log = LoggerFactory.getLogger(StockVolumeAggregator.class); @Override public void write(List<? extends Trade> trades) throws Exception { trades.forEach(t -> { if (fxMarketVolumeStore.containsKey(t.getStock())) { StockVolume stockVolume = fxMarketVolumeStore.get(t.getStock()); long newVolume = stockVolume.getVolume() + t.getShares(); // Increment stock volume stockVolume.setVolume(newVolume); } else { log.trace("Adding new stock {}", t.getStock()); fxMarketVolumeStore.put(t.getStock(), new StockVolume(t.getStock(), t.getShares())); } }); } }
3. Listener
JobCompletionNotificationListener
is a JobExecutionListener
that provides a callback function afterJob(JobExecution jobExecution)
to load the stocks volumes into the CSV file volume.csv
after ETL Job completion.
JobCompletionNotificationListener.java:
package com.quantvalley.batch.listener; import java.io.BufferedWriter; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.listener.JobExecutionListenerSupport; import org.springframework.beans.factory.annotation.Autowired; import com.quantvalley.batch.model.FxMarketVolumeStore; import com.quantvalley.batch.model.StockVolume; /** * The Class JobCompletionNotificationListener * * @author ashraf */ public class JobCompletionNotificationListener extends JobExecutionListenerSupport { private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class); private static final String HEADER = "stock,volume"; private static final String LINE_DILM = ","; @Autowired private FxMarketVolumeStore fxMarketVolumeStore; @Override public void afterJob(JobExecution jobExecution) { if (jobExecution.getStatus() == BatchStatus.COMPLETED) { log.trace("Loading the results into file"); Path path = Paths.get("volume.csv"); try (BufferedWriter fileWriter = Files.newBufferedWriter(path)) { fileWriter.write(HEADER); fileWriter.newLine(); for (StockVolume pd : fxMarketVolumeStore.values()) { fileWriter.write(new StringBuilder().append(pd.getStock()) .append(LINE_DILM).append(pd.getVolume()).toString()); fileWriter.newLine(); } } catch (Exception e) { log.error("Fetal error: error occurred while writing {} file", path.getFileName()); } } } }
4. Configuring and Running a Job
4.1. Enable job multithreaded step
All that is required to add the power of Spring’s multithreading capabilities to a step in your job is to define a TaskExecutor
implementation (we use SimpleAsyncTaskExecutor
in this example) and reference it in your step. When you execute the statement job, Spring creates a threadpool of 5 threads, executing each chunk in a different thread or 5 chunks in parallel. As you can imagine, this can be a powerful addition to most jobs. A Step configuration may look like this:
BatchConfiguration.java:
// Configure job step @Bean public Job fxMarketPricesETLJob() { return jobBuilderFactory.get("FxMarket Volume ETL Job").incrementer(new RunIdIncrementer()).listener(listener()) .flow(etlStep()).end().build(); } @Bean public TaskExecutor taskExecutor(){ SimpleAsyncTaskExecutor asyncTaskExecutor=new SimpleAsyncTaskExecutor("spring_batch"); asyncTaskExecutor.setConcurrencyLimit(5); return asyncTaskExecutor; } @Bean public Step etlStep() { return stepBuilderFactory.get("Extract -> Transform -> Aggregate -> Load").<FxMarketEvent, Trade> chunk(10000) .reader(fxMarketEventReader()).processor(fxMarketEventProcessor()) .writer(stockVolumeAggregator()) .taskExecutor(taskExecutor()).build(); }
4.2. Running a Job
Our job reads records from the trades.csv
file, then it aggregates and save daily volume for each stock in volume.csv
.
4.2.1. Input
trades.csv:
stock,time,price,shares JHX,09:30:00.00,57,95 JNJ,09:30:00.00,91.14,548 OPK,09:30:00.00,8.3,300 OPK,09:30:00.00,8.3,63 OMC,09:30:00.00,74.53,100 OMC,09:30:00.00,74.53,24 TWTR,09:30:00.00,64.89,100 TWTR,09:30:00.00,64.89,25 TWTR,09:30:00.00,64.89,245
4.2.2. Output
The below output sample contains the top 10 high volume stocks.
volume.csv:
stock,volume ELAY,8563079 EEM,9220571 FTR,12444516 AEGY,12869499 ERBB,19696299 MJNA,8263325 PVEC,10083433 FITX,14781867 BRGO,11458750 BAC,10860160
5. Conclusion
It was noticeable that there is a significant time different when we enable the step multithreading feature where the time (6.776 seconds
) is almost is about 63.5 % of the total time (10.677 seconds
) consumed when the multithreading is disabled.
5.1. Job running with multithreaded step
. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v1.3.3.RELEASE) 2016-06-04 21:37:14.419 INFO 3312 --- [ main] com.quantvalley.batch.Application : Starting Application on HP-ProBook with PID 3312 (started by ashraf in /home/ashraf/jcg/examples/Spring Batch Multithreading Example/spring-batch-multithreading-example) 2016-06-04 21:37:14.422 INFO 3312 --- [ main] com.quantvalley.batch.Application : No active profile set, falling back to default profiles: default 2016-06-04 21:37:14.453 INFO 3312 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@45afc369: startup date [Sat Jun 04 21:37:14 EET 2016]; root of context hierarchy 2016-06-04 21:37:14.926 WARN 3312 --- [ main] o.s.c.a.ConfigurationClassEnhancer : @Bean method ScopeConfiguration.stepScope is non-static and returns an object assignable to Spring's BeanFactoryPostProcessor interface. This will result in a failure to process annotations such as @Autowired, @Resource and @PostConstruct within the method's declaring @Configuration class. Add the 'static' modifier to this method to avoid these container lifecycle issues; see @Bean javadoc for complete details. 2016-06-04 21:37:14.938 WARN 3312 --- [ main] o.s.c.a.ConfigurationClassEnhancer : @Bean method ScopeConfiguration.jobScope is non-static and returns an object assignable to Spring's BeanFactoryPostProcessor interface. This will result in a failure to process annotations such as @Autowired, @Resource and @PostConstruct within the method's declaring @Configuration class. Add the 'static' modifier to this method to avoid these container lifecycle issues; see @Bean javadoc for complete details. 2016-06-04 21:37:15.085 INFO 3312 --- [ main] o.s.j.d.e.EmbeddedDatabaseFactory : Starting embedded database: url='jdbc:hsqldb:mem:testdb', username='sa' 2016-06-04 21:37:15.608 INFO 3312 --- [ main] o.s.jdbc.datasource.init.ScriptUtils : Executing SQL script from class path resource [org/springframework/batch/core/schema-hsqldb.sql] 2016-06-04 21:37:15.616 INFO 3312 --- [ main] o.s.jdbc.datasource.init.ScriptUtils : Executed SQL script from class path resource [org/springframework/batch/core/schema-hsqldb.sql] in 7 ms. 2016-06-04 21:37:15.663 INFO 3312 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup 2016-06-04 21:37:15.673 INFO 3312 --- [ main] o.s.b.a.b.JobLauncherCommandLineRunner : Running default command line with: [] 2016-06-04 21:37:15.678 INFO 3312 --- [ main] o.s.b.c.r.s.JobRepositoryFactoryBean : No database type set, using meta data indicating: HSQL 2016-06-04 21:37:15.804 INFO 3312 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : No TaskExecutor has been set, defaulting to synchronous executor. 2016-06-04 21:37:15.856 INFO 3312 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=FxMarket Volume ETL Job]] launched with the following parameters: [{run.id=1}] 2016-06-04 21:37:15.877 INFO 3312 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [Extract -> Transform -> Aggregate -> Load] 2016-06-04 21:37:21.015 INFO 3312 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=FxMarket Volume ETL Job]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED] 2016-06-04 21:37:21.016 INFO 3312 --- [ main] com.quantvalley.batch.Application : Started Application in 6.776 seconds (JVM running for 7.108) 2016-06-04 21:37:21.017 INFO 3312 --- [ Thread-1] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@45afc369: startup date [Sat Jun 04 21:37:14 EET 2016]; root of context hierarchy 2016-06-04 21:37:21.018 INFO 3312 --- [ Thread-1] o.s.j.e.a.AnnotationMBeanExporter : Unregistering JMX-exposed beans on shutdown 2016-06-04 21:37:21.019 INFO 3312 --- [ Thread-1] o.s.j.d.e.EmbeddedDatabaseFactory : Shutting down embedded database: url='jdbc:hsqldb:mem:testdb'
5.2. Job running without multithreaded step
. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v1.3.3.RELEASE) 2016-06-04 21:38:55.821 INFO 3484 --- [ main] com.quantvalley.batch.Application : Starting Application on HP-ProBook with PID 3484 (started by ashraf in /home/ashraf/jcg/examples/Spring Batch Multithreading Example/spring-batch-multithreading-example) 2016-06-04 21:38:55.823 INFO 3484 --- [ main] com.quantvalley.batch.Application : No active profile set, falling back to default profiles: default 2016-06-04 21:38:55.861 INFO 3484 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@45afc369: startup date [Sat Jun 04 21:38:55 EET 2016]; root of context hierarchy 2016-06-04 21:38:56.348 WARN 3484 --- [ main] o.s.c.a.ConfigurationClassEnhancer : @Bean method ScopeConfiguration.stepScope is non-static and returns an object assignable to Spring's BeanFactoryPostProcessor interface. This will result in a failure to process annotations such as @Autowired, @Resource and @PostConstruct within the method's declaring @Configuration class. Add the 'static' modifier to this method to avoid these container lifecycle issues; see @Bean javadoc for complete details. 2016-06-04 21:38:56.360 WARN 3484 --- [ main] o.s.c.a.ConfigurationClassEnhancer : @Bean method ScopeConfiguration.jobScope is non-static and returns an object assignable to Spring's BeanFactoryPostProcessor interface. This will result in a failure to process annotations such as @Autowired, @Resource and @PostConstruct within the method's declaring @Configuration class. Add the 'static' modifier to this method to avoid these container lifecycle issues; see @Bean javadoc for complete details. 2016-06-04 21:38:56.498 INFO 3484 --- [ main] o.s.j.d.e.EmbeddedDatabaseFactory : Starting embedded database: url='jdbc:hsqldb:mem:testdb', username='sa' 2016-06-04 21:38:57.018 INFO 3484 --- [ main] o.s.jdbc.datasource.init.ScriptUtils : Executing SQL script from class path resource [org/springframework/batch/core/schema-hsqldb.sql] 2016-06-04 21:38:57.026 INFO 3484 --- [ main] o.s.jdbc.datasource.init.ScriptUtils : Executed SQL script from class path resource [org/springframework/batch/core/schema-hsqldb.sql] in 7 ms. 2016-06-04 21:38:57.081 INFO 3484 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup 2016-06-04 21:38:57.096 INFO 3484 --- [ main] o.s.b.a.b.JobLauncherCommandLineRunner : Running default command line with: [] 2016-06-04 21:38:57.104 INFO 3484 --- [ main] o.s.b.c.r.s.JobRepositoryFactoryBean : No database type set, using meta data indicating: HSQL 2016-06-04 21:38:57.253 INFO 3484 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : No TaskExecutor has been set, defaulting to synchronous executor. 2016-06-04 21:38:57.307 INFO 3484 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=FxMarket Volume ETL Job]] launched with the following parameters: [{run.id=1}] 2016-06-04 21:38:57.327 INFO 3484 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [Extract -> Transform -> Aggregate -> Load] 2016-06-04 21:39:06.319 INFO 3484 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=FxMarket Volume ETL Job]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED] 2016-06-04 21:39:06.321 INFO 3484 --- [ main] com.quantvalley.batch.Application : Started Application in 10.677 seconds (JVM running for 11.016) 2016-06-04 21:39:06.322 INFO 3484 --- [ Thread-1] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@45afc369: startup date [Sat Jun 04 21:38:55 EET 2016]; root of context hierarchy 2016-06-04 21:39:06.324 INFO 3484 --- [ Thread-1] o.s.j.e.a.AnnotationMBeanExporter : Unregistering JMX-exposed beans on shutdown 2016-06-04 21:39:06.324 INFO 3484 --- [ Thread-1] o.s.j.d.e.EmbeddedDatabaseFactory : Shutting down embedded database: url='jdbc:hsqldb:mem:testdb'
6. Download the Source Code
This was an example to show how to use Spring Batch Multithreading Job.
You can download the full source code of this example here: SpringBatchMultithreadingExampleCode.zip
Nice article. I have one question…How to decide the fixed number of threads(in this case you mentioned 5).
What will happen if I use 10 threads instead…does it depends on the number of cores or number of threads configured on tomcat server
will this support restart logic?
Reader is also running five times and loading the repeated data for different threads. cant we load all the data at once and the run multiple threads for processor and writer.