/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.CommitSummary;
import org.apache.iceberg.flink.sink.DeltaManifests;
import org.apache.iceberg.flink.sink.DeltaManifestsSerializer;
import org.apache.iceberg.flink.sink.FlinkManifestUtil;
import org.apache.iceberg.flink.sink.IcebergCommittable;
import org.apache.iceberg.flink.sink.IcebergFilesCommitterMetrics;
import org.apache.iceberg.flink.sink.SinkUtil;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class IcebergCommitter
implements Committer<IcebergCommittable> {
    private static final Logger LOG = LoggerFactory.getLogger(IcebergCommitter.class);
    private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
    public static final WriteResult EMPTY_WRITE_RESULT = WriteResult.builder().addDataFiles((Iterable)Lists.newArrayList()).addDeleteFiles((Iterable)Lists.newArrayList()).build();
    @VisibleForTesting
    static final String MAX_CONTINUOUS_EMPTY_COMMITS = "flink.max-continuous-empty-commits";
    private final String branch;
    private final Map<String, String> snapshotProperties;
    private final boolean replacePartitions;
    private IcebergFilesCommitterMetrics committerMetrics;
    private Table table;
    private final TableLoader tableLoader;
    private int maxContinuousEmptyCommits;
    private ExecutorService workerPool;
    private int continuousEmptyCheckpoints = 0;
    private boolean compactMode = false;

    IcebergCommitter(TableLoader tableLoader, String branch, Map<String, String> snapshotProperties, boolean replacePartitions, int workerPoolSize, String sinkId, IcebergFilesCommitterMetrics committerMetrics, boolean compactMode) {
        this.branch = branch;
        this.snapshotProperties = snapshotProperties;
        this.replacePartitions = replacePartitions;
        this.committerMetrics = committerMetrics;
        this.tableLoader = tableLoader;
        if (!tableLoader.isOpen()) {
            tableLoader.open();
        }
        this.table = tableLoader.loadTable();
        this.maxContinuousEmptyCommits = PropertyUtil.propertyAsInt((Map)this.table.properties(), (String)MAX_CONTINUOUS_EMPTY_COMMITS, (int)10);
        Preconditions.checkArgument((this.maxContinuousEmptyCommits > 0 ? 1 : 0) != 0, (Object)"flink.max-continuous-empty-commits must be positive");
        this.workerPool = ThreadPools.newFixedThreadPool((String)("iceberg-committer-pool-" + this.table.name() + "-" + sinkId), (int)workerPoolSize);
        this.continuousEmptyCheckpoints = 0;
        this.compactMode = compactMode;
    }

    public void commit(Collection<Committer.CommitRequest<IcebergCommittable>> commitRequests) throws IOException, InterruptedException {
        if (commitRequests.isEmpty()) {
            return;
        }
        TreeMap commitRequestMap = Maps.newTreeMap();
        for (Committer.CommitRequest<IcebergCommittable> request : commitRequests) {
            commitRequestMap.put(((IcebergCommittable)request.getCommittable()).checkpointId(), request);
        }
        IcebergCommittable last = (IcebergCommittable)((Committer.CommitRequest)commitRequestMap.lastEntry().getValue()).getCommittable();
        long maxCommittedCheckpointId = SinkUtil.getMaxCommittedCheckpointId(this.table, last.jobId(), last.operatorId(), this.branch);
        commitRequestMap.headMap(maxCommittedCheckpointId, true).values().forEach(Committer.CommitRequest::signalAlreadyCommitted);
        NavigableMap<Long, Committer.CommitRequest<IcebergCommittable>> uncommitted = commitRequestMap.tailMap(maxCommittedCheckpointId, false);
        if (!uncommitted.isEmpty()) {
            this.commitPendingRequests(uncommitted, last.jobId(), last.operatorId());
        }
    }

    private void commitPendingRequests(NavigableMap<Long, Committer.CommitRequest<IcebergCommittable>> commitRequestMap, String newFlinkJobId, String operatorId) throws IOException {
        long checkpointId = (Long)commitRequestMap.lastKey();
        ArrayList manifests = Lists.newArrayList();
        TreeMap pendingResults = Maps.newTreeMap();
        for (Map.Entry e : commitRequestMap.entrySet()) {
            if (Arrays.equals(EMPTY_MANIFEST_DATA, ((IcebergCommittable)((Committer.CommitRequest)e.getValue()).getCommittable()).manifest())) {
                pendingResults.put((Long)e.getKey(), EMPTY_WRITE_RESULT);
                continue;
            }
            DeltaManifests deltaManifests = (DeltaManifests)SimpleVersionedSerialization.readVersionAndDeSerialize((SimpleVersionedSerializer)DeltaManifestsSerializer.INSTANCE, (byte[])((IcebergCommittable)((Committer.CommitRequest)e.getValue()).getCommittable()).manifest());
            pendingResults.put((Long)e.getKey(), FlinkManifestUtil.readCompletedFiles(deltaManifests, this.table.io(), this.table.specs()));
            manifests.addAll(deltaManifests.manifests());
        }
        CommitSummary summary = new CommitSummary(pendingResults);
        this.commitPendingResult(pendingResults, summary, newFlinkJobId, operatorId);
        if (this.committerMetrics != null) {
            this.committerMetrics.updateCommitSummary(summary);
        }
        if (!this.compactMode) {
            FlinkManifestUtil.deleteCommittedManifests(this.table, manifests, newFlinkJobId, checkpointId);
        }
    }

    private void logCommitSummary(CommitSummary summary, String description) {
        LOG.info("Preparing for commit: {} on table: {} branch: {} with summary: {}.", new Object[]{description, this.table, this.branch, summary});
    }

    private void commitPendingResult(NavigableMap<Long, WriteResult> pendingResults, CommitSummary summary, String newFlinkJobId, String operatorId) {
        long totalFiles = summary.dataFilesCount() + summary.deleteFilesCount();
        int n = this.continuousEmptyCheckpoints = totalFiles == 0L ? this.continuousEmptyCheckpoints + 1 : 0;
        if (totalFiles != 0L || this.continuousEmptyCheckpoints % this.maxContinuousEmptyCommits == 0) {
            if (this.replacePartitions) {
                this.replacePartitions(pendingResults, summary, newFlinkJobId, operatorId);
            } else {
                this.commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId);
            }
            this.continuousEmptyCheckpoints = 0;
        } else {
            long checkpointId = (Long)pendingResults.lastKey();
            LOG.info("Skip commit for checkpoint {} due to no data files or delete files.", (Object)checkpointId);
        }
    }

    private void replacePartitions(NavigableMap<Long, WriteResult> pendingResults, CommitSummary summary, String newFlinkJobId, String operatorId) {
        long checkpointId = (Long)pendingResults.lastKey();
        Preconditions.checkState((summary.deleteFilesCount() == 0L ? 1 : 0) != 0, (Object)"Cannot overwrite partitions with delete files.");
        ReplacePartitions dynamicOverwrite = (ReplacePartitions)this.table.newReplacePartitions().scanManifestsWith(this.workerPool);
        for (WriteResult result : pendingResults.values()) {
            Preconditions.checkState((result.referencedDataFiles().length == 0 ? 1 : 0) != 0, (Object)"Should have no referenced data files.");
            Arrays.stream(result.dataFiles()).forEach(arg_0 -> ((ReplacePartitions)dynamicOverwrite).addFile(arg_0));
        }
        String description = "dynamic partition overwrite";
        this.logCommitSummary(summary, description);
        this.commitOperation((SnapshotUpdate<?>)dynamicOverwrite, description, newFlinkJobId, operatorId, checkpointId);
    }

    private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, CommitSummary summary, String newFlinkJobId, String operatorId) {
        long checkpointId = (Long)pendingResults.lastKey();
        if (summary.deleteFilesCount() == 0L) {
            AppendFiles appendFiles = (AppendFiles)this.table.newAppend().scanManifestsWith(this.workerPool);
            for (WriteResult result : pendingResults.values()) {
                Preconditions.checkState((result.referencedDataFiles().length == 0 ? 1 : 0) != 0, (Object)"Should have no referenced data files for append.");
                Arrays.stream(result.dataFiles()).forEach(arg_0 -> ((AppendFiles)appendFiles).appendFile(arg_0));
            }
            String description = "append";
            this.logCommitSummary(summary, description);
            this.commitOperation((SnapshotUpdate<?>)appendFiles, description, newFlinkJobId, operatorId, checkpointId);
        } else {
            for (Map.Entry e : pendingResults.entrySet()) {
                WriteResult result = (WriteResult)e.getValue();
                RowDelta rowDelta = (RowDelta)this.table.newRowDelta().scanManifestsWith(this.workerPool);
                Arrays.stream(result.dataFiles()).forEach(arg_0 -> ((RowDelta)rowDelta).addRows(arg_0));
                Arrays.stream(result.deleteFiles()).forEach(arg_0 -> ((RowDelta)rowDelta).addDeletes(arg_0));
                String description = "rowDelta";
                this.logCommitSummary(summary, description);
                this.commitOperation((SnapshotUpdate<?>)rowDelta, description, newFlinkJobId, operatorId, (Long)e.getKey());
            }
        }
    }

    private void commitOperation(SnapshotUpdate<?> operation, String description, String newFlinkJobId, String operatorId, long checkpointId) {
        this.snapshotProperties.forEach((arg_0, arg_1) -> operation.set(arg_0, arg_1));
        operation.set("flink.max-committed-checkpoint-id", Long.toString(checkpointId));
        operation.set("flink.job-id", newFlinkJobId);
        operation.set("flink.operator-id", operatorId);
        operation.toBranch(this.branch);
        long startNano = System.nanoTime();
        operation.commit();
        long durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano);
        LOG.info("Committed {} to table: {}, branch: {}, checkpointId {} in {} ms", new Object[]{description, this.table.name(), this.branch, checkpointId, durationMs});
        if (this.committerMetrics != null) {
            this.committerMetrics.commitDuration(durationMs);
        }
    }

    public void close() throws IOException {
        this.tableLoader.close();
        this.workerPool.shutdown();
    }
}

