/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.cdc.base.source.enumerator;

import io.debezium.relational.TableId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.SplitAssigner;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.IncrementalPhaseState;
import org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
import org.apache.seatunnel.connectors.cdc.base.source.split.CompletedSnapshotSplitInfo;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IncrementalSplitAssigner<C extends SourceConfig>
implements SplitAssigner {
    private static final Logger LOG = LoggerFactory.getLogger(IncrementalSplitAssigner.class);
    protected static final String INCREMENTAL_SPLIT_ID = "incremental-split-%d";
    private final SplitAssigner.Context<C> context;
    private final int incrementalParallelism;
    private final OffsetFactory offsetFactory;
    private final Map<TableId, Offset> tableWatermarks = new HashMap<TableId, Offset>();
    private boolean splitAssigned = false;
    private final List<IncrementalSplit> remainingSplits = new ArrayList<IncrementalSplit>();
    private final Map<String, IncrementalSplit> assignedSplits = new HashMap<String, IncrementalSplit>();

    public IncrementalSplitAssigner(SplitAssigner.Context<C> context, int incrementalParallelism, OffsetFactory offsetFactory) {
        this.context = context;
        this.incrementalParallelism = incrementalParallelism;
        this.offsetFactory = offsetFactory;
    }

    @Override
    public void open() {
    }

    @Override
    public Optional<SourceSplitBase> getNext() {
        if (!this.remainingSplits.isEmpty()) {
            Iterator<IncrementalSplit> iterator = this.remainingSplits.iterator();
            IncrementalSplit split = iterator.next();
            iterator.remove();
            this.assignedSplits.put(split.splitId(), split);
            return Optional.of(split);
        }
        if (this.splitAssigned) {
            return Optional.empty();
        }
        List<IncrementalSplit> incrementalSplits = this.createIncrementalSplits();
        this.remainingSplits.addAll(incrementalSplits);
        this.splitAssigned = true;
        return this.getNext();
    }

    public boolean noMoreSplits() {
        return this.getRemainingTables().isEmpty() && this.remainingSplits.isEmpty();
    }

    private Set<TableId> getRemainingTables() {
        HashSet<TableId> allTables = new HashSet<TableId>(this.context.getCapturedTables());
        this.assignedSplits.values().forEach(split -> split.getTableIds().forEach(allTables::remove));
        return allTables;
    }

    @Override
    public boolean waitingForCompletedSplits() {
        return false;
    }

    @Override
    public void onCompletedSplits(List<SnapshotSplitWatermark> completedSplitWatermarks) {
        completedSplitWatermarks.forEach(watermark -> this.context.getSplitCompletedOffsets().put(watermark.getSplitId(), watermark.getHighWatermark()));
    }

    @Override
    public void addSplits(Collection<SourceSplitBase> splits) {
        splits.stream().map(SourceSplitBase::asIncrementalSplit).forEach(incrementalSplit -> {
            Offset startupOffset = incrementalSplit.getStartupOffset();
            List<CompletedSnapshotSplitInfo> completedSnapshotSplitInfos = incrementalSplit.getCompletedSnapshotSplitInfos();
            for (CompletedSnapshotSplitInfo info : completedSnapshotSplitInfos) {
                this.context.getSplitCompletedOffsets().put(info.getSplitId(), info.getWatermark());
                this.context.getAssignedSnapshotSplit().put(info.getSplitId(), info.asSnapshotSplit());
            }
            for (TableId tableId : incrementalSplit.getTableIds()) {
                this.tableWatermarks.put(tableId, startupOffset);
            }
        });
    }

    @Override
    public IncrementalPhaseState snapshotState(long checkpointId) {
        return new IncrementalPhaseState();
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) {
    }

    public List<IncrementalSplit> createIncrementalSplits() {
        HashSet<TableId> allTables = new HashSet<TableId>(this.context.getCapturedTables());
        this.assignedSplits.values().forEach(split -> split.getTableIds().forEach(allTables::remove));
        List[] capturedTables = new List[this.incrementalParallelism];
        int i = 0;
        for (TableId tableId : allTables) {
            int index = i % this.incrementalParallelism;
            if (capturedTables[index] == null) {
                capturedTables[index] = new ArrayList();
            }
            capturedTables[index].add(tableId);
            ++i;
        }
        i = 0;
        ArrayList<IncrementalSplit> incrementalSplits = new ArrayList<IncrementalSplit>();
        for (List capturedTable : capturedTables) {
            incrementalSplits.add(this.createIncrementalSplit(capturedTable, i++));
        }
        return incrementalSplits;
    }

    private IncrementalSplit createIncrementalSplit(List<TableId> capturedTables, int index) {
        List assignedSnapshotSplit = this.context.getAssignedSnapshotSplit().values().stream().filter(split -> capturedTables.contains(split.getTableId())).sorted(Comparator.comparing(SourceSplitBase::splitId)).collect(Collectors.toList());
        Map<String, Offset> splitCompletedOffsets = this.context.getSplitCompletedOffsets();
        ArrayList<CompletedSnapshotSplitInfo> completedSnapshotSplitInfos = new ArrayList<CompletedSnapshotSplitInfo>();
        Offset minOffset = null;
        for (SnapshotSplit split2 : assignedSnapshotSplit) {
            Offset changeLogOffset = splitCompletedOffsets.get(split2.splitId());
            if (minOffset == null || changeLogOffset.isBefore(minOffset)) {
                minOffset = changeLogOffset;
            }
            completedSnapshotSplitInfos.add(new CompletedSnapshotSplitInfo(split2.splitId(), split2.getTableId(), split2.getSplitKeyType(), split2.getSplitStart(), split2.getSplitEnd(), changeLogOffset));
        }
        for (TableId tableId : capturedTables) {
            Offset watermark = this.tableWatermarks.get(tableId);
            if (minOffset != null && (watermark == null || !watermark.isBefore(minOffset))) continue;
            minOffset = watermark;
        }
        C sourceConfig = this.context.getSourceConfig();
        return new IncrementalSplit(String.format(INCREMENTAL_SPLIT_ID, index), capturedTables, minOffset != null ? minOffset : sourceConfig.getStartupConfig().getStartupOffset(this.offsetFactory), sourceConfig.getStopConfig().getStopOffset(this.offsetFactory), completedSnapshotSplitInfos);
    }
}

