/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source.operator;

import java.util.ArrayList;
import java.util.Collections;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackendParametersImpl;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.MockOutput;
import org.apache.flink.streaming.util.MockStreamConfig;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.paimon.flink.source.SimpleSourceSplit;
import org.apache.paimon.flink.source.SimpleSourceSplitSerializer;
import org.apache.paimon.flink.source.operator.AbstractTestingSourceOperator;

public class TestingSourceOperator<T>
extends AbstractTestingSourceOperator<T, SimpleSourceSplit> {
    private static final long serialVersionUID = 1L;
    private final int subtaskIndex;
    private final int parallelism;

    public TestingSourceOperator(StreamOperatorParameters<T> parameters, SourceReader<T, SimpleSourceSplit> reader, WatermarkStrategy<T> watermarkStrategy, ProcessingTimeService timeService, boolean emitProgressiveWatermarks) {
        this(parameters, reader, watermarkStrategy, timeService, new TestingOperatorEventGateway(), 1, 5, emitProgressiveWatermarks);
    }

    public TestingSourceOperator(StreamOperatorParameters<T> parameters, SourceReader<T, SimpleSourceSplit> reader, WatermarkStrategy<T> watermarkStrategy, ProcessingTimeService timeService, OperatorEventGateway eventGateway, int subtaskIndex, int parallelism, boolean emitProgressiveWatermarks) {
        super(context -> reader, eventGateway, (SimpleVersionedSerializer)new SimpleSourceSplitSerializer(), watermarkStrategy, timeService, new Configuration(), "localhost", emitProgressiveWatermarks, () -> false);
        this.subtaskIndex = subtaskIndex;
        this.parallelism = parallelism;
        this.metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
        this.initSourceMetricGroup();
        try {
            this.initReader();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        this.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
    }

    public StreamingRuntimeContext getRuntimeContext() {
        return new MockStreamingRuntimeContext(false, this.parallelism, this.subtaskIndex);
    }

    public ExecutionConfig getExecutionConfig() {
        ExecutionConfig cfg = new ExecutionConfig();
        cfg.setAutoWatermarkInterval(100L);
        return cfg;
    }

    public static <T> SourceOperator<T, SimpleSourceSplit> createTestOperator(SourceReader<T, SimpleSourceSplit> reader, WatermarkStrategy<T> watermarkStrategy, boolean emitProgressiveWatermarks) throws Exception {
        HashMapStateBackend abstractStateBackend = new HashMapStateBackend();
        MockEnvironment env = new MockEnvironmentBuilder().build();
        CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
        OperatorStateBackend operatorStateStore = abstractStateBackend.createOperatorStateBackend((StateBackend.OperatorStateBackendParameters)new OperatorStateBackendParametersImpl((Environment)env, "test-operator", Collections.emptyList(), cancelStreamRegistry));
        StateInitializationContextImpl stateContext = new StateInitializationContextImpl(null, (OperatorStateStore)operatorStateStore, null, null, null);
        TestProcessingTimeService timeService = new TestProcessingTimeService();
        timeService.setCurrentTime(Integer.MAX_VALUE);
        TestingSourceOperator<T> sourceOperator = new TestingSourceOperator<T>(new StreamOperatorParameters((StreamTask)new SourceOperatorStreamTask((Environment)new DummyEnvironment()), (StreamConfig)new MockStreamConfig(new Configuration(), 1), (Output)new MockOutput(new ArrayList()), null, null, null), reader, watermarkStrategy, (ProcessingTimeService)timeService, emitProgressiveWatermarks);
        sourceOperator.initializeState((StateInitializationContext)stateContext);
        sourceOperator.open();
        return sourceOperator;
    }

    private static class TestingOperatorEventGateway
    implements OperatorEventGateway {
        private TestingOperatorEventGateway() {
        }

        public void sendEventToCoordinator(OperatorEvent event) {
        }
    }
}

