Sparkly Context

About Sparkly Context

SparklyContext class is the main class of the Sparkly library. It encompasses all of this library’s functionality. Most of times you want to subclass it to define the various options you desire through class attributes.

Sparkly context have links to other extras of the lib:

Attribute Link to the doc
read_ext Read/write utilities for DataFrames
hms Hive Metastore Utils

Dataframe pyspark class is also monkey patched with write_ext (Read/write utilities for DataFrames) attribute for convenient writing.

Use cases

Setup Custom options

Why: Sometimes you need to customize your spark context more than default. We prefer to define Spark options declaratively rather than using getter/setters for each option.

For example: some useful usecases of this are:

  • Optimizing shuffling options, like spark.sql.shuffle.partitions
  • Setup custom Hive Metastore instead of local.
  • Package specific options, like spark.hadoop.avro.mapred.ignore.inputs.without.extension
from sparkly import SparklyContext
class OwnSparklyContext(SparklyContext):
    options = {
        # Increasing default amount of partitions for shuffling.
        'spark.sql.shuffle.partitions': 1000,
        # setup remote Hive Metastore.
        'hive.metastore.uris': 'thrift://<host1>:9083,thrift://<host2>:9083',
        # setup avro reader to not ignore files without `avro` extension
        'spark.hadoop.avro.mapred.ignore.inputs.without.extension': 'false',
    }

# you can also overwrite or add some options at initialisation time.
ctx = OwnSparklyContext({ ...initialize-time options... })

# you still can update options later if you need.
ctx.setConf('key', 'value')

Installing spark dependencies

Why: The default mechanism requires that dependencies be declared when the spark job is submitted, typically on the command line. We prefer a code-first approach where dependencies are actually declared as part of the job.

For example: You want to install cassandra connector to read data for one of your tables.

from sparkly import SparklyContext
class OwnSparklyContext(SparklyContext):
    # specifying spark dependencies.
    packages = [
        'datastax:spark-cassandra-connector:1.5.0-M3-s_2.10',
    ]

# dependencies will be installed in context initialization.
ctx = OwnSparklyContext()

# Here is how you now can obtain a Dataframe representing yout cassandra table.
df = ctx.read_ext.by_url('cassandra://<cassandra-host>'
                         '/<db>/<talbe>?consistency=QUORUM&parallelism=16')

Using UDFs

Why: By default to use udfs in Hive queries you need to add jars and specify which udfs you wish to use using verbose Hive queries.

For example: You want to import udfs from (brickhouse)[https://github.com/klout/brickhouse] Hive udfs lib.

from pyspark.sql.types import IntegerType
from sparkly import SparklyContext

def my_own_udf(item):
    return len(item)

class OwnSparklyContext(SparklyContext):
    # specifying spark dependencies.
    jars = [
        '/path/to/brickhouse.jar'
    ]
    udfs = {
        'collect_max': 'brickhouse.udf.collect.CollectMaxUDAF',
        'my_udf': (my_own_udf, IntegerType())
    }

# dependencies will be installed in context initialization.
ctx = OwnSparklyContext()

ctx.sql('SELECT collect_max(amount) FROM my_data GROUP BY ...')
ctx.sql('SELECT my_udf(amount) FROM my_data')
class sparkly.context.SparklyContext(additional_options=None)[source]

Wrapper around HiveContext to simplify definition of options, packages, JARs and UDFs.

Example:

from pyspark.sql.types import IntegerType
import sparkly


class MyContext(sparkly.SparklyContext):
    options = {'spark.sql.shuffle.partitions': '2000'}
    packages = ['com.databricks:spark-csv_2.10:1.4.0']
    jars = ['../path/to/brickhouse-0.7.1.jar']
    udfs = {
        'collect_max': 'brickhouse.udf.collect.CollectMaxUDAF',
        'my_python_udf': (lambda x: len(x), IntegerType()),
    }


hc = MyContext()
hc.read_ext.cassandra(...)
options

dict[str,str] – Configuration options that are passed to SparkConf. See the list of possible options.

packages

list[str] – Spark packages that should be installed. See https://spark-packages.org/

jars

list[str] – Full paths to jar files that we want to include to the context. E.g. a JDBC connector or a library with UDF functions.

udfs

dict[str,str|typing.Callable] – Register UDF functions within the context. Key - a name of the function, Value - either a class name imported from a JAR file

or a tuple with python function and its return type.
has_jar(jar_name)[source]

Check if the jar is available in the context.

Parameters:jar_name (str) – E.g. “mysql-connector-java”
Returns:bool
has_package(package_prefix)[source]

Check if the package is available in the context.

Parameters:package_prefix (str) – E.g. “org.elasticsearch:elasticsearch-spark”
Returns:bool