/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.translation.source;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.seatunnel.api.event.Event;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.event.EnumeratorCloseEvent;
import org.apache.seatunnel.api.source.event.EnumeratorOpenEvent;
import org.apache.seatunnel.api.source.event.ReaderCloseEvent;
import org.apache.seatunnel.api.source.event.ReaderOpenEvent;
import org.apache.seatunnel.translation.source.BaseSourceFunction;
import org.apache.seatunnel.translation.source.ParallelEnumeratorContext;
import org.apache.seatunnel.translation.source.ParallelReaderContext;
import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParallelSource<T, SplitT extends SourceSplit, StateT extends Serializable>
implements BaseSourceFunction<T> {
    private static final Logger LOG = LoggerFactory.getLogger(ParallelSource.class);
    protected final SeaTunnelSource<T, SplitT, StateT> source;
    protected final ParallelEnumeratorContext<SplitT> parallelEnumeratorContext;
    protected final ParallelReaderContext readerContext;
    protected final String jobId;
    protected final Integer subtaskId;
    protected final Integer parallelism;
    protected final Serializer<SplitT> splitSerializer;
    protected final Serializer<StateT> enumeratorStateSerializer;
    protected final List<SplitT> restoredSplitState;
    protected final SourceSplitEnumerator<SplitT, StateT> splitEnumerator;
    protected final SourceReader<T, SplitT> reader;
    protected volatile transient ScheduledThreadPoolExecutor executorService;
    private volatile boolean running = true;

    public ParallelSource(SeaTunnelSource<T, SplitT, StateT> source, Map<Integer, List<byte[]>> restoredState, int parallelism, String jobId, int subtaskId) {
        this.source = source;
        this.jobId = jobId;
        this.subtaskId = subtaskId;
        this.parallelism = parallelism;
        this.splitSerializer = source.getSplitSerializer();
        this.enumeratorStateSerializer = source.getEnumeratorStateSerializer();
        this.parallelEnumeratorContext = new ParallelEnumeratorContext(this, parallelism, jobId, subtaskId);
        this.readerContext = new ParallelReaderContext(this, source.getBoundedness(), jobId, subtaskId);
        try {
            if (restoredState != null && restoredState.size() > 0) {
                Serializable restoredEnumeratorState = null;
                if (restoredState.containsKey(-1)) {
                    restoredEnumeratorState = (Serializable)this.enumeratorStateSerializer.deserialize(restoredState.get(-1).get(0));
                }
                this.restoredSplitState = new ArrayList<SplitT>(restoredState.get(subtaskId).size());
                for (byte[] splitBytes : restoredState.get(subtaskId)) {
                    this.restoredSplitState.add((SourceSplit)this.splitSerializer.deserialize(splitBytes));
                }
                this.splitEnumerator = source.restoreEnumerator(this.parallelEnumeratorContext, restoredEnumeratorState);
            } else {
                this.restoredSplitState = Collections.emptyList();
                this.splitEnumerator = source.createEnumerator(this.parallelEnumeratorContext);
            }
            this.reader = source.createReader((SourceReader.Context)this.readerContext);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void open() throws Exception {
        this.executorService = ThreadPoolExecutorFactory.createScheduledThreadPoolExecutor(1, String.format("parallel-split-enumerator-executor-%s", this.subtaskId));
        this.splitEnumerator.open();
        if (this.restoredSplitState.size() > 0) {
            this.splitEnumerator.addSplitsBack(this.restoredSplitState, this.subtaskId.intValue());
        }
        this.reader.open();
        this.readerContext.getEventListener().onEvent((Event)new ReaderOpenEvent());
        this.parallelEnumeratorContext.register();
        this.parallelEnumeratorContext.getEventListener().onEvent((Event)new EnumeratorOpenEvent());
        this.splitEnumerator.registerReader(this.subtaskId.intValue());
    }

    @Override
    public void run(Collector<T> collector) throws Exception {
        Future<?> future = this.executorService.submit(() -> {
            try {
                this.splitEnumerator.run();
            }
            catch (Exception e) {
                throw new RuntimeException("SourceSplitEnumerator run failed.", e);
            }
        });
        while (this.running) {
            if (future.isDone()) {
                future.get();
            }
            this.reader.pollNext(collector);
            if (collector.isEmptyThisPollNext()) {
                Thread.sleep(100L);
                continue;
            }
            collector.resetEmptyThisPollNext();
            Thread.sleep(0L);
        }
        LOG.debug("Parallel source runs complete.");
    }

    @Override
    public void close() throws IOException {
        this.running = false;
        if (this.executorService != null) {
            LOG.debug("Close the thread pool resource.");
            this.executorService.shutdown();
        }
        if (this.splitEnumerator != null) {
            LOG.debug("Close the split enumerator for the Apache SeaTunnel source.");
            this.splitEnumerator.close();
        }
        if (this.reader != null) {
            LOG.debug("Close the data reader for the Apache SeaTunnel source.");
            this.reader.close();
            this.readerContext.getEventListener().onEvent((Event)new ReaderCloseEvent());
            this.parallelEnumeratorContext.getEventListener().onEvent((Event)new EnumeratorCloseEvent());
        }
    }

    protected void handleNoMoreElement() {
        this.running = false;
    }

    protected void handleSplitRequest(int subtaskId) {
        this.splitEnumerator.handleSplitRequest(subtaskId);
    }

    protected void addSplits(List<SplitT> splits) {
        this.reader.addSplits(splits);
    }

    protected void handleNoMoreSplits() {
        this.reader.handleNoMoreSplits();
    }

    @Override
    public Map<Integer, List<byte[]>> snapshotState(long checkpointId) throws Exception {
        List splitStates;
        HashMap<Integer, List<byte[]>> allStates = new HashMap<Integer, List<byte[]>>(2);
        Serializable enumeratorState = (Serializable)this.splitEnumerator.snapshotState(checkpointId);
        if (enumeratorState != null) {
            byte[] enumeratorStateBytes = this.enumeratorStateSerializer.serialize((Object)enumeratorState);
            allStates.put(-1, Collections.singletonList(enumeratorStateBytes));
        }
        if ((splitStates = this.reader.snapshotState(checkpointId)) != null) {
            ArrayList<byte[]> readerStateBytes = new ArrayList<byte[]>(splitStates.size());
            for (SourceSplit splitState : splitStates) {
                readerStateBytes.add(this.splitSerializer.serialize((Object)splitState));
            }
            allStates.put(this.subtaskId, readerStateBytes);
        }
        return allStates;
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        this.splitEnumerator.notifyCheckpointComplete(checkpointId);
        this.reader.notifyCheckpointComplete(checkpointId);
    }

    public void notifyCheckpointAborted(long checkpointId) throws Exception {
        this.splitEnumerator.notifyCheckpointAborted(checkpointId);
        this.reader.notifyCheckpointAborted(checkpointId);
    }
}

