Introduction
With state-of-the-art machines with a lot of processors, it’s essential that you be able to build software that can get the most out of these resources. What’s the purpose of having a super computer with 16 processors if your software is executed sequentially in a single-thread? Messaging systems, like JMS and AMQP, can be really useful to distribute your processing. However, in some scenarios, these technologies may be overkill, once besides bringing some overhead related to adding a message broker to the architecture, we may just want to efficiently use all the processors of the machine, splitting a task that would be executed in a single-thread in a set of smaller tasks that can be executed in parallel by multiple threads. Prior to Java 5, we had to create/manage manually the Threads, which wasn’t a pleasant task. But with Java 5, things got easier with the new Executor Services.
Examples
In order to make the use of this resource clearer, we’ll be approaching 2 examples.
The first one will handle the execution of a list of unrelated tasks, while the second one will handle the execution of a list of related tasks, meaning that the final result depends on the execution of all the tasks.
In this post we’ll be talking about the first example and, in the next one, the second.
Project
For this project we’ll use maven for managing the dependencies/buid. The pom.xml must be defined as below:
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.wordpress.lucianomolinari</groupId> <artifactId>taskexecutor</artifactId> <version>1.0.0</version> <properties> <junit.version>4.8.2</junit.version> <log4j.version>1.2.17</log4j.version> </properties> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>${log4j.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> <scope>test</scope> </dependency> </dependencies> </project>
Let’s create now the class that represents the task to be executed. Its processing logic is quite simple. It just receives a String and, in order to simulate a heavy task, “hangs” the execution for some milliseconds and then logs the received String.
SimpleTask.java
package com.wordpress.lucianomolinari.taskexecutor.unrelatedtask; import org.apache.log4j.Logger; public final class SimpleTask { private static final Logger logger = Logger.getLogger(SimpleTask.class); private final String message; public SimpleTask(String message) { this.message = message; } public void execute() { // waits 5 milliseconds try { Thread.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } logger.debug("Message processed: " + message); } }
As a test and to analyse the difference in processing time, the same set of tasks will be executed both sequentially and in parallel. So, we’ll define a common interface to be implemented by both executors: Sequential and parallel.
SimpleTaskExecutor.java
package com.wordpress.lucianomolinari.taskexecutor.unrelatedtask; public interface SimpleTaskExecutor { void execute(final SimpleTask simpleTask); }
The sequential implementation is quite simple and just delegates the task execution to the previously defined SimpleTask class.
SimpleTaskExecutorSequential.java
package com.wordpress.lucianomolinari.taskexecutor.unrelatedtask; public class SimpleTaskExecutorSequential implements SimpleTaskExecutor { @Override public void execute(final SimpleTask simpleTask) { simpleTask.execute(); } }
The implementation that executes tasks in parallel makes use of Java Executor, as we can see below:
SimpleTaskExecutorParallel.java
package com.wordpress.lucianomolinari.taskexecutor.unrelatedtask; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class SimpleTaskExecutorParallel implements SimpleTaskExecutor { /** * Creates a pool with 10 threads to execute jobs */ private final ExecutorService executor = Executors.newFixedThreadPool(10); @Override public void execute(final SimpleTask simpleTask) { executor.execute(new Runnable() { @Override public void run() { simpleTask.execute(); } }); } }
Notice that using the class Executors and the interface ExecutorService is quite simple. The newFixedThreadPool() method creates a Thread pool with the number of Threads given as parameter. There are other types of pool and you can find more details in the JavaDoc of the Executors class. After getting hold of the executor, it’s possible to execute objects of both types Runnable and Callable. The main difference between them is that Runnable executes tasks that return void value, while Callable can return a value. The usage of Callable will be described in the next post about this subject. In order to validate the difference of the execution times between the 2 implementations, we’ll create a test class with JUnit that will be responsible for executing 50 tasks using each of them. The check to validate the execution time with each approach will be done through log messages, for the sake of simplicity. Hence, let’s configure log4j so that it shows all the messages in the console with the proper timestamp.
log4j.properties
# Root logger option log4j.rootLogger=DEBUG, stdout # Direct log messages to stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} - %m%n
TestSimpleTaskExecutor.java
package com.wordpress.lucianomolinari.taskexecutor.unrelatedtask; import java.util.ArrayList; import java.util.List; import org.apache.log4j.Logger; import org.junit.Test; public class TestSimpleTaskExecutor { private final Logger logger = Logger.getLogger(TestSimpleTaskExecutor.class); @Test public void testSequential() { logger.debug("--------------"); logger.debug("Executing tasks in a sequential manner..."); SimpleTaskExecutor executor = new SimpleTaskExecutorSequential(); for (SimpleTask task : getListToBeExecuted()) { executor.execute(task); } } @Test public void testParallel() { logger.debug("--------------"); logger.debug("Executing tasks in a parallel manner..."); SimpleTaskExecutor executor = new SimpleTaskExecutorParallel(); for (SimpleTask task : getListToBeExecuted()) { executor.execute(task); } // it is necessary to wait for a moment so the threads can finish their // jobs try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } private List<SimpleTask> getListToBeExecuted() { List<SimpleTask> tasks = new ArrayList<SimpleTask>(); for (int index = 1; index <= 50; index++) { tasks.add(new SimpleTask("Message " + index)); } return tasks; } }
We’ll get the following output after running this class.
23:21:55,062 - -------------- 23:21:55,063 - Executing tasks in a sequential manner... 23:21:55,069 - Message processed: Message 1 23:21:55,074 - Message processed: Message 2 23:21:55,079 - Message processed: Message 3 23:21:55,085 - Message processed: Message 4 23:21:55,090 - Message processed: Message 5 23:21:55,095 - Message processed: Message 6 .... 23:21:55,307 - Message processed: Message 45 23:21:55,312 - Message processed: Message 46 23:21:55,318 - Message processed: Message 47 23:21:55,323 - Message processed: Message 48 23:21:55,328 - Message processed: Message 49 23:21:55,333 - Message processed: Message 50 23:21:55,334 - -------------- 23:21:55,334 - Executing tasks in a parallel manner... 23:21:55,344 - Message processed: Message 1 23:21:55,344 - Message processed: Message 2 23:21:55,344 - Message processed: Message 3 23:21:55,344 - Message processed: Message 4 23:21:55,344 - Message processed: Message 5 ... 23:21:55,365 - Message processed: Message 42 23:21:55,365 - Message processed: Message 48 23:21:55,365 - Message processed: Message 45 23:21:55,365 - Message processed: Message 49 23:21:55,365 - Message processed: Message 50
Notice that in the sequential test, all the messages are logged in the same order as they were created, which doesn’t happen in the parallel test, once there are 10 threads executing the tasks.
What really stands out is the execution time for each scenario. Notice that, in the sequential test, between the first and last message, there’s an interval of 270ms (23:21:55,063 e 23:21:55,333), while in the parallel test this time is only 31ms (23:21:55,334 e 23:21:55,365).
It’s important to notice that the results depend on the machine configurations and the number of processors it has, hence this time can be worse or even better.
The source code can be found in the GitHub.
In the next post we’ll keep talking about this subject, see you.
Olá Luciano, tudo bem?
Achei muito legal seu artigo sobre processamento assíncrono com esta interface ExecutorServices e até cheguei a implementar um exemplo baseado na sua explicação usando a implementação newSingleThreadExecutor(). Porém, encontrei um probleminha e não consigo entender o que está acontecendo. Poderia me ajudar? O problema é o seguinte: coloquei o processamento “pesado” no submit implementando Callable e outra thread(do swing) chama o método get(). Notei que se a execução chegar primeiro em get() antes de concluir a execução do submit() ocorre o travamento das duas. Caso a execução do submit() ocorra primeiro da chamada do get() não há problema.
Obrigado pela atenção
Lintz.
LikeLike
Olá Lintz,
O processamento pesado ficou dentro do método call() do Callable?
Muito estranho seu problema, nunca passei por isso..
LikeLike