Testing Utils

Base TestCases

There are two main test cases available in Sparkly:
  • SparklyTest creates a new session for each test case.
  • SparklyGlobalSessionTest uses a single sparkly session for all test cases to boost performance.
from pyspark.sql import types as T

from sparkly import SparklySession
from sparkly.testing import SparklyTest, SparklyGlobalSessionTest


class MyTestCase(SparklyTest):
    session = SparklySession

    def test(self):
        df = self.spark.read_ext.by_url(...)

        # Compare all fields
        self.assertRowsEqual(
            df.collect(),
            [
                T.Row(col1='row1', col2=1),
                T.Row(col1='row2', col2=2),
            ],
        )

...

class MyTestWithReusableSession(SparklyGlobalSessionTest):
    context = SparklySession

    def test(self):
        df = self.spark.read_ext.by_url(...)

...

DataFrame Assertions

Asserting that the dataframe produced by your transformation is equal to some expected output can be unnecessarily complicated at times. Common issues include:

  • Ignoring the order in which elements appear in an array. This could be particularly useful when that array is generated as part of a groupBy aggregation, and you only care about all elements being part of the end result, rather than the order in which Spark encountered them.
  • Comparing floats that could be arbitrarily nested in complicated datatypes within a given tolerance; exact matching is either fragile or impossible.
  • Ignoring whether a field of a complex datatype is nullable. Spark infers this based on the applied transformations, but it is oftentimes inaccurate. As a result, assertions on complex data types might fail, even though in theory they shouldn’t have.
  • Having rows with different field names compare equal if the values match in alphabetical order of the names (see unit tests for example).
  • Unhelpful diffs in case of mismatches.

Sparkly addresses these issues by providing assertRowsEqual:

from pyspark.sql import types as T

from sparkly import SparklySession
from sparkly.test import SparklyTest


def my_transformation(spark):
    return spark.createDataFrame(
        data=[
            ('row1', {'field': 'value_1'}, [1.1, 2.2, 3.3]),
            ('row2', {'field': 'value_2'}, [4.1, 5.2, 6.3]),
        ],
        schema=T.StructType([
            T.StructField('id', T.StringType()),
            T.StructField(
                'st',
                T.StructType([
                    T.StructField('field', T.StringType()),
                ]),
            ),
            T.StructField('ar', T.ArrayType(T.FloatType())),
        ]),
    )


class MyTestCase(SparklyTest):
    session = SparklySession

    def test(self):
        df = my_transformation(self.spark)

        self.assertRowsEqual(
            df.collect(),
            [
                T.Row(id='row2', st=T.Row(field='value_2'), ar=[6.0, 5.0, 4.0]),
                T.Row(id='row1', st=T.Row(field='value_1'), ar=[2.0, 3.0, 1.0]),
            ],
            atol=0.5,
        )

Instant Iterative Development

The slowest part in Spark integration testing is context initialisation. SparklyGlobalSessionTest allows you to keep the same instance of spark context between different test cases, but it still kills the context at the end. It’s especially annoying if you work in TDD fashion. On each run you have to wait 25-30 seconds till a new context is ready. We added a tool to preserve spark context between multiple test runs.

Note

In case if you change SparklySession definition (new options, jars or packages) you have to refresh the context via sparkly-testing refresh. However, you don’t need to refresh context if udfs are changed.

Fixtures

“Fixture” is a term borrowed from Django framework. Fixtures load data to a database before the test execution.

There are several storages supported in Sparkly:
  • Elastic
  • Cassandra (requires cassandra-driver)
  • Mysql (requires PyMySql)
  • Kafka (requires kafka-python)
from sparkly.test import MysqlFixture, SparklyTest


class MyTestCase(SparklyTest):
    ...
    fixtures = [
        MysqlFixture('mysql.host',
                     'user',
                     'password',
                     '/path/to/setup_data.sql',
                     '/path/to/remove_data.sql')
    ]
    ...
class sparkly.testing.CassandraFixture(host, setup_file, teardown_file)[source]

Fixture to load data into cassandra.

Notes

  • Depends on cassandra-driver.

Examples

>>> class MyTestCase(SparklyTest):
...      fixtures = [
...          CassandraFixture(
...              'cassandra.host',
...              absolute_path(__file__, 'resources', 'setup.cql'),
...              absolute_path(__file__, 'resources', 'teardown.cql'),
...          )
...      ]
...
>>> class MyTestCase(SparklyTest):
...      data = CassandraFixture(
...          'cassandra.host',
...          absolute_path(__file__, 'resources', 'setup.cql'),
...          absolute_path(__file__, 'resources', 'teardown.cql'),
...      )
...      def setUp(self):
...          data.setup_data()
...      def tearDown(self):
...          data.teardown_data()
...
>>> def test():
...     fixture = CassandraFixture(...)
...     with fixture:
...        test_stuff()
...
class sparkly.testing.ElasticFixture(host, es_index, es_type, mapping=None, data=None, port=None)[source]

Fixture for elastic integration tests.

Examples

>>> class MyTestCase(SparklyTest):
...      fixtures = [
...          ElasticFixture(
...              'elastic.host',
...              'es_index',
...              'es_type',
...              '/path/to/mapping.json',
...              '/path/to/data.json',
...          )
...      ]
...
class sparkly.testing.Fixture[source]

Base class for fixtures.

Fixture is a term borrowed from Django tests, it’s data loaded into database for integration testing.

setup_data()[source]

Method called to load data into database.

teardown_data()[source]

Method called to remove data from database which was loaded by setup_data.

class sparkly.testing.KafkaFixture(host, port=9092, topic=None, key_serializer=None, value_serializer=None, data=None)[source]

Fixture for kafka integration tests.

Notes

  • depends on kafka-python lib.
  • json file should contain array of dicts: [{‘key’: ..., ‘value’: ...}]

Examples

>>> class MyTestCase(SparklySession):
...     fixtures = [
...         KafkaFixture(
...             'kafka.host', 'topic',
...             key_serializer=..., value_serializer=...,
...             data='/path/to/data.json',
...         )
...     ]
class sparkly.testing.KafkaWatcher(spark, df_schema, key_deserializer, value_deserializer, host, topic, port=9092)[source]

Context manager that tracks Kafka data published to a topic

Provides access to the new items that were written to a kafka topic by code running within this context.

NOTE: This is mainly useful in integration test cases and may produce unexpected results in production environments, since there are no guarantees about who else may be publishing to a kafka topic.

Usage:

my_deserializer = lambda item: json.loads(item.decode(‘utf-8’)) kafka_watcher = KafkaWatcher(

my_sparkly_session, expected_output_dataframe_schema, my_deserializer, my_deserializer, ‘my.kafkaserver.net’, ‘my_kafka_topic’,

) with kafka_watcher:

# do stuff that publishes messages to ‘my_kafka_topic’

self.assertEqual(kafka_watcher.count, expected_number_of_new_messages) self.assertDataFrameEqual(kafka_watcher.df, expected_df)

class sparkly.testing.MysqlFixture(host, user, password=None, data=None, teardown=None)[source]

Fixture for mysql integration tests.

Notes

  • depends on PyMySql lib.

Examples

>>> class MyTestCase(SparklyTest):
...      fixtures = [
...          MysqlFixture('mysql.host', 'user', 'password', '/path/to/data.sql')
...      ]
...      def test(self):
...          pass
...
class sparkly.testing.SparklyGlobalSessionTest(methodName='runTest')[source]

Base test case that keeps a single instance for the given session class across all tests.

Integration tests are slow, especially when you have to start/stop Spark context for each test case. This class allows you to reuse Spark session across multiple test cases.

class sparkly.testing.SparklyTest(methodName='runTest')[source]

Base test for spark scrip tests.

Initialize and shut down Session specified in session attribute.

Example

>>> from pyspark.sql import types as T
>>> class MyTestCase(SparklyTest):
...     def test(self):
...         self.assertRowsEqual(
...              self.spark.sql('SELECT 1 as one').collect(),
...              [T.Row(one=1)],
...         )
assertDataFrameEqual(actual_df, expected_data, fields=None, ordered=False)[source]

Ensure that DataFrame has the right data inside.

assertDataFrameEqual is being deprecated. Please use assertRowsEqual instead.

Parameters:
  • actual_df (pyspark.sql.DataFrame|list[pyspark.sql.Row]) – Dataframe to test data in.
  • expected_data (list[dict]) – Expected dataframe rows defined as dicts.
  • fields (list[str]) – Compare only certain fields.
  • ordered (bool) – Does order of rows matter?
assertRowsEqual(first, second, msg=None, ignore_order=True, ignore_order_depth=None, atol=0, rtol=1e-07, equal_nan=True, ignore_nullability=True)[source]

Assert equal on steroids.

Extend this classic function signature to work better with comparisons involving rows, datatypes, dictionaries, lists and floats by:

  • ignoring the order of lists and datatypes recursively,
  • comparing floats within a given tolerance,
  • assuming NaNs are equal,
  • ignoring the nullability requirements of datatypes (since Spark can be inaccurate when inferring it),
  • providing better diffs for rows and datatypes.

Float comparisons are inspired by NumPy’s assert_allclose. The main formula used is | float1 - float2 | <= atol + rtol * float2.

Parameters:
  • first – see unittest.TestCase.assertEqual.
  • second – see unittest.TestCase.assertEqual.
  • msg – see unittest.TestCase.assertEqual.
  • ignore_order (bool|True) – ignore the order in lists and datatypes (rows, dicts are inherently orderless).
  • ignore_order_depth (int|None) – if ignore_order is true, do ignore order up to this level of nested lists or datatypes (exclusive). Setting this to 0 or None means ignore order infinitely, 1 means ignore order only at the top level, 2 will ignore order within lists of lists and so on. Default is ignore order arbitrarily deep.
  • atol (int, float|0) – Absolute tolerance in float comparisons.
  • rtol (int, float|1e-07) – Relative tolerance in float comparisons.
  • equal_nan (bool|True) – If set, NaNs will compare equal.
  • ignore_nullability (bool|True) – If set, ignore all nullability fields in dataTypes. This includes containsNull in arrays, valueContainsNull in maps and nullable in struct fields.
Returns:

None iff the two objects are equal.

Raises
AssertionError: iff the two objects are not equal. See unittest.TestCase.assertEqual for details.
session

alias of SparklySession