/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.source.BoundedTestSource;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructLikeSet;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.Assert;

public class TestFlinkIcebergSinkV2Base {
    protected static final int FORMAT_V2 = 2;
    protected static final TypeInformation<Row> ROW_TYPE_INFO = new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
    protected static final int ROW_ID_POS = 0;
    protected static final int ROW_DATA_POS = 1;
    protected int parallelism = 1;
    protected TableLoader tableLoader;
    protected Table table;
    protected StreamExecutionEnvironment env;
    protected FileFormat format;
    protected boolean partitioned;
    protected String writeDistributionMode;
    protected static final Map<String, RowKind> ROW_KIND_MAP = ImmutableMap.of((Object)"+I", (Object)RowKind.INSERT, (Object)"-D", (Object)RowKind.DELETE, (Object)"-U", (Object)RowKind.UPDATE_BEFORE, (Object)"+U", (Object)RowKind.UPDATE_AFTER);

    protected Row row(String rowKind, int id, String data) {
        RowKind kind = ROW_KIND_MAP.get(rowKind);
        if (kind == null) {
            throw new IllegalArgumentException("Unknown row kind: " + rowKind);
        }
        return Row.ofKind((RowKind)kind, (Object[])new Object[]{id, data});
    }

    protected void testUpsertOnIdDataKey(String branch) throws Exception {
        ImmutableList elementsPerCheckpoint = ImmutableList.of((Object)ImmutableList.of((Object)this.row("+I", 1, "aaa"), (Object)this.row("+U", 1, "aaa"), (Object)this.row("+I", 2, "bbb")), (Object)ImmutableList.of((Object)this.row("+I", 1, "aaa"), (Object)this.row("-D", 2, "bbb"), (Object)this.row("+I", 2, "ccc")), (Object)ImmutableList.of((Object)this.row("+U", 1, "bbb"), (Object)this.row("-U", 1, "ccc"), (Object)this.row("-D", 1, "aaa")));
        ImmutableList expectedRecords = ImmutableList.of((Object)ImmutableList.of((Object)this.record(1, "aaa"), (Object)this.record(2, "bbb")), (Object)ImmutableList.of((Object)this.record(1, "aaa"), (Object)this.record(2, "ccc")), (Object)ImmutableList.of((Object)this.record(1, "bbb"), (Object)this.record(2, "ccc")));
        this.testChangeLogs((List<String>)ImmutableList.of((Object)"id", (Object)"data"), (KeySelector<Row, Object>)(KeySelector & Serializable)row -> Row.of((Object[])new Object[]{row.getField(0), row.getField(1)}), true, (List<List<Row>>)elementsPerCheckpoint, (List<List<Record>>)expectedRecords, branch);
    }

    protected void testChangeLogOnIdDataKey(String branch) throws Exception {
        ImmutableList elementsPerCheckpoint = ImmutableList.of((Object)ImmutableList.of((Object)this.row("+I", 1, "aaa"), (Object)this.row("-D", 1, "aaa"), (Object)this.row("+I", 2, "bbb"), (Object)this.row("+I", 1, "bbb"), (Object)this.row("+I", 2, "aaa")), (Object)ImmutableList.of((Object)this.row("-U", 2, "aaa"), (Object)this.row("+U", 1, "ccc"), (Object)this.row("+I", 1, "aaa")), (Object)ImmutableList.of((Object)this.row("-D", 1, "bbb"), (Object)this.row("+I", 2, "aaa")));
        ImmutableList expectedRecords = ImmutableList.of((Object)ImmutableList.of((Object)this.record(1, "bbb"), (Object)this.record(2, "aaa"), (Object)this.record(2, "bbb")), (Object)ImmutableList.of((Object)this.record(1, "aaa"), (Object)this.record(1, "bbb"), (Object)this.record(1, "ccc"), (Object)this.record(2, "bbb")), (Object)ImmutableList.of((Object)this.record(1, "aaa"), (Object)this.record(1, "ccc"), (Object)this.record(2, "aaa"), (Object)this.record(2, "bbb")));
        this.testChangeLogs((List<String>)ImmutableList.of((Object)"data", (Object)"id"), (KeySelector<Row, Object>)(KeySelector & Serializable)row -> Row.of((Object[])new Object[]{row.getField(0), row.getField(1)}), false, (List<List<Row>>)elementsPerCheckpoint, (List<List<Record>>)expectedRecords, branch);
    }

    protected void testChangeLogOnSameKey(String branch) throws Exception {
        ImmutableList elementsPerCheckpoint = ImmutableList.of((Object)ImmutableList.of((Object)this.row("+I", 1, "aaa"), (Object)this.row("-D", 1, "aaa"), (Object)this.row("+I", 1, "aaa")), (Object)ImmutableList.of((Object)this.row("-U", 1, "aaa"), (Object)this.row("+U", 1, "aaa")), (Object)ImmutableList.of((Object)this.row("-D", 1, "aaa"), (Object)this.row("+I", 1, "aaa")), (Object)ImmutableList.of((Object)this.row("-U", 1, "aaa"), (Object)this.row("+U", 1, "aaa"), (Object)this.row("+I", 1, "aaa")));
        ImmutableList expectedRecords = ImmutableList.of((Object)ImmutableList.of((Object)this.record(1, "aaa")), (Object)ImmutableList.of((Object)this.record(1, "aaa")), (Object)ImmutableList.of((Object)this.record(1, "aaa")), (Object)ImmutableList.of((Object)this.record(1, "aaa"), (Object)this.record(1, "aaa")));
        this.testChangeLogs((List<String>)ImmutableList.of((Object)"id", (Object)"data"), (KeySelector<Row, Object>)(KeySelector & Serializable)row -> Row.of((Object[])new Object[]{row.getField(0), row.getField(1)}), false, (List<List<Row>>)elementsPerCheckpoint, (List<List<Record>>)expectedRecords, branch);
    }

    protected void testChangeLogOnDataKey(String branch) throws Exception {
        ImmutableList elementsPerCheckpoint = ImmutableList.of((Object)ImmutableList.of((Object)this.row("+I", 1, "aaa"), (Object)this.row("-D", 1, "aaa"), (Object)this.row("+I", 2, "bbb"), (Object)this.row("+I", 1, "bbb"), (Object)this.row("+I", 2, "aaa")), (Object)ImmutableList.of((Object)this.row("-U", 2, "aaa"), (Object)this.row("+U", 1, "ccc"), (Object)this.row("+I", 1, "aaa")), (Object)ImmutableList.of((Object)this.row("-D", 1, "bbb"), (Object)this.row("+I", 2, "aaa"), (Object)this.row("+I", 2, "ccc")));
        ImmutableList expectedRecords = ImmutableList.of((Object)ImmutableList.of((Object)this.record(1, "bbb"), (Object)this.record(2, "aaa")), (Object)ImmutableList.of((Object)this.record(1, "aaa"), (Object)this.record(1, "bbb"), (Object)this.record(1, "ccc")), (Object)ImmutableList.of((Object)this.record(1, "aaa"), (Object)this.record(1, "ccc"), (Object)this.record(2, "aaa"), (Object)this.record(2, "ccc")));
        this.testChangeLogs((List<String>)ImmutableList.of((Object)"data"), (KeySelector<Row, Object>)(KeySelector & Serializable)row -> row.getField(1), false, (List<List<Row>>)elementsPerCheckpoint, (List<List<Record>>)expectedRecords, branch);
    }

    protected void testUpsertOnDataKey(String branch) throws Exception {
        ImmutableList elementsPerCheckpoint = ImmutableList.of((Object)ImmutableList.of((Object)this.row("+I", 1, "aaa"), (Object)this.row("+I", 2, "aaa"), (Object)this.row("+I", 3, "bbb")), (Object)ImmutableList.of((Object)this.row("+U", 4, "aaa"), (Object)this.row("-U", 3, "bbb"), (Object)this.row("+U", 5, "bbb")), (Object)ImmutableList.of((Object)this.row("+I", 6, "aaa"), (Object)this.row("+U", 7, "bbb")));
        ImmutableList expectedRecords = ImmutableList.of((Object)ImmutableList.of((Object)this.record(2, "aaa"), (Object)this.record(3, "bbb")), (Object)ImmutableList.of((Object)this.record(4, "aaa"), (Object)this.record(5, "bbb")), (Object)ImmutableList.of((Object)this.record(6, "aaa"), (Object)this.record(7, "bbb")));
        this.testChangeLogs((List<String>)ImmutableList.of((Object)"data"), (KeySelector<Row, Object>)(KeySelector & Serializable)row -> row.getField(1), true, (List<List<Row>>)elementsPerCheckpoint, (List<List<Record>>)expectedRecords, branch);
    }

    protected void testChangeLogOnIdKey(String branch) throws Exception {
        ImmutableList elementsPerCheckpoint = ImmutableList.of((Object)ImmutableList.of((Object)this.row("+I", 1, "aaa"), (Object)this.row("-D", 1, "aaa"), (Object)this.row("+I", 1, "bbb"), (Object)this.row("+I", 2, "aaa"), (Object)this.row("-D", 2, "aaa"), (Object)this.row("+I", 2, "bbb")), (Object)ImmutableList.of((Object)this.row("-U", 2, "bbb"), (Object)this.row("+U", 2, "ccc"), (Object)this.row("-D", 2, "ccc"), (Object)this.row("+I", 2, "ddd")), (Object)ImmutableList.of((Object)this.row("-D", 1, "bbb"), (Object)this.row("+I", 1, "ccc"), (Object)this.row("-D", 1, "ccc"), (Object)this.row("+I", 1, "ddd")));
        ImmutableList expectedRecords = ImmutableList.of((Object)ImmutableList.of((Object)this.record(1, "bbb"), (Object)this.record(2, "bbb")), (Object)ImmutableList.of((Object)this.record(1, "bbb"), (Object)this.record(2, "ddd")), (Object)ImmutableList.of((Object)this.record(1, "ddd"), (Object)this.record(2, "ddd")));
        if (this.partitioned && this.writeDistributionMode.equals("hash")) {
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.lambda$testChangeLogOnIdKey$0((List)elementsPerCheckpoint, (List)expectedRecords, branch)).isInstanceOf(IllegalStateException.class)).hasMessageStartingWith("In 'hash' distribution mode with equality fields set, partition field").hasMessageContaining("should be included in equality fields:");
        } else {
            this.testChangeLogs((List<String>)ImmutableList.of((Object)"id"), (KeySelector<Row, Object>)(KeySelector & Serializable)row -> row.getField(0), false, (List<List<Row>>)elementsPerCheckpoint, (List<List<Record>>)expectedRecords, branch);
        }
    }

    protected void testUpsertOnIdKey(String branch) throws Exception {
        ImmutableList elementsPerCheckpoint = ImmutableList.of((Object)ImmutableList.of((Object)this.row("+I", 1, "aaa"), (Object)this.row("+U", 1, "bbb")), (Object)ImmutableList.of((Object)this.row("+I", 1, "ccc")), (Object)ImmutableList.of((Object)this.row("+U", 1, "ddd"), (Object)this.row("+I", 1, "eee")));
        ImmutableList expectedRecords = ImmutableList.of((Object)ImmutableList.of((Object)this.record(1, "bbb")), (Object)ImmutableList.of((Object)this.record(1, "ccc")), (Object)ImmutableList.of((Object)this.record(1, "eee")));
        if (!this.partitioned) {
            this.testChangeLogs((List<String>)ImmutableList.of((Object)"id"), (KeySelector<Row, Object>)(KeySelector & Serializable)row -> row.getField(0), true, (List<List<Row>>)elementsPerCheckpoint, (List<List<Record>>)expectedRecords, branch);
        } else {
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.lambda$testUpsertOnIdKey$1((List)elementsPerCheckpoint, (List)expectedRecords, branch)).isInstanceOf(IllegalStateException.class)).hasMessageContaining("should be included in equality fields:");
        }
    }

    protected void testChangeLogs(List<String> equalityFieldColumns, KeySelector<Row, Object> keySelector, boolean insertAsUpsert, List<List<Row>> elementsPerCheckpoint, List<List<Record>> expectedRecordsPerCheckpoint, String branch) throws Exception {
        DataStreamSource dataStream = this.env.addSource(new BoundedTestSource(elementsPerCheckpoint), ROW_TYPE_INFO);
        FlinkSink.forRow((DataStream)dataStream, (TableSchema)SimpleDataUtil.FLINK_SCHEMA).tableLoader(this.tableLoader).tableSchema(SimpleDataUtil.FLINK_SCHEMA).writeParallelism(this.parallelism).equalityFieldColumns(equalityFieldColumns).upsert(insertAsUpsert).toBranch(branch).append();
        this.env.execute("Test Iceberg Change-Log DataStream.");
        this.table.refresh();
        List<Snapshot> snapshots = this.findValidSnapshots();
        int expectedSnapshotNum = expectedRecordsPerCheckpoint.size();
        Assert.assertEquals((String)"Should have the expected snapshot number", (long)expectedSnapshotNum, (long)snapshots.size());
        for (int i = 0; i < expectedSnapshotNum; ++i) {
            long snapshotId = snapshots.get(i).snapshotId();
            List<Record> expectedRecords = expectedRecordsPerCheckpoint.get(i);
            Assert.assertEquals((String)("Should have the expected records for the checkpoint#" + i), (Object)this.expectedRowSet(expectedRecords.toArray(new Record[0])), (Object)this.actualRowSet(snapshotId, "*"));
        }
    }

    protected Record record(int id, String data) {
        return SimpleDataUtil.createRecord(id, data);
    }

    private List<Snapshot> findValidSnapshots() {
        ArrayList validSnapshots = Lists.newArrayList();
        for (Snapshot snapshot : this.table.snapshots()) {
            if (!snapshot.allManifests(this.table.io()).stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId().longValue())) continue;
            validSnapshots.add(snapshot);
        }
        return validSnapshots;
    }

    private StructLikeSet expectedRowSet(Record ... records) {
        return SimpleDataUtil.expectedRowSet(this.table, records);
    }

    private StructLikeSet actualRowSet(long snapshotId, String ... columns) throws IOException {
        this.table.refresh();
        StructLikeSet set = StructLikeSet.create((Types.StructType)this.table.schema().asStruct());
        try (CloseableIterable reader = IcebergGenerics.read((Table)this.table).useSnapshot(snapshotId).select(columns).build();){
            reader.forEach(arg_0 -> ((StructLikeSet)set).add(arg_0));
        }
        return set;
    }

    private /* synthetic */ void lambda$testUpsertOnIdKey$1(List elementsPerCheckpoint, List expectedRecords, String branch) throws Throwable {
        this.testChangeLogs((List<String>)ImmutableList.of((Object)"id"), (KeySelector<Row, Object>)(KeySelector & Serializable)row -> row.getField(0), true, elementsPerCheckpoint, expectedRecords, branch);
    }

    private /* synthetic */ void lambda$testChangeLogOnIdKey$0(List elementsPerCheckpoint, List expectedRecords, String branch) throws Throwable {
        this.testChangeLogs((List<String>)ImmutableList.of((Object)"id"), (KeySelector<Row, Object>)(KeySelector & Serializable)row -> row.getField(0), false, elementsPerCheckpoint, expectedRecords, branch);
    }
}

