/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.source;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.metrics.AbstractMetricsContext;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.event.EventListener;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.constants.JobMode;

public class SourceFlowTestUtils {
    public static List<SeaTunnelRow> runBatchWithCheckpointDisabled(ReadonlyConfig options, TableSourceFactory factory) throws Exception {
        JobContext context = new JobContext(Long.valueOf(System.currentTimeMillis()));
        context.setJobMode(JobMode.BATCH);
        context.setEnableCheckpoint(false);
        return SourceFlowTestUtils.runWithContext(options, factory, context, Boundedness.BOUNDED, 1);
    }

    public static List<SeaTunnelRow> runBatchWithCheckpointEnabled(ReadonlyConfig options, TableSourceFactory factory) throws Exception {
        JobContext context = new JobContext(Long.valueOf(System.currentTimeMillis()));
        context.setJobMode(JobMode.BATCH);
        context.setEnableCheckpoint(true);
        return SourceFlowTestUtils.runWithContext(options, factory, context, Boundedness.BOUNDED, 1);
    }

    public static List<SeaTunnelRow> runParallelSubtasksBatchWithCheckpointDisabled(ReadonlyConfig options, TableSourceFactory factory, int parallelism) throws Exception {
        JobContext context = new JobContext(Long.valueOf(System.currentTimeMillis()));
        context.setJobMode(JobMode.BATCH);
        context.setEnableCheckpoint(false);
        return SourceFlowTestUtils.runWithContext(options, factory, context, Boundedness.BOUNDED, parallelism);
    }

    private static List<SeaTunnelRow> runWithContext(ReadonlyConfig options, TableSourceFactory factory, JobContext context, final Boundedness boundedness, final int parallelism) throws Exception {
        SourceReader reader2;
        SeaTunnelSource source = factory.createSource(new TableSourceFactoryContext(options, Thread.currentThread().getContextClassLoader())).createSource();
        source.setJobContext(context);
        final HashSet<Integer> registeredReaders = new HashSet<Integer>();
        final ArrayList<SourceReader> readers = new ArrayList<SourceReader>();
        final HashSet<Integer> unfinishedReaders = new HashSet<Integer>();
        final SourceSplitEnumerator enumerator = source.createEnumerator((SourceSplitEnumerator.Context)new SourceSplitEnumerator.Context<SourceSplit>(){

            public int currentParallelism() {
                return parallelism;
            }

            public Set<Integer> registeredReaders() {
                return registeredReaders;
            }

            public void assignSplit(int subtaskId, List<SourceSplit> splits) {
                if (this.registeredReaders().isEmpty()) {
                    return;
                }
                SourceReader reader = (SourceReader)readers.get(subtaskId);
                if (splits.isEmpty()) {
                    reader.handleNoMoreSplits();
                } else {
                    reader.addSplits(splits);
                }
            }

            public void signalNoMoreSplits(int subtask) {
                SourceReader reader = (SourceReader)readers.get(subtask);
                reader.handleNoMoreSplits();
            }

            public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
                SourceReader reader = (SourceReader)readers.get(subtaskId);
                reader.handleSourceEvent(event);
            }

            public MetricsContext getMetricsContext() {
                return new AbstractMetricsContext(){};
            }

            public EventListener getEventListener() {
                return (EventListener & Serializable)event -> {};
            }
        });
        enumerator.open();
        for (int i = 0; i < parallelism; ++i) {
            final int finalI = i;
            reader2 = source.createReader(new SourceReader.Context(){

                public int getIndexOfSubtask() {
                    return finalI;
                }

                public Boundedness getBoundedness() {
                    return boundedness;
                }

                public void signalNoMoreElement() {
                    unfinishedReaders.remove(finalI);
                }

                public void sendSplitRequest() {
                    enumerator.handleSplitRequest(finalI);
                }

                public void sendSourceEventToEnumerator(SourceEvent sourceEvent) {
                    enumerator.handleSourceEvent(finalI, sourceEvent);
                }

                public MetricsContext getMetricsContext() {
                    return new AbstractMetricsContext(){};
                }

                public EventListener getEventListener() {
                    return (EventListener & Serializable)event -> {};
                }
            });
            unfinishedReaders.add(i);
            registeredReaders.add(i);
            readers.add(reader2);
            enumerator.registerReader(i);
        }
        enumerator.run();
        final ArrayList<SeaTunnelRow> rows = new ArrayList<SeaTunnelRow>();
        while (!unfinishedReaders.isEmpty()) {
            for (int i = 0; i < parallelism; ++i) {
                reader2 = (SourceReader)readers.get(i);
                if (!unfinishedReaders.contains(i)) continue;
                reader2.pollNext(new Collector(){

                    public void collect(Object record) {
                        rows.add((SeaTunnelRow)record);
                    }

                    public Object getCheckpointLock() {
                        return reader2;
                    }
                });
            }
        }
        enumerator.close();
        for (final SourceReader reader2 : readers) {
            reader2.close();
        }
        return rows;
    }
}

