/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink.dynamic;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.sink.CommitSummary;
import org.apache.iceberg.flink.sink.DeltaManifests;
import org.apache.iceberg.flink.sink.DeltaManifestsSerializer;
import org.apache.iceberg.flink.sink.FlinkManifestUtil;
import org.apache.iceberg.flink.sink.dynamic.DynamicCommittable;
import org.apache.iceberg.flink.sink.dynamic.DynamicCommitterMetrics;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
class DynamicCommitter
implements Committer<DynamicCommittable> {
    private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id";
    private static final Logger LOG = LoggerFactory.getLogger(DynamicCommitter.class);
    private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
    private static final WriteResult EMPTY_WRITE_RESULT = WriteResult.builder().addDataFiles((Iterable)Lists.newArrayList()).addDeleteFiles((Iterable)Lists.newArrayList()).build();
    private static final long INITIAL_CHECKPOINT_ID = -1L;
    @VisibleForTesting
    static final String MAX_CONTINUOUS_EMPTY_COMMITS = "flink.max-continuous-empty-commits";
    private static final String FLINK_JOB_ID = "flink.job-id";
    private static final String OPERATOR_ID = "flink.operator-id";
    private final Map<String, String> snapshotProperties;
    private final boolean replacePartitions;
    private final DynamicCommitterMetrics committerMetrics;
    private final Catalog catalog;
    private final Map<TableKey, Integer> maxContinuousEmptyCommitsMap;
    private final Map<TableKey, Integer> continuousEmptyCheckpointsMap;
    private final ExecutorService workerPool;

    DynamicCommitter(Catalog catalog, Map<String, String> snapshotProperties, boolean replacePartitions, int workerPoolSize, String sinkId, DynamicCommitterMetrics committerMetrics) {
        this.snapshotProperties = snapshotProperties;
        this.replacePartitions = replacePartitions;
        this.committerMetrics = committerMetrics;
        this.catalog = catalog;
        this.maxContinuousEmptyCommitsMap = Maps.newHashMap();
        this.continuousEmptyCheckpointsMap = Maps.newHashMap();
        this.workerPool = ThreadPools.newFixedThreadPool((String)("iceberg-committer-pool-" + sinkId), (int)workerPoolSize);
    }

    public void commit(Collection<Committer.CommitRequest<DynamicCommittable>> commitRequests) throws IOException, InterruptedException {
        if (commitRequests.isEmpty()) {
            return;
        }
        HashMap commitRequestMap = Maps.newHashMap();
        for (Committer.CommitRequest<DynamicCommittable> commitRequest : commitRequests) {
            NavigableMap committables = commitRequestMap.computeIfAbsent(new TableKey((DynamicCommittable)commitRequest.getCommittable()), unused -> Maps.newTreeMap());
            committables.computeIfAbsent(((DynamicCommittable)commitRequest.getCommittable()).checkpointId(), unused -> Lists.newArrayList()).add(commitRequest);
        }
        for (Map.Entry entry : commitRequestMap.entrySet()) {
            Table table = this.catalog.loadTable(TableIdentifier.parse((String)((TableKey)entry.getKey()).tableName()));
            DynamicCommittable last = (DynamicCommittable)((Committer.CommitRequest)((List)((NavigableMap)entry.getValue()).lastEntry().getValue()).get(0)).getCommittable();
            long maxCommittedCheckpointId = DynamicCommitter.getMaxCommittedCheckpointId(table, last.jobId(), last.operatorId(), ((TableKey)entry.getKey()).branch());
            ((NavigableMap)entry.getValue()).headMap(maxCommittedCheckpointId, true).values().forEach(list -> list.forEach(Committer.CommitRequest::signalAlreadyCommitted));
            NavigableMap<Long, List<Committer.CommitRequest<DynamicCommittable>>> uncommitted = ((NavigableMap)entry.getValue()).tailMap(maxCommittedCheckpointId, false);
            if (uncommitted.isEmpty()) continue;
            this.commitPendingRequests(table, ((TableKey)entry.getKey()).branch(), uncommitted, last.jobId(), last.operatorId());
        }
    }

    private static long getMaxCommittedCheckpointId(Table table, String flinkJobId, String operatorId, String branch) {
        Snapshot snapshot = table.snapshot(branch);
        long lastCommittedCheckpointId = -1L;
        while (snapshot != null) {
            String value;
            Map summary = snapshot.summary();
            String snapshotFlinkJobId = (String)summary.get(FLINK_JOB_ID);
            String snapshotOperatorId = (String)summary.get(OPERATOR_ID);
            if (flinkJobId.equals(snapshotFlinkJobId) && (snapshotOperatorId == null || snapshotOperatorId.equals(operatorId)) && (value = (String)summary.get(MAX_COMMITTED_CHECKPOINT_ID)) != null) {
                lastCommittedCheckpointId = Long.parseLong(value);
                break;
            }
            Long parentSnapshotId = snapshot.parentId();
            snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId.longValue()) : null;
        }
        return lastCommittedCheckpointId;
    }

    private void commitPendingRequests(Table table, String branch, NavigableMap<Long, List<Committer.CommitRequest<DynamicCommittable>>> commitRequestMap, String newFlinkJobId, String operatorId) throws IOException {
        long checkpointId = (Long)commitRequestMap.lastKey();
        ArrayList manifests = Lists.newArrayList();
        TreeMap pendingResults = Maps.newTreeMap();
        for (Map.Entry e : commitRequestMap.entrySet()) {
            for (Committer.CommitRequest committable : (List)e.getValue()) {
                if (Arrays.equals(EMPTY_MANIFEST_DATA, ((DynamicCommittable)committable.getCommittable()).manifest())) {
                    pendingResults.computeIfAbsent((Long)e.getKey(), unused -> Lists.newArrayList()).add(EMPTY_WRITE_RESULT);
                    continue;
                }
                DeltaManifests deltaManifests = (DeltaManifests)SimpleVersionedSerialization.readVersionAndDeSerialize((SimpleVersionedSerializer)DeltaManifestsSerializer.INSTANCE, (byte[])((DynamicCommittable)committable.getCommittable()).manifest());
                pendingResults.computeIfAbsent((Long)e.getKey(), unused -> Lists.newArrayList()).add(FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), table.specs()));
                manifests.addAll(deltaManifests.manifests());
            }
        }
        CommitSummary summary = new CommitSummary();
        summary.addAll(pendingResults);
        this.commitPendingResult(table, branch, pendingResults, summary, newFlinkJobId, operatorId);
        if (this.committerMetrics != null) {
            this.committerMetrics.updateCommitSummary(table.name(), summary);
        }
        FlinkManifestUtil.deleteCommittedManifests(table, manifests, newFlinkJobId, checkpointId);
    }

    private void commitPendingResult(Table table, String branch, NavigableMap<Long, List<WriteResult>> pendingResults, CommitSummary summary, String newFlinkJobId, String operatorId) {
        long totalFiles = summary.dataFilesCount() + summary.deleteFilesCount();
        TableKey key = new TableKey(table.name(), branch);
        int continuousEmptyCheckpoints = this.continuousEmptyCheckpointsMap.computeIfAbsent(key, unused -> 0);
        int maxContinuousEmptyCommits = this.maxContinuousEmptyCommitsMap.computeIfAbsent(key, unused -> {
            int result = PropertyUtil.propertyAsInt((Map)table.properties(), (String)MAX_CONTINUOUS_EMPTY_COMMITS, (int)10);
            Preconditions.checkArgument((result > 0 ? 1 : 0) != 0, (Object)"flink.max-continuous-empty-commits must be positive");
            return result;
        });
        int n = continuousEmptyCheckpoints = totalFiles == 0L ? continuousEmptyCheckpoints + 1 : 0;
        if (totalFiles != 0L || continuousEmptyCheckpoints % maxContinuousEmptyCommits == 0) {
            if (this.replacePartitions) {
                this.replacePartitions(table, branch, pendingResults, summary, newFlinkJobId, operatorId);
            } else {
                this.commitDeltaTxn(table, branch, pendingResults, summary, newFlinkJobId, operatorId);
            }
            continuousEmptyCheckpoints = 0;
        } else {
            long checkpointId = (Long)pendingResults.lastKey();
            LOG.info("Skip commit for checkpoint {} due to no data files or delete files.", (Object)checkpointId);
        }
        this.continuousEmptyCheckpointsMap.put(key, continuousEmptyCheckpoints);
    }

    private void replacePartitions(Table table, String branch, NavigableMap<Long, List<WriteResult>> pendingResults, CommitSummary summary, String newFlinkJobId, String operatorId) {
        for (Map.Entry e : pendingResults.entrySet()) {
            for (WriteResult result : (List)e.getValue()) {
                ReplacePartitions dynamicOverwrite = (ReplacePartitions)table.newReplacePartitions().scanManifestsWith(this.workerPool);
                Arrays.stream(result.dataFiles()).forEach(arg_0 -> ((ReplacePartitions)dynamicOverwrite).addFile(arg_0));
                this.commitOperation(table, branch, (SnapshotUpdate<?>)dynamicOverwrite, summary, "dynamic partition overwrite", newFlinkJobId, operatorId, (Long)e.getKey());
            }
        }
    }

    private void commitDeltaTxn(Table table, String branch, NavigableMap<Long, List<WriteResult>> pendingResults, CommitSummary summary, String newFlinkJobId, String operatorId) {
        for (Map.Entry e : pendingResults.entrySet()) {
            for (WriteResult result : (List)e.getValue()) {
                RowDelta rowDelta = (RowDelta)table.newRowDelta().scanManifestsWith(this.workerPool);
                Arrays.stream(result.dataFiles()).forEach(arg_0 -> ((RowDelta)rowDelta).addRows(arg_0));
                Arrays.stream(result.deleteFiles()).forEach(arg_0 -> ((RowDelta)rowDelta).addDeletes(arg_0));
                this.commitOperation(table, branch, (SnapshotUpdate<?>)rowDelta, summary, "rowDelta", newFlinkJobId, operatorId, (Long)e.getKey());
            }
        }
    }

    @VisibleForTesting
    void commitOperation(Table table, String branch, SnapshotUpdate<?> operation, CommitSummary summary, String description, String newFlinkJobId, String operatorId, long checkpointId) {
        LOG.info("Committing {} for checkpoint {} to table {} branch {} with summary: {}", new Object[]{description, checkpointId, table.name(), branch, summary});
        this.snapshotProperties.forEach((arg_0, arg_1) -> operation.set(arg_0, arg_1));
        operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
        operation.set(FLINK_JOB_ID, newFlinkJobId);
        operation.set(OPERATOR_ID, operatorId);
        operation.toBranch(branch);
        long startNano = System.nanoTime();
        operation.commit();
        long durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano);
        LOG.info("Committed {} to table: {}, branch: {}, checkpointId {} in {} ms", new Object[]{description, table.name(), branch, checkpointId, durationMs});
        if (this.committerMetrics != null) {
            this.committerMetrics.commitDuration(table.name(), durationMs);
        }
    }

    public void close() throws IOException {
        this.workerPool.shutdown();
    }

    private static class TableKey
    implements Serializable {
        private String tableName;
        private String branch;

        TableKey(String tableName, String branch) {
            this.tableName = tableName;
            this.branch = branch;
        }

        TableKey(DynamicCommittable committable) {
            this.tableName = committable.key().tableName();
            this.branch = committable.key().branch();
        }

        String tableName() {
            return this.tableName;
        }

        String branch() {
            return this.branch;
        }

        public boolean equals(Object other) {
            if (this == other) {
                return true;
            }
            if (other == null || this.getClass() != other.getClass()) {
                return false;
            }
            TableKey that = (TableKey)other;
            return this.tableName.equals(that.tableName) && this.branch.equals(that.branch);
        }

        public int hashCode() {
            return Objects.hash(this.tableName, this.branch);
        }

        public String toString() {
            return MoreObjects.toStringHelper((Object)this).add("tableName", (Object)this.tableName).add("branch", (Object)this.branch).toString();
        }
    }
}

