/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExternalizedCheckpointRetention;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.types.Row;
import org.apache.paimon.Snapshot;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.utils.Pair;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

public class FlinkJobRecoveryITCase
extends CatalogITCaseBase {
    private static final String MINI_CLUSTER_FIELD = "miniCluster";

    @Override
    @BeforeEach
    public void before() throws IOException {
        super.before();
        this.sEnv.getConfig().getConfiguration().set(CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION, (Object)ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION).removeKey("execution.checkpointing.interval");
        this.batchSql("INSERT INTO source_table1 VALUES (1, 'test-1', '20241030')", new Object[0]);
        this.batchSql("INSERT INTO source_table1 VALUES (2, 'test-2', '20241030')", new Object[0]);
        this.batchSql("INSERT INTO source_table1 VALUES (3, 'test-3', '20241030')", new Object[0]);
        this.batchSql("INSERT INTO source_table2 VALUES (4, 'test-4', '20241031'), (5, 'test-5', '20241031'), (6, 'test-6', '20241031')", new Object[0]);
    }

    @Override
    protected List<String> ddl() {
        return Arrays.asList(String.format("CREATE CATALOG `fs_catalog` WITH ('type'='paimon', 'warehouse'='%s')", this.path), "CREATE TABLE IF NOT EXISTS `source_table1` (k INT, f1 STRING, dt STRING) WITH ('bucket'='1', 'bucket-key'='k')", "CREATE TABLE IF NOT EXISTS `source_table2` (k INT, f1 STRING, dt STRING) WITH ('bucket'='1', 'bucket-key'='k')");
    }

    @ParameterizedTest
    @EnumSource(value=BucketMode.class)
    @Timeout(value=300L)
    public void testRestoreFromSavepointWithJobGraphChange(BucketMode bucketMode) throws Exception {
        if (bucketMode == BucketMode.POSTPONE_MODE) {
            return;
        }
        this.createTargetTable("target_table", bucketMode);
        String beforeRecoverSql = "INSERT INTO `target_table` /*+ OPTIONS('sink.operator-uid.suffix'='test-uid') */ SELECT * FROM source_table1 /*+ OPTIONS('source.operator-uid.suffix'='test-uid') */";
        String beforeRecoverCheckSql = "SELECT * FROM target_table";
        List<Row> beforeRecoverExpectedRows = Arrays.asList(Row.of((Object[])new Object[]{1, "test-1", "20241030"}), Row.of((Object[])new Object[]{2, "test-2", "20241030"}), Row.of((Object[])new Object[]{3, "test-3", "20241030"}));
        String afterRecoverSql = "INSERT INTO `target_table` /*+ OPTIONS('sink.operator-uid.suffix'='test-uid') */ (SELECT * FROM source_table1 /*+ OPTIONS('source.operator-uid.suffix'='test-uid') */ UNION ALL SELECT * FROM source_table2)";
        String afterRecoverCheckSql = "SELECT * FROM target_table";
        List<Row> afterRecoverExpectedRows = Arrays.asList(Row.of((Object[])new Object[]{1, "test-1", "20241030"}), Row.of((Object[])new Object[]{2, "test-2", "20241030"}), Row.of((Object[])new Object[]{3, "test-3", "20241030"}), Row.of((Object[])new Object[]{4, "test-4", "20241031"}), Row.of((Object[])new Object[]{5, "test-5", "20241031"}), Row.of((Object[])new Object[]{6, "test-6", "20241031"}));
        this.testRecoverFromSavepoint(beforeRecoverSql, beforeRecoverCheckSql, beforeRecoverExpectedRows, afterRecoverSql, afterRecoverCheckSql, afterRecoverExpectedRows, Collections.emptyList(), (Pair<String, String>)Pair.of((Object)"target_table", (Object)"target_table"), Collections.emptyMap());
    }

    @Test
    @Timeout(value=300L)
    public void testRestoreFromSavepointWithIgnoreSourceState() throws Exception {
        this.createTargetTable("target_table", BucketMode.HASH_FIXED);
        String beforeRecoverSql = "INSERT INTO `target_table` SELECT * FROM source_table1";
        String beforeRecoverCheckSql = "SELECT * FROM target_table";
        List<Row> beforeRecoverExpectedRows = Arrays.asList(Row.of((Object[])new Object[]{1, "test-1", "20241030"}), Row.of((Object[])new Object[]{2, "test-2", "20241030"}), Row.of((Object[])new Object[]{3, "test-3", "20241030"}));
        String afterRecoverSql = "INSERT INTO `target_table` SELECT * FROM source_table2 /*+ OPTIONS('source.operator-uid.suffix'='test-uid') */";
        String afterRecoverCheckSql = "SELECT * FROM target_table";
        List<Row> afterRecoverExpectedRows = Arrays.asList(Row.of((Object[])new Object[]{1, "test-1", "20241030"}), Row.of((Object[])new Object[]{2, "test-2", "20241030"}), Row.of((Object[])new Object[]{3, "test-3", "20241030"}), Row.of((Object[])new Object[]{4, "test-4", "20241031"}), Row.of((Object[])new Object[]{5, "test-5", "20241031"}), Row.of((Object[])new Object[]{6, "test-6", "20241031"}));
        this.testRecoverFromSavepoint(beforeRecoverSql, beforeRecoverCheckSql, beforeRecoverExpectedRows, afterRecoverSql, afterRecoverCheckSql, afterRecoverExpectedRows, Collections.emptyList(), (Pair<String, String>)Pair.of((Object)"target_table", (Object)"target_table"), Collections.singletonMap(StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), "true"));
    }

    @Test
    @Timeout(value=300L)
    public void testRestoreFromSavepointWithIgnoreSinkState() throws Exception {
        this.createTargetTable("target_table", BucketMode.HASH_FIXED);
        this.createTargetTable("target_table2", BucketMode.HASH_FIXED);
        String beforeRecoverSql = "INSERT INTO `target_table` SELECT * FROM source_table1";
        String beforeRecoverCheckSql = "SELECT * FROM target_table";
        List<Row> beforeRecoverExpectedRows = Arrays.asList(Row.of((Object[])new Object[]{1, "test-1", "20241030"}), Row.of((Object[])new Object[]{2, "test-2", "20241030"}), Row.of((Object[])new Object[]{3, "test-3", "20241030"}));
        String afterRecoverSql = "INSERT INTO `target_table2` /*+ OPTIONS('sink.operator-uid.suffix'='test-uid') */ SELECT * FROM source_table1";
        String afterRecoverCheckSql = "SELECT * FROM target_table2";
        List<Row> afterRecoverExpectedRows = Arrays.asList(Row.of((Object[])new Object[]{7, "test-7", "20241030"}), Row.of((Object[])new Object[]{8, "test-8", "20241030"}), Row.of((Object[])new Object[]{9, "test-9", "20241030"}));
        String updateSql = "INSERT INTO source_table1 VALUES (7, 'test-7', '20241030'), (8, 'test-8', '20241030'), (9, 'test-9', '20241030')";
        this.testRecoverFromSavepoint(beforeRecoverSql, beforeRecoverCheckSql, beforeRecoverExpectedRows, afterRecoverSql, afterRecoverCheckSql, afterRecoverExpectedRows, Collections.singletonList(updateSql), (Pair<String, String>)Pair.of((Object)"target_table", (Object)"target_table2"), Collections.singletonMap(StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), "true"));
    }

    private void testRecoverFromSavepoint(String beforeRecoverSql, String beforeRecoverCheckSql, List<Row> beforeRecoverExpectedRows, String afterRecoverSql, String afterRecoverCheckSql, List<Row> afterRecoverExpectedRows, List<String> updateSql, Pair<String, String> targetTables, Map<String, String> recoverOptions) throws Exception {
        JobClient jobClient = (JobClient)this.sEnv.executeSql(beforeRecoverSql).getJobClient().get();
        String checkpointPath = this.triggerCheckpointAndWaitForWrites(jobClient, (String)targetTables.getLeft(), beforeRecoverExpectedRows.size());
        jobClient.cancel().get();
        List<Row> rows = this.batchSql(beforeRecoverCheckSql, new Object[0]);
        Assertions.assertThat((int)rows.size()).isEqualTo(beforeRecoverExpectedRows.size());
        Assertions.assertThat(rows).containsExactlyInAnyOrder((Object[])beforeRecoverExpectedRows.toArray(new Row[0]));
        for (String sql : updateSql) {
            this.batchSql(sql, new Object[0]);
        }
        Configuration config = this.sEnv.getConfig().getConfiguration();
        config.setString("execution.state-recovery.path", checkpointPath);
        for (Map.Entry<String, String> entry : recoverOptions.entrySet()) {
            config.setString(entry.getKey(), entry.getValue());
        }
        jobClient = (JobClient)this.sEnv.executeSql(afterRecoverSql).getJobClient().get();
        this.triggerCheckpointAndWaitForWrites(jobClient, (String)targetTables.getRight(), afterRecoverExpectedRows.size());
        jobClient.cancel().get();
        rows = this.batchSql(afterRecoverCheckSql, new Object[0]);
        Assertions.assertThat((int)rows.size()).isEqualTo(afterRecoverExpectedRows.size());
        Assertions.assertThat(rows).containsExactlyInAnyOrder((Object[])afterRecoverExpectedRows.toArray(new Row[0]));
    }

    private void createTargetTable(String tableName, BucketMode bucketMode) {
        switch (bucketMode) {
            case HASH_FIXED: {
                this.batchSql(String.format("CREATE TABLE IF NOT EXISTS `%s` (k INT, f1 STRING, pt STRING, PRIMARY KEY(k, pt) NOT ENFORCED) WITH ('bucket'='2', 'commit.force-create-snapshot'='true')", tableName), new Object[0]);
                return;
            }
            case HASH_DYNAMIC: {
                this.batchSql(String.format("CREATE TABLE IF NOT EXISTS `%s` (k INT, f1 STRING, pt STRING, PRIMARY KEY(k, pt) NOT ENFORCED) WITH ('bucket'='-1', 'commit.force-create-snapshot'='true')", tableName), new Object[0]);
                return;
            }
            case CROSS_PARTITION: {
                this.batchSql(String.format("CREATE TABLE IF NOT EXISTS `%s` (k INT, f1 STRING, pt STRING, PRIMARY KEY(k) NOT ENFORCED) WITH ('bucket'='-1', 'commit.force-create-snapshot'='true')", tableName), new Object[0]);
                return;
            }
            case BUCKET_UNAWARE: {
                this.batchSql(String.format("CREATE TABLE IF NOT EXISTS `%s` (k INT, f1 STRING, pt STRING) WITH ('bucket'='-1', 'commit.force-create-snapshot'='true')", tableName), new Object[0]);
                return;
            }
        }
        throw new IllegalArgumentException("Unsupported bucket mode: " + bucketMode);
    }

    private Snapshot waitForNewSnapshot(String tableName, long initialSnapshot) throws InterruptedException {
        Snapshot snapshot = this.findLatestSnapshot(tableName);
        while (snapshot == null || snapshot.id() == initialSnapshot) {
            Thread.sleep(2000L);
            snapshot = this.findLatestSnapshot(tableName);
        }
        return snapshot;
    }

    private <T> T reflectGetMiniCluster(Object instance) throws NoSuchFieldException, IllegalAccessException {
        Field field = instance.getClass().getDeclaredField(MINI_CLUSTER_FIELD);
        field.setAccessible(true);
        return (T)field.get(instance);
    }

    private String triggerCheckpointAndWaitForWrites(JobClient jobClient, String targetTable, long totalRecords) throws Exception {
        MiniCluster miniCluster = (MiniCluster)this.reflectGetMiniCluster(jobClient);
        JobID jobID = jobClient.getJobID();
        JobStatus jobStatus = (JobStatus)jobClient.getJobStatus().get();
        while (jobStatus == JobStatus.INITIALIZING || jobStatus == JobStatus.CREATED) {
            Thread.sleep(2000L);
            jobStatus = (JobStatus)jobClient.getJobStatus().get();
        }
        if (jobStatus != JobStatus.RUNNING) {
            throw new IllegalStateException("Job status is not RUNNING");
        }
        AtomicBoolean allTaskRunning = new AtomicBoolean(false);
        while (!allTaskRunning.get()) {
            allTaskRunning.set(true);
            Thread.sleep(2000L);
            ((CompletableFuture)miniCluster.getExecutionGraph(jobID).thenAccept(eg -> eg.getAllExecutionVertices().forEach(ev -> {
                if (ev.getExecutionState() != ExecutionState.RUNNING) {
                    allTaskRunning.set(false);
                }
            }))).get();
        }
        String checkpointPath = (String)miniCluster.triggerCheckpoint(jobID).get();
        Snapshot snapshot = this.waitForNewSnapshot(targetTable, -1L);
        while (snapshot.totalRecordCount() < totalRecords) {
            checkpointPath = (String)miniCluster.triggerCheckpoint(jobID).get();
            snapshot = this.waitForNewSnapshot(targetTable, snapshot.id());
        }
        return checkpointPath;
    }
}

