/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.utils.FailingFileIO;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class SinkSavepointITCase
extends AbstractTestBase {
    private String path;
    private String failingName;

    @BeforeEach
    public void before() throws Exception {
        this.path = this.getTempDirPath();
        this.failingName = UUID.randomUUID().toString();
    }

    @Test
    @Timeout(value=180L)
    public void testRecoverFromSavepoint() throws Exception {
        String failingPath = FailingFileIO.getFailingPath((String)this.failingName, (String)this.path);
        String savepointPath = null;
        ThreadLocalRandom random = ThreadLocalRandom.current();
        boolean enableFailure = random.nextBoolean();
        if (enableFailure) {
            FailingFileIO.reset((String)this.failingName, (int)100, (int)500);
        } else {
            FailingFileIO.reset((String)this.failingName, (int)0, (int)1);
        }
        block2: while (true) {
            JobClient jobClient = this.runRecoverFromSavepointJob(failingPath, savepointPath);
            while (true) {
                Thread.sleep(random.nextInt(5000));
                if (jobClient.getJobStatus().get() == JobStatus.FINISHED) break block2;
                try {
                    savepointPath = (String)jobClient.stopWithSavepoint(false, this.path + "/savepoint", SavepointFormatType.DEFAULT).get();
                }
                catch (Exception e) {
                    Optional t;
                    if (!(t = ExceptionUtils.findThrowable((Throwable)e, StopWithSavepointStoppingException.class)).isPresent()) continue;
                    savepointPath = ((StopWithSavepointStoppingException)t.get()).getSavepointPath();
                }
                break;
            }
            while (true) {
                if (((JobStatus)jobClient.getJobStatus().get()).isGloballyTerminalState()) continue block2;
                Thread.sleep(1000L);
            }
            break;
        }
        this.checkRecoverFromSavepointBatchResult();
        this.checkRecoverFromSavepointStreamingResult();
    }

    private JobClient runRecoverFromSavepointJob(String failingPath, String savepointPath) throws Exception {
        Configuration conf = new Configuration();
        if (savepointPath != null) {
            SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath((String)savepointPath, (boolean)false);
            SavepointRestoreSettings.toConfiguration((SavepointRestoreSettings)savepointRestoreSettings, (Configuration)conf);
        }
        TableEnvironment tEnv = this.tableEnvironmentBuilder().streamingMode().checkpointIntervalMs(500).parallelism(1).allowRestart().setConf(conf).setConf(StateBackendOptions.STATE_BACKEND, "filesystem").setConf(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file://" + this.path + "/checkpoint").build();
        String createCatalogSql = String.join((CharSequence)"\n", "CREATE CATALOG my_catalog WITH (", "  'type' = 'paimon',", "  'warehouse' = '" + failingPath + "'", ")");
        FailingFileIO.retryArtificialException(() -> tEnv.executeSql(createCatalogSql));
        tEnv.executeSql("USE CATALOG my_catalog");
        tEnv.executeSql(String.join((CharSequence)"\n", "CREATE TEMPORARY TABLE S (", "  a INT", ") WITH (", "  'connector' = 'datagen',", "  'rows-per-second' = '10000',", "  'fields.a.kind' = 'sequence',", "  'fields.a.start' = '0',", "  'fields.a.end' = '99999'", ")"));
        String createSinkSql = String.join((CharSequence)"\n", "CREATE TABLE IF NOT EXISTS T (", "  k INT,", "  v INT,", "  PRIMARY KEY (k) NOT ENFORCED", ") WITH (", "  'bucket' = '4',", "  'file.format' = 'avro',", "  'changelog-producer' = 'full-compaction',", "  'full-compaction.delta-commits' = '3'", ")");
        FailingFileIO.retryArtificialException(() -> tEnv.executeSql(createSinkSql));
        String insertIntoSql = String.format("INSERT INTO T /*+ OPTIONS('sink.parallelism' = '%d') */ SELECT (a %% 15000) AS k, a AS v FROM S", ThreadLocalRandom.current().nextInt(3) + 2);
        JobClient jobClient = (JobClient)((TableResult)FailingFileIO.retryArtificialException(() -> tEnv.executeSql(insertIntoSql))).getJobClient().get();
        while (jobClient.getJobStatus().get() == JobStatus.INITIALIZING) {
            Thread.sleep(1000L);
        }
        return jobClient;
    }

    private void checkRecoverFromSavepointBatchResult() throws Exception {
        TableEnvironment tEnv = this.tableEnvironmentBuilder().batchMode().build();
        tEnv.executeSql(String.join((CharSequence)"\n", "CREATE CATALOG my_catalog WITH (", "  'type' = 'paimon',", "  'warehouse' = '" + this.path + "'", ")"));
        tEnv.executeSql("USE CATALOG my_catalog");
        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
        for (int i = 0; i < 100000; ++i) {
            expected.put(i % 15000, i);
        }
        HashMap<Integer, Integer> actual = new HashMap<Integer, Integer>();
        try (CloseableIterator it = tEnv.executeSql("SELECT * FROM T").collect();){
            while (it.hasNext()) {
                Row row = (Row)it.next();
                Assertions.assertThat((int)row.getArity()).isEqualTo(2);
                actual.put((Integer)row.getField(0), (Integer)row.getField(1));
            }
        }
        Assertions.assertThat(actual).isEqualTo(expected);
    }

    private void checkRecoverFromSavepointStreamingResult() throws Exception {
        TableEnvironment tEnv = this.tableEnvironmentBuilder().streamingMode().build();
        tEnv.executeSql(String.join((CharSequence)"\n", "CREATE CATALOG my_catalog WITH (", "  'type' = 'paimon',", "  'warehouse' = '" + this.path + "'", ")"));
        tEnv.executeSql("USE CATALOG my_catalog");
        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
        for (int i = 0; i < 100000; ++i) {
            expected.put(i % 15000, i);
        }
        HashSet expectedValues = new HashSet(expected.values());
        int endCount = 0;
        HashMap<Integer, Integer> actual = new HashMap<Integer, Integer>();
        try (CloseableIterator it = tEnv.executeSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id' = '2') */").collect();){
            while (it.hasNext()) {
                Row row = (Row)it.next();
                Assertions.assertThat((int)row.getArity()).isEqualTo(2);
                int k = (Integer)row.getField(0);
                int v = (Integer)row.getField(1);
                switch (row.getKind()) {
                    case INSERT: 
                    case UPDATE_AFTER: {
                        Assertions.assertThat(actual).doesNotContainKey((Object)k);
                        actual.put(k, v);
                        break;
                    }
                    case DELETE: 
                    case UPDATE_BEFORE: {
                        Assertions.assertThat(actual).containsKey((Object)k);
                        actual.remove(k);
                        break;
                    }
                    default: {
                        throw new UnsupportedOperationException("Unknown row kind " + row.getKind());
                    }
                }
                if (expectedValues.contains(v)) {
                    ++endCount;
                }
                if (endCount < expectedValues.size()) continue;
                break;
            }
        }
        Assertions.assertThat(actual).isEqualTo(expected);
    }
}

