At OVO we are heavily relying on Spark in both it’s batch and stream processing capability to successfully master the challenge of smart meter and real time energy usage data and provide increasing insights for our customers and business.

Streaming solutions come with their own challenges, the biggest of them being able to continuously operate them 24/7 and still maintaining the ability continuously update them. While we can (and will) reason about message delivery patterns, failure handling, state recovery and more sophisticated aspects of building a resilient stream based solution, let’s start with the most important one: How can we structure our Spark application in a way, so it’s easily testable?

Unit Testing

Let’s look at the anatomy of a simple Spark stream that takes the incoming electricity consumption of our customers from kafka for the last time window, produces a grand total and publishes our result:

object TotalElectricityConsumption {
    def consumptionSource: DStream[Consumption] = ???
    def publishElectricityConsumptionTotal: (RDD[BigDecimal], Time) => Unit = ???
    
    def runStream(): Unit {
        consumptionSource
            .flatMap(Consumption.deserialize)
            .filter(Consumption.isElectricityConsumption)
            .reduceByKey(Consumption.extractConsumption)
            .reduce(_ + _)
            .foreachRDD(publishElectricityConsumptionTotal)
    }
}

Please note that the actual sink and source definitions as well as the initial Spark setup were omitted for clarity. When we structure our Spark stream like this, we end up with several pure functions that contain most of our application logic. Those are then naturally combined using Sparks collection-like apis. Pure functions that do not contain any dependencies to the streaming framework are very easy to test - and come with several additional perks - we can reuse most of our code in streams producing different metrics, and document its functionality by giving all steps of the stream a clear name.

Let’s look at an example of a unit test for our filtering method:

"Filtering only electricity consumption" should {
    "return true for electricity" in {
        Consumption.isElectricityConsumption(electricityConsumption) mustEqual true
    }
    
    "return false for gas" in {
        Consumption.isElectricityConsumption(gasConsumption) mustEqual false
    }
}

The test becomes almost trivial with no single dependency on the Spark framework - this structure rewards us with reduced complexity and faster test execution times.

Stream Tests

Now that we now that the individual building blocks of our application are working as intended, the next step is to tie it all together and test the way data flows trough our stream. For the our example, this means verifying whether given a source RDD of consumptions we produce a RDD containing a single grand total of all electricity being consumed right now. With our initial code this is will be quite hard to achieve. But there is a way around this, let’sfactor out the stream transformation logic from the concrete sink and source implementations:

object TotalElectricityConsumption {
    def consumptionSource: DStream[Consumption] = ???
    def publishElectricityConsumptionTotal: (RDD[BigDecimal], Time) => Unit = ???
    
    def createStream(source: DStream[Consumption]): DStream[BigDecimal] = {
        source
            .filter(Consumption.isElectricityConsumption)
            .reduceByKey(Consumption.extractConsumption)
            .reduce(_ + _)
    }
    
    def runStream(): Unit = {
        totalElectricityConsumptionStream(consumptionSource)
            .foreachRDD(publishElectricityConsumptionTotal)
    }
}

Now we can pass an arbitrary source of consumptions to our stream definition. We just need to a test source that emits a defined set of consumptions and compare the result with our expectations. Spark Testing Base by Holden Karau gives us an easy way to do this (https://github.com/holdenk/spark-testing-base) alongside with a couple of additional great tools for testing Spark:

import com.holdenkarau.spark.testing.StreamingSuiteBase

class TotalElectricityConsumptionStreamSpec extends StreamingSuiteBase {

  test("TotalElectricityConsumptionStream must calculate the correct total electricity consumption for a time window") {
    val consumptions: List[List[Consumption]] = ConsumptionFixture.mixedEnergyStream
    val expectations: List[List[BigDecimal]] = List(List(1),List(2),List(4))

    testOperation(consumptions, TotalElectricityConsumption.createStream, expectations)
  }
}

SparkStreamingSuiteBase makes it really easy for us to set up our inputs, where each List[Consumption] represents one time batch of Spark streaming executing. It also takes care of setting up a Spark context that can be shared across multiple test cases - although only sequentially - to speed up test case run times.

With our new program structure, we can assert that we combined our little function building blocks into a correct stream transformation producing a correct grand total of electricity consumption. These types of tests are the core layer to test our business expectations for the stream.

Service Tests

With most of our application logic now covered by either unit or stream tests, there is still a small gap remaining - namely the configuration and interoperability of surrounding systems. In the world of streaming pipelines, this is a significant factor - as the complexity of the pipeline grows, so does the amount of different integrations and configuration options.

In the ideal case, we can produce a test case that can run both against a synthetic test environment, spun up on the fly with tools such as docker, or a real world production deployment. The easiest way to achieve this, is to look for a natural key in the streaming output data, and generate synthetic data only for this. For instance if we were looking at the individual energy consumptions of customers, we could generate a fake customer that we inject data into our stream for and assert it generates results.

For our stream example, this is unfortunately not easily possible, as we would be altering the result of our computation. But for this level of testing, we don’t even want to look at the results in all detail - remember that we already covered most of the details in our unit and stream tests. Essentially there are two things we want to assert: (1) Our stream continuously produces a total electricity consumption of all our customers (2) The result is somewhere within the range of what we expect for our user base

Where we could omit requirement (2).

With this test, we can confirm that all our configuration is correct and our setup works as intended. Service test become very helpful when we make changes to our streaming application - when we deploy new versions - to assert that our deployment was successful. We use high level service tests for every single one of our streaming pipelines to support our continuous delivery process at OVO.

Key takeaways

With the right structure for our Spark Streaming application and a couple of great tools, there is no reason to only use fully fledged test environments with complex test and service setups. Most of our application can be tested in small parts that are easy to reason about, with test case execution speeds that make for fast feedback during test driven development.