/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.flink.types.Row;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.source.BoundedTableFactory;
import org.apache.iceberg.flink.source.ChangeLogTableTestBase;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.StructLikeSet;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
public class TestChangeLogTable
extends ChangeLogTableTestBase {
    private static final Configuration CONF = new Configuration();
    private static final String SOURCE_TABLE = "default_catalog.default_database.source_change_logs";
    private static final String CATALOG_NAME = "test_catalog";
    private static final String DATABASE_NAME = "test_db";
    private static final String TABLE_NAME = "test_table";
    private static String warehouse;
    @Parameter
    private boolean partitioned;

    @Parameters(name="PartitionedTable={0}")
    public static Iterable<Object[]> parameters() {
        return ImmutableList.of((Object)new Object[]{true}, (Object)new Object[]{false});
    }

    @BeforeEach
    public void before() throws IOException {
        File warehouseFile = File.createTempFile("junit", null, this.temporaryDirectory.toFile());
        Assertions.assertThat((boolean)warehouseFile.delete()).isTrue();
        warehouse = String.format("file:%s", warehouseFile);
        this.sql("CREATE CATALOG %s WITH ('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')", CATALOG_NAME, warehouse);
        this.sql("USE CATALOG %s", CATALOG_NAME);
        this.sql("CREATE DATABASE %s", DATABASE_NAME);
        this.sql("USE %s", DATABASE_NAME);
        this.getTableEnv().getConfig().set("table.exec.sink.upsert-materialize", "NONE");
    }

    @Override
    @AfterEach
    public void clean() {
        this.sql("DROP TABLE IF EXISTS %s", TABLE_NAME);
        this.sql("DROP DATABASE IF EXISTS %s", DATABASE_NAME);
        this.dropCatalog(CATALOG_NAME, true);
        BoundedTableFactory.clearDataSets();
    }

    @TestTemplate
    public void testSqlChangeLogOnIdKey() throws Exception {
        ImmutableList inputRowsPerCheckpoint = ImmutableList.of((Object)ImmutableList.of((Object)TestChangeLogTable.insertRow(1, "aaa"), (Object)TestChangeLogTable.deleteRow(1, "aaa"), (Object)TestChangeLogTable.insertRow(1, "bbb"), (Object)TestChangeLogTable.insertRow(2, "aaa"), (Object)TestChangeLogTable.deleteRow(2, "aaa"), (Object)TestChangeLogTable.insertRow(2, "bbb")), (Object)ImmutableList.of((Object)TestChangeLogTable.updateBeforeRow(2, "bbb"), (Object)TestChangeLogTable.updateAfterRow(2, "ccc"), (Object)TestChangeLogTable.deleteRow(2, "ccc"), (Object)TestChangeLogTable.insertRow(2, "ddd")), (Object)ImmutableList.of((Object)TestChangeLogTable.deleteRow(1, "bbb"), (Object)TestChangeLogTable.insertRow(1, "ccc"), (Object)TestChangeLogTable.deleteRow(1, "ccc"), (Object)TestChangeLogTable.insertRow(1, "ddd")));
        ImmutableList expectedRecordsPerCheckpoint = ImmutableList.of((Object)ImmutableList.of((Object)TestChangeLogTable.insertRow(1, "bbb"), (Object)TestChangeLogTable.insertRow(2, "bbb")), (Object)ImmutableList.of((Object)TestChangeLogTable.insertRow(1, "bbb"), (Object)TestChangeLogTable.insertRow(2, "ddd")), (Object)ImmutableList.of((Object)TestChangeLogTable.insertRow(1, "ddd"), (Object)TestChangeLogTable.insertRow(2, "ddd")));
        this.testSqlChangeLog(TABLE_NAME, (List<String>)ImmutableList.of((Object)"id"), (List<List<Row>>)inputRowsPerCheckpoint, (List<List<Row>>)expectedRecordsPerCheckpoint);
    }

    @TestTemplate
    public void testChangeLogOnDataKey() throws Exception {
        ImmutableList elementsPerCheckpoint = ImmutableList.of((Object)ImmutableList.of((Object)TestChangeLogTable.insertRow(1, "aaa"), (Object)TestChangeLogTable.deleteRow(1, "aaa"), (Object)TestChangeLogTable.insertRow(2, "bbb"), (Object)TestChangeLogTable.insertRow(1, "bbb"), (Object)TestChangeLogTable.insertRow(2, "aaa")), (Object)ImmutableList.of((Object)TestChangeLogTable.updateBeforeRow(2, "aaa"), (Object)TestChangeLogTable.updateAfterRow(1, "ccc"), (Object)TestChangeLogTable.insertRow(1, "aaa")), (Object)ImmutableList.of((Object)TestChangeLogTable.deleteRow(1, "bbb"), (Object)TestChangeLogTable.insertRow(2, "aaa"), (Object)TestChangeLogTable.insertRow(2, "ccc")));
        ImmutableList expectedRecords = ImmutableList.of((Object)ImmutableList.of((Object)TestChangeLogTable.insertRow(1, "bbb"), (Object)TestChangeLogTable.insertRow(2, "aaa")), (Object)ImmutableList.of((Object)TestChangeLogTable.insertRow(1, "aaa"), (Object)TestChangeLogTable.insertRow(1, "bbb"), (Object)TestChangeLogTable.insertRow(1, "ccc")), (Object)ImmutableList.of((Object)TestChangeLogTable.insertRow(1, "aaa"), (Object)TestChangeLogTable.insertRow(1, "ccc"), (Object)TestChangeLogTable.insertRow(2, "aaa"), (Object)TestChangeLogTable.insertRow(2, "ccc")));
        this.testSqlChangeLog(TABLE_NAME, (List<String>)ImmutableList.of((Object)"data"), (List<List<Row>>)elementsPerCheckpoint, (List<List<Row>>)expectedRecords);
    }

    @TestTemplate
    public void testChangeLogOnIdDataKey() throws Exception {
        ImmutableList elementsPerCheckpoint = ImmutableList.of((Object)ImmutableList.of((Object)TestChangeLogTable.insertRow(1, "aaa"), (Object)TestChangeLogTable.deleteRow(1, "aaa"), (Object)TestChangeLogTable.insertRow(2, "bbb"), (Object)TestChangeLogTable.insertRow(1, "bbb"), (Object)TestChangeLogTable.insertRow(2, "aaa")), (Object)ImmutableList.of((Object)TestChangeLogTable.updateBeforeRow(2, "aaa"), (Object)TestChangeLogTable.updateAfterRow(1, "ccc"), (Object)TestChangeLogTable.insertRow(1, "aaa")), (Object)ImmutableList.of((Object)TestChangeLogTable.deleteRow(1, "bbb"), (Object)TestChangeLogTable.insertRow(2, "aaa")));
        ImmutableList expectedRecords = ImmutableList.of((Object)ImmutableList.of((Object)TestChangeLogTable.insertRow(1, "bbb"), (Object)TestChangeLogTable.insertRow(2, "aaa"), (Object)TestChangeLogTable.insertRow(2, "bbb")), (Object)ImmutableList.of((Object)TestChangeLogTable.insertRow(1, "aaa"), (Object)TestChangeLogTable.insertRow(1, "bbb"), (Object)TestChangeLogTable.insertRow(1, "ccc"), (Object)TestChangeLogTable.insertRow(2, "bbb")), (Object)ImmutableList.of((Object)TestChangeLogTable.insertRow(1, "aaa"), (Object)TestChangeLogTable.insertRow(1, "ccc"), (Object)TestChangeLogTable.insertRow(2, "aaa"), (Object)TestChangeLogTable.insertRow(2, "bbb")));
        this.testSqlChangeLog(TABLE_NAME, (List<String>)ImmutableList.of((Object)"data", (Object)"id"), (List<List<Row>>)elementsPerCheckpoint, (List<List<Row>>)expectedRecords);
    }

    @TestTemplate
    public void testPureInsertOnIdKey() throws Exception {
        ImmutableList elementsPerCheckpoint = ImmutableList.of((Object)ImmutableList.of((Object)TestChangeLogTable.insertRow(1, "aaa"), (Object)TestChangeLogTable.insertRow(2, "bbb")), (Object)ImmutableList.of((Object)TestChangeLogTable.insertRow(3, "ccc"), (Object)TestChangeLogTable.insertRow(4, "ddd")), (Object)ImmutableList.of((Object)TestChangeLogTable.insertRow(5, "eee"), (Object)TestChangeLogTable.insertRow(6, "fff")));
        ImmutableList expectedRecords = ImmutableList.of((Object)ImmutableList.of((Object)TestChangeLogTable.insertRow(1, "aaa"), (Object)TestChangeLogTable.insertRow(2, "bbb")), (Object)ImmutableList.of((Object)TestChangeLogTable.insertRow(1, "aaa"), (Object)TestChangeLogTable.insertRow(2, "bbb"), (Object)TestChangeLogTable.insertRow(3, "ccc"), (Object)TestChangeLogTable.insertRow(4, "ddd")), (Object)ImmutableList.of((Object)TestChangeLogTable.insertRow(1, "aaa"), (Object)TestChangeLogTable.insertRow(2, "bbb"), (Object)TestChangeLogTable.insertRow(3, "ccc"), (Object)TestChangeLogTable.insertRow(4, "ddd"), (Object)TestChangeLogTable.insertRow(5, "eee"), (Object)TestChangeLogTable.insertRow(6, "fff")));
        this.testSqlChangeLog(TABLE_NAME, (List<String>)ImmutableList.of((Object)"data"), (List<List<Row>>)elementsPerCheckpoint, (List<List<Row>>)expectedRecords);
    }

    private static Record record(int id, String data) {
        return SimpleDataUtil.createRecord(id, data);
    }

    private Table createTable(String tableName, List<String> key, boolean isPartitioned) {
        String partitionByCause = isPartitioned ? "PARTITIONED BY (data)" : "";
        this.sql("CREATE TABLE %s(id INT, data VARCHAR, PRIMARY KEY(%s) NOT ENFORCED) %s", tableName, Joiner.on((char)',').join(key), partitionByCause);
        CatalogLoader loader = CatalogLoader.hadoop((String)"my_catalog", (Configuration)CONF, (Map)ImmutableMap.of((Object)"warehouse", (Object)warehouse));
        Table table = loader.loadCatalog().loadTable(TableIdentifier.of((String[])new String[]{DATABASE_NAME, TABLE_NAME}));
        TableOperations ops = ((BaseTable)table).operations();
        TableMetadata meta = ops.current();
        ops.commit(meta, meta.upgradeToFormatVersion(2));
        return table;
    }

    private void testSqlChangeLog(String tableName, List<String> key, List<List<Row>> inputRowsPerCheckpoint, List<List<Row>> expectedRecordsPerCheckpoint) throws Exception {
        String dataId = BoundedTableFactory.registerDataSet(inputRowsPerCheckpoint);
        this.sql("CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL) WITH ('connector'='BoundedSource', 'data-id'='%s')", SOURCE_TABLE, dataId);
        Assertions.assertThat(this.sql("SELECT * FROM %s", SOURCE_TABLE)).isEqualTo(TestChangeLogTable.listJoin(inputRowsPerCheckpoint));
        Table table = this.createTable(tableName, key, this.partitioned);
        this.sql("INSERT INTO %s SELECT * FROM %s", tableName, SOURCE_TABLE);
        table.refresh();
        List<Snapshot> snapshots = this.findValidSnapshots(table);
        int expectedSnapshotNum = expectedRecordsPerCheckpoint.size();
        ((ListAssert)Assertions.assertThat(snapshots).as("Should have the expected snapshot number", new Object[0])).hasSameSizeAs(expectedRecordsPerCheckpoint);
        for (int i = 0; i < expectedSnapshotNum; ++i) {
            long snapshotId = snapshots.get(i).snapshotId();
            List<Row> expectedRows = expectedRecordsPerCheckpoint.get(i);
            ((AbstractCollectionAssert)Assertions.assertThat((Collection)TestChangeLogTable.actualRowSet(table, snapshotId)).as("Should have the expected records for the checkpoint#" + i, new Object[0])).isEqualTo((Object)TestChangeLogTable.expectedRowSet(table, expectedRows));
        }
        if (expectedSnapshotNum > 0) {
            ((ListAssert)Assertions.assertThat(this.sql("SELECT * FROM %s", tableName)).as("Should have the expected rows in the final table", new Object[0])).containsExactlyInAnyOrderElementsOf((Iterable)expectedRecordsPerCheckpoint.get(expectedSnapshotNum - 1));
        }
    }

    private List<Snapshot> findValidSnapshots(Table table) {
        ArrayList validSnapshots = Lists.newArrayList();
        for (Snapshot snapshot : table.snapshots()) {
            if (!snapshot.allManifests(table.io()).stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId().longValue())) continue;
            validSnapshots.add(snapshot);
        }
        return validSnapshots;
    }

    private static StructLikeSet expectedRowSet(Table table, List<Row> rows) {
        Record[] records = new Record[rows.size()];
        for (int i = 0; i < records.length; ++i) {
            records[i] = TestChangeLogTable.record((Integer)rows.get(i).getField(0), (String)rows.get(i).getField(1));
        }
        return SimpleDataUtil.expectedRowSet(table, records);
    }

    private static StructLikeSet actualRowSet(Table table, long snapshotId) throws IOException {
        return SimpleDataUtil.actualRowSet(table, snapshotId, "*");
    }
}

