/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.cdc.base.source.reader;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotPhaseEvent;
import org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotSplitsReportEvent;
import org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.base.source.split.state.IncrementalSplitState;
import org.apache.seatunnel.connectors.cdc.base.source.split.state.SnapshotSplitState;
import org.apache.seatunnel.connectors.cdc.base.source.split.state.SourceSplitStateBase;
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderOptions;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SingleThreadFetcherManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IncrementalSourceReader<T, C extends SourceConfig>
extends SingleThreadMultiplexSourceReaderBase<SourceRecords, T, SourceSplitBase, SourceSplitStateBase> {
    private static final Logger log = LoggerFactory.getLogger(IncrementalSourceReader.class);
    private final Map<String, SnapshotSplit> finishedUnackedSplits;
    private volatile boolean running = false;
    private final int subtaskId;
    private final C sourceConfig;
    private final DebeziumDeserializationSchema<T> debeziumDeserializationSchema;
    private final DataSourceDialect<C> dataSourceDialect;
    private volatile transient Offset snapshotChangeLogOffset;
    private final AtomicBoolean needSendSplitRequest = new AtomicBoolean(false);

    public IncrementalSourceReader(DataSourceDialect<C> dataSourceDialect, BlockingQueue<RecordsWithSplitIds<SourceRecords>> elementsQueue, Supplier<IncrementalSourceSplitReader<C>> splitReaderSupplier, RecordEmitter<SourceRecords, T, SourceSplitStateBase> recordEmitter, SourceReaderOptions options, SourceReader.Context context, C sourceConfig, DebeziumDeserializationSchema<T> debeziumDeserializationSchema) {
        super(elementsQueue, new SingleThreadFetcherManager(elementsQueue, splitReaderSupplier::get), recordEmitter, options, context);
        this.dataSourceDialect = dataSourceDialect;
        this.sourceConfig = sourceConfig;
        this.finishedUnackedSplits = new HashMap<String, SnapshotSplit>();
        this.subtaskId = context.getIndexOfSubtask();
        this.debeziumDeserializationSchema = debeziumDeserializationSchema;
    }

    @Override
    public void pollNext(Collector<T> output) throws Exception {
        if (!this.running) {
            if (this.getNumberOfCurrentlyAssignedSplits() == 0) {
                this.context.sendSplitRequest();
            }
            this.running = true;
        }
        if (this.needSendSplitRequest.get()) {
            this.context.sendSplitRequest();
            this.needSendSplitRequest.compareAndSet(true, false);
        }
        if (this.isNoMoreSplitsAssignment() && this.isNoMoreElement()) {
            log.info("Reader {} send NoMoreElement event", (Object)this.context.getIndexOfSubtask());
            this.context.signalNoMoreElement();
        } else {
            super.pollNext(output);
        }
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        this.dataSourceDialect.commitChangeLogOffset(this.snapshotChangeLogOffset);
    }

    @Override
    public void addSplits(List<SourceSplitBase> splits) {
        ArrayList<SourceSplitBase> unfinishedSplits = new ArrayList<SourceSplitBase>();
        log.info("subtask {} add splits: {}", (Object)this.subtaskId, (Object)splits.stream().map(SourceSplitBase::splitId).collect(Collectors.joining(",")));
        for (SourceSplitBase split : splits) {
            if (split.isSnapshotSplit()) {
                SnapshotSplit snapshotSplit = split.asSnapshotSplit();
                if (snapshotSplit.isSnapshotReadFinished()) {
                    this.finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit);
                    log.info("subtask {} add finished split: {}", (Object)this.subtaskId, (Object)snapshotSplit.splitId());
                    continue;
                }
                unfinishedSplits.add(split);
                continue;
            }
            unfinishedSplits.add(split.asIncrementalSplit());
        }
        this.reportFinishedSnapshotSplitsIfNeed();
        if (!unfinishedSplits.isEmpty()) {
            super.addSplits(unfinishedSplits);
        } else {
            this.needSendSplitRequest.set(true);
        }
    }

    @Override
    protected void onSplitFinished(Map<String, SourceSplitStateBase> finishedSplitIds) {
        for (SourceSplitStateBase splitState : finishedSplitIds.values()) {
            SourceSplitBase sourceSplit = splitState.toSourceSplit();
            Preconditions.checkState(sourceSplit.isSnapshotSplit() && sourceSplit.asSnapshotSplit().isSnapshotReadFinished(), String.format("Only snapshot split could finish, but the actual split is incremental split %s", sourceSplit));
            this.finishedUnackedSplits.put(sourceSplit.splitId(), sourceSplit.asSnapshotSplit());
        }
        this.reportFinishedSnapshotSplitsIfNeed();
        this.context.sendSplitRequest();
    }

    private void reportFinishedSnapshotSplitsIfNeed() {
        if (!this.finishedUnackedSplits.isEmpty()) {
            ArrayList<SnapshotSplitWatermark> completedSnapshotSplitWatermarks = new ArrayList<SnapshotSplitWatermark>();
            for (SnapshotSplit split : this.finishedUnackedSplits.values()) {
                completedSnapshotSplitWatermarks.add(new SnapshotSplitWatermark(split.splitId(), split.getLowWatermark(), split.getHighWatermark()));
            }
            CompletedSnapshotSplitsReportEvent reportEvent = new CompletedSnapshotSplitsReportEvent();
            reportEvent.setCompletedSnapshotSplitWatermarks(completedSnapshotSplitWatermarks);
            this.context.sendSourceEventToEnumerator((SourceEvent)reportEvent);
            this.finishedUnackedSplits.clear();
            log.debug("The subtask {} reports offsets of finished snapshot splits {}.", (Object)this.subtaskId, completedSnapshotSplitWatermarks);
        }
    }

    @Override
    protected SourceSplitStateBase initializedState(SourceSplitBase split) {
        IncrementalSplitState splitState;
        if (split.isSnapshotSplit()) {
            return new SnapshotSplitState(split.asSnapshotSplit());
        }
        IncrementalSplit incrementalSplit = split.asIncrementalSplit();
        if (incrementalSplit.getCheckpointDataType() != null) {
            log.info("The incremental split[{}] has checkpoint datatype {} for restore.", (Object)incrementalSplit.splitId(), (Object)incrementalSplit.getCheckpointDataType());
            this.debeziumDeserializationSchema.restoreCheckpointProducedType(incrementalSplit.getCheckpointDataType());
        }
        if ((splitState = new IncrementalSplitState(incrementalSplit)).autoEnterPureIncrementPhaseIfAllowed()) {
            log.info("The incremental split[{}] startup position {} is equal the maxSnapshotSplitsHighWatermark {}, auto enter pure increment phase.", new Object[]{incrementalSplit.splitId(), splitState.getStartupOffset(), splitState.getMaxSnapshotSplitsHighWatermark()});
            log.info("Clean the IncrementalSplit#completedSnapshotSplitInfos to empty.");
            CompletedSnapshotPhaseEvent event = new CompletedSnapshotPhaseEvent(splitState.getTableIds());
            this.context.sendSourceEventToEnumerator((SourceEvent)event);
        }
        return splitState;
    }

    @Override
    public List<SourceSplitBase> snapshotState(long checkpointId) {
        List stateSplits = super.snapshotState(checkpointId);
        List<SourceSplitBase> unfinishedSplits = stateSplits.stream().filter(split -> !this.finishedUnackedSplits.containsKey(split.splitId())).collect(Collectors.toList());
        unfinishedSplits.addAll(this.finishedUnackedSplits.values());
        if (this.isIncrementalSplitPhase(unfinishedSplits)) {
            IncrementalSplit incrementalSplit = unfinishedSplits.get(0).asIncrementalSplit();
            this.snapshotChangeLogOffset = incrementalSplit.getStartupOffset();
            return this.snapshotCheckpointDataType(incrementalSplit);
        }
        return unfinishedSplits;
    }

    @Override
    protected SourceSplitBase toSplitType(String splitId, SourceSplitStateBase splitState) {
        return splitState.toSourceSplit();
    }

    private boolean isIncrementalSplitPhase(List<SourceSplitBase> stateSplits) {
        return stateSplits.size() == 1 && stateSplits.get(0).isIncrementalSplit();
    }

    private List<SourceSplitBase> snapshotCheckpointDataType(IncrementalSplit incrementalSplit) {
        SeaTunnelDataType<T> checkpointDataType = this.debeziumDeserializationSchema.getProducedType();
        IncrementalSplit newIncrementalSplit = new IncrementalSplit(incrementalSplit, checkpointDataType);
        log.debug("Snapshot checkpoint datatype {} into split[{}] state.", checkpointDataType, (Object)incrementalSplit.splitId());
        return Arrays.asList(newIncrementalSplit);
    }
}

