/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.flink.CatalogTestBase;
import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.Timeout;

@Timeout(value=60L)
public class TestStreamScanSql
extends CatalogTestBase {
    private static final String TABLE = "test_table";
    private static final FileFormat FORMAT = FileFormat.PARQUET;
    private TableEnvironment tEnv;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected TableEnvironment getTableEnv() {
        TableEnvironment tableEnv = this.tEnv;
        if (tableEnv != null) {
            return tableEnv;
        }
        TestStreamScanSql testStreamScanSql = this;
        synchronized (testStreamScanSql) {
            if (this.tEnv == null) {
                EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings.newInstance().inStreamingMode();
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
                env.enableCheckpointing(400L);
                StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)env, (EnvironmentSettings)settingsBuilder.build());
                streamTableEnv.getConfig().getConfiguration().set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, (Object)true);
                this.tEnv = streamTableEnv;
            }
        }
        return this.tEnv;
    }

    @Override
    @BeforeEach
    public void before() {
        super.before();
        this.sql("CREATE DATABASE %s", this.flinkDatabase);
        this.sql("USE CATALOG %s", this.catalogName);
        this.sql("USE %s", "db");
    }

    @Override
    @AfterEach
    public void clean() {
        this.sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, TABLE);
        this.sql("DROP DATABASE IF EXISTS %s", this.flinkDatabase);
        super.clean();
    }

    private void insertRows(String partition, Table table, Row ... rows) throws IOException {
        this.insertRows(partition, "main", table, rows);
    }

    private void insertRows(String partition, String branch, Table table, Row ... rows) throws IOException {
        GenericAppenderHelper appender = new GenericAppenderHelper(table, FORMAT, this.temporaryDirectory);
        GenericRecord gRecord = GenericRecord.create((Schema)table.schema());
        ArrayList records = Lists.newArrayList();
        for (Row row : rows) {
            records.add(gRecord.copy("id", row.getField(0), "data", row.getField(1), "dt", row.getField(2)));
        }
        if (partition != null) {
            appender.appendToTable((StructLike)TestHelpers.Row.of((Object[])new Object[]{partition, 0}), branch, (List)records);
        } else {
            appender.appendToTable(branch, (List)records);
        }
    }

    private void insertRowsInBranch(String branch, Table table, Row ... rows) throws IOException {
        this.insertRows(null, branch, table, rows);
    }

    private void insertRows(Table table, Row ... rows) throws IOException {
        this.insertRows(null, table, rows);
    }

    private void assertRows(List<Row> expectedRows, Iterator<Row> iterator) {
        for (Row expectedRow : expectedRows) {
            Assertions.assertThat(iterator).hasNext();
            Row actualRow = iterator.next();
            Assertions.assertThat((int)actualRow.getArity()).isEqualTo(3);
            Assertions.assertThat((Object)actualRow.getField(0)).isEqualTo(expectedRow.getField(0));
            Assertions.assertThat((Object)actualRow.getField(1)).isEqualTo(expectedRow.getField(1));
            Assertions.assertThat((Object)actualRow.getField(2)).isEqualTo(expectedRow.getField(2));
        }
    }

    @TestTemplate
    public void testUnPartitionedTable() throws Exception {
        this.sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
        Table table = this.validationCatalog.loadTable(TableIdentifier.of((Namespace)this.icebergNamespace, (String)TABLE));
        TableResult result = this.exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/", TABLE);
        try (CloseableIterator iterator = result.collect();){
            Row row1 = Row.of((Object[])new Object[]{1, "aaa", "2021-01-01"});
            this.insertRows(table, row1);
            this.assertRows((List<Row>)ImmutableList.of((Object)row1), (Iterator<Row>)iterator);
            Row row2 = Row.of((Object[])new Object[]{2, "bbb", "2021-01-01"});
            this.insertRows(table, row2);
            this.assertRows((List<Row>)ImmutableList.of((Object)row2), (Iterator<Row>)iterator);
        }
        result.getJobClient().ifPresent(JobClient::cancel);
    }

    @TestTemplate
    public void testPartitionedTable() throws Exception {
        this.sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR) PARTITIONED BY (dt)", TABLE);
        Table table = this.validationCatalog.loadTable(TableIdentifier.of((Namespace)this.icebergNamespace, (String)TABLE));
        TableResult result = this.exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/", TABLE);
        try (CloseableIterator iterator = result.collect();){
            Row row1 = Row.of((Object[])new Object[]{1, "aaa", "2021-01-01"});
            this.insertRows("2021-01-01", table, row1);
            this.assertRows((List<Row>)ImmutableList.of((Object)row1), (Iterator<Row>)iterator);
            Row row2 = Row.of((Object[])new Object[]{2, "bbb", "2021-01-02"});
            this.insertRows("2021-01-02", table, row2);
            this.assertRows((List<Row>)ImmutableList.of((Object)row2), (Iterator<Row>)iterator);
            Row row3 = Row.of((Object[])new Object[]{1, "aaa", "2021-01-02"});
            this.insertRows("2021-01-02", table, row3);
            this.assertRows((List<Row>)ImmutableList.of((Object)row3), (Iterator<Row>)iterator);
            Row row4 = Row.of((Object[])new Object[]{2, "bbb", "2021-01-01"});
            this.insertRows("2021-01-01", table, row4);
            this.assertRows((List<Row>)ImmutableList.of((Object)row4), (Iterator<Row>)iterator);
        }
        result.getJobClient().ifPresent(JobClient::cancel);
    }

    @TestTemplate
    public void testConsumeFromBeginning() throws Exception {
        this.sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
        Table table = this.validationCatalog.loadTable(TableIdentifier.of((Namespace)this.icebergNamespace, (String)TABLE));
        Row row1 = Row.of((Object[])new Object[]{1, "aaa", "2021-01-01"});
        Row row2 = Row.of((Object[])new Object[]{2, "bbb", "2021-01-01"});
        this.insertRows(table, row1, row2);
        TableResult result = this.exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/", TABLE);
        try (CloseableIterator iterator = result.collect();){
            this.assertRows((List<Row>)ImmutableList.of((Object)row1, (Object)row2), (Iterator<Row>)iterator);
            Row row3 = Row.of((Object[])new Object[]{3, "ccc", "2021-01-01"});
            this.insertRows(table, row3);
            this.assertRows((List<Row>)ImmutableList.of((Object)row3), (Iterator<Row>)iterator);
            Row row4 = Row.of((Object[])new Object[]{4, "ddd", "2021-01-01"});
            this.insertRows(table, row4);
            this.assertRows((List<Row>)ImmutableList.of((Object)row4), (Iterator<Row>)iterator);
        }
        result.getJobClient().ifPresent(JobClient::cancel);
    }

    @TestTemplate
    public void testConsumeFilesFromMainBranch() throws Exception {
        this.sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
        Table table = this.validationCatalog.loadTable(TableIdentifier.of((Namespace)this.icebergNamespace, (String)TABLE));
        Row row1 = Row.of((Object[])new Object[]{1, "aaa", "2021-01-01"});
        Row row2 = Row.of((Object[])new Object[]{2, "bbb", "2021-01-01"});
        this.insertRows(table, row1, row2);
        String branchName = "b1";
        table.manageSnapshots().createBranch(branchName).commit();
        Row row3 = Row.of((Object[])new Object[]{3, "ccc", "2021-01-01"});
        Row row4 = Row.of((Object[])new Object[]{4, "ddd", "2021-01-01"});
        this.insertRowsInBranch(branchName, table, row3, row4);
        TableResult result = this.exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/", TABLE);
        try (CloseableIterator iterator = result.collect();){
            this.assertRows((List<Row>)ImmutableList.of((Object)row1, (Object)row2), (Iterator<Row>)iterator);
            Row row5 = Row.of((Object[])new Object[]{5, "eee", "2021-01-01"});
            Row row6 = Row.of((Object[])new Object[]{6, "fff", "2021-01-01"});
            this.insertRows(table, row5, row6);
            this.assertRows((List<Row>)ImmutableList.of((Object)row5, (Object)row6), (Iterator<Row>)iterator);
            Row row7 = Row.of((Object[])new Object[]{7, "ggg", "2021-01-01"});
            this.insertRows(table, row7);
            this.assertRows((List<Row>)ImmutableList.of((Object)row7), (Iterator<Row>)iterator);
        }
        result.getJobClient().ifPresent(JobClient::cancel);
    }

    @TestTemplate
    public void testConsumeFilesFromBranch() throws Exception {
        this.sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
        Table table = this.validationCatalog.loadTable(TableIdentifier.of((Namespace)this.icebergNamespace, (String)TABLE));
        Row row1 = Row.of((Object[])new Object[]{1, "aaa", "2021-01-01"});
        Row row2 = Row.of((Object[])new Object[]{2, "bbb", "2021-01-01"});
        this.insertRows(table, row1, row2);
        String branchName = "b1";
        table.manageSnapshots().createBranch(branchName).commit();
        TableResult result = this.exec("SELECT * FROM %s  /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'branch'='%s')*/ ", TABLE, branchName);
        try (CloseableIterator iterator = result.collect();){
            this.assertRows((List<Row>)ImmutableList.of((Object)row1, (Object)row2), (Iterator<Row>)iterator);
            Row row3 = Row.of((Object[])new Object[]{3, "ccc", "2021-01-01"});
            Row row4 = Row.of((Object[])new Object[]{4, "ddd", "2021-01-01"});
            this.insertRowsInBranch(branchName, table, row3, row4);
            this.assertRows((List<Row>)ImmutableList.of((Object)row3, (Object)row4), (Iterator<Row>)iterator);
        }
        result.getJobClient().ifPresent(JobClient::cancel);
    }

    @TestTemplate
    public void testConsumeFilesFromTwoBranches() throws Exception {
        this.sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
        Table table = this.validationCatalog.loadTable(TableIdentifier.of((Namespace)this.icebergNamespace, (String)TABLE));
        String branch1 = "b1";
        String branch2 = "b2";
        table.manageSnapshots().createBranch(branch1).commit();
        table.manageSnapshots().createBranch(branch2).commit();
        Row row1Branch1 = Row.of((Object[])new Object[]{1, "b1", "2021-01-01"});
        Row row2Branch1 = Row.of((Object[])new Object[]{2, "b1", "2021-01-01"});
        Row row1Branch2 = Row.of((Object[])new Object[]{2, "b2", "2021-01-01"});
        Row row2Branch2 = Row.of((Object[])new Object[]{3, "b3", "2021-01-01"});
        this.insertRowsInBranch(branch1, table, row1Branch1, row2Branch1);
        this.insertRowsInBranch(branch2, table, row1Branch2, row2Branch2);
        TableResult resultBranch1 = this.exec("SELECT * FROM %s  /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'branch'='%s')*/ ", TABLE, branch1);
        try (CloseableIterator iterator = resultBranch1.collect();){
            this.assertRows((List<Row>)ImmutableList.of((Object)row1Branch1, (Object)row2Branch1), (Iterator<Row>)iterator);
            Row another = Row.of((Object[])new Object[]{4, "ccc", "2021-01-01"});
            this.insertRowsInBranch(branch1, table, another);
            this.assertRows((List<Row>)ImmutableList.of((Object)another), (Iterator<Row>)iterator);
        }
        TableResult resultBranch2 = this.exec("SELECT * FROM %s  /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'branch'='%s')*/ ", TABLE, branch2);
        try (CloseableIterator iterator = resultBranch2.collect();){
            this.assertRows((List<Row>)ImmutableList.of((Object)row1Branch2, (Object)row2Branch2), (Iterator<Row>)iterator);
            Row another = Row.of((Object[])new Object[]{4, "ccc", "2021-01-01"});
            this.insertRowsInBranch(branch2, table, another);
            this.assertRows((List<Row>)ImmutableList.of((Object)another), (Iterator<Row>)iterator);
        }
        resultBranch1.getJobClient().ifPresent(JobClient::cancel);
        resultBranch2.getJobClient().ifPresent(JobClient::cancel);
    }

    @TestTemplate
    public void testConsumeFromStartSnapshotId() throws Exception {
        this.sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
        Table table = this.validationCatalog.loadTable(TableIdentifier.of((Namespace)this.icebergNamespace, (String)TABLE));
        Row row1 = Row.of((Object[])new Object[]{1, "aaa", "2021-01-01"});
        Row row2 = Row.of((Object[])new Object[]{2, "bbb", "2021-01-01"});
        this.insertRows(table, row1);
        this.insertRows(table, row2);
        long startSnapshotId = table.currentSnapshot().snapshotId();
        Row row3 = Row.of((Object[])new Object[]{3, "ccc", "2021-01-01"});
        Row row4 = Row.of((Object[])new Object[]{4, "ddd", "2021-01-01"});
        this.insertRows(table, row3, row4);
        TableResult result = this.exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='%d')*/", TABLE, startSnapshotId);
        try (CloseableIterator iterator = result.collect();){
            this.assertRows((List<Row>)ImmutableList.of((Object)row3, (Object)row4), (Iterator<Row>)iterator);
            Row row5 = Row.of((Object[])new Object[]{5, "eee", "2021-01-01"});
            Row row6 = Row.of((Object[])new Object[]{6, "fff", "2021-01-01"});
            this.insertRows(table, row5, row6);
            this.assertRows((List<Row>)ImmutableList.of((Object)row5, (Object)row6), (Iterator<Row>)iterator);
            Row row7 = Row.of((Object[])new Object[]{7, "ggg", "2021-01-01"});
            this.insertRows(table, row7);
            this.assertRows((List<Row>)ImmutableList.of((Object)row7), (Iterator<Row>)iterator);
        }
        result.getJobClient().ifPresent(JobClient::cancel);
    }

    @TestTemplate
    public void testConsumeFromStartTag() throws Exception {
        this.sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
        Table table = this.validationCatalog.loadTable(TableIdentifier.of((Namespace)this.icebergNamespace, (String)TABLE));
        Row row1 = Row.of((Object[])new Object[]{1, "aaa", "2021-01-01"});
        Row row2 = Row.of((Object[])new Object[]{2, "bbb", "2021-01-01"});
        this.insertRows(table, row1);
        this.insertRows(table, row2);
        String tagName = "t1";
        long startSnapshotId = table.currentSnapshot().snapshotId();
        table.manageSnapshots().createTag(tagName, startSnapshotId).commit();
        Row row3 = Row.of((Object[])new Object[]{3, "ccc", "2021-01-01"});
        Row row4 = Row.of((Object[])new Object[]{4, "ddd", "2021-01-01"});
        this.insertRows(table, row3, row4);
        TableResult result = this.exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-tag'='%s')*/", TABLE, tagName);
        try (CloseableIterator iterator = result.collect();){
            this.assertRows((List<Row>)ImmutableList.of((Object)row3, (Object)row4), (Iterator<Row>)iterator);
            Row row5 = Row.of((Object[])new Object[]{5, "eee", "2021-01-01"});
            Row row6 = Row.of((Object[])new Object[]{6, "fff", "2021-01-01"});
            this.insertRows(table, row5, row6);
            this.assertRows((List<Row>)ImmutableList.of((Object)row5, (Object)row6), (Iterator<Row>)iterator);
            Row row7 = Row.of((Object[])new Object[]{7, "ggg", "2021-01-01"});
            this.insertRows(table, row7);
            this.assertRows((List<Row>)ImmutableList.of((Object)row7), (Iterator<Row>)iterator);
        }
        result.getJobClient().ifPresent(JobClient::cancel);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-tag'='%s', 'start-snapshot-id'='%d' )*/", TABLE, tagName, startSnapshotId)).isInstanceOf(IllegalArgumentException.class)).hasMessage("START_SNAPSHOT_ID and START_TAG cannot both be set.");
    }
}

