Testing Utils

Base TestCases

There are two main test cases available in Sparkly:
  • SparklyTest creates a new session for each test case.
  • SparklyGlobalSessionTest uses a single sparkly session for all test cases to boost performance.
from sparkly import SparklySession
from sparkly.test import SparklyTest


class MyTestCase(SparklyTest):
    session = SparklySession

    def test(self):
        df = self.spark.read_ext.by_url(...)

        # Compare all fields
        self.assertDataFrameEqual(
            actual_df=df,
            expected_data=[
                {'col1': 'row1', 'col2': 1},
                {'col1': 'row2', 'col2': 2},
            ],
        )

        # Compare a subset of fields
        self.assertDataFrameEqual(
            actual_df=df,
            expected_data=[
                {'col1': 'row1'},
                {'col1': 'row2'},
            ],
            fields=['col1'],
        )

...

class MyTestWithReusableSession(SparklyGlobalSessionTest):
    context = SparklySession

    def test(self):
        df = self.spark.read_ext.by_url(...)

...

Fixtures

“Fixture” is a term borrowed from Django framework. Fixtures load data to a database before the test execution.

There are several storages supported in Sparkly:
  • Elastic
  • Cassandra (requires cassandra-driver)
  • Mysql (requires PyMySql)

_ Kafka (requires kafka-python)

from sparkly.test import MysqlFixture, SparklyTest


class MyTestCase(SparklyTest):
    ...
    fixtures = [
        MysqlFixture('mysql.host',
                     'user',
                     'password',
                     '/path/to/setup_data.sql',
                     '/path/to/remove_data.sql')
    ]
    ...
class sparkly.testing.CassandraFixture(host, setup_file, teardown_file)[source]

Fixture to load data into cassandra.

Notes

  • Depends on cassandra-driver.

Examples

>>> class MyTestCase(SparklyTest):
...      fixtures = [
...          CassandraFixture(
...              'cassandra.host',
...              absolute_path(__file__, 'resources', 'setup.cql'),
...              absolute_path(__file__, 'resources', 'teardown.cql'),
...          )
...      ]
...
>>> class MyTestCase(SparklyTest):
...      data = CassandraFixture(
...          'cassandra.host',
...          absolute_path(__file__, 'resources', 'setup.cql'),
...          absolute_path(__file__, 'resources', 'teardown.cql'),
...      )
...      def setUp(self):
...          data.setup_data()
...      def tearDown(self):
...          data.teardown_data()
...
>>> def test():
...     fixture = CassandraFixture(...)
...     with fixture:
...        test_stuff()
...
class sparkly.testing.ElasticFixture(host, es_index, es_type, mapping=None, data=None, port=None)[source]

Fixture for elastic integration tests.

Examples

>>> class MyTestCase(SparklyTest):
...      fixtures = [
...          ElasticFixture(
...              'elastic.host',
...              'es_index',
...              'es_type',
...              '/path/to/mapping.json',
...              '/path/to/data.json',
...          )
...      ]
...
class sparkly.testing.Fixture[source]

Base class for fixtures.

Fixture is a term borrowed from Django tests, it’s data loaded into database for integration testing.

setup_data()[source]

Method called to load data into database.

teardown_data()[source]

Method called to remove data from database which was loaded by setup_data.

class sparkly.testing.KafkaFixture(host, port=9092, topic=None, key_serializer=None, value_serializer=None, data=None)[source]

Fixture for kafka integration tests.

Notes

  • depends on kafka-python lib.
  • json file should contain array of dicts: [{‘key’: ..., ‘value’: ...}]

Examples

>>> class MyTestCase(SparklySession):
...     fixtures = [
...         KafkaFixture(
...             'kafka.host', 'topic',
...             key_serializer=..., value_serializer=...,
...             data='/path/to/data.json',
...         )
...     ]
class sparkly.testing.KafkaWatcher(spark, df_schema, key_deserializer, value_deserializer, host, topic, port=9092)[source]

Context manager that tracks Kafka data published to a topic

Provides access to the new items that were written to a kafka topic by code running within this context.

NOTE: This is mainly useful in integration test cases and may produce unexpected results in production environments, since there are no guarantees about who else may be publishing to a kafka topic.

Usage:

my_deserializer = lambda item: json.loads(item.decode(‘utf-8’)) kafka_watcher = KafkaWatcher(

my_sparkly_session, expected_output_dataframe_schema, my_deserializer, my_deserializer, ‘my.kafkaserver.net’, ‘my_kafka_topic’,

) with kafka_watcher:

# do stuff that publishes messages to ‘my_kafka_topic’

self.assertEqual(kafka_watcher.count, expected_number_of_new_messages) self.assertDataFrameEqual(kafka_watcher.df, expected_df)

class sparkly.testing.MysqlFixture(host, user, password=None, data=None, teardown=None)[source]

Fixture for mysql integration tests.

Notes

  • depends on PyMySql lib.

Examples

>>> class MyTestCase(SparklyTest):
...      fixtures = [
...          MysqlFixture('mysql.host', 'user', 'password', '/path/to/data.sql')
...      ]
...      def test(self):
...          pass
...
class sparkly.testing.SparklyGlobalSessionTest(methodName='runTest')[source]

Base test case that keeps a single instance for the given session class across all tests.

Integration tests are slow, especially when you have to start/stop Spark context for each test case. This class allows you to reuse Spark session across multiple test cases.

class sparkly.testing.SparklyTest(methodName='runTest')[source]

Base test for spark scrip tests.

Initialize and shut down Session specified in session attribute.

Example

>>> class MyTestCase(SparklyTest):
...     def test(self):
...         self.assertDataFrameEqual(
...              self.spark.sql('SELECT 1 as one').collect(),
...              [{'one': 1}],
...         )
assertDataFrameEqual(actual_df, expected_data, fields=None, ordered=False)[source]

Ensure that DataFrame has the right data inside.

Parameters:
  • actual_df (pyspark.sql.DataFrame|list[pyspark.sql.Row]) – Dataframe to test data in.
  • expected_data (list[dict]) – Expected dataframe rows defined as dicts.
  • fields (list[str]) – Compare only certain fields.
  • ordered (bool) – Does order of rows matter?
session

alias of SparklySession