Testing Utils¶
Base TestCases¶
- There are two main test cases available in Sparkly:
SparklyTest
creates a new session for each test case.SparklyGlobalSessionTest
uses a single sparkly session for all test cases to boost performance.
from sparkly import SparklySession
from sparkly.test import SparklyTest
class MyTestCase(SparklyTest):
session = SparklySession
def test(self):
df = self.spark.read_ext.by_url(...)
# Compare all fields
self.assertDataFrameEqual(
actual_df=df,
expected_data=[
{'col1': 'row1', 'col2': 1},
{'col1': 'row2', 'col2': 2},
],
)
# Compare a subset of fields
self.assertDataFrameEqual(
actual_df=df,
expected_data=[
{'col1': 'row1'},
{'col1': 'row2'},
],
fields=['col1'],
)
...
class MyTestWithReusableSession(SparklyGlobalSessionTest):
context = SparklySession
def test(self):
df = self.spark.read_ext.by_url(...)
...
Instant Iterative Development¶
The slowest part in Spark integration testing is context initialisation.
SparklyGlobalSessionTest
allows you to keep the same instance of spark context between different test cases,
but it still kills the context at the end. It’s especially annoying if you work in TDD fashion.
On each run you have to wait 25-30 seconds till a new context is ready.
We added a tool to preserve spark context between multiple test runs.
Note
In case if you change SparklySession
definition (new options, jars or packages)
you have to refresh the context via sparkly-testing refresh
.
However, you don’t need to refresh context if udfs
are changed.
Fixtures¶
“Fixture” is a term borrowed from Django framework. Fixtures load data to a database before the test execution.
- There are several storages supported in Sparkly:
- Elastic
- Cassandra (requires
cassandra-driver
) - Mysql (requires
PyMySql
)
_ Kafka (requires
kafka-python
)
from sparkly.test import MysqlFixture, SparklyTest
class MyTestCase(SparklyTest):
...
fixtures = [
MysqlFixture('mysql.host',
'user',
'password',
'/path/to/setup_data.sql',
'/path/to/remove_data.sql')
]
...
-
class
sparkly.testing.
CassandraFixture
(host, setup_file, teardown_file)[source]¶ Fixture to load data into cassandra.
Notes
- Depends on cassandra-driver.
Examples
>>> class MyTestCase(SparklyTest): ... fixtures = [ ... CassandraFixture( ... 'cassandra.host', ... absolute_path(__file__, 'resources', 'setup.cql'), ... absolute_path(__file__, 'resources', 'teardown.cql'), ... ) ... ] ...
>>> class MyTestCase(SparklyTest): ... data = CassandraFixture( ... 'cassandra.host', ... absolute_path(__file__, 'resources', 'setup.cql'), ... absolute_path(__file__, 'resources', 'teardown.cql'), ... ) ... def setUp(self): ... data.setup_data() ... def tearDown(self): ... data.teardown_data() ...
>>> def test(): ... fixture = CassandraFixture(...) ... with fixture: ... test_stuff() ...
-
class
sparkly.testing.
ElasticFixture
(host, es_index, es_type, mapping=None, data=None, port=None)[source]¶ Fixture for elastic integration tests.
Examples
>>> class MyTestCase(SparklyTest): ... fixtures = [ ... ElasticFixture( ... 'elastic.host', ... 'es_index', ... 'es_type', ... '/path/to/mapping.json', ... '/path/to/data.json', ... ) ... ] ...
-
class
sparkly.testing.
Fixture
[source]¶ Base class for fixtures.
Fixture is a term borrowed from Django tests, it’s data loaded into database for integration testing.
-
class
sparkly.testing.
KafkaFixture
(host, port=9092, topic=None, key_serializer=None, value_serializer=None, data=None)[source]¶ Fixture for kafka integration tests.
Notes
- depends on kafka-python lib.
- json file should contain array of dicts: [{‘key’: ..., ‘value’: ...}]
Examples
>>> class MyTestCase(SparklySession): ... fixtures = [ ... KafkaFixture( ... 'kafka.host', 'topic', ... key_serializer=..., value_serializer=..., ... data='/path/to/data.json', ... ) ... ]
-
class
sparkly.testing.
KafkaWatcher
(spark, df_schema, key_deserializer, value_deserializer, host, topic, port=9092)[source]¶ Context manager that tracks Kafka data published to a topic
Provides access to the new items that were written to a kafka topic by code running within this context.
NOTE: This is mainly useful in integration test cases and may produce unexpected results in production environments, since there are no guarantees about who else may be publishing to a kafka topic.
- Usage:
my_deserializer = lambda item: json.loads(item.decode(‘utf-8’)) kafka_watcher = KafkaWatcher(
my_sparkly_session, expected_output_dataframe_schema, my_deserializer, my_deserializer, ‘my.kafkaserver.net’, ‘my_kafka_topic’,) with kafka_watcher:
# do stuff that publishes messages to ‘my_kafka_topic’self.assertEqual(kafka_watcher.count, expected_number_of_new_messages) self.assertDataFrameEqual(kafka_watcher.df, expected_df)
-
class
sparkly.testing.
MysqlFixture
(host, user, password=None, data=None, teardown=None)[source]¶ Fixture for mysql integration tests.
Notes
- depends on PyMySql lib.
Examples
>>> class MyTestCase(SparklyTest): ... fixtures = [ ... MysqlFixture('mysql.host', 'user', 'password', '/path/to/data.sql') ... ] ... def test(self): ... pass ...
-
class
sparkly.testing.
SparklyGlobalSessionTest
(methodName='runTest')[source]¶ Base test case that keeps a single instance for the given session class across all tests.
Integration tests are slow, especially when you have to start/stop Spark context for each test case. This class allows you to reuse Spark session across multiple test cases.
-
class
sparkly.testing.
SparklyTest
(methodName='runTest')[source]¶ Base test for spark scrip tests.
Initialize and shut down Session specified in session attribute.
Example
>>> class MyTestCase(SparklyTest): ... def test(self): ... self.assertDataFrameEqual( ... self.spark.sql('SELECT 1 as one').collect(), ... [{'one': 1}], ... )
-
assertDataFrameEqual
(actual_df, expected_data, fields=None, ordered=False)[source]¶ Ensure that DataFrame has the right data inside.
Parameters: - actual_df (pyspark.sql.DataFrame|list[pyspark.sql.Row]) – Dataframe to test data in.
- expected_data (list[dict]) – Expected dataframe rows defined as dicts.
- fields (list[str]) – Compare only certain fields.
- ordered (bool) – Does order of rows matter?
-
session
¶ alias of
SparklySession
-