Integration Testing Base Classes

Base testing classes

There are two main testing classes in Sparkly:
  • SparklyTest:
    • Instantiates Sparkly context specified in context attribute.
    • The context will be available via self.hc.
  • SparklyGlobalContextTest:
    • Reuses single SparklyContext for all tests for performance boost.

Example:

from sparkly import SparklyContext
from sparkly.test import SparklyTest

class MyTestCase(SparklyTest):
    context = SparklyContext
    def test(self):
        df = self.hc.read_ext.by_url(...)
        self.assertDataFrameEqual(
            df, [('test_data', 1)], ['name', 'number']
        )

...

class MyTestWithReusableContext(SparklyGlobalContextTest):
    context = SparklyContext
    def test(self):
        df = self.hc.read_ext.by_url(...)

...

Fixtures

Fixtures is term borrowed from testing in Django framework. A fixture will load data to a database upon text execution.

There are couple of databases supported in Sparkly:
  • Mysql (requires: PyMySql)
  • Elastic
  • Cassandra (requires: cassandra-driver)

Example:

from sparkly.test import MysqlFixture, SparklyTest

class MyTestCase(SparklyTest):
    ...
    fixtures = [
        MysqlFixture('mysql.host',
                     'user',
                     'password',
                     '/path/to/setup_data.sql',
                     '/path/to/remove_data.sql')
    ]
    ...
class sparkly.testing.CassandraFixture(host, setup_file, teardown_file)[source]

Fixture to load data into cassandra.

Notes

  • Depends on cassandra-driver.

Examples

>>> class MyTestCase(SparklyTest):
...      fixtures = [
...          CassandraFixture(
...              'cassandra.host',
...              absolute_path(__file__, 'resources', 'setup.cql'),
...              absolute_path(__file__, 'resources', 'teardown.cql'),
...          )
...      ]
...
>>> class MyTestCase(SparklyTest):
...      data = CassandraFixture(
...          'cassandra.host',
...          absolute_path(__file__, 'resources', 'setup.cql'),
...          absolute_path(__file__, 'resources', 'teardown.cql'),
...      )
...      def setUp(self):
...          data.setup_data()
...      def tearDown(self):
...          data.teardown_data()
...
>>> def test():
...     fixture = CassandraFixture(...)
...     with fixture:
...        test_stuff()
...
class sparkly.testing.ElasticFixture(host, es_index, es_type, mapping=None, data=None, port=None)[source]

Fixture for elastic integration tests.

Notes

  • Data upload uses bulk api.

Examples

>>> class MyTestCase(SparklyTest):
...      fixtures = [
...          ElasticFixture(
...              'elastic.host',
...              'es_index',
...              'es_type',
...              '/path/to/mapping.json',
...              '/path/to/data.json',
...          )
...      ]
...
class sparkly.testing.Fixture[source]

Base class for fixtures.

Fixture is a term borrowed from Django tests, it’s data loaded into database for integration testing.

setup_data()[source]

Method called to load data into database.

teardown_data()[source]

Method called to remove data from database which was loaded by setup_data.

class sparkly.testing.KafkaFixture(host, port=9092, topic=None, key_serializer=None, value_serializer=None, data=None)[source]

Fixture for kafka integration tests.

Notes

  • depends on kafka-python lib.
  • json file should contain array of dicts: [{‘key’: ..., ‘value’: ...}]

Examples

>>> class MyTestCase(SparklyContext):
...     fixtures = [
...         KafkaFixture(
...             'kafka.host', 'topic',
...             key_serializer=..., value_serializer=...,
...             data='/path/to/data.json',
...         )
...     ]
class sparkly.testing.MysqlFixture(host, user, password=None, data=None, teardown=None)[source]

Fixture for mysql integration tests.

Notes

  • depends on PyMySql lib.

Examples

>>> class MyTestCase(SparklyTest):
...      fixtures = [
...          MysqlFixture('mysql.host', 'user', 'password', '/path/to/data.sql')
...      ]
...      def test(self):
...          pass
...
class sparkly.testing.SparklyGlobalContextTest(methodName='runTest')[source]

Base test case that keeps a single instance for the given context class across all tests.

Integration tests are slow, especially when you have to start/stop Spark context for each test case. This class allows you to reuse Spark context across multiple test cases.

class sparkly.testing.SparklyTest(methodName='runTest')[source]

Base test for spark scrip tests.

Initializes and shuts down Context specified in context param.

Example

>>> class MyTestCase(SparklyTest):
...     def test(self):
...         self.assertDataFrameEqual(
...              self.hc.sql('SELECT 1 as one').collect(),
...              [{'one': 1}],
...         )
assertDataFrameEqual(actual_df, expected_data, fields=None, ordered=False)[source]

Ensure that DataFrame has the right data inside.

Parameters:
  • actual_df (pyspark.sql.DataFrame|list[pyspark.sql.Row]) – Dataframe to test data in.
  • expected_data (list[dict]) – Expected dataframe rows defined as dicts.
  • fields (list[str]) – Compare only certain fields.
  • ordered (bool) – Does order of rows matter?
context

alias of SparklyContext