Welcome to sparkly’s documentation!

Sparkly is a library that makes usage of pyspark more convenient and consistent.

A brief tour on Sparkly features:

# The main entry point is SparklySession,
# you can think of it as of a combination of SparkSession and SparkSession.builder.
from sparkly import SparklySession


# Define dependencies in the code instead of messing with `spark-submit`.
class MySession(SparklySession):
    # Spark packages and dependencies from Maven.
    packages = [
        'datastax:spark-cassandra-connector:2.0.0-M2-s_2.11',
        'mysql:mysql-connector-java:5.1.39',
    ]

    # Jars and Hive UDFs
    jars = ['/path/to/brickhouse-0.7.1.jar'],
    udfs = {
        'collect_max': 'brickhouse.udf.collect.CollectMaxUDAF',
    }


spark = MySession()

# Operate with interchangeable URL-like data source definitions:
df = spark.read_ext.by_url('mysql://<my-sql.host>/my_database/my_database')
df.write_ext('parquet:s3://<my-bucket>/<path>/data?partition_by=<field_name1>')

# Interact with Hive Metastore via convenient python api,
# instead of verbose SQL queries:
spark.catalog_ext.has_table('my_custom_table')
spark.catalog_ext.get_table_properties('my_custom_table')

# Easy integration testing with Fixtures and base test classes.
from pyspark.sql import types as T
from sparkly.testing import SparklyTest


class TestMyShinySparkScript(SparklyTest):
    session = MySession

    fixtures = [
        MysqlFixture('<my-testing-host>', '<test-user>', '<test-pass>', '/path/to/data.sql', '/path/to/clear.sql')
    ]

   def test_job_works_with_mysql(self):
      df = self.spark.read_ext.by_url('mysql://<my-testing-host>/<test-db>/<test-table>?user=<test-usre>&password=<test-password>')
      res_df = my_shiny_script(df)
      self.assertRowsEqual(
         res_df.collect(),
         [T.Row(fieldA='DataA', fieldB='DataB', fieldC='DataC')],
      )

Indices and tables