/*
 * Decompiled with CFR 0.152.
 */
package com.king.bravo.testing;

import com.king.bravo.testing.CollectingSink;
import com.king.bravo.testing.MiniClusterResourceFactory;
import com.king.bravo.testing.PipelineAction;
import com.king.bravo.testing.actions.CancelJob;
import com.king.bravo.testing.actions.NextWatermark;
import com.king.bravo.testing.actions.Process;
import com.king.bravo.testing.actions.Sleep;
import com.king.bravo.testing.actions.TestPipelineSource;
import com.king.bravo.testing.actions.TriggerFailure;
import com.king.bravo.testing.actions.TriggerSavepoint;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.function.Function;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.util.TestLogger;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BravoTestPipeline
extends TestLogger
implements Serializable {
    private static final long serialVersionUID = 1L;
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Rule
    public final TemporaryFolder folder = new TemporaryFolder();
    public static JobGraph jobGraph;
    public static ClusterClient<?> client;
    public static JobID jobID;
    public static LinkedList<PipelineAction> actions;

    @Before
    public void cleanOutputs() {
        CollectingSink.OUTPUT.clear();
        actions.clear();
    }

    public Path getLastCheckpointPath() throws IOException {
        FileStatus[] listStatus = FileSystem.getLocalFileSystem().listStatus(new Path(this.getCheckpointDir(), jobID.toString()));
        return Arrays.stream(listStatus).filter(s -> s.getPath().getName().startsWith("chk")).sorted((s1, s2) -> -Integer.compare(Integer.parseInt(s1.getPath().getName().split("-")[1]), Integer.parseInt(s2.getPath().getName().split("-")[1]))).findFirst().map(s -> s.getPath()).orElseThrow(() -> new IllegalStateException("Cannot find any checkpoints"));
    }

    public List<String> runTestPipeline(Function<DataStream<String>, DataStream<String>> pipelinerBuilder) throws Exception {
        return this.runTestPipeline(2, null, pipelinerBuilder);
    }

    public List<String> restoreTestPipelineFromSnapshot(String savepoint, Function<DataStream<String>, DataStream<String>> pipelinerBuilder) throws Exception {
        return this.runTestPipeline(2, savepoint, pipelinerBuilder);
    }

    public List<String> restoreTestPipelineFromLastCheckpoint(Function<DataStream<String>, DataStream<String>> pipelinerBuilder) throws Exception {
        return this.restoreTestPipelineFromSnapshot(this.getLastCheckpointPath().getPath(), pipelinerBuilder);
    }

    public List<String> restoreTestPipelineFromLastSavepoint(Function<DataStream<String>, DataStream<String>> pipelinerBuilder) throws Exception {
        if (TriggerSavepoint.lastSavepointPath == null) {
            throw new RuntimeException("triggerSavepoint must be called to obtain a valid savepoint");
        }
        return this.restoreTestPipelineFromSnapshot(TriggerSavepoint.lastSavepointPath, pipelinerBuilder);
    }

    private StreamExecutionEnvironment createJobGraph(int parallelism, Function<DataStream<String>, DataStream<String>> pipelinerBuilder) throws Exception {
        Path checkpointDir = this.getCheckpointDir();
        Path savepointRootDir = this.getSavepointDir();
        checkpointDir.getFileSystem().mkdirs(checkpointDir);
        savepointRootDir.getFileSystem().mkdirs(savepointRootDir);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableSysoutLogging();
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setBufferTimeout(0L);
        env.setParallelism(parallelism);
        env.enableCheckpointing(500L, CheckpointingMode.EXACTLY_ONCE);
        env.setStateBackend((StateBackend)new RocksDBStateBackend(checkpointDir.toString(), true));
        SingleOutputStreamOperator sourceData = env.addSource((SourceFunction)new TestPipelineSource()).uid("TestSource").name("TestSource").setParallelism(1);
        pipelinerBuilder.apply((DataStream<String>)sourceData).addSink((SinkFunction)new CollectingSink()).name("Output").uid("Output").setParallelism(1);
        return env;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<String> runTestPipeline(int parallelism, String savepoint, Function<DataStream<String>, DataStream<String>> pipelinerBuilder) throws Exception {
        if (!actions.isEmpty() && actions.getLast() instanceof CancelJob && ((CancelJob)actions.getLast()).isClusterActionTriggered()) {
            this.cancelJob();
        }
        jobGraph = this.createJobGraph(parallelism, pipelinerBuilder).getStreamGraph().getJobGraph();
        if (savepoint != null) {
            jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)savepoint));
        }
        jobID = jobGraph.getJobID();
        MiniClusterResourceFactory clusterFactory = this.createCluster(1, 2);
        MiniClusterResource cluster = clusterFactory.get();
        cluster.before();
        client = cluster.getClusterClient();
        try {
            client.submitJob(jobGraph, BravoTestPipeline.class.getClassLoader());
        }
        catch (ProgramInvocationException pie) {
            if (!pie.getMessage().contains("Job was cancelled") && !pie.getCause().getMessage().contains("Job was cancelled")) {
                throw pie;
            }
        }
        finally {
            cluster.after();
        }
        return CollectingSink.OUTPUT;
    }

    protected Path getCheckpointDir() {
        return new Path("file://" + this.folder.getRoot().getAbsolutePath(), "checkpoints");
    }

    protected Path getSavepointDir() {
        return new Path("file://" + this.folder.getRoot().getAbsolutePath(), "savepoints");
    }

    protected Path getLastSavepointPath() {
        return new Path(TriggerSavepoint.lastSavepointPath);
    }

    protected Savepoint getLastCheckpoint() throws IOException {
        return BravoTestPipeline.loadSavepoint(this.getLastCheckpointPath().getPath());
    }

    protected Savepoint getLastSavepoint() throws IOException {
        return BravoTestPipeline.loadSavepoint(this.getLastSavepointPath().getPath());
    }

    public void process(String element) {
        actions.add(new Process(element, 0L));
    }

    public void process(String element, long ts) {
        actions.add(new Process(element, ts));
    }

    public void triggerFailure() {
        actions.add(new TriggerFailure());
    }

    public void triggerSavepoint() {
        actions.add(new TriggerSavepoint());
    }

    public void cancelJob() {
        actions.add(new CancelJob());
    }

    public void processWatermark(long timestamp) {
        actions.add(new NextWatermark(timestamp));
    }

    public void sleep(long millis) {
        actions.add(new Sleep(millis));
    }

    public void sleep(Time time) {
        this.sleep(time.toMilliseconds());
    }

    public static Savepoint loadSavepoint(String checkpointPointer) {
        try {
            Method resolveCheckpointPointer = AbstractFsCheckpointStorage.class.getDeclaredMethod("resolveCheckpointPointer", String.class);
            resolveCheckpointPointer.setAccessible(true);
            CompletedCheckpointStorageLocation loc = (CompletedCheckpointStorageLocation)resolveCheckpointPointer.invoke(null, checkpointPointer);
            return Checkpoints.loadCheckpointMetadata((DataInputStream)new DataInputStream((InputStream)loc.getMetadataHandle().openInputStream()), (ClassLoader)BravoTestPipeline.class.getClassLoader());
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private MiniClusterResourceFactory createCluster(int numTaskManagers, int numSlotsPerTaskManager) {
        Configuration config = new Configuration();
        config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, this.getCheckpointDir().toUri().toString());
        config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
        config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, this.getSavepointDir().toUri().toString());
        MiniClusterResourceFactory clusterFactory = new MiniClusterResourceFactory(numTaskManagers, numSlotsPerTaskManager, config);
        return clusterFactory;
    }

    static {
        actions = new LinkedList();
    }
}

