Source code for sparkly.utils

#
# Copyright 2017 Tubular Labs, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import functools
import inspect
from itertools import islice
import os
import re

try:
    from kafka import SimpleClient
    from kafka.structs import OffsetRequestPayload
except ImportError:
    pass
import pylru
from pyspark import StorageLevel
from pyspark.sql import DataFrame
from pyspark.sql import types as T

from sparkly.exceptions import UnsupportedDataType


[docs]def absolute_path(file_path, *rel_path): """Return absolute path to file. Usage: >>> absolute_path('/my/current/dir/x.txt', '..', 'x.txt') '/my/current/x.txt' >>> absolute_path('/my/current/dir/x.txt', 'relative', 'path') '/my/current/dir/relative/path' >>> import os >>> absolute_path('x.txt', 'relative/path') == os.getcwd() + '/relative/path' True Args: file_path (str): file rel_path (list[str]): path parts Returns: str """ return os.path.abspath( os.path.join( os.path.dirname( os.path.realpath(file_path) ), *rel_path ) )
[docs]def kafka_get_topics_offsets(host, topic, port=9092): """Return available partitions and their offsets for the given topic. Args: host (str): Kafka host. topic (str): Kafka topic. port (int): Kafka port. Returns: [(int, int, int)]: [(partition, start_offset, end_offset)]. """ brokers = ['{}:{}'.format(host, port)] client = SimpleClient(brokers) offsets = [] partitions = client.get_partition_ids_for_topic(topic) offsets_responses_end = client.send_offset_request( [OffsetRequestPayload(topic, partition, -1, 1) for partition in partitions] ) offsets_responses_start = client.send_offset_request( [OffsetRequestPayload(topic, partition, -2, 1) for partition in partitions] ) for start_offset, end_offset in zip(offsets_responses_start, offsets_responses_end): offsets.append((start_offset.partition, start_offset.offsets[0], end_offset.offsets[0])) return offsets
[docs]class lru_cache(object): """LRU cache that supports DataFrames. Enables caching of both the dataframe object and the data that df contains by persisting it according to user specs. It's the user's responsibility to make sure that the dataframe contents are not evicted from memory and/or disk should this feature get overused. Args: maxsize (int|128): maximum number of items to cache. storage_level (pyspark.StorageLevel|MEMORY_ONLY): how to cache the contents of a dataframe (only used when the cached function results in a dataframe). """ def __init__(self, maxsize=128, storage_level=StorageLevel.MEMORY_ONLY): self.maxsize = maxsize self.storage_level = storage_level def __call__(self, func): # Whenever an object is evicted from the cache we want to # unpersist its contents too if it's a dataframe def eviction_callback(key, value): if isinstance(value, DataFrame): value.unpersist() lru_decorator = pylru.lrudecorator(self.maxsize) lru_decorator.cache.callback = eviction_callback @lru_decorator @functools.wraps(func) def func_and_persist(*args, **kwargs): result = func(*args, **kwargs) if isinstance(result, DataFrame): result.persist(self.storage_level) return result return func_and_persist
[docs]def parse_schema(schema): """Generate schema by its string definition. It's basically an opposite action to `DataType.simpleString` method. Supports all atomic types (like string, int, float...) and complex types (array, map, struct) except DecimalType. Usages: >>> parse_schema('string') StringType >>> parse_schema('int') IntegerType >>> parse_schema('array<int>') ArrayType(IntegerType,true) >>> parse_schema('map<string,int>') MapType(StringType,IntegerType,true) >>> parse_schema('struct<a:int,b:string>') StructType(List(StructField(a,IntegerType,true),StructField(b,StringType,true))) >>> parse_schema('unsupported') Traceback (most recent call last): ... sparkly.exceptions.UnsupportedDataType: Cannot parse type from string: "unsupported" """ field_type, args_string = re.match('(\w+)<?(.*)>?$', schema).groups() args = _parse_args(args_string) if args_string else [] if field_type in ATOMIC_TYPES: return ATOMIC_TYPES[field_type]() elif field_type in COMPLEX_TYPES: return COMPLEX_TYPES[field_type](*args) else: message = 'Cannot parse type from string: "{}"'.format(field_type) raise UnsupportedDataType(message)
def _parse_args(args_string): args = [] balance = 0 pos = 0 for i, ch in enumerate(args_string): if ch == '<': balance += 1 elif ch == '>': balance -= 1 elif ch == ',' and balance == 0: args.append(args_string[pos:i]) pos = i + 1 args.append(args_string[pos:]) return args def _is_atomic_type(obj): return inspect.isclass(obj) and issubclass(obj, T.AtomicType) and obj is not T.DecimalType ATOMIC_TYPES = { _type[1]().simpleString(): _type[1] for _type in inspect.getmembers(T, _is_atomic_type) } def _init_map(*args): return T.MapType( keyType=parse_schema(args[0]), valueType=parse_schema(args[1]), ) def _init_struct(*args): struct = T.StructType() for item in args: field_name, field_type = item.split(':', 1) field_type = parse_schema(field_type) struct.add(field_name, field_type) return struct def _init_array(*args): return T.ArrayType(parse_schema(args[0])) COMPLEX_TYPES = { 'map': _init_map, 'struct': _init_struct, 'array': _init_array, }
[docs]def schema_has(t, required_fields): """Check whether a complex dataType has specific fields. Args: t (pyspark.sql.types.ArrayType, MapType, StructType): type to check. required_fields (same with t or dict[str, pyspark.sql.DataType]): fields that need to be present in t. For convenience, a user can define a ``dict`` in place of a ``pyspark.sql.types.StructType``, but other than that this argument must have the same type as t. Raises: AssertionError: if t and required_fields cannot be compared because they aren't instances of the same complex dataType. KeyError: if a required field is not found in the struct. TypeError: if a required field exists but its actual type does not match the required one. """ if isinstance(required_fields, dict): required_fields = T.StructType([ T.StructField(*field_def) for field_def in required_fields.items() ]) assert type(t) == type(required_fields), 'Cannot compare heterogeneous types' def _unpack(t): if isinstance(t, T.ArrayType): return {'element': t.elementType} elif isinstance(t, T.MapType): return {'key': t.keyType, 'value': t.valueType} elif isinstance(t, T.StructType): return {field.name: field.dataType for field in t.fields} return {} def _is_complex(t): return isinstance(t, (T.ArrayType, T.MapType, T.StructType)) existing_fields = _unpack(t) required_fields = _unpack(required_fields) for required_field, required_type in required_fields.items(): try: current_type = existing_fields[required_field] except KeyError: raise KeyError(required_field) if _is_complex(current_type): try: schema_has(current_type, required_type) except (KeyError, TypeError) as e: raise type(e)('{}.{}'.format(required_field, e.args[0])) except AssertionError: pass else: continue if required_type != current_type: raise TypeError( '{} is {}, expected {}'.format(required_field, current_type, required_type) ) return True