/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.paimon.consumer.Consumer;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.flink.action.ActionITCaseBase;
import org.apache.paimon.flink.action.ResetConsumerAction;
import org.apache.paimon.flink.util.ReadWriteTableTestUtil;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class ConsumerActionITCase
extends ActionITCaseBase {
    @ParameterizedTest
    @Timeout(value=60L)
    @ValueSource(strings={"action", "procedure_indexed", "procedure_named"})
    public void testResetConsumer(String invoker) throws Exception {
        ReadWriteTableTestUtil.init(this.warehouse);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.BIGINT(), DataTypes.STRING()}, (String[])new String[]{"pk1", "col1"});
        FileStoreTable table = this.createFileStoreTable(rowType, Collections.emptyList(), Collections.singletonList("pk1"), Collections.emptyList(), Collections.emptyMap());
        StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(this.commitUser);
        this.write = writeBuilder.newWrite();
        this.commit = writeBuilder.newCommit();
        this.writeData(this.rowData(1L, BinaryString.fromString((String)"Hi")));
        this.writeData(this.rowData(2L, BinaryString.fromString((String)"Hello")));
        this.writeData(this.rowData(3L, BinaryString.fromString((String)"Paimon")));
        BlockingIterator<Row, Row> iterator = ReadWriteTableTestUtil.testStreamingRead("SELECT * FROM `" + this.tableName + "` /*+ OPTIONS('consumer-id'='myid','consumer.expiration-time'='3h') */", Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1L, "Hi"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{2L, "Hello"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{3L, "Paimon"})));
        ConsumerManager consumerManager = new ConsumerManager(table.fileIO(), table.location());
        while (!consumerManager.consumer("myid").isPresent()) {
            Thread.sleep(1000L);
        }
        iterator.close();
        Optional consumer1 = consumerManager.consumer("myid");
        Assertions.assertThat((Optional)consumer1).isPresent();
        Assertions.assertThat((long)((Consumer)consumer1.get()).nextSnapshot()).isEqualTo(4L);
        List<String> args = Arrays.asList("reset_consumer", "--warehouse", this.warehouse, "--database", this.database, "--table", this.tableName, "--consumer_id", "myid", "--next_snapshot", "1");
        switch (invoker) {
            case "action": {
                this.createAction(ResetConsumerAction.class, args).run();
                break;
            }
            case "procedure_indexed": {
                this.executeSQL(String.format("CALL sys.reset_consumer('%s.%s', 'myid', 1)", this.database, this.tableName));
                break;
            }
            case "procedure_named": {
                this.executeSQL(String.format("CALL sys.reset_consumer(`table` => '%s.%s', consumer_id => 'myid', next_snapshot_id => cast(1 as bigint))", this.database, this.tableName));
                break;
            }
            default: {
                throw new UnsupportedOperationException(invoker);
            }
        }
        Optional consumer2 = consumerManager.consumer("myid");
        Assertions.assertThat((Optional)consumer2).isPresent();
        Assertions.assertThat((long)((Consumer)consumer2.get()).nextSnapshot()).isEqualTo(1L);
        switch (invoker) {
            case "action": {
                this.createAction(ResetConsumerAction.class, args.subList(0, 9)).run();
                break;
            }
            case "procedure_indexed": {
                this.executeSQL(String.format("CALL sys.reset_consumer('%s.%s', 'myid')", this.database, this.tableName));
                break;
            }
            case "procedure_named": {
                this.executeSQL(String.format("CALL sys.reset_consumer(`table` => '%s.%s', consumer_id => 'myid')", this.database, this.tableName));
                break;
            }
            default: {
                throw new UnsupportedOperationException(invoker);
            }
        }
        Optional consumer3 = consumerManager.consumer("myid");
        Assertions.assertThat((Optional)consumer3).isNotPresent();
        List<String> args1 = Arrays.asList("reset_consumer", "--warehouse", this.warehouse, "--database", this.database, "--table", this.tableName, "--consumer_id", "myid", "--next_snapshot", "10");
        switch (invoker) {
            case "action": {
                org.junit.jupiter.api.Assertions.assertThrows(RuntimeException.class, () -> this.createAction(ResetConsumerAction.class, args1).run());
                break;
            }
            case "procedure_indexed": {
                org.junit.jupiter.api.Assertions.assertThrows(TableException.class, () -> this.executeSQL(String.format("CALL sys.reset_consumer('%s.%s', 'myid', 10)", this.database, this.tableName)));
                break;
            }
            case "procedure_named": {
                org.junit.jupiter.api.Assertions.assertThrows(TableException.class, () -> this.executeSQL(String.format("CALL sys.reset_consumer(`table` => '%s.%s', consumer_id => 'myid', next_snapshot_id => cast(10 as bigint))", this.database, this.tableName)));
                break;
            }
            default: {
                throw new UnsupportedOperationException(invoker);
            }
        }
    }

    @ParameterizedTest
    @Timeout(value=60L)
    @ValueSource(strings={"action", "procedure_indexed", "procedure_named"})
    public void testResetBranchConsumer(String invoker) throws Exception {
        ReadWriteTableTestUtil.init(this.warehouse);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.BIGINT(), DataTypes.STRING()}, (String[])new String[]{"pk1", "col1"});
        FileStoreTable table = this.createFileStoreTable(rowType, Collections.emptyList(), Collections.singletonList("pk1"), Collections.emptyList(), Collections.emptyMap());
        StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(this.commitUser);
        this.write = writeBuilder.newWrite();
        this.commit = writeBuilder.newCommit();
        this.writeData(this.rowData(1L, BinaryString.fromString((String)"Hi")));
        this.writeData(this.rowData(2L, BinaryString.fromString((String)"Hello")));
        this.writeData(this.rowData(3L, BinaryString.fromString((String)"Paimon")));
        table.createTag("tag", 3L);
        String branchName = "b1";
        table.createBranch("b1", "tag");
        String branchTableName = this.tableName + "$branch_b1";
        BlockingIterator<Row, Row> iterator = ReadWriteTableTestUtil.testStreamingRead("SELECT * FROM `" + branchTableName + "` /*+ OPTIONS('consumer-id'='myid','consumer.expiration-time'='3h') */", Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1L, "Hi"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{2L, "Hello"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{3L, "Paimon"})));
        ConsumerManager consumerManager = new ConsumerManager(table.fileIO(), table.location(), branchName);
        while (!consumerManager.consumer("myid").isPresent()) {
            Thread.sleep(1000L);
        }
        iterator.close();
        Optional consumer1 = consumerManager.consumer("myid");
        Assertions.assertThat((Optional)consumer1).isPresent();
        Assertions.assertThat((long)((Consumer)consumer1.get()).nextSnapshot()).isEqualTo(4L);
        List<String> args = Arrays.asList("reset_consumer", "--warehouse", this.warehouse, "--database", this.database, "--table", branchTableName, "--consumer_id", "myid", "--next_snapshot", "3");
        switch (invoker) {
            case "action": {
                this.createAction(ResetConsumerAction.class, args).run();
                break;
            }
            case "procedure_indexed": {
                this.executeSQL(String.format("CALL sys.reset_consumer('%s.%s', 'myid', 3)", this.database, branchTableName));
                break;
            }
            case "procedure_named": {
                this.executeSQL(String.format("CALL sys.reset_consumer(`table` => '%s.%s', consumer_id => 'myid', next_snapshot_id => cast(3 as bigint))", this.database, branchTableName));
                break;
            }
            default: {
                throw new UnsupportedOperationException(invoker);
            }
        }
        Optional consumer2 = consumerManager.consumer("myid");
        Assertions.assertThat((Optional)consumer2).isPresent();
        Assertions.assertThat((long)((Consumer)consumer2.get()).nextSnapshot()).isEqualTo(3L);
        switch (invoker) {
            case "action": {
                this.createAction(ResetConsumerAction.class, args.subList(0, 9)).run();
                break;
            }
            case "procedure_indexed": {
                this.executeSQL(String.format("CALL sys.reset_consumer('%s.%s', 'myid')", this.database, branchTableName));
                break;
            }
            case "procedure_named": {
                this.executeSQL(String.format("CALL sys.reset_consumer(`table` => '%s.%s', consumer_id => 'myid')", this.database, branchTableName));
                break;
            }
            default: {
                throw new UnsupportedOperationException(invoker);
            }
        }
        Optional consumer3 = consumerManager.consumer("myid");
        Assertions.assertThat((Optional)consumer3).isNotPresent();
    }
}

