Testing Utils¶
Base TestCases¶
- There are two main test cases available in Sparkly:
SparklyTest
creates a new session for each test case.SparklyGlobalSessionTest
uses a single sparkly session for all test cases to boost performance.
from sparkly import SparklySession
from sparkly.test import SparklyTest
class MyTestCase(SparklyTest):
session = SparklySession
def test(self):
df = self.spark.read_ext.by_url(...)
# Compare all fields
self.assertDataFrameEqual(
actual_df=df,
expected_data=[
{'col1': 'row1', 'col2': 1},
{'col1': 'row2', 'col2': 2},
],
)
# Compare a subset of fields
self.assertDataFrameEqual(
actual_df=df,
expected_data=[
{'col1': 'row1'},
{'col1': 'row2'},
],
fields=['col1'],
)
...
class MyTestWithReusableSession(SparklyGlobalSessionTest):
context = SparklySession
def test(self):
df = self.spark.read_ext.by_url(...)
...
Fixtures¶
“Fixture” is a term borrowed from Django framework. Fixtures load data to a database before the test execution.
- There are several storages supported in Sparkly:
- Elastic
- Cassandra (requires
cassandra-driver
) - Mysql (requires
PyMySql
)
_ Kafka (requires
kafka-python
)
from sparkly.test import MysqlFixture, SparklyTest
class MyTestCase(SparklyTest):
...
fixtures = [
MysqlFixture('mysql.host',
'user',
'password',
'/path/to/setup_data.sql',
'/path/to/remove_data.sql')
]
...
-
class
sparkly.testing.
CassandraFixture
(host, setup_file, teardown_file)[source]¶ Fixture to load data into cassandra.
Notes
- Depends on cassandra-driver.
Examples
>>> class MyTestCase(SparklyTest): ... fixtures = [ ... CassandraFixture( ... 'cassandra.host', ... absolute_path(__file__, 'resources', 'setup.cql'), ... absolute_path(__file__, 'resources', 'teardown.cql'), ... ) ... ] ...
>>> class MyTestCase(SparklyTest): ... data = CassandraFixture( ... 'cassandra.host', ... absolute_path(__file__, 'resources', 'setup.cql'), ... absolute_path(__file__, 'resources', 'teardown.cql'), ... ) ... def setUp(self): ... data.setup_data() ... def tearDown(self): ... data.teardown_data() ...
>>> def test(): ... fixture = CassandraFixture(...) ... with fixture: ... test_stuff() ...
-
class
sparkly.testing.
ElasticFixture
(host, es_index, es_type, mapping=None, data=None, port=None)[source]¶ Fixture for elastic integration tests.
Examples
>>> class MyTestCase(SparklyTest): ... fixtures = [ ... ElasticFixture( ... 'elastic.host', ... 'es_index', ... 'es_type', ... '/path/to/mapping.json', ... '/path/to/data.json', ... ) ... ] ...
-
class
sparkly.testing.
Fixture
[source]¶ Base class for fixtures.
Fixture is a term borrowed from Django tests, it’s data loaded into database for integration testing.
-
class
sparkly.testing.
KafkaFixture
(host, port=9092, topic=None, key_serializer=None, value_serializer=None, data=None)[source]¶ Fixture for kafka integration tests.
Notes
- depends on kafka-python lib.
- json file should contain array of dicts: [{‘key’: ..., ‘value’: ...}]
Examples
>>> class MyTestCase(SparklySession): ... fixtures = [ ... KafkaFixture( ... 'kafka.host', 'topic', ... key_serializer=..., value_serializer=..., ... data='/path/to/data.json', ... ) ... ]
-
class
sparkly.testing.
KafkaWatcher
(spark, df_schema, key_deserializer, value_deserializer, host, topic, port=9092)[source]¶ Context manager that tracks Kafka data published to a topic
Provides access to the new items that were written to a kafka topic by code running within this context.
NOTE: This is mainly useful in integration test cases and may produce unexpected results in production environments, since there are no guarantees about who else may be publishing to a kafka topic.
- Usage:
my_deserializer = lambda item: json.loads(item.decode(‘utf-8’)) kafka_watcher = KafkaWatcher(
my_sparkly_session, expected_output_dataframe_schema, my_deserializer, my_deserializer, ‘my.kafkaserver.net’, ‘my_kafka_topic’,) with kafka_watcher:
# do stuff that publishes messages to ‘my_kafka_topic’self.assertEqual(kafka_watcher.count, expected_number_of_new_messages) self.assertDataFrameEqual(kafka_watcher.df, expected_df)
-
class
sparkly.testing.
MysqlFixture
(host, user, password=None, data=None, teardown=None)[source]¶ Fixture for mysql integration tests.
Notes
- depends on PyMySql lib.
Examples
>>> class MyTestCase(SparklyTest): ... fixtures = [ ... MysqlFixture('mysql.host', 'user', 'password', '/path/to/data.sql') ... ] ... def test(self): ... pass ...
-
class
sparkly.testing.
SparklyGlobalSessionTest
(methodName='runTest')[source]¶ Base test case that keeps a single instance for the given session class across all tests.
Integration tests are slow, especially when you have to start/stop Spark context for each test case. This class allows you to reuse Spark session across multiple test cases.
-
class
sparkly.testing.
SparklyTest
(methodName='runTest')[source]¶ Base test for spark scrip tests.
Initialize and shut down Session specified in session attribute.
Example
>>> class MyTestCase(SparklyTest): ... def test(self): ... self.assertDataFrameEqual( ... self.spark.sql('SELECT 1 as one').collect(), ... [{'one': 1}], ... )
-
assertDataFrameEqual
(actual_df, expected_data, fields=None, ordered=False)[source]¶ Ensure that DataFrame has the right data inside.
Parameters: - actual_df (pyspark.sql.DataFrame|list[pyspark.sql.Row]) – Dataframe to test data in.
- expected_data (list[dict]) – Expected dataframe rows defined as dicts.
- fields (list[str]) – Compare only certain fields.
- ordered (bool) – Does order of rows matter?
-
session
¶ alias of
SparklySession
-