/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.flink.sink.LogOffsetCommittable;
import org.apache.paimon.flink.sink.MultiTableCommittable;
import org.apache.paimon.flink.sink.MultiTableCommittableChannelComputer;
import org.apache.paimon.flink.sink.StoreCommitter;
import org.apache.paimon.flink.sink.TableFilter;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.WrappedManifestCommittable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;

public class StoreMultiCommitter
implements Committer<MultiTableCommittable, WrappedManifestCommittable> {
    private final Catalog catalog;
    private final Committer.Context context;
    private final Map<Identifier, StoreCommitter> tableCommitters;
    private final boolean ignoreEmptyCommit;
    private final Map<String, String> dynamicOptions;
    private final TableFilter tableFilter;

    public StoreMultiCommitter(CatalogLoader catalogLoader, Committer.Context context) {
        this(catalogLoader, context, false, Collections.emptyMap());
    }

    public StoreMultiCommitter(CatalogLoader catalogLoader, Committer.Context context, boolean ignoreEmptyCommit, Map<String, String> dynamicOptions) {
        this(catalogLoader, context, ignoreEmptyCommit, dynamicOptions, false, null);
    }

    public StoreMultiCommitter(CatalogLoader catalogLoader, Committer.Context context, boolean ignoreEmptyCommit, Map<String, String> dynamicOptions, boolean eagerInit, TableFilter tableFilter) {
        this.catalog = catalogLoader.load();
        this.context = context;
        this.ignoreEmptyCommit = ignoreEmptyCommit;
        this.dynamicOptions = dynamicOptions;
        this.tableCommitters = new HashMap<Identifier, StoreCommitter>();
        this.tableFilter = tableFilter;
        int parallelism = context.getParallelism();
        int index = context.getSubtaskIndex();
        if (eagerInit) {
            List tableIds = this.filterTables().stream().filter(identifier -> MultiTableCommittableChannelComputer.computeChannel(identifier.getDatabaseName(), identifier.getTableName(), parallelism) == index).collect(Collectors.toList());
            tableIds.stream().forEach(this::getStoreCommitter);
        }
    }

    @Override
    public boolean forceCreatingSnapshot() {
        return true;
    }

    @Override
    public WrappedManifestCommittable combine(long checkpointId, long watermark, List<MultiTableCommittable> committables) {
        WrappedManifestCommittable wrappedManifestCommittable = new WrappedManifestCommittable(checkpointId, watermark);
        return this.combine(checkpointId, watermark, wrappedManifestCommittable, committables);
    }

    @Override
    public WrappedManifestCommittable combine(long checkpointId, long watermark, WrappedManifestCommittable wrappedManifestCommittable, List<MultiTableCommittable> committables) {
        for (MultiTableCommittable committable : committables) {
            Identifier identifier = Identifier.create(committable.getDatabase(), committable.getTable());
            ManifestCommittable manifestCommittable = wrappedManifestCommittable.computeCommittableIfAbsent(identifier, checkpointId, watermark);
            switch (committable.kind()) {
                case FILE: {
                    CommitMessage file = (CommitMessage)committable.wrappedCommittable();
                    manifestCommittable.addFileCommittable(file);
                    break;
                }
                case LOG_OFFSET: {
                    LogOffsetCommittable offset = (LogOffsetCommittable)committable.wrappedCommittable();
                    StoreCommitter committer = this.tableCommitters.get(identifier);
                    manifestCommittable.addLogOffset(offset.bucket(), offset.offset(), committer.allowLogOffsetDuplicate());
                }
            }
        }
        return wrappedManifestCommittable;
    }

    @Override
    public void commit(List<WrappedManifestCommittable> committables) throws IOException, InterruptedException {
        if (committables.isEmpty()) {
            return;
        }
        Map<Identifier, List<ManifestCommittable>> committableMap = this.groupByTable(committables);
        committableMap.keySet().forEach(this::getStoreCommitter);
        long checkpointId = committables.get(0).checkpointId();
        long watermark = committables.get(0).watermark();
        for (Map.Entry<Identifier, StoreCommitter> entry : this.tableCommitters.entrySet()) {
            List<ManifestCommittable> committableList = committableMap.get(entry.getKey());
            StoreCommitter committer = entry.getValue();
            if (committableList != null) {
                committer.commit(committableList);
                continue;
            }
            if (!committer.forceCreatingSnapshot()) continue;
            Object combine = committer.combine(checkpointId, watermark, Collections.emptyList());
            committer.commit((List<ManifestCommittable>)Collections.singletonList(combine));
        }
    }

    @Override
    public int filterAndCommit(List<WrappedManifestCommittable> globalCommittables, boolean checkAppendFiles) throws IOException {
        int result = 0;
        for (Map.Entry<Identifier, List<ManifestCommittable>> entry : this.groupByTable(globalCommittables).entrySet()) {
            result += this.getStoreCommitter(entry.getKey()).filterAndCommit(entry.getValue(), checkAppendFiles);
        }
        return result;
    }

    private Map<Identifier, List<ManifestCommittable>> groupByTable(List<WrappedManifestCommittable> committables) {
        return committables.stream().flatMap(wrapped -> {
            Map<Identifier, ManifestCommittable> manifestCommittables = wrapped.manifestCommittables();
            return manifestCommittables.entrySet().stream().map(entry -> Tuple2.of(entry.getKey(), entry.getValue()));
        }).collect(Collectors.groupingBy(t -> (Identifier)t.f0, Collectors.mapping(t -> (ManifestCommittable)t.f1, Collectors.toList())));
    }

    @Override
    public Map<Long, List<MultiTableCommittable>> groupByCheckpoint(Collection<MultiTableCommittable> committables) {
        HashMap<Long, List<MultiTableCommittable>> grouped = new HashMap<Long, List<MultiTableCommittable>>();
        for (MultiTableCommittable c : committables) {
            grouped.computeIfAbsent(c.checkpointId(), k -> new ArrayList()).add(c);
        }
        return grouped;
    }

    private StoreCommitter getStoreCommitter(Identifier tableId) {
        StoreCommitter committer = this.tableCommitters.get(tableId);
        if (committer == null) {
            FileStoreTable table;
            try {
                table = (FileStoreTable)this.catalog.getTable(tableId).copy(this.dynamicOptions);
            }
            catch (Catalog.TableNotExistException e) {
                throw new RuntimeException(String.format("Failed to get committer for table %s", tableId.getFullName()), e);
            }
            committer = new StoreCommitter(table, table.newCommit(this.context.commitUser()).ignoreEmptyCommit(this.ignoreEmptyCommit), this.context);
            this.tableCommitters.put(tableId, committer);
        }
        return committer;
    }

    @Override
    public void close() throws Exception {
        for (StoreCommitter committer : this.tableCommitters.values()) {
            committer.close();
        }
        if (this.catalog != null) {
            this.catalog.close();
        }
    }

    private List<Identifier> filterTables() {
        List<String> allTables = null;
        try {
            allTables = this.catalog.listTables(this.tableFilter.getDbName());
        }
        catch (Catalog.DatabaseNotExistException e) {
            allTables = Collections.emptyList();
        }
        List<String> tblList = this.tableFilter.filterTables(allTables);
        return tblList.stream().map(t -> Identifier.create(this.tableFilter.getDbName(), t)).collect(Collectors.toList());
    }
}

