/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client;

import com.codahale.metrics.Timer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.async.AsyncCleanerService;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.FlinkHoodieIndexFactory;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.FlinkAppendHandle;
import org.apache.hudi.io.FlinkConcatAndReplaceHandle;
import org.apache.hudi.io.FlinkConcatHandle;
import org.apache.hudi.io.FlinkCreateHandle;
import org.apache.hudi.io.FlinkMergeAndReplaceHandle;
import org.apache.hudi.io.FlinkMergeHandle;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.io.MiniBatchHandle;
import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.compact.CompactHelpers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.SupportsUpgradeDowngrade;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.util.FlinkClientUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieFlinkWriteClient<T extends HoodieRecordPayload>
extends BaseHoodieWriteClient<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkWriteClient.class);
    private final Map<String, Path> bucketToHandles = new HashMap<String, Path>();

    public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) {
        super(context, writeConfig, (SupportsUpgradeDowngrade)FlinkUpgradeDowngradeHelper.getInstance());
    }

    protected HoodieIndex createIndex(HoodieWriteConfig writeConfig) {
        return FlinkHoodieIndexFactory.createIndex((HoodieFlinkEngineContext)this.context, this.config);
    }

    public boolean commit(String instantTime, List<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata, String commitActionType, Map<String, List<String>> partitionToReplacedFileIds) {
        List writeStats = writeStatuses.parallelStream().map(WriteStatus::getStat).collect(Collectors.toList());
        return this.commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds);
    }

    protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) {
        return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext)this.context);
    }

    protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, HoodieTableMetaClient metaClient) {
        return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext)this.context, metaClient);
    }

    public List<HoodieRecord<T>> filterExists(List<HoodieRecord<T>> hoodieRecords) {
        HoodieFlinkTable<T> table = this.getHoodieTable();
        Timer.Context indexTimer = this.metrics.getIndexCtx();
        List recordsWithLocation = this.getIndex().tagLocation((HoodieData)HoodieListData.eager(hoodieRecords), this.context, table).collectAsList();
        this.metrics.updateIndexMetrics("lookup", this.metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
        return recordsWithLocation.stream().filter(v1 -> !v1.isCurrentLocationKnown()).collect(Collectors.toList());
    }

    public void bootstrap(Option<Map<String, String>> extraMetadata) {
        throw new HoodieNotSupportedException("Bootstrap operation is not supported yet");
    }

    public List<WriteStatus> upsert(List<HoodieRecord<T>> records, String instantTime) {
        HoodieWriteMetadata result;
        HoodieTable table = this.initTable(WriteOperationType.UPSERT, Option.ofNullable((Object)instantTime));
        table.validateUpsertSchema();
        this.preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());
        try (AutoCloseableWriteHandle closeableHandle = new AutoCloseableWriteHandle(records, instantTime, table);){
            result = ((HoodieFlinkTable)table).upsert(this.context, closeableHandle.getWriteHandle(), instantTime, records);
        }
        if (result.getIndexLookupDuration().isPresent()) {
            this.metrics.updateIndexMetrics("lookup", ((Duration)result.getIndexLookupDuration().get()).toMillis());
        }
        return this.postWrite(result, instantTime, table);
    }

    public List<WriteStatus> upsertPreppedRecords(List<HoodieRecord<T>> preppedRecords, String instantTime) {
        HoodieTable table = this.initTable(WriteOperationType.UPSERT, Option.ofNullable((Object)instantTime));
        table.validateUpsertSchema();
        this.preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, table.getMetaClient());
        Map<String, List<HoodieRecord>> preppedRecordsByFileId = ((Stream)preppedRecords.stream().parallel()).collect(Collectors.groupingBy(r -> r.getCurrentLocation().getFileId()));
        return ((Stream)preppedRecordsByFileId.values().stream().parallel()).map(records -> {
            HoodieWriteMetadata result;
            try (AutoCloseableWriteHandle closeableHandle = new AutoCloseableWriteHandle(records, instantTime, table);){
                result = ((HoodieFlinkTable)table).upsertPrepped(this.context, (HoodieWriteHandle)closeableHandle.getWriteHandle(), instantTime, (List)records);
            }
            return this.postWrite(result, instantTime, table);
        }).flatMap(Collection::stream).collect(Collectors.toList());
    }

    public List<WriteStatus> insert(List<HoodieRecord<T>> records, String instantTime) {
        HoodieWriteMetadata result;
        HoodieTable table = this.initTable(WriteOperationType.INSERT, Option.ofNullable((Object)instantTime));
        table.validateInsertSchema();
        this.preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient());
        try (AutoCloseableWriteHandle closeableHandle = new AutoCloseableWriteHandle(records, instantTime, table);){
            result = ((HoodieFlinkTable)table).insert(this.context, closeableHandle.getWriteHandle(), instantTime, records);
        }
        if (result.getIndexLookupDuration().isPresent()) {
            this.metrics.updateIndexMetrics("lookup", ((Duration)result.getIndexLookupDuration().get()).toMillis());
        }
        return this.postWrite(result, instantTime, table);
    }

    public List<WriteStatus> insertOverwrite(List<HoodieRecord<T>> records, String instantTime) {
        HoodieWriteMetadata result;
        HoodieTable table = this.initTable(WriteOperationType.INSERT_OVERWRITE, Option.ofNullable((Object)instantTime));
        table.validateInsertSchema();
        this.preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE, table.getMetaClient());
        try (AutoCloseableWriteHandle closeableHandle = new AutoCloseableWriteHandle(records, instantTime, table);){
            result = ((HoodieFlinkTable)table).insertOverwrite(this.context, closeableHandle.getWriteHandle(), instantTime, records);
        }
        return this.postWrite(result, instantTime, table);
    }

    public List<WriteStatus> insertOverwriteTable(List<HoodieRecord<T>> records, String instantTime) {
        HoodieWriteMetadata result;
        HoodieTable table = this.initTable(WriteOperationType.INSERT_OVERWRITE_TABLE, Option.ofNullable((Object)instantTime));
        table.validateInsertSchema();
        this.preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE_TABLE, table.getMetaClient());
        try (AutoCloseableWriteHandle closeableHandle = new AutoCloseableWriteHandle(records, instantTime, table);){
            result = ((HoodieFlinkTable)table).insertOverwriteTable(this.context, closeableHandle.getWriteHandle(), instantTime, records);
        }
        return this.postWrite(result, instantTime, table);
    }

    public List<WriteStatus> insertPreppedRecords(List<HoodieRecord<T>> preppedRecords, String instantTime) {
        throw new HoodieNotSupportedException("InsertPrepped operation is not supported yet");
    }

    public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> records, String instantTime) {
        throw new HoodieNotSupportedException("BulkInsert operation is not supported yet");
    }

    public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> records, String instantTime, Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
        throw new HoodieNotSupportedException("BulkInsert operation is not supported yet");
    }

    public List<WriteStatus> bulkInsertPreppedRecords(List<HoodieRecord<T>> preppedRecords, String instantTime, Option<BulkInsertPartitioner> bulkInsertPartitioner) {
        throw new HoodieNotSupportedException("BulkInsertPrepped operation is not supported yet");
    }

    public List<WriteStatus> delete(List<HoodieKey> keys, String instantTime) {
        HoodieTable table = this.initTable(WriteOperationType.DELETE, Option.ofNullable((Object)instantTime));
        this.preWrite(instantTime, WriteOperationType.DELETE, table.getMetaClient());
        HoodieWriteMetadata result = table.delete(this.context, instantTime, keys);
        return this.postWrite(result, instantTime, table);
    }

    public void preWrite(String instantTime, WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) {
        this.setOperationType(writeOperationType);
    }

    protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
        try (HoodieBackedTableMetadataWriter metadataWriter = this.initMetadataWriter();){
            metadataWriter.update(metadata, instantTime, this.getHoodieTable().isTableServiceAction(actionType, instantTime));
        }
        catch (Exception e) {
            throw new HoodieException("Failed to update metadata", (Throwable)e);
        }
    }

    public HoodieBackedTableMetadataWriter initMetadataWriter() {
        return (HoodieBackedTableMetadataWriter)FlinkHoodieBackedTableMetadataWriter.create(FlinkClientUtil.getHadoopConf(), this.config, HoodieFlinkEngineContext.DEFAULT);
    }

    public void initMetadataTable() {
        HoodieFlinkTable<T> table = this.getHoodieTable();
        if (this.config.isMetadataTableEnabled()) {
            try {
                this.txnManager.getLockManager().lock();
                this.initMetadataWriter().close();
            }
            catch (Exception e) {
                throw new HoodieException("Failed to initialize metadata table", (Throwable)e);
            }
            finally {
                this.txnManager.getLockManager().unlock();
            }
            table.deleteMetadataIndexIfNecessary();
        } else {
            table.maybeDeleteMetadataTable();
        }
    }

    public void startAsyncCleaning() {
        if (this.asyncCleanerService == null) {
            this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled((BaseHoodieWriteClient)this);
        } else {
            this.asyncCleanerService.start(null);
        }
    }

    public void waitForCleaningFinish() {
        if (this.asyncCleanerService != null) {
            LOG.info("Cleaner has been spawned already. Waiting for it to finish");
            AsyncCleanerService.waitForCompletion((AsyncCleanerService)this.asyncCleanerService);
            LOG.info("Cleaner has finished");
        }
    }

    protected List<WriteStatus> postWrite(HoodieWriteMetadata<List<WriteStatus>> result, String instantTime, HoodieTable hoodieTable) {
        if (result.getIndexLookupDuration().isPresent()) {
            this.metrics.updateIndexMetrics(this.getOperationType().name(), ((Duration)result.getIndexUpdateDuration().get()).toMillis());
        }
        return (List)result.getWriteStatuses();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) {
        try {
            WriteMarkersFactory.get((MarkerType)this.config.getMarkersType(), (HoodieTable)this.createTable(this.config, this.hadoopConf), (String)instantTime).quietDeleteMarkerDir(this.context, this.config.getMarkersDeleteParallelism());
        }
        finally {
            this.heartbeatClient.stop(instantTime);
        }
    }

    protected void mayBeCleanAndArchive(HoodieTable table) {
        this.autoArchiveOnCommit(table);
    }

    public void commitCompaction(String compactionInstantTime, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
        HoodieFlinkTable<T> table = this.getHoodieTable();
        extraMetadata.ifPresent(m -> m.forEach((arg_0, arg_1) -> ((HoodieCommitMetadata)metadata).addMetadata(arg_0, arg_1)));
        this.completeCompaction(metadata, table, compactionInstantTime);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime) {
        this.context.setJobStatus(((Object)((Object)this)).getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + this.config.getTableName());
        List writeStats = metadata.getWriteStats();
        HoodieInstant compactionInstant = HoodieTimeline.getCompactionInflightInstant((String)compactionCommitTime);
        try {
            this.txnManager.beginTransaction(Option.of((Object)compactionInstant), Option.empty());
            this.finalizeWrite(table, compactionCommitTime, writeStats);
            this.writeTableMetadata(table, compactionCommitTime, compactionInstant.getAction(), metadata);
            LOG.info("Committing Compaction {} finished with result {}.", (Object)compactionCommitTime, (Object)metadata);
            CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
        }
        finally {
            this.txnManager.endTransaction(Option.of((Object)compactionInstant));
        }
        WriteMarkersFactory.get((MarkerType)this.config.getMarkersType(), (HoodieTable)table, (String)compactionCommitTime).quietDeleteMarkerDir(this.context, this.config.getMarkersDeleteParallelism());
        if (this.compactionTimer != null) {
            long durationInMs = this.metrics.getDurationInMs(this.compactionTimer.stop());
            try {
                this.metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime((String)compactionCommitTime).getTime(), durationInMs, metadata, "compaction");
            }
            catch (ParseException e) {
                throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction " + this.config.getBasePath() + " at time " + compactionCommitTime, (Throwable)e);
            }
        }
        LOG.info("Compacted successfully on commit " + compactionCommitTime);
    }

    protected HoodieWriteMetadata<List<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) {
        HoodieWriteMetadata compactionMetadata = this.getHoodieTable().compact(this.context, compactionInstantTime);
        this.commitCompaction(compactionInstantTime, (HoodieCommitMetadata)compactionMetadata.getCommitMetadata().get(), (Option<Map<String, String>>)Option.empty());
        return compactionMetadata;
    }

    public HoodieWriteMetadata<List<WriteStatus>> cluster(String clusteringInstant, boolean shouldComplete) {
        throw new HoodieNotSupportedException("Clustering is not supported yet");
    }

    private void completeClustering(HoodieReplaceCommitMetadata metadata, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, String clusteringCommitTime) {
        this.context.setJobStatus(((Object)((Object)this)).getClass().getSimpleName(), "Collect clustering write status and commit clustering");
        HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, "replacecommit", clusteringCommitTime);
        List writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e -> ((List)e.getValue()).stream()).collect(Collectors.toList());
        if (writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum() > 0L) {
            throw new HoodieClusteringException("Clustering failed to write to files:" + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(HoodieWriteStat::getFileId).collect(Collectors.joining(",")));
        }
        try {
            this.txnManager.beginTransaction(Option.of((Object)clusteringInstant), Option.empty());
            this.finalizeWrite(table, clusteringCommitTime, writeStats);
            this.writeTableMetadata(table, clusteringCommitTime, clusteringInstant.getAction(), (HoodieCommitMetadata)metadata);
            LOG.info("Committing Clustering {} finished with result {}.", (Object)clusteringCommitTime, (Object)metadata);
            table.getActiveTimeline().transitionReplaceInflightToComplete(HoodieTimeline.getReplaceCommitInflightInstant((String)clusteringCommitTime), Option.of((Object)metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
        }
        catch (IOException e2) {
            throw new HoodieClusteringException("Failed to commit " + table.getMetaClient().getBasePath() + " at time " + clusteringCommitTime, (Throwable)e2);
        }
        finally {
            this.txnManager.endTransaction(Option.of((Object)clusteringInstant));
        }
        WriteMarkersFactory.get((MarkerType)this.config.getMarkersType(), table, (String)clusteringCommitTime).quietDeleteMarkerDir(this.context, this.config.getMarkersDeleteParallelism());
        if (this.clusteringTimer != null) {
            long durationInMs = this.metrics.getDurationInMs(this.clusteringTimer.stop());
            try {
                this.metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime((String)clusteringCommitTime).getTime(), durationInMs, (HoodieCommitMetadata)metadata, "replacecommit");
            }
            catch (ParseException e3) {
                throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction " + this.config.getBasePath() + " at time " + clusteringCommitTime, (Throwable)e3);
            }
        }
        LOG.info("Clustering successfully on commit " + clusteringCommitTime);
    }

    protected void doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
    }

    public void completeTableService(TableServiceType tableServiceType, HoodieCommitMetadata metadata, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, String commitInstant) {
        switch (tableServiceType) {
            case CLUSTER: {
                this.completeClustering((HoodieReplaceCommitMetadata)metadata, table, commitInstant);
                break;
            }
            case COMPACT: {
                this.completeCompaction(metadata, table, commitInstant);
                break;
            }
            default: {
                throw new IllegalArgumentException("This table service is not valid " + tableServiceType);
            }
        }
    }

    public void upgradeDowngrade(String instantTime, HoodieTableMetaClient metaClient) {
        new UpgradeDowngrade(metaClient, this.config, this.context, (SupportsUpgradeDowngrade)FlinkUpgradeDowngradeHelper.getInstance()).run(HoodieTableVersion.current(), instantTime);
    }

    public void cleanHandles() {
        this.bucketToHandles.clear();
    }

    public void close() {
        super.close();
        this.cleanHandles();
    }

    private HoodieWriteHandle<?, ?, ?, ?> getOrCreateWriteHandle(HoodieRecord<T> record, HoodieWriteConfig config, String instantTime, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, Iterator<HoodieRecord<T>> recordItr) {
        MiniBatchHandle writeHandle;
        HoodieRecordLocation loc = record.getCurrentLocation();
        String fileID = loc.getFileId();
        String partitionPath = record.getPartitionPath();
        boolean insertClustering = config.allowDuplicateInserts();
        boolean isDelta = table.getMetaClient().getTableType().equals((Object)HoodieTableType.MERGE_ON_READ);
        if (isDelta) {
            return new FlinkAppendHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>(config, instantTime, table, partitionPath, fileID, recordItr, table.getTaskContextSupplier());
        }
        if (this.bucketToHandles.containsKey(fileID)) {
            Path lastPath = this.bucketToHandles.get(fileID);
            writeHandle = insertClustering ? new FlinkConcatAndReplaceHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>(config, instantTime, table, recordItr, partitionPath, fileID, table.getTaskContextSupplier(), lastPath) : new FlinkMergeAndReplaceHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>(config, instantTime, table, recordItr, partitionPath, fileID, table.getTaskContextSupplier(), lastPath);
        } else {
            writeHandle = loc.getInstantTime().equals("I") ? new FlinkCreateHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>(config, instantTime, table, partitionPath, fileID, table.getTaskContextSupplier()) : (insertClustering ? new FlinkConcatHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>(config, instantTime, table, recordItr, partitionPath, fileID, table.getTaskContextSupplier()) : new FlinkMergeHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>(config, instantTime, table, recordItr, partitionPath, fileID, table.getTaskContextSupplier()));
        }
        this.bucketToHandles.put(fileID, ((MiniBatchHandle)writeHandle).getWritePath());
        return writeHandle;
    }

    public HoodieFlinkTable<T> getHoodieTable() {
        return HoodieFlinkTable.create(this.config, (HoodieFlinkEngineContext)this.context);
    }

    public Map<String, List<String>> getPartitionToReplacedFileIds(WriteOperationType writeOperationType, List<WriteStatus> writeStatuses) {
        HoodieFlinkTable<T> table = this.getHoodieTable();
        switch (writeOperationType) {
            case INSERT_OVERWRITE: {
                return writeStatuses.stream().map(status -> status.getStat().getPartitionPath()).distinct().collect(Collectors.toMap(partition -> partition, partitionPath -> this.getAllExistingFileIds(table, (String)partitionPath)));
            }
            case INSERT_OVERWRITE_TABLE: {
                Map<String, List<String>> partitionToExistingFileIds = new HashMap<String, List<String>>();
                List partitionPaths = FSUtils.getAllPartitionPaths((HoodieEngineContext)this.context, (HoodieMetadataConfig)this.config.getMetadataConfig(), (String)table.getMetaClient().getBasePath());
                if (partitionPaths != null && partitionPaths.size() > 0) {
                    this.context.setJobStatus(((Object)((Object)this)).getClass().getSimpleName(), "Getting ExistingFileIds of all partitions: " + this.config.getTableName());
                    partitionToExistingFileIds = ((Stream)partitionPaths.stream().parallel()).collect(Collectors.toMap(partition -> partition, partition -> this.getAllExistingFileIds(table, (String)partition)));
                }
                return partitionToExistingFileIds;
            }
        }
        throw new AssertionError();
    }

    private List<String> getAllExistingFileIds(HoodieFlinkTable<T> table, String partitionPath) {
        return table.getSliceView().getLatestFileSlices(partitionPath).map(FileSlice::getFileId).distinct().collect(Collectors.toList());
    }

    private final class AutoCloseableWriteHandle
    implements AutoCloseable {
        private final HoodieWriteHandle<?, ?, ?, ?> writeHandle;

        AutoCloseableWriteHandle(List<HoodieRecord<T>> records, String instantTime, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table) {
            this.writeHandle = HoodieFlinkWriteClient.this.getOrCreateWriteHandle(records.get(0), HoodieFlinkWriteClient.this.getConfig(), instantTime, table, records.listIterator());
        }

        HoodieWriteHandle<?, ?, ?, ?> getWriteHandle() {
            return this.writeHandle;
        }

        @Override
        public void close() {
            ((MiniBatchHandle)this.writeHandle).closeGracefully();
        }
    }
}

