/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.translation.flink.source;

import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext;
import org.apache.seatunnel.api.common.metrics.AbstractMetricsContext;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.event.DefaultEventProcessor;
import org.apache.seatunnel.api.event.EventListener;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.translation.flink.source.SourceEventWrapper;
import org.apache.seatunnel.translation.flink.source.SplitWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkSourceSplitEnumeratorContext<SplitT extends SourceSplit>
implements SourceSplitEnumerator.Context<SplitT> {
    private static final Logger log = LoggerFactory.getLogger(FlinkSourceSplitEnumeratorContext.class);
    private final SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext;
    protected final EventListener eventListener;

    public FlinkSourceSplitEnumeratorContext(SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext) {
        this.enumContext = enumContext;
        this.eventListener = new DefaultEventProcessor(FlinkSourceSplitEnumeratorContext.getFlinkJobId(enumContext));
    }

    public int currentParallelism() {
        return this.enumContext.currentParallelism();
    }

    public Set<Integer> registeredReaders() {
        return this.enumContext.registeredReaders().keySet();
    }

    public void assignSplit(int subtaskId, List<SplitT> splits) {
        splits.forEach(split -> this.enumContext.assignSplit(new SplitWrapper<SourceSplit>((SourceSplit)split), subtaskId));
    }

    public void signalNoMoreSplits(int subtask) {
        this.enumContext.signalNoMoreSplits(subtask);
    }

    public void sendEventToSourceReader(int subtaskId, org.apache.seatunnel.api.source.SourceEvent event) {
        this.enumContext.sendEventToSourceReader(subtaskId, (SourceEvent)new SourceEventWrapper(event));
    }

    public MetricsContext getMetricsContext() {
        return new AbstractMetricsContext(){};
    }

    public EventListener getEventListener() {
        return this.eventListener;
    }

    private static String getFlinkJobId(SplitEnumeratorContext enumContext) {
        try {
            return FlinkSourceSplitEnumeratorContext.getJobIdForV15(enumContext);
        }
        catch (Exception e) {
            log.warn("Get flink job id failed", (Throwable)e);
            return null;
        }
    }

    private static String getJobIdForV15(SplitEnumeratorContext enumContext) {
        try {
            SourceCoordinatorContext coordinatorContext = (SourceCoordinatorContext)enumContext;
            Field field = coordinatorContext.getClass().getDeclaredField("operatorCoordinatorContext");
            field.setAccessible(true);
            OperatorCoordinator.Context operatorCoordinatorContext = (OperatorCoordinator.Context)field.get(coordinatorContext);
            Field[] fields = operatorCoordinatorContext.getClass().getDeclaredFields();
            Optional<Field> fieldOptional = Arrays.stream(fields).filter(f -> f.getName().equals("globalFailureHandler")).findFirst();
            if (!fieldOptional.isPresent()) {
                fieldOptional = Arrays.stream(fields).filter(f -> f.getName().equals("context")).findFirst();
                field = fieldOptional.get();
                field.setAccessible(true);
                operatorCoordinatorContext = (OperatorCoordinator.Context)field.get(operatorCoordinatorContext);
            }
            field = Arrays.stream(operatorCoordinatorContext.getClass().getDeclaredFields()).filter(f -> f.getName().equals("globalFailureHandler")).findFirst().get();
            field.setAccessible(true);
            Object obj = field.get(operatorCoordinatorContext);
            fields = obj.getClass().getDeclaredFields();
            field = Arrays.stream(fields).filter(f -> f.getName().equals("arg$1")).findFirst().get();
            field.setAccessible(true);
            SchedulerBase schedulerBase = (SchedulerBase)field.get(obj);
            return schedulerBase.getExecutionGraph().getJobID().toString();
        }
        catch (Exception e) {
            throw new IllegalStateException("Initialize flink job-id failed", e);
        }
    }
}

