/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.translation.spark.source.reader.batch;

import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.Handover;
import org.apache.seatunnel.translation.source.BaseSourceFunction;
import org.apache.seatunnel.translation.source.ParallelSource;
import org.apache.seatunnel.translation.spark.serialization.InternalRowCollector;
import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;
import org.apache.spark.sql.catalyst.InternalRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParallelBatchPartitionReader {
    private static final Logger log = LoggerFactory.getLogger(ParallelBatchPartitionReader.class);
    protected static final Integer INTERVAL = 100;
    protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
    protected final Integer parallelism;
    protected final Integer subtaskId;
    protected final ExecutorService executorService;
    protected final Handover<InternalRow> handover;
    protected final Object checkpointLock = new Object();
    protected volatile boolean running = true;
    protected volatile boolean prepare = true;
    protected volatile BaseSourceFunction<SeaTunnelRow> internalSource;
    protected volatile InternalRowCollector internalRowCollector;

    public ParallelBatchPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?> source, Integer parallelism, Integer subtaskId) {
        this.source = source;
        this.parallelism = parallelism;
        this.subtaskId = subtaskId;
        this.executorService = ThreadPoolExecutorFactory.createScheduledThreadPoolExecutor((int)1, (String)this.getEnumeratorThreadName());
        this.handover = new Handover();
    }

    protected String getEnumeratorThreadName() {
        return String.format("parallel-split-enumerator-executor-%s", this.subtaskId);
    }

    public boolean next() throws IOException {
        this.prepare();
        while (this.running && this.handover.isEmpty()) {
            try {
                Thread.sleep(INTERVAL.intValue());
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        return this.running || !this.handover.isEmpty();
    }

    protected void prepare() {
        if (!this.prepare) {
            return;
        }
        this.internalSource = this.createInternalSource();
        try {
            this.internalSource.open();
        }
        catch (Exception e) {
            this.running = false;
            throw new RuntimeException("Failed to open internal source.", e);
        }
        this.internalRowCollector = new InternalRowCollector(this.handover, this.checkpointLock, this.source.getProducedType());
        this.executorService.execute(() -> {
            try {
                this.internalSource.run((Collector)this.internalRowCollector);
            }
            catch (Exception e) {
                this.handover.reportError((Throwable)e);
                log.error("BatchPartitionReader execute failed.", (Throwable)e);
                this.running = false;
            }
        });
        this.prepare = false;
    }

    protected BaseSourceFunction<SeaTunnelRow> createInternalSource() {
        return new InternalParallelSource(this.source, null, this.parallelism, this.subtaskId);
    }

    public InternalRow get() {
        try {
            return (InternalRow)this.handover.pollNext().get();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void close() throws IOException {
        this.running = false;
        try {
            if (this.internalSource != null) {
                this.internalSource.close();
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        this.executorService.shutdown();
    }

    public class InternalParallelSource<SplitT extends SourceSplit, StateT extends Serializable>
    extends ParallelSource<SeaTunnelRow, SplitT, StateT> {
        public InternalParallelSource(SeaTunnelSource<SeaTunnelRow, SplitT, StateT> source, Map<Integer, List<byte[]>> restoredState, int parallelism, int subtaskId) {
            super(source, restoredState, parallelism, subtaskId);
        }

        protected void handleNoMoreElement() {
            super.handleNoMoreElement();
            ParallelBatchPartitionReader.this.running = false;
        }
    }
}

