Continuing with the subject presented in the first part of this article, let’s talk a little more about asynchronous processing with ExecutorServices. In this second part, let’s approach the following use case.
Our software will receive and process a batch of transactions. After processing this batch, the system must create a “summary” of the processing, pointing individually the number of processed transactions successfully and with error. This summary could be sent to some other system, for example, that could then analyze the success rate of this transactions batch or something like that.
Let’s start by defining the class that represents the transaction. This is quite a simple class, that holds an id and a method responsible for performing the transaction processing and returning a boolean value indicating whether the transaction was executed successfully or not.
Transaction.java
package com.wordpress.lucianomolinari.taskexecutor.relatedtask; import org.apache.log4j.Logger; import com.wordpress.lucianomolinari.taskexecutor.unrelatedtask.SimpleTask; public final class Transaction { private final Long id; private static final Logger logger = Logger.getLogger(SimpleTask.class); public Transaction(Long id) { this.id = id; } /** * Returns a boolean indicating if the transaction was processed * successfully * * @return */ public boolean process() { // waits 5 milliseconds(simulates a slow task, such as accessing a remote system) try { Thread.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } // error every 10 transactions boolean success = id % 10 != 0; logger.debug("Transaction processed: " + id + "; Result: " + success); return success; } public Long getId() { return id; } @Override public String toString() { return "Transaction [id=" + id + "]"; } }
As the final result of our system must be a summary of the batch execution, let’s create a class to hold this data.
ExecutionSummary.java
package com.wordpress.lucianomolinari.taskexecutor.relatedtask; public class ExecutionSummary { private int numberOfOk; private int numberOfError; public void addOk() { numberOfOk++; } public void addError() { numberOfError++; } public void addExecutionSummary(ExecutionSummary executionSummary) { numberOfOk += executionSummary.getNumberOfOk(); numberOfError += executionSummary.getNumberOfError(); } public int getNumberOfOk() { return numberOfOk; } public int getNumberOfError() { return numberOfError; } @Override public String toString() { return "ExecutionSummary [numberOfOk=" + numberOfOk + ", numberOfError=" + numberOfError + "]"; } }
Now that we have the classes used as input and output in the system, we can start to tackle the processing code. As the main goal of this article is to show the differences between sequential and parallel processing, let’s define a common interface and one implementation for each one of the processing modes. The interface is pretty straightforward, and it will receive a list of transactions to be processed and will return the execution summary.
TransactionExecutor.java
package com.wordpress.lucianomolinari.taskexecutor.relatedtask; import java.util.List; public interface TransactionExecutor { /** * Executes a {@link List} of {@link Transaction} and returns its execution * summary, containing the number of transactions which were executed with * success and with error * * @param transactions * @return */ ExecutionSummary execute(List<Transaction> transactions); }
The sequential implementation basically iterates over the list and, as it processes the transactions, it will compute the results.
TransactionExecutorSequential.java
package com.wordpress.lucianomolinari.taskexecutor.relatedtask; import java.util.List; public class TransactionExecutorSequential implements TransactionExecutor { @Override public ExecutionSummary execute(List<Transaction> transactions) { ExecutionSummary summary = new ExecutionSummary(); for (Transaction transaction : transactions) { if (transaction.process()) { summary.addOk(); } else { summary.addError(); } } return summary; } }
The parallel implementation is somewhat more complex. The idea is to split the main transactions batch into smaller batches, then to execute each one of these batches in different Threads, get the partial result of the processing for each batch and, finally, compute the final result over this data. The picture bellow illustrates this idea.
The interesting point is, as the processes are being executed in parallel, we need to use some mechanism to know when the threads finished their executions and only then get the partial results to perform the consolidation.
The ExecutorServices framework provides all the resources we need, as we can see in the class below. Just like as we did in the first part of this post, we are creating a pool of 10 threads to execute the tasks. Thus, in the beginning of the execute() method we calculate the number of transactions that each sub-batch should contain, taking into account 10 sub-batches (1 for each Thread of the pool). Having done that, we can execute each one of these batches. Unlike the first part of this post, where we used the Runnable interface to execute the tasks, now we are using the Callable interface. The main difference is that the later allows us to return a result at the end of its execution, like a regular method (remember that we need to get the partial result of the execution of each process).
But then, there’s another issue: If the processes are being executed in other Threads, how can we get these results? The “magic” resides in the Future object returned by the Callable interface. As the name implies, this object allows us to get the result in the “future”. This way, the main process stores each one of the 10 Future objects in a list so that it can get the results later.
And that’s exactly what the last loop does. The get() method of the Future object “hangs” until the Thread associated to it completes its execution. This way, this simple loop guarantees that it will only be completed when all the Threads have finished their executions. In the end, the program consolidates all the results in a single final result.
TransactionExecutorParallel.java
package com.wordpress.lucianomolinari.taskexecutor.relatedtask; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class TransactionExecutorParallel implements TransactionExecutor { private final ExecutorService executor = Executors.newFixedThreadPool(10); @Override public ExecutionSummary execute(List<Transaction> transactions) { int numberOfElementsPerList = transactions.size() / 10; List<Future<ExecutionSummary>> summariesToBeGet = new ArrayList<Future<ExecutionSummary>>(); int iniPos = 0; for (int i = 0; i < 10; i++) { final List<Transaction> subList = transactions.subList(iniPos, iniPos + numberOfElementsPerList); Future<ExecutionSummary> future = executor.submit(new Callable<ExecutionSummary>() { @Override public ExecutionSummary call() throws Exception { ExecutionSummary summary = new ExecutionSummary(); for (Transaction transaction : subList) { if (transaction.process()) { summary.addOk(); } else { summary.addError(); } } return summary; } }); summariesToBeGet.add(future); iniPos += numberOfElementsPerList; } ExecutionSummary summaryFinal = new ExecutionSummary(); for (Future<ExecutionSummary> futureSummary : summariesToBeGet) { try { summaryFinal.addExecutionSummary(futureSummary.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } return summaryFinal; } }
In order to check the difference between the sequential and parallel implementations, we will use a test class.
TestTransactionExecutor.java
package com.wordpress.lucianomolinari.taskexecutor.relatedtask; import static org.junit.Assert.assertEquals; import java.util.ArrayList; import java.util.List; import org.apache.log4j.Logger; import org.junit.Test; public class TestTransactionExecutor { private final Logger logger = Logger.getLogger(TestTransactionExecutor.class); @Test public void testSequential() { doTest(new TransactionExecutorSequential()); } @Test public void testParallel() { doTest(new TransactionExecutorParallel()); } private void doTest(TransactionExecutor transactionExecutor) { logger.debug("--------------"); List<Transaction> transactions = getListOfTransactions(); logger.debug("Executing transactions using " + transactionExecutor.getClass().getSimpleName()); ExecutionSummary summary = transactionExecutor.execute(transactions); logger.debug("Transactions executed successfully"); assertEquals(90, summary.getNumberOfOk()); assertEquals(10, summary.getNumberOfError()); } private List<Transaction> getListOfTransactions() { List<Transaction> transactions = new ArrayList<Transaction>(); for (int i = 1; i <= 100; i++) { transactions.add(new Transaction(Long.valueOf(i))); } return transactions; } }
The following output is obtained when running the test class.
23:59:05,668 - -------------- 23:59:05,670 - Executing transactions using TransactionExecutorSequential 23:59:05,675 - Transaction processed: 1; Result: true 23:59:05,680 - Transaction processed: 2; Result: true 23:59:05,686 - Transaction processed: 3; Result: true 23:59:05,691 - Transaction processed: 4; Result: true .... 23:59:06,188 - Transaction processed: 97; Result: true 23:59:06,194 - Transaction processed: 98; Result: true 23:59:06,199 - Transaction processed: 99; Result: true 23:59:06,204 - Transaction processed: 100; Result: false 23:59:06,204 - Transactions executed successfully 23:59:06,217 - -------------- 23:59:06,217 - Executing transactions using TransactionExecutorParallel 23:59:06,224 - Transaction processed: 1; Result: true 23:59:06,224 - Transaction processed: 11; Result: true 23:59:06,224 - Transaction processed: 21; Result: true 23:59:06,224 - Transaction processed: 31; Result: true ... 23:59:06,272 - Transaction processed: 80; Result: false 23:59:06,272 - Transaction processed: 20; Result: false 23:59:06,272 - Transaction processed: 90; Result: false 23:59:06,273 - Transaction processed: 100; Result: false 23:59:06,273 - Transactions executed successfully
Just like in the fist part of the article, see that the parallel processing(56ms) was way faster then the sequential one(534ms).
Conclusion
The technique applied in this second part of the article resembles the Map Reduce concept, where the processing of a task is split among several servers and, in the end, all the results are consolidated. However, Map Reduce is much more powerful when you need to process large amounts of data, as it allows the processing to be executed in multiple servers, while the example showed here just allows multiple processes in a single server to be used. Nevertheless, as we could see, the implementation described is quite simple and can be efficiently used in scenarios where the data volume is not that big!
Another point to notice is that this technique is known as fork-join and was introduced as a native feature in the Java 7. I preferred to use ExecutorServices throughout this article, but the concept is the same.
I hope this 2-parts article has helped you to gain knowledge about how ExecutorServices can be useful when it comes to use all the available resources in the machines that have multiple CPUs/cores. As we could see, the benefits in performance can be expressive.
The complete source code can be obtained in the GitHub.