Source code for sparkly.catalog

#
# Copyright 2017 Tubular Labs, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


[docs]class SparklyCatalog(object): """A set of tools to interact with HiveMetastore.""" def __init__(self, spark): """Constructor. Args: spark (sparkly.SparklySession) """ self._spark = spark
[docs] def drop_table(self, table_name, checkfirst=True): """Drop table from the metastore. Note: Follow the official documentation to understand `DROP TABLE` semantic. https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL\ #LanguageManualDDL-DropTable Args: table_name (str): A table name. checkfirst (bool): Only issue DROPs for tables that are presented in the database. """ drop_statement = 'DROP TABLE IF EXISTS' if checkfirst else 'DROP TABLE' return self._spark.sql('{} `{}`'.format(drop_statement, table_name))
[docs] def has_table(self, table_name, db_name=None): """Check if table is available in the metastore. Args: table_name (str): A table name. db_name (str): A database name. Returns: bool """ for table in self._spark.catalog.listTables(db_name): if table.name == table_name: return True return False
[docs] def rename_table(self, old_table_name, new_table_name): """Rename table in the metastore. Note: Follow the official documentation to understand `ALTER TABLE` semantic. https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL\ #LanguageManualDDL-RenameTable Args: old_table_name (str): The current table name. new_table_name (str): An expected table name. """ self._spark.sql('ALTER TABLE `{}` RENAME TO `{}`'.format(old_table_name, new_table_name))
[docs] def get_table_property(self, table_name, property_name, to_type=None): """Get table property value from the metastore. Args: table_name (str): A table name. Might contain a db name. E.g. "my_table" or "default.my_table". property_name (str): A property name to read value for. to_type (function): Cast value to the given type. E.g. `int` or `float`. Returns: Any """ if not to_type: to_type = str df = self._spark.sql("SHOW TBLPROPERTIES `{}`('{}')".format(table_name, property_name)) prop_val = df.collect()[0].value.strip() if 'does not have property' not in prop_val: return to_type(prop_val)
[docs] def get_table_properties(self, table_name): """Get table properties from the metastore. Args: table_name (str): A table name. Returns: dict[str,str]: Key/value for properties. """ rows = self._spark.sql('SHOW TBLPROPERTIES `{}`'.format(table_name)).collect() return {row.key: row.value for row in rows}
[docs] def set_table_property(self, table_name, property_name, value): """Set value for table property. Args: table_name (str): A table name. property_name (str): A property name to set value for. value (Any): Will be automatically casted to string. """ self._spark.sql("ALTER TABLE `{}` SET TBLPROPERTIES ('{}'='{}')".format( table_name, property_name, value ))