Generic Utils

These are generic utils used in Sparkly.

sparkly.utils.absolute_path(file_path, *rel_path)[source]

Return absolute path to file.

Usage:
>>> absolute_path('/my/current/dir/x.txt', '..', 'x.txt')
'/my/current/x.txt'
>>> absolute_path('/my/current/dir/x.txt', 'relative', 'path')
'/my/current/dir/relative/path'
>>> import os
>>> absolute_path('x.txt', 'relative/path') == os.getcwd() + '/relative/path'
True
Parameters:
  • file_path (str) – file
  • rel_path (list[str]) – path parts
Returns:

str

sparkly.utils.kafka_get_topics_offsets(host, topic, port=9092)[source]

Return available partitions and their offsets for the given topic.

Parameters:
  • host (str) – Kafka host.
  • topic (str) – Kafka topic.
  • port (int) – Kafka port.
Returns:

[ – [(partition, start_offset, end_offset)].

Return type:

int, int, int

class sparkly.utils.lru_cache(maxsize=128, storage_level=StorageLevel(False, True, False, False, 1))[source]

LRU cache that supports DataFrames.

Enables caching of both the dataframe object and the data that df contains by persisting it according to user specs. It’s the user’s responsibility to make sure that the dataframe contents are not evicted from memory and/or disk should this feature get overused.

Parameters:
  • maxsize (int|128) – maximum number of items to cache.
  • storage_level (pyspark.StorageLevel|MEMORY_ONLY) – how to cache the contents of a dataframe (only used when the cached function results in a dataframe).
sparkly.utils.parse_schema(schema)[source]

Generate schema by its string definition.

It’s basically an opposite action to DataType.simpleString method. Supports all atomic types (like string, int, float...) and complex types (array, map, struct) except DecimalType.

Usages:
>>> parse_schema('string')
StringType
>>> parse_schema('int')
IntegerType
>>> parse_schema('array<int>')
ArrayType(IntegerType,true)
>>> parse_schema('map<string,int>')
MapType(StringType,IntegerType,true)
>>> parse_schema('struct<a:int,b:string>')
StructType(List(StructField(a,IntegerType,true),StructField(b,StringType,true)))
>>> parse_schema('unsupported')
Traceback (most recent call last):
...
sparkly.exceptions.UnsupportedDataType: Cannot parse type from string: "unsupported"
sparkly.utils.schema_has(t, required_fields)[source]

Check whether a complex dataType has specific fields.

Parameters:
  • t (pyspark.sql.types.ArrayType, MapType, StructType) – type to check.
  • required_fields (same with t or dict[str, pyspark.sql.DataType]) – fields that need to be present in t. For convenience, a user can define a dict in place of a pyspark.sql.types.StructType, but other than that this argument must have the same type as t.
Raises:
  • AssertionError – if t and required_fields cannot be compared because they aren’t instances of the same complex dataType.
  • KeyError – if a required field is not found in the struct.
  • TypeError – if a required field exists but its actual type does not match the required one.