Building a service using Akka HTTP and Redis – Part 1 of 2

This article is split in 2 parts and it aims to show how to create a small application (or a microservice if you prefer) using Akka HTTP (Scala) and Redis database.The application is very focused and provides means for a customer to be added, removed and retrieved.

In terms of technologies/tools/libs, the following ones will be used:

  • Scala
  • SBT
  • Akka HTTP/Akka HTTP Json Support/Akka HTTP Test kit
  • Rediscala
  • Scalatest
  • Redis database

In this first part, we’ll focus on the layer responsible for communicating with Redis, while the second part will focus on Akka HTTP.

The project is based on SBT and its build.sbt file looks like this:

build.sbt
organization := "com.lucianomolinari"

name := "akkahttp_redis"

version := "1.0"

scalaVersion := "2.11.7"

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-http-core" % "2.4.7",
  "com.typesafe.akka" %% "akka-http-experimental" % "2.4.7",
  "com.typesafe.akka" %% "akka-http-spray-json-experimental" % "2.4.7",
  "com.typesafe.akka" %% "akka-http-testkit" % "2.4.7" % "test",
  "org.scalatest" %% "scalatest" % "2.2.6" % "test",
  "com.github.etaty" %% "rediscala" % "1.6.0"
)

Now that we have the project setup ready, we can start to create our app by looking into the repository layer.

Managing customers on a Redis Database

Redis is a key-value in-memory database that natively supports a lot of data structures and offers features like replication, on-disk persistence, high availability, among others. Throughout this article we’ll be using just a few structures, like counters and hashes. You can download Redis databse and check the installation instructions here. From now on, this article will consider you have Redis up and running on your environment.

In order to connect to Redis using Scala, the rediscala client will be used. This client provides non-blocking and asynchronous I/O operations and uses Akka Actors at its core. This means that all the operations executed on Redis using rediscala will wrap the response on Scala Futures and our application will be responsible for dealing with them.

Before creating our repository layer, let’s define our simple domain model.

Customer.scala
package com.lucianomolinari.akkahttpredis

/**
  * Represents the domain model of a Customer.
  * <p>
  * A Customer only has an id once he's persisted, so his id field is of type Option.
  */
case class Customer(id: Option[Long], name: String, age: Int) {

  def this(name: String, age: Int) = this(None, name, age)

}

The Customer case class contains the attributes id, name and age. The id is only filled only when the customer is saved in the DB, that’s why it’s mapped as an Option and why a second constructor is also provided.

With the Customer class defined, we can proceed with the creation of the CustomerRepository class, as seen below:

package com.lucianomolinari.akkahttpredis

import akka.actor.ActorSystem
import akka.util.ByteString
import redis.ByteStringSerializer._
import redis.RedisClient

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent._

/**
  * Class responsible for managing [[Customer]] instances in a Redis Database.
  * <p>
  * It's important to notice that all operations performed in this class are non-blocking and, thus,
  * all of the methods wrap their responses in [[Future]] objects. This is because rediscala supports
  * non-blocking operations.
  *
  * @param actorSystem Implicit [[ActorSystem]]. Required by rediscala client.
  */
class CustomerRepository(implicit actorSystem: ActorSystem) {

  /**
    * Client holding a connection towards Redis Database server.
    */
  val redis = RedisClient()

  /**
    * Responsible for persisting a [[Customer]]. It first finds out the next ID to be used, sets it in the
    * customer and then persists him in the DB.
    *
    * @param customer The [[Customer]] to be persisted.
    * @return A [[Future]] with the [[Customer]] persisted, including his ID.
    */
  def add(customer: Customer): Future[Customer] = {
    getNextId flatMap { id =>
      val customerWithId = customer.copy(id = Some(id))
      val futureAdded = redis.hmset(getCustomerKey(id), Map("id" -> id.toString, "name" -> customerWithId.name,
        "age" -> customerWithId.age.toString))
      futureAdded map { res => customerWithId }
    }
  }

  /**
    * @param id The ID of the [[Customer]] to be found.
    * @return A [[Future]] with an [[Option]] holding the [[Customer]] found or None.
    */
  def find(id: Long): Future[Option[Customer]] = {
    redis.hgetall(getCustomerKey(id)) map { keysAndValues =>
      if (keysAndValues.isEmpty) None else Some(mapToCustomer(keysAndValues))
    }
  }

  /**
    * Removes the [[Customer]] given by the id.
    *
    * @param id The ID of the [[Customer]] to be removed.
    * @return A [[Future]] wrapping a Boolean value indicating whether the customer was removed or not.
    */
  def remove(id: Long): Future[Boolean] = {
    redis.del(getCustomerKey(id)) map { rowsDeleted =>
      rowsDeleted == 1
    }
  }

  private def getNextId(): Future[Long] = {
    redis.incr("next_customer_id")
  }

  private def mapToCustomer(keysAndValues: Map[String, ByteString]): Customer = {
    Customer(Some(keysAndValues("id").utf8String.toLong), keysAndValues("name").utf8String,
      keysAndValues("age").utf8String.toInt)
  }

  private def getCustomerKey(id: Long) = s"customer:$id"

}

There’s a lot going on here on this class, so let’s go through it step by step:

  • In the class definition, an implicit ActorSystem is received. This ActorSystem is needed by rediscala lib.
  • After that, an instance of RedisClient is instantiated. In this case we’re using the default values, but you can specify things like host, port, password and so on.
  • The method getNextId is responsible for finding out the next ID to be used while inserting a new customer. For this, we use a counter with the Redis command INCR. That call will return a Future wrapping a Long value, that contains the value of the counter after the increment operation happened.
  • The idea of the add method is: get the next available ID and set it in the customer; persist the customer and then return him wrapped in a Future. In order to do that, we need to a create dependency between Future objects, as we first need to wait until the getNextId completes to only then proceed to the next step. For this we can use the flatMap operation, which “takes a function that maps the value to a new future g, and then returns a future which is completed once g is completed.” Within the flatMap operation, we:
    • Create a copy of the customer to be persisted and set his id with the value returned by getNextId.
    • Store the customer object using a Hash on Redis, by using the hmset operation. The key used to persist the customer is a string in the format “customer:<id>”, which makes it easy to retrieve a customer by his id later on.
    • As the previous operation will return a Future wrapping a Boolean (indicating whether Customer was added or not) and we want to return a Future with the persisted customer, we use the map operation to return a Future with the desired type.
  • The find operation returns a Future wrapping an Option that might contain the customer for the given id. We first use the hgetall operation that returns a Map with all the fields names/values of the customer. Then we again map the Future returned into something more useful for our use case: If the Map is empty, None is returned, otherwise it is converted to a Customer instance and returned.
  • The remove operation uses the del command. This command returns a Future with the number of rows removed. As we’re just removing one key, a check is added to make sure the customer was removed. If rowsDeleted is 0, it means no customer was found for that id.

It’s interesting to notice how we can use operations like map and flatMap on Future objects to run chained operations and to convert them to other types of objects. Scala shows that working with Futures can be quite simple and concise.

We could say our repository layer is ready, but that’s not quite true until we have it properly tested, which is what we’ll do in the following class:

CustomerRepositorySpec.scala
package com.lucianomolinari.akkahttpredis


import akka.actor.ActorSystem
import akka.testkit.TestKit
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time._
import org.scalatest.{BeforeAndAfter, Matchers, WordSpecLike}
import redis.RedisClient

import scala.concurrent.Await
import scala.concurrent.duration._

class CustomerRepositorySpec extends TestKit(ActorSystem()) with WordSpecLike with Matchers with ScalaFutures
  with BeforeAndAfter {

  // Overrides default timeout used by ScalaFutures operations, like whenReady
  override implicit def patienceConfig = PatienceConfig(timeout = Span(1, Second))

  // ActorSystem is automatically provided by TestKit
  val customerRepository = new CustomerRepository()

  // DB is cleaned before each test case
  before {
    val redis = RedisClient()
    Await.ready(redis.flushdb(), 1 second)
  }

  "The customer repository" should {

    "return the customer when he exists" in {
      addCustomer(new Customer("John", 30), 1)
      whenReady(customerRepository.find(1)) {
        customer => customer shouldBe Some(Customer(Some(1), "John", 30))
      }
    }

    "return None when the customer doesn't exist" in {
      whenReady(customerRepository.find(1)) {
        customer => customer shouldBe None
      }
    }

    "return false when the customer to be removed exists" in {
      addCustomer(new Customer("John", 30), 1)
      whenReady(customerRepository.remove(1)) {
        res => res shouldBe true
      }
      whenReady(customerRepository.find(1)) {
        customer => customer shouldBe None
      }
    }

    "return false when the customer to be removed doesn't exist" in {
      whenReady(customerRepository.remove(1)) {
        res => res shouldBe false
      }
    }

    "continue to increase the id value even if other customers are removed" in {
      addCustomer(new Customer("John", 30), 1)
      addCustomer(new Customer("Mary", 25), 2)
      whenReady(customerRepository.remove(2)) {
        res => res shouldBe true
      }
      addCustomer(new Customer("Carl", 25), 3)
    }

  }

  def addCustomer(customer: Customer, expectedId: Long) = {
    whenReady(customerRepository.add(customer)) {
      customerAdded => customerAdded shouldBe customer.copy(id = Some(expectedId))
    }
  }

}

The class CustomerRepositorySpec extends a class and mixes a few traits:

  • TestKit: It’s part of the Akka Test library.
  • WordSpecLike: Allows test cases to be written in something close to BDD style. Scalatest provides other styles in case you want to try something different.
  • Matchers: Trait providing a lot of matchers to be used in the tests assertions.
  • ScalaFutures: Makes it easy to test code that returns Future objects.
  • BeforeAndAfter: Provides a hook to allow code to be execute before/after each test case.

A few points to mention about this test class:

  • The patienceConfig is used to override the default timeout used by ScalaFutures’ functions, like whenReady.
  • In the before block, we use the flushdb command to clean up the database before running each one of our test cases. That’s useful to make sure each test is independent from each other.
  • The whenReady function used in all of the test cases is responsible for getting a Future object and invoking the callback operation once it is completed. It waits for some timeout (in this case 1 second, as defined in the patienceConfig) and, if it’s not completed within this time, the test will fail.
  • Notice how Scalatest and its traits provides a very nice and neat way of writing test cases and asserting their results.

Summary

That’s all for the first part of the article. In the second and last part, we’ll see how to use Akka HTTP to expose all of the services created here via REST. See you there.

One thought on “Building a service using Akka HTTP and Redis – Part 1 of 2

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s