/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.translation.flink.source;

import java.lang.reflect.Field;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.event.DefaultEventProcessor;
import org.apache.seatunnel.api.event.EventListener;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.translation.flink.metric.FlinkMetricContext;
import org.apache.seatunnel.translation.flink.source.NoMoreElementEvent;
import org.apache.seatunnel.translation.flink.source.SourceEventWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkSourceReaderContext
implements SourceReader.Context {
    private static final Logger log = LoggerFactory.getLogger(FlinkSourceReaderContext.class);
    private static final Logger LOGGER = LoggerFactory.getLogger(FlinkSourceReaderContext.class);
    private final AtomicBoolean isSendNoMoreElementEvent = new AtomicBoolean(false);
    private final SourceReaderContext readerContext;
    private final SeaTunnelSource source;
    protected final EventListener eventListener;

    public FlinkSourceReaderContext(SourceReaderContext readerContext, SeaTunnelSource source) {
        this.readerContext = readerContext;
        this.source = source;
        this.eventListener = new DefaultEventProcessor(FlinkSourceReaderContext.getFlinkJobId(readerContext));
    }

    public int getIndexOfSubtask() {
        return this.readerContext.getIndexOfSubtask();
    }

    public Boundedness getBoundedness() {
        return this.source.getBoundedness();
    }

    public void signalNoMoreElement() {
        if (!this.isSendNoMoreElementEvent.get()) {
            LOGGER.info("Reader [{}] send no more element event to enumerator", (Object)this.readerContext.getIndexOfSubtask());
            this.isSendNoMoreElementEvent.compareAndSet(false, true);
            this.readerContext.sendSourceEventToCoordinator((SourceEvent)new NoMoreElementEvent(this.readerContext.getIndexOfSubtask()));
        }
    }

    public void sendSplitRequest() {
        this.readerContext.sendSplitRequest();
    }

    public void sendSourceEventToEnumerator(org.apache.seatunnel.api.source.SourceEvent sourceEvent) {
        this.readerContext.sendSourceEventToCoordinator((SourceEvent)new SourceEventWrapper(sourceEvent));
    }

    public MetricsContext getMetricsContext() {
        return new FlinkMetricContext(FlinkSourceReaderContext.getStreamingRuntimeContext(this.readerContext));
    }

    public boolean isSendNoMoreElementEvent() {
        return this.isSendNoMoreElementEvent.get();
    }

    public EventListener getEventListener() {
        return this.eventListener;
    }

    private static String getFlinkJobId(SourceReaderContext readerContext) {
        try {
            return FlinkSourceReaderContext.getStreamingRuntimeContext(readerContext).getJobId().toString();
        }
        catch (Exception e) {
            log.warn("Get flink job id failed", (Throwable)e);
            return null;
        }
    }

    private static StreamingRuntimeContext getStreamingRuntimeContext(SourceReaderContext readerContext) {
        try {
            Field field = readerContext.getClass().getDeclaredField("this$0");
            field.setAccessible(true);
            AbstractStreamOperator operator = (AbstractStreamOperator)field.get(readerContext);
            return operator.getRuntimeContext();
        }
        catch (Exception e) {
            throw new IllegalStateException("Initialize flink context failed", e);
        }
    }
}

