/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client;

import com.codahale.metrics.Timer;
import java.time.Duration;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.HoodieFlinkTableServiceClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.FlinkHoodieIndexFactory;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.FlinkWriteHandleFactory;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.io.MiniBatchHandle;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.SupportsUpgradeDowngrade;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.util.WriteStatMerger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieFlinkWriteClient<T>
extends BaseHoodieWriteClient<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkWriteClient.class);
    private final Map<String, Path> bucketToHandles = new HashMap<String, Path>();

    public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) {
        super(context, writeConfig, (SupportsUpgradeDowngrade)FlinkUpgradeDowngradeHelper.getInstance());
        this.tableServiceClient = new HoodieFlinkTableServiceClient(context, writeConfig, (Option<EmbeddedTimelineService>)this.getTimelineServer());
    }

    protected HoodieIndex createIndex(HoodieWriteConfig writeConfig) {
        return FlinkHoodieIndexFactory.createIndex((HoodieFlinkEngineContext)this.context, this.config);
    }

    public boolean commit(String instantTime, List<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata, String commitActionType, Map<String, List<String>> partitionToReplacedFileIds, Option<BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata>> extraPreCommitFunc) {
        List writeStats = writeStatuses.parallelStream().map(WriteStatus::getStat).collect(Collectors.toList());
        List merged = writeStats.stream().collect(Collectors.groupingBy(writeStat -> writeStat.getPartitionPath() + writeStat.getPath())).values().stream().map(duplicates -> (HoodieWriteStat)duplicates.stream().reduce(WriteStatMerger::merge).get()).collect(Collectors.toList());
        return this.commitStats(instantTime, (HoodieData)HoodieListData.eager(writeStatuses), merged, extraMetadata, commitActionType, partitionToReplacedFileIds, extraPreCommitFunc);
    }

    protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) {
        return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext)this.context);
    }

    protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, HoodieTableMetaClient metaClient) {
        return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext)this.context, metaClient);
    }

    public List<HoodieRecord<T>> filterExists(List<HoodieRecord<T>> hoodieRecords) {
        HoodieFlinkTable<T> table = this.getHoodieTable();
        Timer.Context indexTimer = this.metrics.getIndexCtx();
        List recordsWithLocation = this.getIndex().tagLocation((HoodieData)HoodieListData.eager(hoodieRecords), this.context, table).collectAsList();
        this.metrics.updateIndexMetrics("lookup", this.metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
        return recordsWithLocation.stream().filter(v1 -> !v1.isCurrentLocationKnown()).collect(Collectors.toList());
    }

    public void bootstrap(Option<Map<String, String>> extraMetadata) {
        throw new HoodieNotSupportedException("Bootstrap operation is not supported yet");
    }

    public List<WriteStatus> upsert(List<HoodieRecord<T>> records, String instantTime) {
        HoodieWriteMetadata result;
        HoodieTable table = this.initTable(WriteOperationType.UPSERT, Option.ofNullable((Object)instantTime));
        table.validateUpsertSchema();
        this.preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());
        try (AutoCloseableWriteHandle closeableHandle = new AutoCloseableWriteHandle(records, instantTime, table);){
            result = ((HoodieFlinkTable)table).upsert(this.context, closeableHandle.getWriteHandle(), instantTime, records);
        }
        if (result.getIndexLookupDuration().isPresent()) {
            this.metrics.updateIndexMetrics("lookup", ((Duration)result.getIndexLookupDuration().get()).toMillis());
        }
        return this.postWrite(result, instantTime, table);
    }

    public List<WriteStatus> upsertPreppedRecords(List<HoodieRecord<T>> preppedRecords, String instantTime) {
        HoodieTable table = this.initTable(WriteOperationType.UPSERT, Option.ofNullable((Object)instantTime));
        table.validateUpsertSchema();
        this.preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, table.getMetaClient());
        Map<String, List<HoodieRecord>> preppedRecordsByFileId = ((Stream)preppedRecords.stream().parallel()).collect(Collectors.groupingBy(r -> r.getCurrentLocation().getFileId()));
        return ((Stream)preppedRecordsByFileId.values().stream().parallel()).map(records -> {
            HoodieWriteMetadata result;
            try (AutoCloseableWriteHandle closeableHandle = new AutoCloseableWriteHandle(records, instantTime, table);){
                result = ((HoodieFlinkTable)table).upsertPrepped(this.context, (HoodieWriteHandle)closeableHandle.getWriteHandle(), instantTime, (List)records);
            }
            return this.postWrite(result, instantTime, table);
        }).flatMap(Collection::stream).collect(Collectors.toList());
    }

    public List<WriteStatus> insert(List<HoodieRecord<T>> records, String instantTime) {
        HoodieWriteMetadata result;
        HoodieTable table = this.initTable(WriteOperationType.INSERT, Option.ofNullable((Object)instantTime));
        table.validateInsertSchema();
        this.preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient());
        try (AutoCloseableWriteHandle closeableHandle = new AutoCloseableWriteHandle(records, instantTime, table);){
            result = ((HoodieFlinkTable)table).insert(this.context, closeableHandle.getWriteHandle(), instantTime, records);
        }
        if (result.getIndexLookupDuration().isPresent()) {
            this.metrics.updateIndexMetrics("lookup", ((Duration)result.getIndexLookupDuration().get()).toMillis());
        }
        return this.postWrite(result, instantTime, table);
    }

    public List<WriteStatus> insertOverwrite(List<HoodieRecord<T>> records, String instantTime) {
        HoodieWriteMetadata result;
        HoodieTable table = this.initTable(WriteOperationType.INSERT_OVERWRITE, Option.ofNullable((Object)instantTime));
        table.validateInsertSchema();
        this.preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE, table.getMetaClient());
        try (AutoCloseableWriteHandle closeableHandle = new AutoCloseableWriteHandle(records, instantTime, table, true);){
            result = ((HoodieFlinkTable)table).insertOverwrite(this.context, closeableHandle.getWriteHandle(), instantTime, records);
        }
        return this.postWrite(result, instantTime, table);
    }

    public List<WriteStatus> insertOverwriteTable(List<HoodieRecord<T>> records, String instantTime) {
        HoodieWriteMetadata result;
        HoodieTable table = this.initTable(WriteOperationType.INSERT_OVERWRITE_TABLE, Option.ofNullable((Object)instantTime));
        table.validateInsertSchema();
        this.preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE_TABLE, table.getMetaClient());
        try (AutoCloseableWriteHandle closeableHandle = new AutoCloseableWriteHandle(records, instantTime, table, true);){
            result = ((HoodieFlinkTable)table).insertOverwriteTable(this.context, closeableHandle.getWriteHandle(), instantTime, records);
        }
        return this.postWrite(result, instantTime, table);
    }

    public List<WriteStatus> insertPreppedRecords(List<HoodieRecord<T>> preppedRecords, String instantTime) {
        throw new HoodieNotSupportedException("InsertPrepped operation is not supported yet");
    }

    public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> records, String instantTime) {
        throw new HoodieNotSupportedException("BulkInsert operation is not supported yet");
    }

    public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> records, String instantTime, Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
        throw new HoodieNotSupportedException("BulkInsert operation is not supported yet");
    }

    public List<WriteStatus> bulkInsertPreppedRecords(List<HoodieRecord<T>> preppedRecords, String instantTime, Option<BulkInsertPartitioner> bulkInsertPartitioner) {
        HoodieTable table = this.initTable(WriteOperationType.BULK_INSERT_PREPPED, Option.ofNullable((Object)instantTime));
        table.validateInsertSchema();
        this.preWrite(instantTime, WriteOperationType.BULK_INSERT_PREPPED, table.getMetaClient());
        Map<String, List<HoodieRecord>> preppedRecordsByFileId = ((Stream)preppedRecords.stream().parallel()).collect(Collectors.groupingBy(r -> r.getCurrentLocation().getFileId()));
        return ((Stream)preppedRecordsByFileId.values().stream().parallel()).map(records -> {
            HoodieWriteMetadata result;
            records.sort(Comparator.comparing(HoodieRecord::getRecordKey));
            ((HoodieRecord)records.get(0)).getCurrentLocation().setInstantTime("I");
            try (AutoCloseableWriteHandle closeableHandle = new AutoCloseableWriteHandle(records, instantTime, table, true);){
                result = ((HoodieFlinkTable)table).bulkInsertPrepped(this.context, (HoodieWriteHandle)closeableHandle.getWriteHandle(), instantTime, (List)records);
            }
            return this.postWrite(result, instantTime, table);
        }).flatMap(Collection::stream).collect(Collectors.toList());
    }

    public List<WriteStatus> delete(List<HoodieKey> keys, String instantTime) {
        HoodieTable table = this.initTable(WriteOperationType.DELETE, Option.ofNullable((Object)instantTime));
        this.preWrite(instantTime, WriteOperationType.DELETE, table.getMetaClient());
        HoodieWriteMetadata result = table.delete(this.context, instantTime, keys);
        return this.postWrite(result, instantTime, table);
    }

    public List<WriteStatus> deletePrepped(List<HoodieRecord<T>> preppedRecords, String instantTime) {
        HoodieTable table = this.initTable(WriteOperationType.DELETE_PREPPED, Option.ofNullable((Object)instantTime));
        this.preWrite(instantTime, WriteOperationType.DELETE_PREPPED, table.getMetaClient());
        HoodieWriteMetadata result = table.deletePrepped(this.context, instantTime, preppedRecords);
        return this.postWrite(result, instantTime, table);
    }

    public List<WriteStatus> deletePartitions(List<String> partitions, String instantTime) {
        HoodieTable table = this.initTable(WriteOperationType.DELETE_PARTITION, Option.ofNullable((Object)instantTime));
        this.preWrite(instantTime, WriteOperationType.DELETE_PARTITION, table.getMetaClient());
        HoodieWriteMetadata result = table.deletePartitions(this.context, instantTime, partitions);
        return this.postWrite(result, instantTime, table);
    }

    public void preWrite(String instantTime, WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) {
        this.setOperationType(writeOperationType);
    }

    public void preTxn(HoodieTableMetaClient metaClient) {
        if (this.txnManager.isLockRequired()) {
            metaClient.reloadActiveTimeline();
            this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata((HoodieTableMetaClient)metaClient);
            this.pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants((HoodieTableMetaClient)metaClient);
        }
    }

    public void initMetadataTable() {
        ((HoodieFlinkTableServiceClient)this.tableServiceClient).initMetadataTable();
    }

    public void startAsyncCleaning() {
        this.tableServiceClient.startAsyncCleanerService((BaseHoodieWriteClient)this);
    }

    public void waitForCleaningFinish() {
        if (this.tableServiceClient.asyncCleanerService != null) {
            LOG.info("Cleaner has been spawned already. Waiting for it to finish");
            this.tableServiceClient.asyncClean();
            LOG.info("Cleaner has finished");
        }
    }

    public List<WriteStatus> postWrite(HoodieWriteMetadata<List<WriteStatus>> result, String instantTime, HoodieTable hoodieTable) {
        if (result.getIndexLookupDuration().isPresent()) {
            this.metrics.updateIndexMetrics(this.getOperationType().name(), ((Duration)result.getIndexUpdateDuration().get()).toMillis());
        }
        return (List)result.getWriteStatuses();
    }

    protected void mayBeCleanAndArchive(HoodieTable table) {
        this.autoArchiveOnCommit(table);
    }

    protected HoodieWriteMetadata<List<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) {
        return this.tableServiceClient.compact(compactionInstantTime, shouldComplete);
    }

    public HoodieWriteMetadata<List<WriteStatus>> cluster(String clusteringInstant, boolean shouldComplete) {
        throw new HoodieNotSupportedException("Clustering is not supported yet");
    }

    private void completeClustering(HoodieReplaceCommitMetadata metadata, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, String clusteringCommitTime, Option<HoodieData<WriteStatus>> writeStatuses) {
        ((HoodieFlinkTableServiceClient)this.tableServiceClient).completeClustering(metadata, table, clusteringCommitTime, writeStatuses);
    }

    protected void doInitTable(WriteOperationType operationType, HoodieTableMetaClient metaClient, Option<String> instantTime) {
    }

    public void completeTableService(TableServiceType tableServiceType, HoodieCommitMetadata metadata, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, String commitInstant, Option<HoodieData<WriteStatus>> writeStatuses) {
        switch (tableServiceType) {
            case CLUSTER: {
                this.completeClustering((HoodieReplaceCommitMetadata)metadata, table, commitInstant, writeStatuses);
                break;
            }
            case COMPACT: {
                this.completeCompaction(metadata, table, commitInstant);
                break;
            }
            default: {
                throw new IllegalArgumentException("This table service is not valid " + tableServiceType);
            }
        }
    }

    public void upgradeDowngrade(String instantTime, HoodieTableMetaClient metaClient) {
        new UpgradeDowngrade(metaClient, this.config, this.context, (SupportsUpgradeDowngrade)FlinkUpgradeDowngradeHelper.getInstance()).run(HoodieTableVersion.current(), instantTime);
    }

    public void cleanHandles() {
        this.bucketToHandles.clear();
    }

    public void close() {
        super.close();
        this.cleanHandles();
    }

    private HoodieWriteHandle<?, ?, ?, ?> getOrCreateWriteHandle(HoodieRecord<T> record, HoodieWriteConfig config, String instantTime, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, Iterator<HoodieRecord<T>> recordItr, boolean overwrite) {
        FlinkWriteHandleFactory.Factory<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> writeHandleFactory = FlinkWriteHandleFactory.getFactory(table.getMetaClient().getTableConfig(), config, overwrite);
        return writeHandleFactory.create(this.bucketToHandles, record, config, instantTime, table, recordItr);
    }

    public HoodieFlinkTable<T> getHoodieTable() {
        return HoodieFlinkTable.create(this.config, (HoodieFlinkEngineContext)this.context);
    }

    public Map<String, List<String>> getPartitionToReplacedFileIds(WriteOperationType writeOperationType, List<WriteStatus> writeStatuses) {
        HoodieFlinkTable<T> table = this.getHoodieTable();
        switch (writeOperationType) {
            case INSERT_OVERWRITE: {
                return writeStatuses.stream().map(status -> status.getStat().getPartitionPath()).distinct().collect(Collectors.toMap(partition -> partition, partitionPath -> this.getAllExistingFileIds(table, (String)partitionPath)));
            }
            case INSERT_OVERWRITE_TABLE: {
                Map<String, List<String>> partitionToExistingFileIds = new HashMap<String, List<String>>();
                List partitionPaths = FSUtils.getAllPartitionPaths((HoodieEngineContext)this.context, (HoodieStorage)table.getStorage(), (HoodieMetadataConfig)this.config.getMetadataConfig(), (String)table.getMetaClient().getBasePath());
                if (partitionPaths != null && partitionPaths.size() > 0) {
                    this.context.setJobStatus(((Object)((Object)this)).getClass().getSimpleName(), "Getting ExistingFileIds of all partitions: " + this.config.getTableName());
                    partitionToExistingFileIds = ((Stream)partitionPaths.stream().parallel()).collect(Collectors.toMap(partition -> partition, partition -> this.getAllExistingFileIds(table, (String)partition)));
                }
                return partitionToExistingFileIds;
            }
        }
        throw new AssertionError();
    }

    private List<String> getAllExistingFileIds(HoodieFlinkTable<T> table, String partitionPath) {
        return table.getSliceView().getLatestFileSlices(partitionPath).map(FileSlice::getFileId).distinct().collect(Collectors.toList());
    }

    private final class AutoCloseableWriteHandle
    implements AutoCloseable {
        private final HoodieWriteHandle<?, ?, ?, ?> writeHandle;

        AutoCloseableWriteHandle(List<HoodieRecord<T>> records, String instantTime, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table) {
            this(records, instantTime, table, false);
        }

        AutoCloseableWriteHandle(List<HoodieRecord<T>> records, String instantTime, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, boolean overwrite) {
            this.writeHandle = HoodieFlinkWriteClient.this.getOrCreateWriteHandle(records.get(0), HoodieFlinkWriteClient.this.getConfig(), instantTime, table, records.listIterator(), overwrite);
        }

        HoodieWriteHandle<?, ?, ?, ?> getWriteHandle() {
            return this.writeHandle;
        }

        @Override
        public void close() {
            ((MiniBatchHandle)this.writeHandle).closeGracefully();
        }
    }
}

