/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.flink.sink.LogOffsetCommittable;
import org.apache.paimon.flink.sink.MultiTableCommittable;
import org.apache.paimon.flink.sink.StoreCommitter;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.WrappedManifestCommittable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.CommitMessage;

public class StoreMultiCommitter
implements Committer<MultiTableCommittable, WrappedManifestCommittable> {
    private final Catalog catalog;
    private final String commitUser;
    @Nullable
    private final OperatorMetricGroup flinkMetricGroup;
    private final Map<Identifier, StoreCommitter> tableCommitters;
    private final boolean isCompactJob;

    public StoreMultiCommitter(Catalog.Loader catalogLoader, String commitUser, @Nullable OperatorMetricGroup flinkMetricGroup) {
        this(catalogLoader, commitUser, flinkMetricGroup, false);
    }

    public StoreMultiCommitter(Catalog.Loader catalogLoader, String commitUser, @Nullable OperatorMetricGroup flinkMetricGroup, boolean isCompactJob) {
        this.catalog = catalogLoader.load();
        this.commitUser = commitUser;
        this.flinkMetricGroup = flinkMetricGroup;
        this.tableCommitters = new HashMap<Identifier, StoreCommitter>();
        this.isCompactJob = isCompactJob;
    }

    @Override
    public boolean forceCreatingSnapshot() {
        return true;
    }

    @Override
    public WrappedManifestCommittable combine(long checkpointId, long watermark, List<MultiTableCommittable> committables) {
        WrappedManifestCommittable wrappedManifestCommittable = new WrappedManifestCommittable(checkpointId, watermark);
        for (MultiTableCommittable committable : committables) {
            ManifestCommittable manifestCommittable = wrappedManifestCommittable.computeCommittableIfAbsent(Identifier.create(committable.getDatabase(), committable.getTable()), checkpointId, watermark);
            switch (committable.kind()) {
                case FILE: {
                    CommitMessage file = (CommitMessage)committable.wrappedCommittable();
                    manifestCommittable.addFileCommittable(file);
                    break;
                }
                case LOG_OFFSET: {
                    LogOffsetCommittable offset = (LogOffsetCommittable)committable.wrappedCommittable();
                    manifestCommittable.addLogOffset(offset.bucket(), offset.offset());
                }
            }
        }
        return wrappedManifestCommittable;
    }

    @Override
    public void commit(List<WrappedManifestCommittable> committables) throws IOException, InterruptedException {
        if (committables.isEmpty()) {
            return;
        }
        Map<Identifier, List<ManifestCommittable>> committableMap = this.groupByTable(committables);
        committableMap.keySet().forEach(this::getStoreCommitter);
        long checkpointId = committables.get(0).checkpointId();
        long watermark = committables.get(0).watermark();
        for (Map.Entry<Identifier, StoreCommitter> entry : this.tableCommitters.entrySet()) {
            List<ManifestCommittable> committableList = committableMap.get(entry.getKey());
            StoreCommitter committer = entry.getValue();
            if (committableList != null) {
                committer.commit(committableList);
                continue;
            }
            if (!committer.forceCreatingSnapshot()) continue;
            Object combine = committer.combine(checkpointId, watermark, Collections.emptyList());
            committer.commit((List<ManifestCommittable>)Collections.singletonList(combine));
        }
    }

    @Override
    public int filterAndCommit(List<WrappedManifestCommittable> globalCommittables) throws IOException {
        int result = 0;
        for (Map.Entry<Identifier, List<ManifestCommittable>> entry : this.groupByTable(globalCommittables).entrySet()) {
            result += this.getStoreCommitter(entry.getKey()).filterAndCommit(entry.getValue());
        }
        return result;
    }

    private Map<Identifier, List<ManifestCommittable>> groupByTable(List<WrappedManifestCommittable> committables) {
        return committables.stream().flatMap(wrapped -> {
            Map<Identifier, ManifestCommittable> manifestCommittables = wrapped.manifestCommittables();
            return manifestCommittables.entrySet().stream().map(entry -> Tuple2.of(entry.getKey(), entry.getValue()));
        }).collect(Collectors.groupingBy(t -> (Identifier)t.f0, Collectors.mapping(t -> (ManifestCommittable)t.f1, Collectors.toList())));
    }

    @Override
    public Map<Long, List<MultiTableCommittable>> groupByCheckpoint(Collection<MultiTableCommittable> committables) {
        HashMap<Long, List<MultiTableCommittable>> grouped = new HashMap<Long, List<MultiTableCommittable>>();
        for (MultiTableCommittable c : committables) {
            grouped.computeIfAbsent(c.checkpointId(), k -> new ArrayList()).add(c);
        }
        return grouped;
    }

    private StoreCommitter getStoreCommitter(Identifier tableId) {
        StoreCommitter committer = this.tableCommitters.get(tableId);
        if (committer == null) {
            Table table;
            try {
                table = (FileStoreTable)this.catalog.getTable(tableId);
                if (this.isCompactJob) {
                    table = table.copy((Map)Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
                }
            }
            catch (Catalog.TableNotExistException e) {
                throw new RuntimeException(String.format("Failed to get committer for table %s", tableId.getFullName()), e);
            }
            committer = new StoreCommitter(table.newCommit(this.commitUser).ignoreEmptyCommit(this.isCompactJob), this.flinkMetricGroup);
            this.tableCommitters.put(tableId, committer);
        }
        return committer;
    }

    @Override
    public void close() throws Exception {
        for (StoreCommitter committer : this.tableCommitters.values()) {
            committer.close();
        }
    }
}

