/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.cdc.base.source.reader.external;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.Fetcher;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IncrementalSourceScanFetcher
implements Fetcher<SourceRecords, SourceSplitBase> {
    private static final Logger log = LoggerFactory.getLogger(IncrementalSourceScanFetcher.class);
    public AtomicBoolean hasNextElement;
    public AtomicBoolean reachEnd;
    private final FetchTask.Context taskContext;
    private final ExecutorService executorService;
    private volatile ChangeEventQueue<DataChangeEvent> queue;
    private volatile Throwable readException;
    private FetchTask<SourceSplitBase> snapshotSplitReadTask;
    private SnapshotSplit currentSnapshotSplit;
    private static final long READER_CLOSE_TIMEOUT_SECONDS = 30L;

    public IncrementalSourceScanFetcher(FetchTask.Context taskContext, int subtaskId) {
        this.taskContext = taskContext;
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("debezium-snapshot-reader-" + subtaskId).build();
        this.executorService = Executors.newSingleThreadExecutor(threadFactory);
        this.hasNextElement = new AtomicBoolean(false);
        this.reachEnd = new AtomicBoolean(false);
    }

    @Override
    public void submitTask(FetchTask<SourceSplitBase> fetchTask) {
        this.snapshotSplitReadTask = fetchTask;
        this.currentSnapshotSplit = fetchTask.getSplit().asSnapshotSplit();
        this.taskContext.configure(this.currentSnapshotSplit);
        this.queue = this.taskContext.getQueue();
        this.hasNextElement.set(true);
        this.reachEnd.set(false);
        this.executorService.submit(() -> {
            try {
                log.info("Start snapshot read task for snapshot split: {} exactly-once: {}", (Object)this.currentSnapshotSplit, (Object)this.taskContext.isExactlyOnce());
                this.snapshotSplitReadTask.execute(this.taskContext);
            }
            catch (Exception e) {
                log.error(String.format("Execute snapshot read task for snapshot split %s fail", this.currentSnapshotSplit), (Throwable)e);
                this.readException = e;
            }
        });
    }

    @Override
    public boolean isFinished() {
        return this.currentSnapshotSplit == null || !this.snapshotSplitReadTask.isRunning() && !this.hasNextElement.get() && this.reachEnd.get();
    }

    @Override
    public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException, SeaTunnelException {
        this.checkReadException();
        if (this.hasNextElement.get()) {
            if (this.taskContext.isExactlyOnce()) {
                return this.pollSplitRecordsIfExactlyOnce();
            }
            return this.pollSplitRecordsIfNotExactlyOnce();
        }
        this.reachEnd.compareAndSet(false, true);
        return null;
    }

    public Iterator<SourceRecords> pollSplitRecordsIfNotExactlyOnce() throws InterruptedException {
        ArrayList<SourceRecord> sendRecords = new ArrayList<SourceRecord>();
        List<DataChangeEvent> batch = this.queue.poll();
        for (DataChangeEvent event : batch) {
            SourceRecord record = event.getRecord();
            sendRecords.add(record);
            if (!WatermarkEvent.isHighWatermarkEvent(record)) continue;
            this.hasNextElement.set(false);
        }
        ArrayList<SourceRecords> sourceRecordsSet = new ArrayList<SourceRecords>();
        sourceRecordsSet.add(new SourceRecords(sendRecords));
        return sourceRecordsSet.iterator();
    }

    public Iterator<SourceRecords> pollSplitRecordsIfExactlyOnce() throws InterruptedException {
        boolean reachChangeLogStart = false;
        boolean reachChangeLogEnd = false;
        SourceRecord lowWatermark = null;
        SourceRecord highWatermark = null;
        LinkedHashMap<Struct, SourceRecord> outputBuffer = new LinkedHashMap<Struct, SourceRecord>();
        block0: while (!reachChangeLogEnd) {
            this.checkReadException();
            List<DataChangeEvent> batch = this.queue.poll();
            for (DataChangeEvent event : batch) {
                SourceRecord record = event.getRecord();
                if (lowWatermark == null) {
                    lowWatermark = record;
                    this.assertLowWatermark(lowWatermark);
                    continue;
                }
                if (highWatermark == null && WatermarkEvent.isHighWatermarkEvent(record)) {
                    highWatermark = record;
                    reachChangeLogStart = true;
                    continue;
                }
                if (reachChangeLogStart && WatermarkEvent.isEndWatermarkEvent(record)) {
                    reachChangeLogEnd = true;
                    continue block0;
                }
                if (!reachChangeLogStart) {
                    outputBuffer.put((Struct)record.key(), record);
                    continue;
                }
                if (!this.isChangeRecordInChunkRange(record)) continue;
                this.taskContext.rewriteOutputBuffer(outputBuffer, record);
            }
        }
        this.hasNextElement.set(false);
        ArrayList<SourceRecord> normalizedRecords = new ArrayList<SourceRecord>();
        normalizedRecords.add(lowWatermark);
        normalizedRecords.addAll(this.taskContext.formatMessageTimestamp(outputBuffer.values()));
        normalizedRecords.add(highWatermark);
        ArrayList<SourceRecords> sourceRecordsSet = new ArrayList<SourceRecords>();
        sourceRecordsSet.add(new SourceRecords(normalizedRecords));
        return sourceRecordsSet.iterator();
    }

    private void assertLowWatermark(SourceRecord lowWatermark) {
        Preconditions.checkState(WatermarkEvent.isLowWatermarkEvent(lowWatermark), String.format("The first record should be low watermark signal event, but actual is %s", lowWatermark));
    }

    private void checkReadException() {
        if (this.readException != null) {
            throw new SeaTunnelException(String.format("Read split %s error due to %s.", this.currentSnapshotSplit, this.readException.getMessage()), this.readException);
        }
    }

    @Override
    public void close() {
        try {
            if (this.taskContext != null) {
                this.taskContext.close();
            }
            if (this.snapshotSplitReadTask != null) {
                this.snapshotSplitReadTask.shutdown();
            }
            if (this.executorService != null) {
                this.executorService.shutdown();
                if (!this.executorService.awaitTermination(30L, TimeUnit.SECONDS)) {
                    log.warn("Failed to close the scan fetcher in {} seconds. Service will execute force close(ExecutorService.shutdownNow)", (Object)30L);
                    this.executorService.shutdownNow();
                }
            }
        }
        catch (Exception e) {
            log.error("Close scan fetcher error", (Throwable)e);
        }
    }

    private boolean isChangeRecordInChunkRange(SourceRecord record) {
        if (this.taskContext.isDataChangeRecord(record)) {
            return this.taskContext.isRecordBetween(record, this.currentSnapshotSplit.getSplitStart(), this.currentSnapshotSplit.getSplitEnd());
        }
        return false;
    }
}

