Read/write utilities for DataFrames

Sparkly isn’t trying to replace any of existing storage connectors. The goal is to provide a simplified and consistent api across a wide array of storage connectors. We also added the way to work with abstract data sources, so you can keep your code agnostic to the storages you use.

Cassandra

Sparkly relies on the official spark cassandra connector and was successfully tested in production using version 2.0.0-M2.

Package https://spark-packages.org/package/datastax/spark-cassandra-connector
Configuration https://github.com/datastax/spark-cassandra-connector/blob/v2.0.0-M2/doc/reference.md
from sparkly import SparklySession


class MySession(SparklySession):
    # Feel free to play with other versions
    packages = ['datastax:spark-cassandra-connector:2.0.0-M2-s_2.11']

spark = MySession()

# To read data
df = spark.read_ext.cassandra('localhost', 'my_keyspace', 'my_table')
# To write data
df.write_ext.cassandra('localhost', 'my_keyspace', 'my_table')

Elastic

Sparkly relies on the official elastic spark connector and was successfully tested in production using version 5.1.1.

Package https://spark-packages.org/package/elastic/elasticsearch-hadoop
Configuration https://www.elastic.co/guide/en/elasticsearch/hadoop/5.1/configuration.html
from sparkly import SparklySession


class MySession(SparklySession):
    # Feel free to play with other versions
    packages = ['org.elasticsearch:elasticsearch-spark-20_2.11:5.1.1']

spark = MySession()

# To read data
df = spark.read_ext.elastic('localhost', 'my_index', 'my_type', query='?q=awesomeness')
# To write data
df.write_ext.elastic('localhost', 'my_index', 'my_type')

Kafka

Sparkly’s reader and writer for Kafka are built on top of the official spark package for Kafka and python library kafka-python . The first one allows us to read data efficiently, the second covers a lack of writing functionality in the official distribution.

Package https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11/2.1.0
Configuration http://spark.apache.org/docs/2.1.0/streaming-kafka-0-8-integration.html

Note

  • To interact with Kafka, sparkly needs the kafka-python library. You can get it via: ` pip install sparkly[kafka] `
  • Sparkly was tested in production using Apache Kafka 0.10.x.
import json

from sparkly import SparklySession


class MySession(SparklySession):
    packages = [
        'org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0',
    ]

spark = MySession()

# To read JSON messaged from Kafka into a dataframe:

#   1. Define a schema of the messages you read.
df_schema = StructType([
    StructField('key', StructType([
        StructField('id', StringType(), True)
    ])),
    StructField('value', StructType([
        StructField('name', StringType(), True),
        StructField('surname', StringType(), True),
    ]))
])

#   2. Specify the schema as a reader parameter.
df = hc.read_ext.kafka(
    'kafka.host',
    topic='my.topic',
    key_deserializer=lambda item: json.loads(item.decode('utf-8')),
    value_deserializer=lambda item: json.loads(item.decode('utf-8')),
    schema=df_schema,
)

# To write a dataframe to Kafka in JSON format:
df.write_ext.kafka(
    'kafka.host',
    topic='my.topic',
    key_serializer=lambda item: json.dumps(item).encode('utf-8'),
    value_serializer=lambda item: json.dumps(item).encode('utf-8'),
)

MySQL

Basically, it’s just a high level api on top of the native jdbc reader and jdbc writer.

Jars https://mvnrepository.com/artifact/mysql/mysql-connector-java
Configuration https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-configuration-properties.html
from sparkly import SparklySession
from sparkly.utils import absolute_path


class MySession(SparklySession):
    # Feel free to play with other versions.
    packages = ['mysql:mysql-connector-java:5.1.39']


spark = MySession()

# To read data
df = spark.read_ext.mysql('localhost', 'my_database', 'my_table',
                          options={'user': 'root', 'password': 'root'})
# To write data
df.write_ext.mysql('localhost', 'my_database', 'my_table', options={
    'user': 'root',
    'password': 'root',
    'rewriteBatchedStatements': 'true',  # improves write throughput dramatically
})

Universal reader/writer

The DataFrame abstraction is really powerful when it comes to transformations. You can shape your data from various storages using exactly the same api. For instance, you can join data from Cassandra with data from Elasticsearch and write the result to MySQL.

The only problem - you have to explicitly define sources (or destinations) in order to create (or export) a DataFrame. But the source/destination of data doesn’t really change the logic of transformations (if the schema is preserved). To solve the problem, we decided to add the universal api to read/write DataFrames:

from sparkly import SparklyContext

class MyContext(SparklyContext):
    packages = [
        'datastax:spark-cassandra-connector:1.6.1-s_2.10',
        'com.databricks:spark-csv_2.10:1.4.0',
        'org.elasticsearch:elasticsearch-spark_2.10:2.3.0',
    ]

hc = MyContext()

# To read data
df = hc.read_ext.by_url('cassandra://localhost/my_keyspace/my_table?consistency=ONE')
df = hc.read_ext.by_url('csv:s3://my-bucket/my-data?header=true')
df = hc.read_ext.by_url('elastic://localhost/my_index/my_type?q=awesomeness')
df = hc.read_ext.by_url('parquet:hdfs://my.name.node/path/on/hdfs')

# To write data
df.write_ext.by_url('cassandra://localhost/my_keyspace/my_table?consistency=QUORUM&parallelism=8')
df.write_ext.by_url('csv:hdfs://my.name.node/path/on/hdfs')
df.write_ext.by_url('elastic://localhost/my_index/my_type?parallelism=4')
df.write_ext.by_url('parquet:s3://my-bucket/my-data?header=false')

Controlling the load

From the official documentation:

Don’t create too many partitions in parallel on a large cluster; otherwise Spark might crash your external database systems.

link: <https://spark.apache.org/docs/2.0.1/api/java/org/apache/spark/sql/DataFrameReader.html>

It’s a very good advice, but in practice it’s hard to track the number of partitions. For instance, if you write a result of a join operation to database the number of splits might be changed implicitly via spark.sql.shuffle.partitions.

To prevent us from shooting to the foot, we decided to add parallelism option for all our readers and writers. The option is designed to control a load on a source we write to / read from. It’s especially useful when you are working with data storages like Cassandra, MySQL or Elastic. However, the implementation of the throttling has some drawbacks and you should be aware of them.

The way we implemented it is pretty simple: we use coalesce on a dataframe to reduce an amount of tasks that will be executed in parallel. Let’s say you have a dataframe with 1000 splits and you want to write no more than 10 task in parallel. In such case coalesce will create a dataframe that has 10 splits with 100 original tasks in each. An outcome of this: if any of these 100 tasks fails, we have to retry the whole pack in 100 tasks.

Read more about coalesce

Reader API documentation

class sparkly.reader.SparklyReader(spark)[source]

A set of tools to create DataFrames from the external storages.

Note

This is a private class to the library. You should not use it directly. The instance of the class is available under SparklyContext via read_ext attribute.

by_url(url)[source]

Create a dataframe using url.

The main idea behind the method is to unify data access interface for different formats and locations. A generic schema looks like:

format:[protocol:]//host[:port][/location][?configuration]

Supported formats:

  • CSV csv://
  • Cassandra cassandra://
  • Elastic elastic://
  • MySQL mysql://
  • Parquet parquet://
  • Hive Metastore table table://

Query string arguments are passed as parameters to the relevant reader.

For instance, the next data source URL:

cassandra://localhost:9042/my_keyspace/my_table?consistency=ONE
    &parallelism=3&spark.cassandra.connection.compression=LZ4

Is an equivalent for:

hc.read_ext.cassandra(
    host='localhost',
    port=9042,
    keyspace='my_keyspace',
    table='my_table',
    consistency='ONE',
    parallelism=3,
    options={'spark.cassandra.connection.compression': 'LZ4'},
)

More examples:

table://table_name
csv:s3://some-bucket/some_directory?header=true
csv://path/on/local/file/system?header=false
parquet:s3://some-bucket/some_directory
elastic://elasticsearch.host/es_index/es_type?parallelism=8
cassandra://cassandra.host/keyspace/table?consistency=QUORUM
mysql://mysql.host/database/table
Parameters:url (str) – Data source URL.
Returns:pyspark.sql.DataFrame
cassandra(host, keyspace, table, consistency=None, port=None, parallelism=None, options=None)[source]

Create a dataframe from a Cassandra table.

Parameters:
  • host (str) – Cassandra server host.
  • keyspace (str) –
  • table (str) – Cassandra table to read from.
  • consistency (str) – Read consistency level: ONE, QUORUM, ALL, etc.
  • port (int|None) – Cassandra server port.
  • parallelism (int|None) – The max number of parallel tasks that could be executed during the read stage (see Controlling the load).
  • options (dict[str,str]|None) – Additional options for org.apache.spark.sql.cassandra format (see configuration for Cassandra).
Returns:

pyspark.sql.DataFrame

elastic(host, es_index, es_type, query='', fields=None, port=None, parallelism=None, options=None)[source]

Create a dataframe from an ElasticSearch index.

Parameters:
  • host (str) – Elastic server host.
  • es_index (str) – Elastic index.
  • es_type (str) – Elastic type.
  • query (str) – Pre-filter es documents, e.g. ‘?q=views:>10’.
  • fields (list[str]|None) – Select only specified fields.
  • port (int|None) –
  • parallelism (int|None) – The max number of parallel tasks that could be executed during the read stage (see Controlling the load).
  • options (dict[str,str]) – Additional options for org.elasticsearch.spark.sql format (see configuration for Elastic).
Returns:

pyspark.sql.DataFrame

kafka(host, topic, offset_ranges=None, key_deserializer=None, value_deserializer=None, schema=None, port=9092, parallelism=None, options=None)[source]

Creates dataframe from specified set of messages from Kafka topic.

Defining ranges:
  • If offset_ranges is specified it defines which specific range to read.
  • If offset_ranges is omitted it will auto-discover it’s partitions.

The schema parameter, if specified, should contain two top level fields: key and value.

Parameters key_deserializer and value_deserializer are callables which get bytes as input and should return python structures as output.

Parameters:
  • host (str) – Kafka host.
  • topic (str|None) – Kafka topic to read from.
  • offset_ranges (list[(int, int, int) – List of partition ranges [(partition, start_offset, end_offset)].
  • key_deserializer (function) – Function used to deserialize the key.
  • value_deserializer (function) – Function used to deserialize the value.
  • schema (pyspark.sql.types.StructType) – Schema to apply to create a Dataframe.
  • port (int) – Kafka port.
  • parallelism (int|None) – The max number of parallel tasks that could be executed during the read stage (see Controlling the load).
  • options (dict|None) – Additional kafka parameters, see KafkaUtils.createRDD docs.
Returns:

pyspark.sql.DataFrame

Raises:

InvalidArgumentError

mysql(host, database, table, port=None, parallelism=None, options=None)[source]

Create a dataframe from a MySQL table.

Options should include user and password.

Parameters:
  • host (str) – MySQL server address.
  • database (str) – Database to connect to.
  • table (str) – Table to read rows from.
  • port (int|None) – MySQL server port.
  • parallelism (int|None) – The max number of parallel tasks that could be executed during the read stage (see Controlling the load).
  • options (dict[str,str]|None) – Additional options for JDBC reader (see configuration for MySQL).
Returns:

pyspark.sql.DataFrame

Writer API documentation

class sparkly.writer.SparklyWriter(df)[source]

A set of tools to write DataFrames to external storages.

Note

We don’t expect you to be using the class directly. The instance of the class is available under DataFrame via write_ext attribute.

by_url(url)[source]

Write a dataframe to a destination specified by url.

The main idea behind the method is to unify data export interface for different formats and locations. A generic schema looks like:

format:[protocol:]//host[:port][/location][?configuration]

Supported formats:

  • CSV csv://
  • Cassandra cassandra://
  • Elastic elastic://
  • MySQL mysql://
  • Parquet parquet://

Query string arguments are passed as parameters to the relevant writer.

For instance, the next data export URL:

elastic://localhost:9200/my_index/my_type?&parallelism=3&mode=overwrite
    &es.write.operation=upsert

Is an equivalent for:

hc.read_ext.elastic(
    host='localhost',
    port=9200,
    es_index='my_index',
    es_type='my_type',
    parallelism=3,
    mode='overwrite',
    options={'es.write.operation': 'upsert'},
)

More examples:

csv:s3://some-s3-bucket/some-s3-key?partitionBy=date,platform
cassandra://cassandra.host/keyspace/table?consistency=ONE&mode=append
parquet:///var/log/?partitionBy=date
elastic://elastic.host/es_index/es_type
mysql://mysql.host/database/table
Parameters:url (str) – Destination URL.
cassandra(host, keyspace, table, consistency=None, port=None, mode=None, parallelism=None, options=None)[source]

Write a dataframe to a Cassandra table.

Parameters:
  • host (str) – Cassandra server host.
  • keyspace (str) – Cassandra keyspace to write to.
  • table (str) – Cassandra table to write to.
  • consistency (str|None) – Write consistency level: ONE, QUORUM, ALL, etc.
  • port (int|None) – Cassandra server port.
  • mode (str|None) – Spark save mode, http://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes
  • parallelism (int|None) – The max number of parallel tasks that could be executed during the write stage (see Controlling the load).
  • options (dict[str, str]) – Additional options to org.apache.spark.sql.cassandra format (see configuration for Cassandra).
elastic(host, es_index, es_type, port=None, mode=None, parallelism=None, options=None)[source]

Write a dataframe into an ElasticSearch index.

Parameters:
kafka(host, topic, key_serializer, value_serializer, port=9092, parallelism=None, options=None)[source]

Writes dataframe to kafka topic.

The schema of the dataframe should conform the pattern:

>>>  StructType([
...     StructField('key', ...),
...     StructField('value', ...),
...  ])

Parameters key_serializer and value_serializer are callables which get’s python structure as input and should return bytes of encoded data as output.

Parameters:
  • host (str) – Kafka host.
  • topic (str) – Topic to write to.
  • key_serializer (function) – Function to serialize key.
  • value_serializer (function) – Function to serialize value.
  • port (int) – Kafka port.
  • parallelism (int|None) – The max number of parallel tasks that could be executed during the write stage (see Controlling the load).
  • options (dict|None) – Additional options.
mysql(host, database, table, port=None, mode=None, parallelism=None, options=None)[source]

Write a dataframe to a MySQL table.

Options should include user and password.

Parameters:
sparkly.writer.attach_writer_to_dataframe()[source]

A tiny amount of magic to attach write extensions.