/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.translation.spark.source.reader.batch;

import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.translation.source.BaseSourceFunction;
import org.apache.seatunnel.translation.source.CoordinatedSource;
import org.apache.seatunnel.translation.spark.serialization.InternalRowCollector;
import org.apache.seatunnel.translation.spark.source.reader.batch.ParallelBatchPartitionReader;

public class CoordinatedBatchPartitionReader
extends ParallelBatchPartitionReader {
    protected final Map<Integer, InternalRowCollector> collectorMap;

    public CoordinatedBatchPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?> source, Integer parallelism, Integer subtaskId) {
        super(source, parallelism, subtaskId);
        this.collectorMap = new HashMap<Integer, InternalRowCollector>(parallelism);
        for (int i = 0; i < parallelism; ++i) {
            this.collectorMap.put(i, new InternalRowCollector(this.handover, new Object(), source.getProducedType()));
        }
    }

    @Override
    protected String getEnumeratorThreadName() {
        return "coordinated-split-enumerator-executor";
    }

    @Override
    protected BaseSourceFunction<SeaTunnelRow> createInternalSource() {
        return new InternalCoordinatedSource(this.source, null, this.parallelism);
    }

    public class InternalCoordinatedSource<SplitT extends SourceSplit, StateT extends Serializable>
    extends CoordinatedSource<SeaTunnelRow, SplitT, StateT> {
        public InternalCoordinatedSource(SeaTunnelSource<SeaTunnelRow, SplitT, StateT> source, Map<Integer, List<byte[]>> restoredState, int parallelism) {
            super(source, restoredState, parallelism);
        }

        public void run(Collector<SeaTunnelRow> collector) throws Exception {
            this.readerMap.entrySet().parallelStream().forEach(entry -> {
                AtomicBoolean flag = (AtomicBoolean)this.readerRunningMap.get(entry.getKey());
                SourceReader reader = (SourceReader)entry.getValue();
                Collector rowCollector = (Collector)CoordinatedBatchPartitionReader.this.collectorMap.get(entry.getKey());
                this.executorService.execute(() -> {
                    while (flag.get()) {
                        try {
                            reader.pollNext(rowCollector);
                            Thread.sleep(5L);
                        }
                        catch (Exception e) {
                            this.running = false;
                            flag.set(false);
                            throw new RuntimeException(e);
                        }
                    }
                });
            });
            this.splitEnumerator.run();
            while (this.running) {
                Thread.sleep(5L);
            }
        }

        protected void handleNoMoreElement(int subtaskId) {
            super.handleNoMoreElement(subtaskId);
            if (!this.running) {
                CoordinatedBatchPartitionReader.this.running = false;
            }
        }
    }
}

