/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.metrics.MetricGroup;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.flink.sink.CommitterMetrics;
import org.apache.paimon.flink.sink.LogOffsetCommittable;
import org.apache.paimon.flink.sink.partition.PartitionListeners;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.TableCommit;
import org.apache.paimon.table.sink.TableCommitImpl;

public class StoreCommitter
implements Committer<Committable, ManifestCommittable> {
    private final TableCommitImpl commit;
    @Nullable
    private final CommitterMetrics committerMetrics;
    private final PartitionListeners partitionListeners;
    private final boolean allowLogOffsetDuplicate;

    public StoreCommitter(FileStoreTable table, TableCommit commit, Committer.Context context) {
        this.commit = (TableCommitImpl)commit;
        if (context.metricGroup() != null) {
            this.commit.withMetricRegistry(new FlinkMetricRegistry((MetricGroup)context.metricGroup()));
            this.committerMetrics = new CommitterMetrics(context.metricGroup().getIOMetricGroup());
        } else {
            this.committerMetrics = null;
        }
        try {
            this.partitionListeners = PartitionListeners.create(context, table);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        this.allowLogOffsetDuplicate = table.bucketMode() == BucketMode.BUCKET_UNAWARE;
    }

    @VisibleForTesting
    public CommitterMetrics getCommitterMetrics() {
        return this.committerMetrics;
    }

    @Override
    public boolean forceCreatingSnapshot() {
        return this.commit.forceCreatingSnapshot();
    }

    @Override
    public ManifestCommittable combine(long checkpointId, long watermark, List<Committable> committables) {
        ManifestCommittable manifestCommittable = new ManifestCommittable(checkpointId, watermark);
        return this.combine(checkpointId, watermark, manifestCommittable, committables);
    }

    @Override
    public ManifestCommittable combine(long checkpointId, long watermark, ManifestCommittable manifestCommittable, List<Committable> committables) {
        for (Committable committable : committables) {
            switch (committable.kind()) {
                case FILE: {
                    CommitMessage file = (CommitMessage)committable.wrappedCommittable();
                    manifestCommittable.addFileCommittable(file);
                    break;
                }
                case LOG_OFFSET: {
                    LogOffsetCommittable offset = (LogOffsetCommittable)committable.wrappedCommittable();
                    manifestCommittable.addLogOffset(offset.bucket(), offset.offset(), this.allowLogOffsetDuplicate);
                }
            }
        }
        return manifestCommittable;
    }

    @Override
    public void commit(List<ManifestCommittable> committables) throws IOException, InterruptedException {
        this.commit.commitMultiple(committables, false);
        this.calcNumBytesAndRecordsOut(committables);
        this.partitionListeners.notifyCommittable(committables);
    }

    @Override
    public int filterAndCommit(List<ManifestCommittable> globalCommittables, boolean checkAppendFiles) {
        int committed = this.commit.filterAndCommitMultiple(globalCommittables, checkAppendFiles);
        this.partitionListeners.notifyCommittable(globalCommittables);
        return committed;
    }

    @Override
    public Map<Long, List<Committable>> groupByCheckpoint(Collection<Committable> committables) {
        try {
            this.partitionListeners.snapshotState();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        HashMap<Long, List<Committable>> grouped = new HashMap<Long, List<Committable>>();
        for (Committable c : committables) {
            grouped.computeIfAbsent(c.checkpointId(), k -> new ArrayList()).add(c);
        }
        return grouped;
    }

    @Override
    public void close() throws Exception {
        this.commit.close();
        this.partitionListeners.close();
    }

    public boolean allowLogOffsetDuplicate() {
        return this.allowLogOffsetDuplicate;
    }

    private void calcNumBytesAndRecordsOut(List<ManifestCommittable> committables) {
        if (this.committerMetrics == null) {
            return;
        }
        long bytesOut = 0L;
        long recordsOut = 0L;
        for (ManifestCommittable committable : committables) {
            List<CommitMessage> commitMessages = committable.fileCommittables();
            for (CommitMessage commitMessage : commitMessages) {
                long dataFileSizeInc = StoreCommitter.calcTotalFileSize(((CommitMessageImpl)commitMessage).newFilesIncrement().newFiles());
                long dataFileRowCountInc = StoreCommitter.calcTotalFileRowCount(((CommitMessageImpl)commitMessage).newFilesIncrement().newFiles());
                bytesOut += dataFileSizeInc;
                recordsOut += dataFileRowCountInc;
            }
        }
        this.committerMetrics.increaseNumBytesOut(bytesOut);
        this.committerMetrics.increaseNumRecordsOut(recordsOut);
    }

    private static long calcTotalFileSize(List<DataFileMeta> files) {
        return files.stream().mapToLong(DataFileMeta::fileSize).reduce(Long::sum).orElse(0L);
    }

    private static long calcTotalFileRowCount(List<DataFileMeta> files) {
        return files.stream().mapToLong(DataFileMeta::rowCount).reduce(Long::sum).orElse(0L);
    }
}

