/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.translation.flink.sink;

import java.lang.reflect.Field;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.event.DefaultEventProcessor;
import org.apache.seatunnel.api.event.EventListener;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.translation.flink.metric.FlinkMetricContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkSinkWriterContext
implements SinkWriter.Context {
    private static final Logger log = LoggerFactory.getLogger(FlinkSinkWriterContext.class);
    private final Sink.InitContext writerContext;
    private final EventListener eventListener;
    private final int parallelism;

    public FlinkSinkWriterContext(Sink.InitContext writerContext, int parallelism) {
        this.writerContext = writerContext;
        this.eventListener = new DefaultEventProcessor(FlinkSinkWriterContext.getFlinkJobId(writerContext));
        this.parallelism = parallelism;
    }

    public int getIndexOfSubtask() {
        return this.writerContext.getSubtaskId();
    }

    public int getNumberOfParallelSubtasks() {
        return this.writerContext.getNumberOfParallelSubtasks();
    }

    public MetricsContext getMetricsContext() {
        return new FlinkMetricContext(FlinkSinkWriterContext.getStreamingRuntimeContextForV15(this.writerContext));
    }

    public EventListener getEventListener() {
        return this.eventListener;
    }

    private static String getFlinkJobId(Sink.InitContext writerContext) {
        try {
            return FlinkSinkWriterContext.getStreamingRuntimeContextForV15(writerContext).getJobId().toString();
        }
        catch (Exception e) {
            log.warn("Get flink job id failed", (Throwable)e);
            return null;
        }
    }

    private static StreamingRuntimeContext getStreamingRuntimeContextForV15(Sink.InitContext writerContext) {
        try {
            Field contextImplField = writerContext.getClass().getDeclaredField("context");
            contextImplField.setAccessible(true);
            Object contextImpl = contextImplField.get(writerContext);
            Field runtimeContextField = contextImpl.getClass().getDeclaredField("runtimeContext");
            runtimeContextField.setAccessible(true);
            return (StreamingRuntimeContext)runtimeContextField.get(contextImpl);
        }
        catch (Exception e) {
            throw new IllegalStateException("Initialize flink context failed", e);
        }
    }
}

