Testing Utils

Base TestCases

There are two main test cases available in Sparkly:
  • SparklyTest creates a new session for each test case.
  • SparklyGlobalSessionTest uses a single sparkly session for all test cases to boost performance.
from sparkly import SparklySession
from sparkly.test import SparklyTest


class MyTestCase(SparklyTest):
    session = SparklySession

    def test(self):
        df = self.spark.read_ext.by_url(...)

        # Compare all fields
        self.assertDataFrameEqual(
            actual_df=df,
            expected_data=[
                {'col1': 'row1', 'col2': 1},
                {'col1': 'row2', 'col2': 2},
            ],
        )

        # Compare a subset of fields
        self.assertDataFrameEqual(
            actual_df=df,
            expected_data=[
                {'col1': 'row1'},
                {'col1': 'row2'},
            ],
            fields=['col1'],
        )

...

class MyTestWithReusableSession(SparklyGlobalSessionTest):
    context = SparklySession

    def test(self):
        df = self.spark.read_ext.by_url(...)

...

Instant Iterative Development

The slowest part in Spark integration testing is context initialisation. SparklyGlobalSessionTest allows you to keep the same instance of spark context between different test cases, but it still kills the context at the end. It’s especially annoying if you work in TDD fashion. On each run you have to wait 25-30 seconds till a new context is ready. We added a tool to preserve spark context between multiple test runs.

Note

In case if you change SparklySession definition (new options, jars or packages) you have to refresh the context via sparkly-testing refresh. However, you don’t need to refresh context if udfs are changed.

Fixtures

“Fixture” is a term borrowed from Django framework. Fixtures load data to a database before the test execution.

There are several storages supported in Sparkly:
  • Elastic
  • Cassandra (requires cassandra-driver)
  • Mysql (requires PyMySql)

_ Kafka (requires kafka-python)

from sparkly.test import MysqlFixture, SparklyTest


class MyTestCase(SparklyTest):
    ...
    fixtures = [
        MysqlFixture('mysql.host',
                     'user',
                     'password',
                     '/path/to/setup_data.sql',
                     '/path/to/remove_data.sql')
    ]
    ...
class sparkly.testing.CassandraFixture(host, setup_file, teardown_file)[source]

Fixture to load data into cassandra.

Notes

  • Depends on cassandra-driver.

Examples

>>> class MyTestCase(SparklyTest):
...      fixtures = [
...          CassandraFixture(
...              'cassandra.host',
...              absolute_path(__file__, 'resources', 'setup.cql'),
...              absolute_path(__file__, 'resources', 'teardown.cql'),
...          )
...      ]
...
>>> class MyTestCase(SparklyTest):
...      data = CassandraFixture(
...          'cassandra.host',
...          absolute_path(__file__, 'resources', 'setup.cql'),
...          absolute_path(__file__, 'resources', 'teardown.cql'),
...      )
...      def setUp(self):
...          data.setup_data()
...      def tearDown(self):
...          data.teardown_data()
...
>>> def test():
...     fixture = CassandraFixture(...)
...     with fixture:
...        test_stuff()
...
class sparkly.testing.ElasticFixture(host, es_index, es_type, mapping=None, data=None, port=None)[source]

Fixture for elastic integration tests.

Examples

>>> class MyTestCase(SparklyTest):
...      fixtures = [
...          ElasticFixture(
...              'elastic.host',
...              'es_index',
...              'es_type',
...              '/path/to/mapping.json',
...              '/path/to/data.json',
...          )
...      ]
...
class sparkly.testing.Fixture[source]

Base class for fixtures.

Fixture is a term borrowed from Django tests, it’s data loaded into database for integration testing.

setup_data()[source]

Method called to load data into database.

teardown_data()[source]

Method called to remove data from database which was loaded by setup_data.

class sparkly.testing.KafkaFixture(host, port=9092, topic=None, key_serializer=None, value_serializer=None, data=None)[source]

Fixture for kafka integration tests.

Notes

  • depends on kafka-python lib.
  • json file should contain array of dicts: [{‘key’: ..., ‘value’: ...}]

Examples

>>> class MyTestCase(SparklySession):
...     fixtures = [
...         KafkaFixture(
...             'kafka.host', 'topic',
...             key_serializer=..., value_serializer=...,
...             data='/path/to/data.json',
...         )
...     ]
class sparkly.testing.KafkaWatcher(spark, df_schema, key_deserializer, value_deserializer, host, topic, port=9092)[source]

Context manager that tracks Kafka data published to a topic

Provides access to the new items that were written to a kafka topic by code running within this context.

NOTE: This is mainly useful in integration test cases and may produce unexpected results in production environments, since there are no guarantees about who else may be publishing to a kafka topic.

Usage:

my_deserializer = lambda item: json.loads(item.decode(‘utf-8’)) kafka_watcher = KafkaWatcher(

my_sparkly_session, expected_output_dataframe_schema, my_deserializer, my_deserializer, ‘my.kafkaserver.net’, ‘my_kafka_topic’,

) with kafka_watcher:

# do stuff that publishes messages to ‘my_kafka_topic’

self.assertEqual(kafka_watcher.count, expected_number_of_new_messages) self.assertDataFrameEqual(kafka_watcher.df, expected_df)

class sparkly.testing.MysqlFixture(host, user, password=None, data=None, teardown=None)[source]

Fixture for mysql integration tests.

Notes

  • depends on PyMySql lib.

Examples

>>> class MyTestCase(SparklyTest):
...      fixtures = [
...          MysqlFixture('mysql.host', 'user', 'password', '/path/to/data.sql')
...      ]
...      def test(self):
...          pass
...
class sparkly.testing.SparklyGlobalSessionTest(methodName='runTest')[source]

Base test case that keeps a single instance for the given session class across all tests.

Integration tests are slow, especially when you have to start/stop Spark context for each test case. This class allows you to reuse Spark session across multiple test cases.

class sparkly.testing.SparklyTest(methodName='runTest')[source]

Base test for spark scrip tests.

Initialize and shut down Session specified in session attribute.

Example

>>> class MyTestCase(SparklyTest):
...     def test(self):
...         self.assertDataFrameEqual(
...              self.spark.sql('SELECT 1 as one').collect(),
...              [{'one': 1}],
...         )
assertDataFrameEqual(actual_df, expected_data, fields=None, ordered=False)[source]

Ensure that DataFrame has the right data inside.

Parameters:
  • actual_df (pyspark.sql.DataFrame|list[pyspark.sql.Row]) – Dataframe to test data in.
  • expected_data (list[dict]) – Expected dataframe rows defined as dicts.
  • fields (list[str]) – Compare only certain fields.
  • ordered (bool) – Does order of rows matter?
session

alias of SparklySession