/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.hive;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.StorageSchemes;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncException;
import org.apache.hudi.hive.PartitionValueExtractor;
import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
import org.apache.thrift.TException;

public class HoodieHiveClient
extends AbstractSyncHoodieClient {
    private static final String HOODIE_LAST_COMMIT_TIME_SYNC = "last_commit_time_sync";
    private static final String HIVE_ESCAPE_CHARACTER = "`";
    private static final Logger LOG = LogManager.getLogger(HoodieHiveClient.class);
    private final PartitionValueExtractor partitionValueExtractor;
    private IMetaStoreClient client;
    private HiveSyncConfig syncConfig;
    private FileSystem fs;
    private Connection connection;
    private HoodieTimeline activeTimeline;
    private HiveConf configuration;

    public HoodieHiveClient(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) {
        super(cfg.basePath, cfg.assumeDatePartitioning, cfg.useFileListingFromMetadata, cfg.verifyMetadataFileListing, fs);
        this.syncConfig = cfg;
        this.fs = fs;
        this.configuration = configuration;
        if (cfg.useJdbc.booleanValue()) {
            LOG.info((Object)("Creating hive connection " + cfg.jdbcUrl));
            this.createHiveConnection();
        }
        try {
            this.client = Hive.get((HiveConf)configuration).getMSC();
        }
        catch (MetaException | HiveException e) {
            throw new HoodieHiveSyncException("Failed to create HiveMetaStoreClient", e);
        }
        try {
            this.partitionValueExtractor = (PartitionValueExtractor)Class.forName(cfg.partitionValueExtractorClass).newInstance();
        }
        catch (Exception e) {
            throw new HoodieHiveSyncException("Failed to initialize PartitionValueExtractor class " + cfg.partitionValueExtractorClass, e);
        }
        this.activeTimeline = this.metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
    }

    public HoodieTimeline getActiveTimeline() {
        return this.activeTimeline;
    }

    @Override
    public void addPartitionsToTable(String tableName, List<String> partitionsToAdd) {
        if (partitionsToAdd.isEmpty()) {
            LOG.info((Object)("No partitions to add for " + tableName));
            return;
        }
        LOG.info((Object)("Adding partitions " + partitionsToAdd.size() + " to table " + tableName));
        String sql = this.constructAddPartitions(tableName, partitionsToAdd);
        this.updateHiveSQL(sql);
    }

    @Override
    public void updatePartitionsToTable(String tableName, List<String> changedPartitions) {
        if (changedPartitions.isEmpty()) {
            LOG.info((Object)("No partitions to change for " + tableName));
            return;
        }
        LOG.info((Object)("Changing partitions " + changedPartitions.size() + " on " + tableName));
        List<String> sqls = this.constructChangePartitions(tableName, changedPartitions);
        for (String sql : sqls) {
            this.updateHiveSQL(sql);
        }
    }

    private String constructAddPartitions(String tableName, List<String> partitions) {
        StringBuilder alterSQL = new StringBuilder("ALTER TABLE ");
        alterSQL.append(HIVE_ESCAPE_CHARACTER).append(this.syncConfig.databaseName).append(HIVE_ESCAPE_CHARACTER).append(".").append(HIVE_ESCAPE_CHARACTER).append(tableName).append(HIVE_ESCAPE_CHARACTER).append(" ADD IF NOT EXISTS ");
        for (String partition : partitions) {
            String partitionClause = this.getPartitionClause(partition);
            String fullPartitionPath = FSUtils.getPartitionPath(this.syncConfig.basePath, partition).toString();
            alterSQL.append("  PARTITION (").append(partitionClause).append(") LOCATION '").append(fullPartitionPath).append("' ");
        }
        return alterSQL.toString();
    }

    private String getPartitionClause(String partition) {
        List<String> partitionValues = this.partitionValueExtractor.extractPartitionValuesInPath(partition);
        ValidationUtils.checkArgument(this.syncConfig.partitionFields.size() == partitionValues.size(), "Partition key parts " + this.syncConfig.partitionFields + " does not match with partition values " + partitionValues + ". Check partition strategy. ");
        ArrayList<String> partBuilder = new ArrayList<String>();
        for (int i = 0; i < this.syncConfig.partitionFields.size(); ++i) {
            String partitionValue = partitionValues.get(i);
            if (this.syncConfig.decodePartition.booleanValue()) {
                try {
                    partitionValue = URLDecoder.decode(partitionValue, StandardCharsets.UTF_8.toString());
                }
                catch (UnsupportedEncodingException e) {
                    throw new HoodieHiveSyncException("error in decode partition: " + partitionValue, e);
                }
            }
            partBuilder.add(HIVE_ESCAPE_CHARACTER + this.syncConfig.partitionFields.get(i) + "`='" + partitionValue + "'");
        }
        return String.join((CharSequence)",", partBuilder);
    }

    private List<String> constructChangePartitions(String tableName, List<String> partitions) {
        ArrayList<String> changePartitions = new ArrayList<String>();
        String useDatabase = "USE `" + this.syncConfig.databaseName + HIVE_ESCAPE_CHARACTER;
        changePartitions.add(useDatabase);
        String alterTable = "ALTER TABLE `" + tableName + HIVE_ESCAPE_CHARACTER;
        for (String partition : partitions) {
            String partitionClause = this.getPartitionClause(partition);
            Path partitionPath = FSUtils.getPartitionPath(this.syncConfig.basePath, partition);
            String partitionScheme = partitionPath.toUri().getScheme();
            String fullPartitionPath = StorageSchemes.HDFS.getScheme().equals(partitionScheme) ? FSUtils.getDFSFullPartitionPath(this.fs, partitionPath) : partitionPath.toString();
            String changePartition = alterTable + " PARTITION (" + partitionClause + ") SET LOCATION '" + fullPartitionPath + "'";
            changePartitions.add(changePartition);
        }
        return changePartitions;
    }

    List<AbstractSyncHoodieClient.PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, List<String> partitionStoragePartitions) {
        HashMap<String, String> paths = new HashMap<String, String>();
        for (Partition tablePartition : tablePartitions) {
            List hivePartitionValues = tablePartition.getValues();
            String fullTablePartitionPath = Path.getPathWithoutSchemeAndAuthority((Path)new Path(tablePartition.getSd().getLocation())).toUri().getPath();
            paths.put(String.join((CharSequence)", ", hivePartitionValues), fullTablePartitionPath);
        }
        ArrayList<AbstractSyncHoodieClient.PartitionEvent> events = new ArrayList<AbstractSyncHoodieClient.PartitionEvent>();
        for (String storagePartition : partitionStoragePartitions) {
            Path storagePartitionPath = FSUtils.getPartitionPath(this.syncConfig.basePath, storagePartition);
            String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority((Path)storagePartitionPath).toUri().getPath();
            List<String> storagePartitionValues = this.partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
            if (storagePartitionValues.isEmpty()) continue;
            String storageValue = String.join((CharSequence)", ", storagePartitionValues);
            if (!paths.containsKey(storageValue)) {
                events.add(AbstractSyncHoodieClient.PartitionEvent.newPartitionAddEvent(storagePartition));
                continue;
            }
            if (((String)paths.get(storageValue)).equals(fullStoragePartitionPath)) continue;
            events.add(AbstractSyncHoodieClient.PartitionEvent.newPartitionUpdateEvent(storagePartition));
        }
        return events;
    }

    public List<Partition> scanTablePartitions(String tableName) throws TException {
        return this.client.listPartitions(this.syncConfig.databaseName, tableName, (short)-1);
    }

    void updateTableDefinition(String tableName, MessageType newSchema) {
        try {
            String newSchemaStr = HiveSchemaUtil.generateSchemaString(newSchema, this.syncConfig.partitionFields, this.syncConfig.supportTimestamp);
            String cascadeClause = this.syncConfig.partitionFields.size() > 0 ? " cascade" : "";
            StringBuilder sqlBuilder = new StringBuilder("ALTER TABLE ").append(HIVE_ESCAPE_CHARACTER).append(this.syncConfig.databaseName).append(HIVE_ESCAPE_CHARACTER).append(".").append(HIVE_ESCAPE_CHARACTER).append(tableName).append(HIVE_ESCAPE_CHARACTER).append(" REPLACE COLUMNS(").append(newSchemaStr).append(" )").append(cascadeClause);
            LOG.info((Object)("Updating table definition with " + sqlBuilder));
            this.updateHiveSQL(sqlBuilder.toString());
        }
        catch (IOException e) {
            throw new HoodieHiveSyncException("Failed to update table for " + tableName, e);
        }
    }

    @Override
    public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, String outputFormatClass, String serdeClass) {
        try {
            String createSQLQuery = HiveSchemaUtil.generateCreateDDL(tableName, storageSchema, this.syncConfig, inputFormatClass, outputFormatClass, serdeClass);
            LOG.info((Object)("Creating table with " + createSQLQuery));
            this.updateHiveSQL(createSQLQuery);
        }
        catch (IOException e) {
            throw new HoodieHiveSyncException("Failed to create table " + tableName, e);
        }
    }

    @Override
    public Map<String, String> getTableSchema(String tableName) {
        if (this.syncConfig.useJdbc.booleanValue()) {
            HashMap<String, String> hashMap;
            if (!this.doesTableExist(tableName)) {
                throw new IllegalArgumentException("Failed to get schema for table " + tableName + " does not exist");
            }
            HashMap<String, String> schema = new HashMap<String, String>();
            ResultSet result = null;
            try {
                DatabaseMetaData databaseMetaData = this.connection.getMetaData();
                result = databaseMetaData.getColumns(null, this.syncConfig.databaseName, tableName, null);
                while (result.next()) {
                    String columnName = result.getString(4);
                    String columnType = result.getString(6);
                    if ("DECIMAL".equals(columnType)) {
                        int columnSize = result.getInt("COLUMN_SIZE");
                        int decimalDigits = result.getInt("DECIMAL_DIGITS");
                        columnType = columnType + String.format("(%s,%s)", columnSize, decimalDigits);
                    }
                    schema.put(columnName, columnType);
                }
                hashMap = schema;
            }
            catch (SQLException e) {
                try {
                    throw new HoodieHiveSyncException("Failed to get table schema for " + tableName, e);
                }
                catch (Throwable throwable) {
                    this.closeQuietly(result, null);
                    throw throwable;
                }
            }
            this.closeQuietly(result, null);
            return hashMap;
        }
        return this.getTableSchemaUsingMetastoreClient(tableName);
    }

    public Map<String, String> getTableSchemaUsingMetastoreClient(String tableName) {
        try {
            long start = System.currentTimeMillis();
            Table table = this.client.getTable(this.syncConfig.databaseName, tableName);
            Map<String, String> partitionKeysMap = table.getPartitionKeys().stream().collect(Collectors.toMap(FieldSchema::getName, f -> f.getType().toUpperCase()));
            Map<String, String> columnsMap = table.getSd().getCols().stream().collect(Collectors.toMap(FieldSchema::getName, f -> f.getType().toUpperCase()));
            HashMap<String, String> schema = new HashMap<String, String>();
            schema.putAll(columnsMap);
            schema.putAll(partitionKeysMap);
            long end = System.currentTimeMillis();
            LOG.info((Object)String.format("Time taken to getTableSchema: %s ms", end - start));
            return schema;
        }
        catch (Exception e) {
            throw new HoodieHiveSyncException("Failed to get table schema for : " + tableName, e);
        }
    }

    @Override
    public boolean doesTableExist(String tableName) {
        try {
            return this.client.tableExists(this.syncConfig.databaseName, tableName);
        }
        catch (TException e) {
            throw new HoodieHiveSyncException("Failed to check if table exists " + tableName, e);
        }
    }

    public boolean doesDataBaseExist(String databaseName) {
        try {
            Database database = this.client.getDatabase(databaseName);
            if (database != null && databaseName.equals(database.getName())) {
                return true;
            }
        }
        catch (TException e) {
            throw new HoodieHiveSyncException("Failed to check if database exists " + databaseName, e);
        }
        return false;
    }

    public void updateHiveSQL(String s) {
        if (this.syncConfig.useJdbc.booleanValue()) {
            Statement stmt = null;
            try {
                stmt = this.connection.createStatement();
                LOG.info((Object)("Executing SQL " + s));
                stmt.execute(s);
            }
            catch (SQLException e) {
                throw new HoodieHiveSyncException("Failed in executing SQL " + s, e);
            }
            finally {
                this.closeQuietly(null, stmt);
            }
        } else {
            this.updateHiveSQLUsingHiveDriver(s);
        }
    }

    public CommandProcessorResponse updateHiveSQLUsingHiveDriver(String sql) {
        List<CommandProcessorResponse> responses = this.updateHiveSQLs(Collections.singletonList(sql));
        return responses.get(responses.size() - 1);
    }

    private List<CommandProcessorResponse> updateHiveSQLs(List<String> sqls) {
        SessionState ss = null;
        Driver hiveDriver = null;
        ArrayList<CommandProcessorResponse> responses = new ArrayList<CommandProcessorResponse>();
        try {
            long startTime = System.currentTimeMillis();
            ss = SessionState.start((HiveConf)this.configuration);
            ss.setCurrentDatabase(this.syncConfig.databaseName);
            hiveDriver = new Driver(this.configuration);
            long endTime = System.currentTimeMillis();
            LOG.info((Object)String.format("Time taken to start SessionState and create Driver: %s ms", endTime - startTime));
            for (String sql : sqls) {
                long start = System.currentTimeMillis();
                responses.add(hiveDriver.run(sql));
                long end = System.currentTimeMillis();
                LOG.info((Object)String.format("Time taken to execute [%s]: %s ms", sql, end - start));
            }
        }
        catch (Exception e) {
            throw new HoodieHiveSyncException("Failed in executing SQL", e);
        }
        finally {
            if (ss != null) {
                try {
                    ss.close();
                }
                catch (IOException ie) {
                    LOG.error((Object)"Error while closing SessionState", (Throwable)ie);
                }
            }
            if (hiveDriver != null) {
                try {
                    hiveDriver.close();
                }
                catch (Exception e) {
                    LOG.error((Object)"Error while closing hiveDriver", (Throwable)e);
                }
            }
        }
        return responses;
    }

    private void createHiveConnection() {
        if (this.connection == null) {
            try {
                Class.forName("org.apache.hive.jdbc.HiveDriver");
            }
            catch (ClassNotFoundException e) {
                LOG.error((Object)"Unable to load Hive driver class", (Throwable)e);
                return;
            }
            try {
                this.connection = DriverManager.getConnection(this.syncConfig.jdbcUrl, this.syncConfig.hiveUser, this.syncConfig.hivePass);
                LOG.info((Object)("Successfully established Hive connection to  " + this.syncConfig.jdbcUrl));
            }
            catch (SQLException e) {
                throw new HoodieHiveSyncException("Cannot create hive connection " + this.getHiveJdbcUrlWithDefaultDBName(), e);
            }
        }
    }

    private String getHiveJdbcUrlWithDefaultDBName() {
        String hiveJdbcUrl = this.syncConfig.jdbcUrl;
        String urlAppend = null;
        if (hiveJdbcUrl.contains(";")) {
            urlAppend = hiveJdbcUrl.substring(hiveJdbcUrl.indexOf(";"));
            hiveJdbcUrl = hiveJdbcUrl.substring(0, hiveJdbcUrl.indexOf(";"));
        }
        if (!hiveJdbcUrl.endsWith("/")) {
            hiveJdbcUrl = hiveJdbcUrl + "/";
        }
        return hiveJdbcUrl + (urlAppend == null ? "" : urlAppend);
    }

    @Override
    public Option<String> getLastCommitTimeSynced(String tableName) {
        try {
            Table database = this.client.getTable(this.syncConfig.databaseName, tableName);
            return Option.ofNullable(database.getParameters().getOrDefault(HOODIE_LAST_COMMIT_TIME_SYNC, null));
        }
        catch (Exception e) {
            throw new HoodieHiveSyncException("Failed to get the last commit time synced from the database", e);
        }
    }

    public void close() {
        try {
            if (this.connection != null) {
                this.connection.close();
            }
            if (this.client != null) {
                Hive.closeCurrent();
                this.client = null;
            }
        }
        catch (SQLException e) {
            LOG.error((Object)"Could not close connection ", (Throwable)e);
        }
    }

    List<String> getAllTables(String db) throws Exception {
        return this.client.getAllTables(db);
    }

    @Override
    public void updateLastCommitTimeSynced(String tableName) {
        String lastCommitSynced = this.activeTimeline.lastInstant().get().getTimestamp();
        try {
            Table table = this.client.getTable(this.syncConfig.databaseName, tableName);
            table.putToParameters(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitSynced);
            this.client.alter_table(this.syncConfig.databaseName, tableName, table);
        }
        catch (Exception e) {
            throw new HoodieHiveSyncException("Failed to get update last commit time synced to " + lastCommitSynced, e);
        }
    }
}

