Asynchronous processing with ExecutorServices – Part 1

Introduction

With state-of-the-art machines with a lot of processors, it’s essential that you be able to build software that can get the most out of these resources. What’s the purpose of having a super computer with 16 processors if your software is executed sequentially in a single-thread? Messaging systems, like JMS and AMQP, can be really useful to distribute your processing. However, in some scenarios, these technologies may be overkill, once besides bringing some overhead related to adding a message broker to the architecture, we may just want to efficiently use all the processors of the machine, splitting a task that would be executed in a single-thread in a set of smaller tasks that can be executed in parallel by multiple threads. Prior to Java 5, we had to create/manage manually the Threads, which wasn’t a pleasant task. But with Java 5, things got easier with the new Executor Services.

Examples

In order to make the use of this resource clearer, we’ll be approaching 2 examples.
The first one will handle the execution of a list of unrelated tasks, while the second one will handle the execution of a list of related tasks, meaning that the final result depends on the execution of all the tasks.
In this post we’ll be talking about the first example and, in the next one, the second.

Project

For this project we’ll use maven for managing the dependencies/buid. The pom.xml must be defined as below:

pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.wordpress.lucianomolinari</groupId>
	<artifactId>taskexecutor</artifactId>
	<version>1.0.0</version>

	<properties>
		<junit.version>4.8.2</junit.version>
		<log4j.version>1.2.17</log4j.version>
	</properties>

	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>2.3.2</version>
				<configuration>
					<source>1.6</source>
					<target>1.6</target>
				</configuration>
			</plugin>
		</plugins>
	</build>

	<dependencies>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>${log4j.version}</version>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>${junit.version}</version>
			<scope>test</scope>
		</dependency>
	</dependencies>

</project>

Let’s create now the class that represents the task to be executed. Its processing logic is quite simple. It just receives a String and, in order to simulate a heavy task, “hangs” the execution for some milliseconds and then logs the received String.

SimpleTask.java
package com.wordpress.lucianomolinari.taskexecutor.unrelatedtask;

import org.apache.log4j.Logger;

public final class SimpleTask {
	private static final Logger logger = Logger.getLogger(SimpleTask.class);
	private final String message;

	public SimpleTask(String message) {
		this.message = message;
	}

	public void execute() {
		// waits 5 milliseconds
		try {
			Thread.sleep(5);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		logger.debug("Message processed: " + message);
	}

}

As a test and to analyse the difference in processing time, the same set of tasks will be executed both sequentially and in parallel. So, we’ll define a common interface to be implemented by both executors: Sequential and parallel.

SimpleTaskExecutor.java
package com.wordpress.lucianomolinari.taskexecutor.unrelatedtask;

public interface SimpleTaskExecutor {

	void execute(final SimpleTask simpleTask);

}

The sequential implementation is quite simple and just delegates the task execution to the previously defined SimpleTask class.

SimpleTaskExecutorSequential.java
package com.wordpress.lucianomolinari.taskexecutor.unrelatedtask;

public class SimpleTaskExecutorSequential implements SimpleTaskExecutor {

	@Override
	public void execute(final SimpleTask simpleTask) {
		simpleTask.execute();
	}

}

The implementation that executes tasks in parallel makes use of Java Executor, as we can see below:

SimpleTaskExecutorParallel.java
package com.wordpress.lucianomolinari.taskexecutor.unrelatedtask;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SimpleTaskExecutorParallel implements SimpleTaskExecutor {

	/**
	 * Creates a pool with 10 threads to execute jobs
	 */
	private final ExecutorService executor = Executors.newFixedThreadPool(10);

	@Override
	public void execute(final SimpleTask simpleTask) {
		executor.execute(new Runnable() {
			@Override
			public void run() {
				simpleTask.execute();
			}
		});
	}

}

Notice that using the class Executors and the interface ExecutorService is quite simple. The newFixedThreadPool() method creates a Thread pool with the number of Threads given as parameter. There are other types of pool and you can find more details in the JavaDoc of the Executors class. After getting hold of the executor, it’s possible to execute objects of both types Runnable and Callable. The main difference between them is that Runnable executes tasks that return void value, while Callable can return a value. The usage of Callable will be described in the next post about this subject. In order to validate the difference of the execution times between the 2 implementations, we’ll create a test class with JUnit that will be responsible for executing 50 tasks using each of them. The check to validate the execution time with each approach will be done through log messages, for the sake of simplicity. Hence, let’s configure log4j so that it shows all the messages in the console with the proper timestamp.

log4j.properties
# Root logger option
log4j.rootLogger=DEBUG, stdout
 
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} - %m%n
TestSimpleTaskExecutor.java
package com.wordpress.lucianomolinari.taskexecutor.unrelatedtask;

import java.util.ArrayList;
import java.util.List;

import org.apache.log4j.Logger;
import org.junit.Test;

public class TestSimpleTaskExecutor {
	private final Logger logger = Logger.getLogger(TestSimpleTaskExecutor.class);

	@Test
	public void testSequential() {
		logger.debug("--------------");
		logger.debug("Executing tasks in a sequential manner...");
		SimpleTaskExecutor executor = new SimpleTaskExecutorSequential();
		for (SimpleTask task : getListToBeExecuted()) {
			executor.execute(task);
		}
	}

	@Test
	public void testParallel() {
		logger.debug("--------------");
		logger.debug("Executing tasks in a parallel manner...");
		SimpleTaskExecutor executor = new SimpleTaskExecutorParallel();
		for (SimpleTask task : getListToBeExecuted()) {
			executor.execute(task);
		}
		// it is necessary to wait for a moment so the threads can finish their
		// jobs
		try {
			Thread.sleep(100);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	private List<SimpleTask> getListToBeExecuted() {
		List<SimpleTask> tasks = new ArrayList<SimpleTask>();
		for (int index = 1; index <= 50; index++) {
			tasks.add(new SimpleTask("Message " + index));
		}
		return tasks;
	}

}

We’ll get the following output after running this class.

23:21:55,062 - --------------
23:21:55,063 - Executing tasks in a sequential manner...
23:21:55,069 - Message processed: Message 1
23:21:55,074 - Message processed: Message 2
23:21:55,079 - Message processed: Message 3
23:21:55,085 - Message processed: Message 4
23:21:55,090 - Message processed: Message 5
23:21:55,095 - Message processed: Message 6
....
23:21:55,307 - Message processed: Message 45
23:21:55,312 - Message processed: Message 46
23:21:55,318 - Message processed: Message 47
23:21:55,323 - Message processed: Message 48
23:21:55,328 - Message processed: Message 49
23:21:55,333 - Message processed: Message 50

23:21:55,334 - --------------
23:21:55,334 - Executing tasks in a parallel manner...
23:21:55,344 - Message processed: Message 1
23:21:55,344 - Message processed: Message 2
23:21:55,344 - Message processed: Message 3
23:21:55,344 - Message processed: Message 4
23:21:55,344 - Message processed: Message 5
...
23:21:55,365 - Message processed: Message 42
23:21:55,365 - Message processed: Message 48
23:21:55,365 - Message processed: Message 45
23:21:55,365 - Message processed: Message 49
23:21:55,365 - Message processed: Message 50

Notice that in the sequential test, all the messages are logged in the same order as they were created, which doesn’t happen in the parallel test, once there are 10 threads executing the tasks.
What really stands out is the execution time for each scenario. Notice that, in the sequential test, between the first and last message, there’s an interval of 270ms (23:21:55,063 e 23:21:55,333), while in the parallel test this time is only 31ms (23:21:55,334 e 23:21:55,365).
It’s important to notice that the results depend on the machine configurations and the number of processors it has, hence this time can be worse or even better.

The source code can be found in the GitHub.

In the next post we’ll keep talking about this subject, see you.

3 thoughts on “Asynchronous processing with ExecutorServices – Part 1

  1. Olá Luciano, tudo bem?

    Achei muito legal seu artigo sobre processamento assíncrono com esta interface ExecutorServices e até cheguei a implementar um exemplo baseado na sua explicação usando a implementação newSingleThreadExecutor(). Porém, encontrei um probleminha e não consigo entender o que está acontecendo. Poderia me ajudar? O problema é o seguinte: coloquei o processamento “pesado” no submit implementando Callable e outra thread(do swing) chama o método get(). Notei que se a execução chegar primeiro em get() antes de concluir a execução do submit() ocorre o travamento das duas. Caso a execução do submit() ocorra primeiro da chamada do get() não há problema.

    Obrigado pela atenção

    Lintz.

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s