/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.HadoopCatalogResource;
import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkV2Base;
import org.apache.iceberg.flink.source.BoundedTestSource;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class TestFlinkIcebergSinkV2
extends TestFlinkIcebergSinkV2Base {
    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = MiniClusterResource.createWithClassloaderCheckDisabled();
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    @Rule
    public final HadoopCatalogResource catalogResource = new HadoopCatalogResource(TEMPORARY_FOLDER, "default", "t");

    @Parameterized.Parameters(name="FileFormat = {0}, Parallelism = {1}, Partitioned={2}, WriteDistributionMode ={3}")
    public static Object[][] parameters() {
        return new Object[][]{{"avro", 1, true, "none"}, {"avro", 1, false, "none"}, {"avro", 4, true, "none"}, {"avro", 4, false, "none"}, {"orc", 1, true, "hash"}, {"orc", 1, false, "hash"}, {"orc", 4, true, "hash"}, {"orc", 4, false, "hash"}, {"parquet", 1, true, "range"}, {"parquet", 1, false, "range"}, {"parquet", 4, true, "range"}, {"parquet", 4, false, "range"}};
    }

    public TestFlinkIcebergSinkV2(String format, int parallelism, boolean partitioned, String writeDistributionMode) {
        this.format = FileFormat.fromString((String)format);
        this.parallelism = parallelism;
        this.partitioned = partitioned;
        this.writeDistributionMode = writeDistributionMode;
    }

    @Before
    public void setupTable() {
        this.table = this.catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, SimpleDataUtil.SCHEMA, this.partitioned ? PartitionSpec.builderFor((Schema)SimpleDataUtil.SCHEMA).identity("data").build() : PartitionSpec.unpartitioned(), (Map)ImmutableMap.of((Object)"write.format.default", (Object)this.format.name(), (Object)"format-version", (Object)String.valueOf(2)));
        this.table.updateProperties().set("write.format.default", this.format.name()).set("write.distribution-mode", this.writeDistributionMode).commit();
        this.env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG).enableCheckpointing(100L).setParallelism(this.parallelism).setMaxParallelism(this.parallelism);
        this.tableLoader = this.catalogResource.tableLoader();
    }

    @Test
    public void testCheckAndGetEqualityFieldIds() {
        this.table.updateSchema().allowIncompatibleChanges().addRequiredColumn("type", (Type)Types.StringType.get()).setIdentifierFields(new String[]{"type"}).commit();
        DataStreamSource dataStream = this.env.addSource(new BoundedTestSource(ImmutableList.of()), ROW_TYPE_INFO);
        FlinkSink.Builder builder = FlinkSink.forRow((DataStream)dataStream, (TableSchema)SimpleDataUtil.FLINK_SCHEMA).table(this.table);
        Assert.assertEquals((Object)this.table.schema().identifierFieldIds(), (Object)Sets.newHashSet((Iterable)builder.checkAndGetEqualityFieldIds()));
        builder.equalityFieldColumns((List)Lists.newArrayList((Object[])new String[]{"id"}));
        Assert.assertEquals((Object)Sets.newHashSet((Object[])new Integer[]{this.table.schema().findField("id").fieldId()}), (Object)Sets.newHashSet((Iterable)builder.checkAndGetEqualityFieldIds()));
        builder.equalityFieldColumns((List)Lists.newArrayList((Object[])new String[]{"type"}));
        Assert.assertEquals((Object)Sets.newHashSet((Object[])new Integer[]{this.table.schema().findField("type").fieldId()}), (Object)Sets.newHashSet((Iterable)builder.checkAndGetEqualityFieldIds()));
    }

    @Test
    public void testChangeLogOnIdKey() throws Exception {
        this.testChangeLogOnIdKey("main");
    }

    @Test
    public void testUpsertOnlyDeletesOnDataKey() throws Exception {
        ImmutableList elementsPerCheckpoint = ImmutableList.of((Object)ImmutableList.of((Object)this.row("+I", 1, "aaa")), (Object)ImmutableList.of((Object)this.row("-D", 1, "aaa"), (Object)this.row("-D", 2, "bbb")));
        ImmutableList expectedRecords = ImmutableList.of((Object)ImmutableList.of((Object)this.record(1, "aaa")), (Object)ImmutableList.of());
        this.testChangeLogs((List<String>)ImmutableList.of((Object)"data"), (KeySelector<Row, Object>)(KeySelector & Serializable)row -> row.getField(1), true, (List<List<Row>>)elementsPerCheckpoint, (List<List<Record>>)expectedRecords, "main");
    }

    @Test
    public void testChangeLogOnDataKey() throws Exception {
        this.testChangeLogOnDataKey("main");
    }

    @Test
    public void testChangeLogOnIdDataKey() throws Exception {
        this.testChangeLogOnIdDataKey("main");
    }

    @Test
    public void testChangeLogOnSameKey() throws Exception {
        this.testChangeLogOnSameKey("main");
    }

    @Test
    public void testUpsertModeCheck() throws Exception {
        DataStreamSource dataStream = this.env.addSource(new BoundedTestSource(ImmutableList.of()), ROW_TYPE_INFO);
        FlinkSink.Builder builder = FlinkSink.forRow((DataStream)dataStream, (TableSchema)SimpleDataUtil.FLINK_SCHEMA).tableLoader(this.tableLoader).tableSchema(SimpleDataUtil.FLINK_SCHEMA).writeParallelism(this.parallelism).upsert(true);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> builder.equalityFieldColumns((List)ImmutableList.of((Object)"id", (Object)"data")).overwrite(true).append()).isInstanceOf(IllegalStateException.class)).hasMessage("OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> builder.equalityFieldColumns((List)ImmutableList.of()).overwrite(false).append()).isInstanceOf(IllegalStateException.class)).hasMessage("Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
    }

    @Test
    public void testUpsertOnIdKey() throws Exception {
        this.testUpsertOnIdKey("main");
    }

    @Test
    public void testUpsertOnDataKey() throws Exception {
        this.testUpsertOnDataKey("main");
    }

    @Test
    public void testUpsertOnIdDataKey() throws Exception {
        this.testUpsertOnIdDataKey("main");
    }
}

