/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.types.Row;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.utils.SnapshotManager;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class RescaleBucketITCase
extends CatalogITCaseBase {
    private final String alterTableSql = "ALTER TABLE %s SET ('bucket' = '%d')";
    private final String rescaleOverwriteSql = "INSERT OVERWRITE %s SELECT * FROM %s";

    @Override
    protected List<String> ddl() {
        return Arrays.asList(String.format("CREATE CATALOG `fs_catalog` WITH ('type' = 'paimon', 'warehouse' = '%s')", this.path), "CREATE TABLE IF NOT EXISTS `fs_catalog`.`default`.`T1` (f0 INT) WITH ('bucket' = '2', 'bucket-key' = 'f0')");
    }

    @Test
    public void testRescaleCatalogTable() {
        this.innerTest("fs_catalog", "T1");
    }

    @Test
    public void testSuspendAndRecoverAfterRescaleOverwrite() throws Exception {
        this.executeBoth(Arrays.asList("USE CATALOG fs_catalog", "CREATE TEMPORARY TABLE IF NOT EXISTS `S0` (f0 INT) WITH ('connector' = 'datagen')", "CREATE TABLE IF NOT EXISTS `T3` (f0 INT) WITH ('bucket' = '2', 'bucket-key' = 'f0')", "CREATE TABLE IF NOT EXISTS `T4` (f0 INT)"));
        SchemaManager schemaManager = new SchemaManager((FileIO)LocalFileIO.create(), this.getTableDirectory("T3"));
        this.assertLatestSchema(schemaManager, 0L, 2);
        String streamSql = "EXECUTE STATEMENT SET BEGIN\n INSERT INTO `T3` SELECT * FROM `S0`;\n INSERT INTO `T4` SELECT * FROM `S0`;\nEND";
        JobClient jobClient = this.startJobAndCommitSnapshot(streamSql, null);
        String savepointPath = this.stopJobSafely(jobClient);
        Snapshot snapshotBeforeRescale = this.findLatestSnapshot("T3");
        Assertions.assertThat((Object)snapshotBeforeRescale).isNotNull();
        this.assertSnapshotSchema(schemaManager, snapshotBeforeRescale.schemaId(), 0L, 2);
        List<Row> committedData = this.batchSql("SELECT * FROM T3", new Object[0]);
        this.batchSql("ALTER TABLE %s SET ('bucket' = '%d')", "T3", 4);
        this.assertLatestSchema(schemaManager, 1L, 4);
        this.batchSql("INSERT OVERWRITE %s SELECT * FROM %s", "T3", "T3");
        Snapshot snapshotAfterRescale = this.findLatestSnapshot("T3");
        Assertions.assertThat((Object)snapshotAfterRescale).isNotNull();
        Assertions.assertThat((long)snapshotAfterRescale.id()).isEqualTo(snapshotBeforeRescale.id() + 1L);
        Assertions.assertThat((Comparable)snapshotAfterRescale.commitKind()).isEqualTo((Object)Snapshot.CommitKind.OVERWRITE);
        this.assertSnapshotSchema(schemaManager, snapshotAfterRescale.schemaId(), 1L, 4);
        Assertions.assertThat(this.batchSql("SELECT * FROM T3", new Object[0])).containsExactlyInAnyOrderElementsOf(committedData);
        this.sEnv.getConfig().getConfiguration().setString("execution.state-recovery.path", savepointPath);
        JobClient resumedJobClient = this.startJobAndCommitSnapshot(streamSql, snapshotAfterRescale.id());
        this.stopJobSafely(resumedJobClient);
        Snapshot lastSnapshot = this.findLatestSnapshot("T3");
        Assertions.assertThat((Object)lastSnapshot).isNotNull();
        SnapshotManager snapshotManager = new SnapshotManager((FileIO)LocalFileIO.create(), this.getTableDirectory("T3"));
        for (long snapshotId = lastSnapshot.id(); snapshotId > snapshotAfterRescale.id(); --snapshotId) {
            this.assertSnapshotSchema(schemaManager, snapshotManager.snapshot(snapshotId).schemaId(), 1L, 4);
        }
        Assertions.assertThat(this.batchSql("SELECT * FROM T3", new Object[0])).containsExactlyInAnyOrderElementsOf(this.batchSql("SELECT * FROM T4", new Object[0]));
    }

    private void waitForTheNextSnapshot(@Nullable Long initSnapshotId) throws InterruptedException {
        Snapshot snapshot = this.findLatestSnapshot("T3");
        while (snapshot == null || new Long(snapshot.id()).equals(initSnapshotId)) {
            Thread.sleep(2000L);
            snapshot = this.findLatestSnapshot("T3");
        }
    }

    private JobClient startJobAndCommitSnapshot(String sql, @Nullable Long initSnapshotId) throws Exception {
        JobClient jobClient = (JobClient)this.sEnv.executeSql(sql).getJobClient().get();
        this.waitForTheNextSnapshot(initSnapshotId);
        return jobClient;
    }

    private String stopJobSafely(JobClient client) throws ExecutionException, InterruptedException {
        CompletableFuture savepointPath = client.stopWithSavepoint(true, this.path, SavepointFormatType.DEFAULT);
        while (!((JobStatus)client.getJobStatus().get()).isGloballyTerminalState()) {
            Thread.sleep(2000L);
        }
        return (String)savepointPath.get();
    }

    private void assertLatestSchema(SchemaManager schemaManager, long expectedSchemaId, int expectedBucketNum) {
        Assertions.assertThat((Optional)schemaManager.latest()).isPresent();
        TableSchema tableSchema = (TableSchema)schemaManager.latest().get();
        Assertions.assertThat((long)tableSchema.id()).isEqualTo(expectedSchemaId);
        Assertions.assertThat((Map)tableSchema.options()).containsEntry((Object)CoreOptions.BUCKET.key(), (Object)String.valueOf(expectedBucketNum));
    }

    private void assertSnapshotSchema(SchemaManager schemaManager, long schemaIdFromSnapshot, long expectedSchemaId, int expectedBucketNum) {
        Assertions.assertThat((long)schemaIdFromSnapshot).isEqualTo(expectedSchemaId);
        TableSchema tableSchema = schemaManager.schema(schemaIdFromSnapshot);
        Assertions.assertThat((Map)tableSchema.options()).containsEntry((Object)CoreOptions.BUCKET.key(), (Object)String.valueOf(expectedBucketNum));
    }

    private void innerTest(String catalogName, String tableName) {
        String useCatalogSql = "USE CATALOG %s";
        this.batchSql(useCatalogSql, catalogName);
        String insertSql = "INSERT INTO %s VALUES (1), (2), (3), (4), (5)";
        this.batchSql(insertSql, tableName);
        Snapshot snapshot = this.findLatestSnapshot(tableName);
        Assertions.assertThat((Object)snapshot).isNotNull();
        SchemaManager schemaManager = new SchemaManager((FileIO)LocalFileIO.create(), this.getTableDirectory(tableName));
        this.assertSnapshotSchema(schemaManager, snapshot.schemaId(), 0L, 2);
        this.batchSql("ALTER TABLE %s SET ('bucket' = '%d')", tableName, 4);
        this.assertLatestSchema(schemaManager, 1L, 4);
        List<Row> expected = Arrays.asList(Row.of((Object[])new Object[]{1}), Row.of((Object[])new Object[]{2}), Row.of((Object[])new Object[]{3}), Row.of((Object[])new Object[]{4}), Row.of((Object[])new Object[]{5}));
        Assertions.assertThat(this.batchSql("SELECT * FROM %s", tableName)).containsExactlyInAnyOrderElementsOf(expected);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.batchSql("INSERT INTO %s VALUES (6)", tableName)).rootCause().isInstanceOf(RuntimeException.class)).hasMessage("Try to write table with a new bucket num 4, but the previous bucket num is 2. Please switch to batch mode, and perform INSERT OVERWRITE to rescale current data layout first.");
        this.batchSql("INSERT OVERWRITE %s SELECT * FROM %s", tableName, tableName);
        snapshot = this.findLatestSnapshot(tableName);
        Assertions.assertThat((Object)snapshot).isNotNull();
        Assertions.assertThat((long)snapshot.id()).isEqualTo(2L);
        Assertions.assertThat((Comparable)snapshot.commitKind()).isEqualTo((Object)Snapshot.CommitKind.OVERWRITE);
        this.assertSnapshotSchema(schemaManager, snapshot.schemaId(), 1L, 4);
        Assertions.assertThat(this.batchSql("SELECT * FROM %s", tableName)).containsExactlyInAnyOrderElementsOf(expected);
        this.batchSql("INSERT INTO %s VALUES(6)", tableName);
        expected = Arrays.asList(Row.of((Object[])new Object[]{1}), Row.of((Object[])new Object[]{2}), Row.of((Object[])new Object[]{3}), Row.of((Object[])new Object[]{4}), Row.of((Object[])new Object[]{5}), Row.of((Object[])new Object[]{6}));
        Assertions.assertThat(this.batchSql("SELECT * FROM %s", tableName)).containsExactlyInAnyOrderElementsOf(expected);
    }

    private void executeBoth(List<String> sqlList) {
        sqlList.forEach(sql -> {
            this.sEnv.executeSql(sql);
            this.tEnv.executeSql(sql);
        });
    }
}

