/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.flink.sink.CommitterMetrics;
import org.apache.paimon.flink.sink.LogOffsetCommittable;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.TableCommit;
import org.apache.paimon.table.sink.TableCommitImpl;

public class StoreCommitter
implements Committer<Committable, ManifestCommittable> {
    private final TableCommitImpl commit;
    @Nullable
    private final CommitterMetrics metrics;

    public StoreCommitter(TableCommit commit, @Nullable CommitterMetrics metrics) {
        this.commit = (TableCommitImpl)commit;
        this.metrics = metrics;
    }

    @Override
    public ManifestCommittable combine(long checkpointId, long watermark, List<Committable> committables) {
        ManifestCommittable manifestCommittable = new ManifestCommittable(checkpointId, watermark);
        for (Committable committable : committables) {
            switch (committable.kind()) {
                case FILE: {
                    CommitMessage file = (CommitMessage)committable.wrappedCommittable();
                    manifestCommittable.addFileCommittable(file);
                    break;
                }
                case LOG_OFFSET: {
                    LogOffsetCommittable offset = (LogOffsetCommittable)committable.wrappedCommittable();
                    manifestCommittable.addLogOffset(offset.bucket(), offset.offset());
                }
            }
        }
        return manifestCommittable;
    }

    @Override
    public void commit(List<ManifestCommittable> committables) throws IOException, InterruptedException {
        this.commit.commitMultiple(committables);
        this.calcNumBytesAndRecordsOut(committables);
    }

    @Override
    public int filterAndCommit(List<ManifestCommittable> globalCommittables) {
        return this.commit.filterAndCommitMultiple(globalCommittables);
    }

    @Override
    public Map<Long, List<Committable>> groupByCheckpoint(Collection<Committable> committables) {
        HashMap<Long, List<Committable>> grouped = new HashMap<Long, List<Committable>>();
        for (Committable c : committables) {
            grouped.computeIfAbsent(c.checkpointId(), k -> new ArrayList()).add(c);
        }
        return grouped;
    }

    @Override
    public void close() throws Exception {
        this.commit.close();
    }

    private void calcNumBytesAndRecordsOut(List<ManifestCommittable> committables) {
        if (this.metrics == null) {
            return;
        }
        long bytesOut = 0L;
        long recordsOut = 0L;
        for (ManifestCommittable committable : committables) {
            List<CommitMessage> commitMessages = committable.fileCommittables();
            for (CommitMessage commitMessage : commitMessages) {
                long dataFileSizeInc = StoreCommitter.calcTotalFileSize(((CommitMessageImpl)commitMessage).newFilesIncrement().newFiles());
                long dataFileRowCountInc = StoreCommitter.calcTotalFileRowCount(((CommitMessageImpl)commitMessage).newFilesIncrement().newFiles());
                bytesOut += dataFileSizeInc;
                recordsOut += dataFileRowCountInc;
            }
        }
        this.metrics.increaseNumBytesOut(bytesOut);
        this.metrics.increaseNumRecordsOut(recordsOut);
    }

    private static long calcTotalFileSize(List<DataFileMeta> files) {
        return files.stream().mapToLong(DataFileMeta::fileSize).reduce(Long::sum).orElse(0L);
    }

    private static long calcTotalFileRowCount(List<DataFileMeta> files) {
        return files.stream().mapToLong(DataFileMeta::rowCount).reduce(Long::sum).orElse(0L);
    }
}

