/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.translation.source;

import java.io.IOException;
import java.io.Serializable;
import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.event.Event;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.event.EnumeratorCloseEvent;
import org.apache.seatunnel.api.source.event.EnumeratorOpenEvent;
import org.apache.seatunnel.api.source.event.ReaderCloseEvent;
import org.apache.seatunnel.api.source.event.ReaderOpenEvent;
import org.apache.seatunnel.translation.source.BaseSourceFunction;
import org.apache.seatunnel.translation.source.CoordinatedEnumeratorContext;
import org.apache.seatunnel.translation.source.CoordinatedReaderContext;
import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CoordinatedSource<T, SplitT extends SourceSplit, StateT extends Serializable>
implements BaseSourceFunction<T> {
    private static final Logger log = LoggerFactory.getLogger(CoordinatedSource.class);
    protected static final long SLEEP_TIME_INTERVAL = 5L;
    protected final SeaTunnelSource<T, SplitT, StateT> source;
    protected final Map<Integer, List<byte[]>> restoredState;
    protected final Integer parallelism;
    protected final String jobId;
    protected final Serializer<SplitT> splitSerializer;
    protected final Serializer<StateT> enumeratorStateSerializer;
    protected final CoordinatedEnumeratorContext<SplitT> coordinatedEnumeratorContext;
    protected final Map<Integer, CoordinatedReaderContext> readerContextMap;
    protected final Map<Integer, List<SplitT>> restoredSplitStateMap = new HashMap<Integer, List<SplitT>>();
    protected volatile transient SourceSplitEnumerator<SplitT, StateT> splitEnumerator;
    protected transient Map<Integer, SourceReader<T, SplitT>> readerMap = new ConcurrentHashMap<Integer, SourceReader<T, SplitT>>();
    protected final Map<Integer, AtomicBoolean> readerRunningMap;
    protected final AtomicInteger completedReader = new AtomicInteger(0);
    protected volatile transient ScheduledThreadPoolExecutor executorService;
    protected volatile boolean running = true;

    public CoordinatedSource(SeaTunnelSource<T, SplitT, StateT> source, Map<Integer, List<byte[]>> restoredState, int parallelism, String jobId) {
        this.source = source;
        this.restoredState = restoredState;
        this.parallelism = parallelism;
        this.jobId = jobId;
        this.splitSerializer = source.getSplitSerializer();
        this.enumeratorStateSerializer = source.getEnumeratorStateSerializer();
        this.coordinatedEnumeratorContext = new CoordinatedEnumeratorContext(this, jobId);
        this.readerContextMap = new ConcurrentHashMap<Integer, CoordinatedReaderContext>(parallelism);
        this.readerRunningMap = new ConcurrentHashMap<Integer, AtomicBoolean>(parallelism);
        try {
            this.createSplitEnumerator();
            this.createReaders();
        }
        catch (Exception e) {
            log.warn("create split enumerator or readers failed", (Throwable)e);
        }
    }

    private void createSplitEnumerator() throws Exception {
        if (this.restoredState != null && this.restoredState.size() > 0) {
            Serializable restoredEnumeratorState = null;
            if (this.restoredState.containsKey(-1)) {
                restoredEnumeratorState = (Serializable)this.enumeratorStateSerializer.deserialize(this.restoredState.get(-1).get(0));
            }
            this.splitEnumerator = this.source.restoreEnumerator(this.coordinatedEnumeratorContext, restoredEnumeratorState);
            this.restoredState.forEach((subtaskId, splitBytes) -> {
                if (subtaskId == -1) {
                    return;
                }
                ArrayList<Object> restoredSplitState = new ArrayList<Object>(splitBytes.size());
                for (byte[] splitByte : splitBytes) {
                    try {
                        restoredSplitState.add(this.splitSerializer.deserialize(splitByte));
                    }
                    catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }
                this.restoredSplitStateMap.put((Integer)subtaskId, (List<SplitT>)restoredSplitState);
            });
        } else {
            this.splitEnumerator = this.source.createEnumerator(this.coordinatedEnumeratorContext);
        }
    }

    private void createReaders() throws Exception {
        for (int subtaskId = 0; subtaskId < this.parallelism; ++subtaskId) {
            CoordinatedReaderContext readerContext = new CoordinatedReaderContext(this, this.source.getBoundedness(), this.jobId, subtaskId);
            this.readerContextMap.put(subtaskId, readerContext);
            this.readerRunningMap.put(subtaskId, new AtomicBoolean(true));
            SourceReader reader = this.source.createReader((SourceReader.Context)readerContext);
            this.readerMap.put(subtaskId, reader);
        }
    }

    @Override
    public void open() throws Exception {
        this.executorService = ThreadPoolExecutorFactory.createScheduledThreadPoolExecutor(this.parallelism, "parallel-split-enumerator-executor");
        this.splitEnumerator.open();
        this.coordinatedEnumeratorContext.getEventListener().onEvent((Event)new EnumeratorOpenEvent());
        this.restoredSplitStateMap.forEach((subtaskId, splits) -> this.splitEnumerator.addSplitsBack(splits, subtaskId.intValue()));
        this.readerMap.forEach((key, value) -> {
            try {
                value.open();
                this.readerContextMap.get(key).getEventListener().onEvent((Event)new ReaderOpenEvent());
                this.splitEnumerator.registerReader(key.intValue());
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    @Override
    public void run(Collector<T> collector) throws Exception {
        this.readerMap.entrySet().parallelStream().forEach(entry -> {
            AtomicBoolean flag = this.readerRunningMap.get(entry.getKey());
            SourceReader reader = (SourceReader)entry.getValue();
            this.executorService.execute(() -> {
                while (flag.get()) {
                    try {
                        reader.pollNext(collector);
                        if (collector.isEmptyThisPollNext()) {
                            Thread.sleep(100L);
                            continue;
                        }
                        collector.resetEmptyThisPollNext();
                        Thread.sleep(0L);
                    }
                    catch (Exception e) {
                        this.running = false;
                        flag.set(false);
                        throw new RuntimeException(e);
                    }
                }
            });
        });
        this.splitEnumerator.run();
        while (this.running) {
            Thread.sleep(5L);
        }
    }

    @Override
    public void close() throws IOException {
        this.running = false;
        for (Map.Entry<Integer, SourceReader<T, SplitT>> entry : this.readerMap.entrySet()) {
            this.readerRunningMap.get(entry.getKey()).set(false);
            entry.getValue().close();
            this.readerContextMap.get(entry.getKey()).getEventListener().onEvent((Event)new ReaderCloseEvent());
        }
        if (this.executorService != null) {
            this.executorService.shutdown();
        }
        try (SourceSplitEnumerator<SplitT, StateT> closed = this.splitEnumerator;){
            this.coordinatedEnumeratorContext.getEventListener().onEvent((Event)new EnumeratorCloseEvent());
        }
    }

    @Override
    public Map<Integer, List<byte[]>> snapshotState(long checkpointId) throws Exception {
        Map<Integer, List<byte[]>> allStates = this.readerMap.entrySet().parallelStream().collect(Collectors.toMap(Map.Entry::getKey, readerEntry -> {
            try {
                List splitStates = ((SourceReader)readerEntry.getValue()).snapshotState(checkpointId);
                ArrayList<byte[]> rawValues = new ArrayList<byte[]>(splitStates.size());
                for (SourceSplit splitState : splitStates) {
                    rawValues.add(this.splitSerializer.serialize((Object)splitState));
                }
                return rawValues;
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }));
        Serializable enumeratorState = (Serializable)this.splitEnumerator.snapshotState(checkpointId);
        if (enumeratorState != null) {
            byte[] enumeratorStateBytes = this.enumeratorStateSerializer.serialize((Object)enumeratorState);
            allStates.put(-1, Collections.singletonList(enumeratorStateBytes));
        }
        return allStates;
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        this.splitEnumerator.notifyCheckpointComplete(checkpointId);
        this.readerMap.values().parallelStream().forEach(reader -> {
            try {
                reader.notifyCheckpointComplete(checkpointId);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    public void notifyCheckpointAborted(long checkpointId) throws Exception {
        this.splitEnumerator.notifyCheckpointAborted(checkpointId);
        this.readerMap.values().parallelStream().forEach(reader -> {
            try {
                reader.notifyCheckpointAborted(checkpointId);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    protected void handleNoMoreElement(int subtaskId) {
        this.readerRunningMap.get(subtaskId).set(false);
        this.readerContextMap.remove(subtaskId);
        if (this.completedReader.incrementAndGet() == this.parallelism.intValue()) {
            this.running = false;
        }
    }

    protected void handleSplitRequest(int subtaskId) {
        this.splitEnumerator.handleSplitRequest(subtaskId);
    }

    protected void handleReaderEvent(int subtaskId, SourceEvent event) {
        this.splitEnumerator.handleSourceEvent(subtaskId, event);
    }

    public int currentReaderCount() {
        return this.readerContextMap.size();
    }

    public Set<Integer> registeredReaders() {
        return this.readerMap.keySet();
    }

    protected void addSplits(int subtaskId, List<SplitT> splits) {
        this.readerMap.get(subtaskId).addSplits(splits);
    }

    protected void handleNoMoreSplits(int subtaskId) {
        this.readerMap.get(subtaskId).handleNoMoreSplits();
    }

    protected void handleEnumeratorEvent(int subtaskId, SourceEvent event) {
        this.readerMap.get(subtaskId).handleSourceEvent(event);
    }

    static {
        DriverManager.getDrivers();
    }
}

