/*
 * Decompiled with CFR 0.152.
 */
package com.netease.arctic.hive.utils;

import com.netease.arctic.hive.HMSClientPool;
import com.netease.arctic.hive.utils.HivePartitionUtil;
import com.netease.arctic.hive.utils.HiveSchemaUtil;
import com.netease.arctic.io.ArcticHadoopFileIO;
import com.netease.arctic.op.OverwriteBaseFiles;
import com.netease.arctic.table.ArcticTable;
import com.netease.arctic.table.KeyedTable;
import com.netease.arctic.table.TableIdentifier;
import com.netease.arctic.utils.TablePropertyUtil;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.OverwriteFiles;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.data.TableMigrationUtil;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructLikeMap;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveMetaSynchronizer {
    private static final Logger LOG = LoggerFactory.getLogger(HiveMetaSynchronizer.class);

    public static void syncHiveSchemaToArctic(ArcticTable table, HMSClientPool hiveClient) {
        try {
            Table hiveTable = (Table)hiveClient.run(client -> client.getTable(table.id().getDatabase(), table.id().getTableName()));
            Schema hiveSchema = HiveSchemaUtil.convertHiveSchemaToIcebergSchema(hiveTable, table.isKeyedTable() ? table.asKeyedTable().primaryKeySpec().fieldNames() : new ArrayList());
            UpdateSchema updateSchema = table.updateSchema();
            boolean update = HiveMetaSynchronizer.updateStructSchema(table.id(), updateSchema, null, table.schema().asStruct(), hiveSchema.asStruct());
            if (update) {
                updateSchema.commit();
            }
        }
        catch (InterruptedException | TException e) {
            throw new RuntimeException("Failed to get hive table:" + table.id(), e);
        }
    }

    private static boolean updateStructSchema(TableIdentifier tableIdentifier, UpdateSchema updateSchema, String parentName, Types.StructType icebergStruct, Types.StructType hiveStruct) {
        boolean update = false;
        for (int i = 0; i < hiveStruct.fields().size(); ++i) {
            String columnName;
            Types.NestedField hiveField = (Types.NestedField)hiveStruct.fields().get(i);
            Types.NestedField icebergField = icebergStruct.field(hiveField.name());
            if (icebergField == null) {
                updateSchema.addColumn(parentName, hiveField.name(), hiveField.type(), hiveField.doc());
                update = true;
                LOG.info("Table {} sync new hive column {} to arctic", (Object)tableIdentifier, (Object)hiveField);
                continue;
            }
            if (icebergField.type().equals(hiveField.type()) && Objects.equals(icebergField.doc(), hiveField.doc())) continue;
            if (hiveField.type().isPrimitiveType() && icebergField.type().isPrimitiveType()) {
                if (TypeUtil.isPromotionAllowed((Type)icebergField.type().asPrimitiveType(), (Type.PrimitiveType)hiveField.type().asPrimitiveType())) {
                    columnName = parentName == null ? hiveField.name() : parentName + "." + hiveField.name();
                    updateSchema.updateColumn(columnName, hiveField.type().asPrimitiveType(), hiveField.doc());
                    update = true;
                    LOG.info("Table {} sync hive column {} to arctic", (Object)tableIdentifier, (Object)hiveField);
                    continue;
                }
                LOG.warn("Table {} sync hive column {} to arctic failed, because of type mismatch", (Object)tableIdentifier, (Object)hiveField);
                continue;
            }
            if (hiveField.type().isStructType() && icebergField.type().isStructType()) {
                columnName = parentName == null ? hiveField.name() : parentName + "." + hiveField.name();
                update = update || HiveMetaSynchronizer.updateStructSchema(tableIdentifier, updateSchema, columnName, icebergField.type().asStructType(), hiveField.type().asStructType());
                continue;
            }
            LOG.warn("Table {} sync hive column {} to arctic failed, because of type mismatch", (Object)tableIdentifier, (Object)hiveField);
        }
        return update;
    }

    public static void syncHiveDataToArctic(ArcticTable table, HMSClientPool hiveClient) {
        block35: {
            Object baseStore = table.isKeyedTable() ? table.asKeyedTable().baseTable() : table.asUnkeyedTable();
            try {
                if (table.spec().isUnpartitioned()) {
                    Table hiveTable = (Table)hiveClient.run(client -> client.getTable(table.id().getDatabase(), table.id().getTableName()));
                    String hiveTransientTime = (String)hiveTable.getParameters().get("transient_lastDdlTime");
                    StructLikeMap structLikeMap = baseStore.partitionProperty();
                    String arcticTransientTime = null;
                    if (structLikeMap.get((Object)TablePropertyUtil.EMPTY_STRUCT) != null) {
                        arcticTransientTime = (String)((Map)structLikeMap.get((Object)TablePropertyUtil.EMPTY_STRUCT)).get("transient-time");
                    }
                    if (arcticTransientTime != null && arcticTransientTime.equals(hiveTransientTime)) break block35;
                    List<DataFile> hiveDataFiles = HiveMetaSynchronizer.listHivePartitionFiles(table, Maps.newHashMap(), hiveTable.getSd().getLocation());
                    ArrayList deleteFiles = Lists.newArrayList();
                    try (CloseableIterable fileScanTasks = baseStore.newScan().planFiles();){
                        fileScanTasks.forEach(fileScanTask -> deleteFiles.add(fileScanTask.file()));
                    }
                    catch (IOException e) {
                        throw new UncheckedIOException("Failed to close table scan of " + table.name(), e);
                    }
                    HiveMetaSynchronizer.overwriteTable(table, deleteFiles, hiveDataFiles);
                    break block35;
                }
                List hivePartitions = (List)hiveClient.run(client -> client.listPartitions(table.id().getDatabase(), table.id().getTableName(), (short)Short.MAX_VALUE));
                ListMultimap filesGroupedByPartition = Multimaps.newListMultimap((Map)Maps.newHashMap(), Lists::newArrayList);
                TableScan tableScan = baseStore.newScan();
                try (CloseableIterable fileScanTasks = tableScan.planFiles();){
                    for (FileScanTask fileScanTask2 : fileScanTasks) {
                        filesGroupedByPartition.put((Object)fileScanTask2.file().partition(), (Object)fileScanTask2.file());
                    }
                }
                catch (IOException e) {
                    throw new UncheckedIOException("Failed to close table scan of " + table.name(), e);
                }
                Map filesMap = filesGroupedByPartition.asMap();
                ArrayList filesToDelete = Lists.newArrayList();
                ArrayList filesToAdd = Lists.newArrayList();
                ArrayList icebergPartitions = Lists.newArrayList(filesMap.keySet());
                for (Partition hivePartition : hivePartitions) {
                    String arcticTransientTime;
                    StructLike partitionData = HivePartitionUtil.buildPartitionData(hivePartition.getValues(), table.spec());
                    icebergPartitions.remove(partitionData);
                    String hiveTransientTime = (String)hivePartition.getParameters().get("transient_lastDdlTime");
                    String string = arcticTransientTime = baseStore.partitionProperty().containsKey((Object)partitionData) ? (String)((Map)baseStore.partitionProperty().get((Object)partitionData)).get("transient-time") : null;
                    if (arcticTransientTime != null && arcticTransientTime.equals(hiveTransientTime)) continue;
                    List<DataFile> hiveDataFiles = HiveMetaSynchronizer.listHivePartitionFiles(table, HiveMetaSynchronizer.buildPartitionValueMap(hivePartition.getValues(), table.spec()), hivePartition.getSd().getLocation());
                    if (filesMap.get(partitionData) != null) {
                        filesToDelete.addAll((Collection)filesMap.get(partitionData));
                        filesToAdd.addAll(hiveDataFiles);
                        continue;
                    }
                    if (hivePartition.getParameters().get("arctic.enabled") != null || hivePartition.getParameters().get("arctic.enable") != null) continue;
                    filesToAdd.addAll(hiveDataFiles);
                }
                icebergPartitions.forEach(partition -> {
                    ArrayList dataFiles = Lists.newArrayList((Iterable)((Iterable)filesMap.get(partition)));
                    if (dataFiles.size() > 0 && !table.io().exists(((DataFile)dataFiles.get(0)).path().toString())) {
                        filesToDelete.addAll((Collection)filesMap.get(partition));
                    }
                });
                HiveMetaSynchronizer.overwriteTable(table, filesToDelete, filesToAdd);
            }
            catch (InterruptedException | TException e) {
                throw new RuntimeException("Failed to get hive table:" + table.id(), e);
            }
        }
    }

    private static List<DataFile> listHivePartitionFiles(ArcticTable arcticTable, Map<String, String> partitionValueMap, String partitionLocation) {
        return (List)arcticTable.io().doAs(() -> TableMigrationUtil.listPartition((Map)partitionValueMap, (String)partitionLocation, (String)arcticTable.properties().getOrDefault("write.format.default", "parquet"), (PartitionSpec)arcticTable.spec(), (Configuration)((ArcticHadoopFileIO)arcticTable.io()).getTableMetaStore().getConfiguration(), (MetricsConfig)MetricsConfig.fromProperties((Map)arcticTable.properties()), (NameMapping)NameMappingParser.fromJson((String)((String)arcticTable.properties().get("schema.name-mapping.default")))));
    }

    private static Map<String, String> buildPartitionValueMap(List<String> partitionValues, PartitionSpec spec) {
        HashMap partitionValueMap = Maps.newHashMap();
        for (int i = 0; i < partitionValues.size(); ++i) {
            partitionValueMap.put(((PartitionField)spec.fields().get(i)).name(), partitionValues.get(i));
        }
        return partitionValueMap;
    }

    private static void overwriteTable(ArcticTable table, List<DataFile> filesToDelete, List<DataFile> filesToAdd) {
        if (filesToDelete.size() > 0 || filesToAdd.size() > 0) {
            LOG.info("Table {} sync hive data change to arctic, delete files: {}, add files {}", new Object[]{table.id(), filesToDelete.stream().map(ContentFile::path).collect(Collectors.toList()), filesToAdd.stream().map(ContentFile::path).collect(Collectors.toList())});
            if (table.isKeyedTable()) {
                long legacyTxId = table.asKeyedTable().beginTransaction(null);
                long txId = TablePropertyUtil.allocateTransactionId((KeyedTable)table.asKeyedTable());
                OverwriteBaseFiles overwriteBaseFiles = table.asKeyedTable().newOverwriteBaseFiles();
                overwriteBaseFiles.set("validate-location", "false");
                filesToDelete.forEach(arg_0 -> ((OverwriteBaseFiles)overwriteBaseFiles).deleteFile(arg_0));
                filesToAdd.forEach(arg_0 -> ((OverwriteBaseFiles)overwriteBaseFiles).addFile(arg_0));
                overwriteBaseFiles.withTransactionIdForChangedPartition(txId);
                overwriteBaseFiles.commit();
            } else {
                OverwriteFiles overwriteFiles = table.asUnkeyedTable().newOverwrite();
                overwriteFiles.set("validate-location", "false");
                filesToDelete.forEach(arg_0 -> ((OverwriteFiles)overwriteFiles).deleteFile(arg_0));
                filesToAdd.forEach(arg_0 -> ((OverwriteFiles)overwriteFiles).addFile(arg_0));
                overwriteFiles.commit();
            }
        }
    }
}

