/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.engine.spark.source;

import com.clearspring.analytics.util.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.exception.ErrorCodeSupplier;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.exception.ServerErrorCode;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.engine.spark.source.NSparkTableMeta;
import org.apache.kylin.engine.spark.source.NSparkTableMetaExplorer;
import org.apache.kylin.engine.spark.source.SparkSqlUtil;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.NTableMetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TableExtDesc;
import org.apache.kylin.source.ISampleDataDeployer;
import org.apache.kylin.source.ISourceMetadataExplorer;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparderEnv;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalog.Database;
import org.apache.spark.sql.catalog.Table;
import org.apache.spark.sql.catalyst.catalog.CatalogTableType;
import org.apache.spark.sql.internal.SQLConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NSparkMetadataExplorer
implements ISourceMetadataExplorer,
ISampleDataDeployer,
Serializable {
    private static final Logger logger = LoggerFactory.getLogger(NSparkMetadataExplorer.class);

    public static String generateCreateSchemaSql(String schemaName) {
        return String.format(Locale.ROOT, "CREATE DATABASE IF NOT EXISTS %s", schemaName);
    }

    public static String[] generateCreateTableSql(TableDesc tableDesc) {
        String dropSql = "DROP TABLE IF EXISTS " + tableDesc.getIdentity();
        StringBuilder ddl = new StringBuilder();
        ddl.append("CREATE TABLE " + tableDesc.getIdentity() + "\n");
        ddl.append("(\n");
        for (int i = 0; i < tableDesc.getColumns().length; ++i) {
            ColumnDesc col = tableDesc.getColumns()[i];
            if (i > 0) {
                ddl.append(",");
            }
            ddl.append(col.getName() + " " + col.getDatatype() + "\n");
        }
        ddl.append(")\n");
        ddl.append("USING com.databricks.spark.csv");
        return new String[]{dropSql, ddl.toString()};
    }

    public NSparkTableMetaExplorer getTableMetaExplorer() {
        return new NSparkTableMetaExplorer();
    }

    public List<String> listDatabases() throws Exception {
        Dataset dataset = SparderEnv.getSparkSession().sql("show databases").select("namespace", new String[0]);
        List databases = dataset.collectAsList().stream().map(row -> row.getString(0)).collect(Collectors.toList());
        if (KylinConfig.getInstanceFromEnv().isDDLLogicalViewEnabled()) {
            String logicalViewDB = KylinConfig.getInstanceFromEnv().getDDLLogicalViewDB();
            databases.forEach(db -> {
                if (db.equalsIgnoreCase(logicalViewDB)) {
                    throw new KylinException((ErrorCodeSupplier)ServerErrorCode.DDL_CHECK_ERROR, "Logical view database should not be duplicated with normal hive database!!!");
                }
            });
            List databasesWithLogicalDB = Lists.newArrayList();
            databasesWithLogicalDB.add(logicalViewDB);
            databasesWithLogicalDB.addAll(databases);
            databases = databasesWithLogicalDB;
        }
        return databases;
    }

    public List<String> listTables(String database) throws Exception {
        UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
        KylinConfig config = KylinConfig.getInstanceFromEnv();
        SparkSession spark = SparderEnv.getSparkSession();
        List tables = Lists.newArrayList();
        try {
            String sql = "show tables";
            if (StringUtils.isNotBlank((String)database)) {
                sql = String.format(Locale.ROOT, sql + " in %s", database);
            }
            Dataset dataset = SparderEnv.getSparkSession().sql(sql).select("tableName", new String[0]);
            tables = dataset.collectAsList().stream().map(row -> row.getString(0)).collect(Collectors.toList());
            if (config.getTableAccessFilterEnable() && config.getKerberosProjectLevelEnable() && UserGroupInformation.isSecurityEnabled()) {
                List accessTables = Lists.newArrayList();
                for (String table : tables) {
                    String tableName = database + "." + table;
                    if (!this.checkTableAccess(tableName)) continue;
                    accessTables.add(table);
                }
                return accessTables;
            }
        }
        catch (Exception e) {
            logger.error("List hive tables failed. user: {}, db: {}", (Object)ugi.getUserName(), (Object)database);
        }
        return tables;
    }

    public boolean checkTableAccess(String tableName) {
        boolean isAccess = true;
        try {
            SparkSession spark = SparderEnv.getSparkSession();
            Table sparkTable = spark.catalog().getTable(tableName);
            Set<Object> needCheckTables = Sets.newHashSet();
            if (sparkTable.tableType().equals(CatalogTableType.VIEW().name())) {
                needCheckTables = SparkSqlUtil.getViewOrignalTables(tableName, SparderEnv.getSparkSession());
            } else {
                needCheckTables.add(tableName);
            }
            String hiveSpecFsLocation = (String)spark.sessionState().conf().getConf(SQLConf.HIVE_SPECIFIC_FS_LOCATION());
            FileSystem fs = null == hiveSpecFsLocation ? HadoopUtil.getWorkingFileSystem() : HadoopUtil.getFileSystem((String)hiveSpecFsLocation);
            for (String string : needCheckTables) {
                String loc = this.getLoc(spark, string, hiveSpecFsLocation);
                if (loc.startsWith(fs.getScheme()) || loc.startsWith("/")) {
                    fs.listStatus(new Path(loc));
                    continue;
                }
                HadoopUtil.getFileSystem((String)loc).listStatus(new Path(loc));
            }
        }
        catch (Exception e) {
            isAccess = false;
            try {
                logger.error("Read hive table {} error:{}, ugi name: {}.", new Object[]{tableName, e.getMessage(), UserGroupInformation.getCurrentUser().getUserName()});
            }
            catch (IOException ex) {
                logger.error("fetch user curr ugi info error.", (Throwable)e);
            }
        }
        return isAccess;
    }

    public boolean checkDatabaseHadoopAccessFast(String database) throws Exception {
        boolean isAccess = true;
        SparkSession spark = SparderEnv.getSparkSession();
        try {
            String databaseLocation = spark.catalog().getDatabase(database).locationUri();
            RemoteIterator<FileStatus> tablesIterator = this.getFilesIterator(databaseLocation, false);
            if (tablesIterator.hasNext()) {
                Path tablePath = ((FileStatus)tablesIterator.next()).getPath();
                this.getFilesIterator(tablePath.toString(), true);
            }
        }
        catch (Exception e) {
            isAccess = false;
            try {
                logger.error("Read hive database {} error:{}, ugi name: {}.", new Object[]{database, e.getMessage(), UserGroupInformation.getCurrentUser().getUserName()});
            }
            catch (IOException ex) {
                logger.error("fetch user curr ugi info error.", (Throwable)e);
            }
        }
        return isAccess;
    }

    private RemoteIterator<FileStatus> getFilesIterator(String location, boolean checkList) throws IOException {
        FileSystem fs;
        SQLConf sparkConf = SparderEnv.getSparkSession().sessionState().conf();
        if (sparkConf.contains("spark.sql.hive.specific.fs.location")) {
            String hiveSpecFsLocation = (String)sparkConf.getConf(SQLConf.HIVE_SPECIFIC_FS_LOCATION());
            location = location.replace("hdfs://hacluster", hiveSpecFsLocation);
            fs = HadoopUtil.getFileSystem((String)hiveSpecFsLocation);
        } else {
            fs = HadoopUtil.getFileSystem((String)location);
        }
        if (checkList) {
            fs.listStatus(new Path(location));
        }
        return fs.listStatusIterator(new Path(location));
    }

    public Pair<TableDesc, TableExtDesc> loadTableMetadata(String database, String tableName, String prj) throws Exception {
        KylinConfig config = KylinConfig.getInstanceFromEnv();
        NTableMetadataManager metaMgr = NTableMetadataManager.getInstance((KylinConfig)config, (String)prj);
        NSparkTableMeta tableMeta = this.getTableMetaExplorer().getSparkTableMeta(database, tableName);
        TableDesc tableDesc = metaMgr.getTableDesc(database + "." + tableName);
        if (tableDesc == null) {
            tableDesc = new TableDesc();
            tableDesc.setDatabase(database.toUpperCase(Locale.ROOT));
            tableDesc.setName(tableName.toUpperCase(Locale.ROOT));
            tableDesc.setUuid(RandomUtil.randomUUIDStr());
            tableDesc.setLastModified(0L);
        } else {
            tableDesc = new TableDesc(tableDesc);
        }
        if (tableMeta.tableType != null) {
            tableDesc.setTableType(tableMeta.tableType);
        }
        tableDesc.setSourceType(9);
        tableDesc.setTransactional(tableMeta.isTransactional);
        tableDesc.setRangePartition(tableMeta.isRangePartition);
        Set partColumnSet = Optional.ofNullable(tableMeta.partitionColumns).orElseGet(Collections::emptyList).stream().map(field -> field.name).collect(Collectors.toSet());
        int columnNumber = tableMeta.allColumns.size();
        ArrayList<ColumnDesc> columns = new ArrayList<ColumnDesc>(columnNumber);
        for (int i = 0; i < columnNumber; ++i) {
            NSparkTableMeta.SparkTableColumnMeta field2 = tableMeta.allColumns.get(i);
            ColumnDesc cdesc = new ColumnDesc();
            cdesc.setName(field2.name.toUpperCase(Locale.ROOT));
            cdesc.setCaseSensitiveName(field2.name);
            if ("float".equalsIgnoreCase(field2.dataType)) {
                cdesc.setDatatype("double");
            } else {
                cdesc.setDatatype(field2.dataType);
            }
            cdesc.setId(String.valueOf(i + 1));
            cdesc.setComment(field2.comment);
            cdesc.setPartitioned(partColumnSet.contains(field2.name));
            columns.add(cdesc);
        }
        tableDesc.setColumns(columns.toArray(new ColumnDesc[columnNumber]));
        List partCols = tableMeta.partitionColumns.stream().map(col -> col.name).collect(Collectors.toList());
        if (!partCols.isEmpty()) {
            tableDesc.setPartitionColumn(((String)partCols.get(0)).toUpperCase(Locale.ROOT));
        } else {
            tableDesc.setPartitionColumn(null);
        }
        StringBuilder partitionColumnBuilder = new StringBuilder();
        int n = tableMeta.partitionColumns.size();
        for (int i = 0; i < n; ++i) {
            if (i > 0) {
                partitionColumnBuilder.append(", ");
            }
            partitionColumnBuilder.append(tableMeta.partitionColumns.get((int)i).name.toUpperCase(Locale.ROOT));
        }
        TableExtDesc tableExtDesc = new TableExtDesc();
        tableExtDesc.setIdentity(tableDesc.getIdentity());
        tableExtDesc.setUuid(RandomUtil.randomUUIDStr());
        tableExtDesc.setLastModified(0L);
        tableExtDesc.init(prj);
        tableExtDesc.addDataSourceProp("location", tableMeta.sdLocation);
        tableExtDesc.addDataSourceProp("owner", tableMeta.owner);
        tableExtDesc.addDataSourceProp("create_time", tableMeta.createTime);
        tableExtDesc.addDataSourceProp("last_access_time", tableMeta.lastAccessTime);
        tableExtDesc.addDataSourceProp("partition_column", partitionColumnBuilder.toString());
        tableExtDesc.addDataSourceProp("total_file_size", String.valueOf(tableMeta.fileSize));
        tableExtDesc.addDataSourceProp("total_file_number", String.valueOf(tableMeta.fileNum));
        tableExtDesc.addDataSourceProp("hive_inputFormat", tableMeta.sdInputFormat);
        tableExtDesc.addDataSourceProp("hive_outputFormat", tableMeta.sdOutputFormat);
        tableExtDesc.addDataSourceProp("s3_role", tableMeta.s3Role);
        tableExtDesc.addDataSourceProp("s3_endpoint", tableMeta.s3Endpoint);
        return Pair.newPair((Object)tableDesc, (Object)tableExtDesc);
    }

    public List<String> getRelatedKylinResources(TableDesc table) {
        return Collections.emptyList();
    }

    public boolean checkDatabaseAccess(String database) throws Exception {
        boolean hiveDBAccessFilterEnable = KapConfig.getInstanceFromEnv().getDBAccessFilterEnable();
        String viewDB = KylinConfig.getInstanceFromEnv().getDDLLogicalViewDB();
        if (viewDB.equalsIgnoreCase(database)) {
            return true;
        }
        if (hiveDBAccessFilterEnable) {
            logger.info("Check database {} access start.", (Object)database);
            try {
                Database database2 = SparderEnv.getSparkSession().catalog().getDatabase(database);
            }
            catch (AnalysisException e) {
                UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
                logger.error("The current user: {} does not have permission to access database {}", (Object)ugi.getUserName(), (Object)database);
                return false;
            }
        }
        return true;
    }

    public boolean checkTablesAccess(Set<String> tables) {
        return tables.stream().allMatch(this::checkTableAccess);
    }

    public Set<String> getTablePartitions(String database, String table, String prj, String partCol) {
        return this.getTableMetaExplorer().checkAndGetTablePartitions(database, table, partCol);
    }

    public void createSampleDatabase(String database) throws Exception {
        SparderEnv.getSparkSession().sql(NSparkMetadataExplorer.generateCreateSchemaSql(database));
    }

    public void createSampleTable(TableDesc table) throws Exception {
        String[] createTableSqls;
        for (String sql : createTableSqls = NSparkMetadataExplorer.generateCreateTableSql(table)) {
            SparderEnv.getSparkSession().sql(sql);
        }
    }

    public void loadSampleData(String tableName, String tableFileDir) throws Exception {
        Dataset dataset = SparderEnv.getSparkSession().read().csv(tableFileDir + "/" + tableName + ".csv").toDF();
        if (tableName.indexOf(".") > 0) {
            tableName = tableName.substring(tableName.indexOf(".") + 1);
        }
        dataset.createOrReplaceTempView(tableName);
    }

    public void createWrapperView(String origTableName, String viewName) throws Exception {
        throw new UnsupportedOperationException("unsupport create wrapper view");
    }

    public String getLoc(SparkSession spark, String table, String hiveSpecFsLocation) {
        String loc = ((Row)spark.sql("desc formatted " + table).where("col_name == 'Location'").head()).getString(1);
        if (null == hiveSpecFsLocation || null == loc) {
            return loc;
        }
        return loc.replace("hdfs://hacluster", hiveSpecFsLocation);
    }
}

