/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.translation.flink.source;

import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.seatunnel.api.event.Event;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.event.EnumeratorCloseEvent;
import org.apache.seatunnel.api.source.event.EnumeratorOpenEvent;
import org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext;
import org.apache.seatunnel.translation.flink.source.NoMoreElementEvent;
import org.apache.seatunnel.translation.flink.source.SourceEventWrapper;
import org.apache.seatunnel.translation.flink.source.SplitWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkSourceEnumerator<SplitT extends SourceSplit, EnumStateT>
implements SplitEnumerator<SplitWrapper<SplitT>, EnumStateT> {
    private static final Logger LOGGER = LoggerFactory.getLogger(FlinkSourceEnumerator.class);
    private final SourceSplitEnumerator<SplitT, EnumStateT> sourceSplitEnumerator;
    private final SplitEnumeratorContext<SplitWrapper<SplitT>> enumeratorContext;
    private final SourceSplitEnumerator.Context<SplitT> context;
    private final int parallelism;
    private final Object lock = new Object();
    private volatile boolean isRun = false;
    private volatile int currentRegisterReaders = 0;

    public FlinkSourceEnumerator(SourceSplitEnumerator<SplitT, EnumStateT> enumerator, SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext) {
        this.sourceSplitEnumerator = enumerator;
        this.enumeratorContext = enumContext;
        this.context = new FlinkSourceSplitEnumeratorContext<SplitT>(this.enumeratorContext);
        this.parallelism = this.enumeratorContext.currentParallelism();
    }

    public void start() {
        this.sourceSplitEnumerator.open();
        this.context.getEventListener().onEvent((Event)new EnumeratorOpenEvent());
    }

    public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
        this.sourceSplitEnumerator.handleSplitRequest(subtaskId);
    }

    public void addSplitsBack(List<SplitWrapper<SplitT>> splits, int subtaskId) {
        this.sourceSplitEnumerator.addSplitsBack(splits.stream().map(SplitWrapper::getSourceSplit).collect(Collectors.toList()), subtaskId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addReader(int subtaskId) {
        this.sourceSplitEnumerator.registerReader(subtaskId);
        Object object = this.lock;
        synchronized (object) {
            ++this.currentRegisterReaders;
            if (!this.isRun && this.currentRegisterReaders == this.parallelism) {
                try {
                    this.sourceSplitEnumerator.run();
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
                this.isRun = true;
            }
        }
    }

    public EnumStateT snapshotState(long checkpointId) throws Exception {
        return (EnumStateT)this.sourceSplitEnumerator.snapshotState(checkpointId);
    }

    public void close() throws IOException {
        this.sourceSplitEnumerator.close();
        this.context.getEventListener().onEvent((Event)new EnumeratorCloseEvent());
    }

    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
        if (sourceEvent instanceof NoMoreElementEvent) {
            LOGGER.info("Received NoMoreElementEvent from reader [{}], total registered readers [{}]", (Object)subtaskId, (Object)this.enumeratorContext.currentParallelism());
            this.enumeratorContext.sendEventToSourceReader(subtaskId, sourceEvent);
        }
        if (sourceEvent instanceof SourceEventWrapper) {
            this.sourceSplitEnumerator.handleSourceEvent(subtaskId, ((SourceEventWrapper)sourceEvent).getSourceEvent());
        }
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        this.sourceSplitEnumerator.notifyCheckpointComplete(checkpointId);
    }

    public void notifyCheckpointAborted(long checkpointId) throws Exception {
        this.sourceSplitEnumerator.notifyCheckpointAborted(checkpointId);
    }
}

