Read/write utilities for DataFrames¶
Sparkly isn’t trying to replace any of existing storage connectors. The goal is to provide a simplified and consistent api across a wide array of storage connectors. We also added the way to work with abstract data sources, so you can keep your code agnostic to the storages you use.
Cassandra¶
Sparkly relies on the official spark cassandra connector and was successfully tested in production using version 2.4.0.
Package |
https://spark-packages.org/package/datastax/spark-cassandra-connector |
Configuration |
https://github.com/datastax/spark-cassandra-connector/blob/v2.4.0/doc/reference.md |
For using overwrite mode, it is needed to specify confirm.truncate as true. Otherwise, use append mode to update existing data.
from sparkly import SparklySession
class MySession(SparklySession):
# Feel free to play with other versions
packages = ['datastax:spark-cassandra-connector:2.4.0-s_2.11']
spark = MySession()
# To read data
df = spark.read_ext.cassandra('localhost', 'my_keyspace', 'my_table')
# To write data
df.write_ext.cassandra('localhost', 'my_keyspace', 'my_table')
Elastic¶
Sparkly relies on the official elastic spark connector and was successfully tested in production using version 6.5.4.
Package |
https://spark-packages.org/package/elastic/elasticsearch-hadoop |
Configuration |
https://www.elastic.co/guide/en/elasticsearch/hadoop/7.3/configuration.html |
from sparkly import SparklySession
class MySession(SparklySession):
# Feel free to play with other versions
packages = ['org.elasticsearch:elasticsearch-spark-20_2.11:7.3.0']
spark = MySession()
# To read data
df = spark.read_ext.elastic('localhost', 'my_index', 'my_type', query='?q=awesomeness')
# To write data
df.write_ext.elastic('localhost', 'my_index', 'my_type')
Kafka¶
Sparkly’s reader and writer for Kafka are built on top of the official spark package for Kafka-SQL.
Package |
https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.11/2.4.0 |
Configuration |
https://spark.apache.org/docs/2.4.0/structured-streaming-kafka-integration.html |
import json
from sparkly import SparklySession
class MySession(SparklySession):
packages = [
'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0',
]
spark = MySession()
# To read JSON messaged from Kafka into a dataframe:
# 1. Define a schema of the messages you read.
df_schema = StructType([
StructField('key', StructType([
StructField('id', StringType(), True)
])),
StructField('value', StructType([
StructField('name', StringType(), True),
StructField('surname', StringType(), True),
]))
])
# 2. Specify the schema as a reader parameter.
df = hc.read_ext.kafka(
'kafka.host',
topic='my.topic',
# key & value deserialization is optional; if not provided,
# then the user will have to deal with decoding the binary directly.
key_deserializer=lambda item: json.loads(item.decode('utf-8')),
value_deserializer=lambda item: json.loads(item.decode('utf-8')),
# if deserializers are used, the schema must be provided:
schema=df_schema,
)
# To write a dataframe to Kafka in JSON format:
df.write_ext.kafka(
'kafka.host',
topic='my.topic',
# key & value serialization is optional; if not provided,
# the `key` and `value` columns MUST already be StringType or BinaryType
key_serializer=lambda item: json.dumps(item).encode('utf-8'),
value_serializer=lambda item: json.dumps(item).encode('utf-8'),
)
MySQL¶
Basically, it’s just a high level api on top of the native jdbc reader and jdbc writer.
Jars |
https://mvnrepository.com/artifact/mysql/mysql-connector-java |
Configuration |
https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-configuration-properties.html |
from sparkly import SparklySession
from sparkly.utils import absolute_path
class MySession(SparklySession):
# Feel free to play with other versions.
packages = ['mysql:mysql-connector-java:6.0.6']
spark = MySession()
# To read data
df = spark.read_ext.mysql('localhost', 'my_database', 'my_table',
options={'user': 'root', 'password': 'root'})
# To write data
df.write_ext.mysql('localhost', 'my_database', 'my_table', options={
'user': 'root',
'password': 'root',
'rewriteBatchedStatements': 'true', # improves write throughput dramatically
})
Redis¶
Sparkly provides a writer for Redis that is built on top of the official redis python library redis-py . It is currently capable of exporting your DataFrame as a JSON blob per row or group of rows.
Note
To interact with Redis,
sparkly
needs theredis
library. You can get it via:pip install sparkly[redis]
import json
from sparkly import SparklySession
spark = SparklySession()
# Write JSON.gz data indexed by col1.col2 that will expire in a day
df.write_ext.redis(
host='localhost',
port=6379,
key_by=['col1', 'col2'],
exclude_key_columns=True,
expire=24 * 60 * 60,
compression='gzip',
)
Universal reader/writer¶
The DataFrame abstraction is really powerful when it comes to transformations. You can shape your data from various storages using exactly the same api. For instance, you can join data from Cassandra with data from Elasticsearch and write the result to MySQL.
The only problem - you have to explicitly define sources (or destinations) in order to create (or export) a DataFrame. But the source/destination of data doesn’t really change the logic of transformations (if the schema is preserved). To solve the problem, we decided to add the universal api to read/write DataFrames:
from sparkly import SparklyContext
class MyContext(SparklyContext):
packages = [
'datastax:spark-cassandra-connector:1.6.1-s_2.10',
'com.databricks:spark-csv_2.10:1.4.0',
'org.elasticsearch:elasticsearch-spark_2.10:6.5.4',
]
hc = MyContext()
# To read data
df = hc.read_ext.by_url('cassandra://localhost/my_keyspace/my_table?consistency=ONE')
df = hc.read_ext.by_url('csv:s3://my-bucket/my-data?header=true')
df = hc.read_ext.by_url('elastic://localhost/my_index/my_type?q=awesomeness')
df = hc.read_ext.by_url('parquet:hdfs://my.name.node/path/on/hdfs')
# To write data
df.write_ext.by_url('cassandra://localhost/my_keyspace/my_table?consistency=QUORUM¶llelism=8')
df.write_ext.by_url('csv:hdfs://my.name.node/path/on/hdfs')
df.write_ext.by_url('elastic://localhost/my_index/my_type?parallelism=4')
df.write_ext.by_url('parquet:s3://my-bucket/my-data?header=false')
Controlling the load¶
From the official documentation:
Don’t create too many partitions in parallel on a large cluster; otherwise Spark might crash your external database systems.link: <https://spark.apache.org/docs/2.0.1/api/java/org/apache/spark/sql/DataFrameReader.html>
It’s a very good advice, but in practice it’s hard to track the number of partitions. For instance, if you write a result of a join operation to database the number of splits might be changed implicitly via spark.sql.shuffle.partitions.
To prevent us from shooting to the foot, we decided to add parallelism option for all our readers and writers. The option is designed to control a load on a source we write to / read from. It’s especially useful when you are working with data storages like Cassandra, MySQL or Elastic. However, the implementation of the throttling has some drawbacks and you should be aware of them.
The way we implemented it is pretty simple: we use coalesce on a dataframe to reduce an amount of tasks that will be executed in parallel. Let’s say you have a dataframe with 1000 splits and you want to write no more than 10 task in parallel. In such case coalesce will create a dataframe that has 10 splits with 100 original tasks in each. An outcome of this: if any of these 100 tasks fails, we have to retry the whole pack in 100 tasks.