/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.source;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.data.FileHelpers;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Term;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkCatalogTestBase;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.AssertionsForClassTypes;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public final class TestStructuredStreamingRead3
extends SparkCatalogTestBase {
    private Table table;
    private static final List<List<SimpleRecord>> TEST_DATA_MULTIPLE_SNAPSHOTS = Lists.newArrayList((Object[])new List[]{Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "one"), new SimpleRecord(2, "two"), new SimpleRecord(3, "three")}), Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(4, "four"), new SimpleRecord(5, "five")}), Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(6, "six"), new SimpleRecord(7, "seven")})});
    private static final List<List<List<SimpleRecord>>> TEST_DATA_MULTIPLE_WRITES_MULTIPLE_SNAPSHOTS = Lists.newArrayList((Object[])new List[]{Lists.newArrayList((Object[])new List[]{Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "one"), new SimpleRecord(2, "two"), new SimpleRecord(3, "three")}), Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(4, "four"), new SimpleRecord(5, "five")})}), Lists.newArrayList((Object[])new List[]{Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(6, "six"), new SimpleRecord(7, "seven")}), Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(8, "eight"), new SimpleRecord(9, "nine")})}), Lists.newArrayList((Object[])new List[]{Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(10, "ten"), new SimpleRecord(11, "eleven"), new SimpleRecord(12, "twelve")}), Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(13, "thirteen"), new SimpleRecord(14, "fourteen")}), Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(15, "fifteen"), new SimpleRecord(16, "sixteen")})})});
    private static final String MEMORY_TABLE = "_stream_view_mem";

    public TestStructuredStreamingRead3(String catalogName, String implementation, Map<String, String> config) {
        super(catalogName, implementation, config);
    }

    @Before
    public void setupTable() {
        this.sql("CREATE TABLE %s (id INT, data STRING) USING iceberg PARTITIONED BY (bucket(3, id))", this.tableName);
        this.table = this.validationCatalog.loadTable(this.tableIdent);
    }

    @After
    public void stopStreams() throws TimeoutException {
        for (StreamingQuery query : spark.streams().active()) {
            query.stop();
        }
    }

    @After
    public void removeTables() {
        this.sql("DROP TABLE IF EXISTS %s", this.tableName);
    }

    @Test
    public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws Exception {
        List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
        this.appendDataAsMultipleSnapshots(expected);
        StreamingQuery query = this.startStream();
        List<SimpleRecord> actual = this.rowsAvailable(query);
        Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
    }

    @Test
    public void testReadStreamOnIcebergThenAddData() throws Exception {
        List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
        StreamingQuery query = this.startStream();
        this.appendDataAsMultipleSnapshots(expected);
        List<SimpleRecord> actual = this.rowsAvailable(query);
        Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
    }

    @Test
    public void testReadingStreamFromTimestamp() throws Exception {
        ArrayList dataBeforeTimestamp = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(-2, "minustwo"), new SimpleRecord(-1, "minusone"), new SimpleRecord(0, "zero")});
        this.appendData(dataBeforeTimestamp);
        this.table.refresh();
        long streamStartTimestamp = this.table.currentSnapshot().timestampMillis() + 1L;
        StreamingQuery query = this.startStream("stream-from-timestamp", Long.toString(streamStartTimestamp));
        List<SimpleRecord> empty = this.rowsAvailable(query);
        Assertions.assertThat((boolean)empty.isEmpty()).isTrue();
        List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
        this.appendDataAsMultipleSnapshots(expected);
        List<SimpleRecord> actual = this.rowsAvailable(query);
        Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
    }

    @Test
    public void testReadingStreamFromFutureTimetsamp() throws Exception {
        long futureTimestamp = System.currentTimeMillis() + 10000L;
        StreamingQuery query = this.startStream("stream-from-timestamp", Long.toString(futureTimestamp));
        List<SimpleRecord> actual = this.rowsAvailable(query);
        Assertions.assertThat((boolean)actual.isEmpty()).isTrue();
        ArrayList data = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(-2, "minustwo"), new SimpleRecord(-1, "minusone"), new SimpleRecord(0, "zero")});
        IntStream.range(0, 3).forEach(x -> {
            this.appendData(data);
            Assertions.assertThat((boolean)this.rowsAvailable(query).isEmpty()).isTrue();
        });
        this.waitUntilAfter(futureTimestamp);
        this.appendData(data);
        actual = this.rowsAvailable(query);
        Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf((Iterable)data);
    }

    @Test
    public void testReadingStreamFromTimestampFutureWithExistingSnapshots() throws Exception {
        ArrayList dataBeforeTimestamp = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "one"), new SimpleRecord(2, "two"), new SimpleRecord(3, "three")});
        this.appendData(dataBeforeTimestamp);
        long streamStartTimestamp = System.currentTimeMillis() + 2000L;
        StreamingQuery query = this.startStream("stream-from-timestamp", Long.toString(streamStartTimestamp));
        List<SimpleRecord> actual = this.rowsAvailable(query);
        Assert.assertEquals(Collections.emptyList(), actual);
        this.waitUntilAfter(streamStartTimestamp);
        List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
        this.appendDataAsMultipleSnapshots(expected);
        Assertions.assertThat(this.rowsAvailable(query)).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
    }

    @Test
    public void testReadingStreamFromTimestampOfExistingSnapshot() throws Exception {
        List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
        this.appendData(expected.get(0));
        this.table.refresh();
        long firstSnapshotTime = this.table.currentSnapshot().timestampMillis();
        StreamingQuery stream = this.startStream("stream-from-timestamp", Long.toString(firstSnapshotTime));
        for (int i = 1; i < expected.size(); ++i) {
            this.appendData(expected.get(i));
        }
        List<SimpleRecord> actual = this.rowsAvailable(stream);
        Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
    }

    @Test
    public void testReadingStreamWithExpiredSnapshotFromTimestamp() throws TimeoutException {
        ArrayList firstSnapshotRecordList = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "one")});
        ArrayList secondSnapshotRecordList = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(2, "two")});
        ArrayList thirdSnapshotRecordList = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(3, "three")});
        ArrayList expectedRecordList = Lists.newArrayList();
        expectedRecordList.addAll(secondSnapshotRecordList);
        expectedRecordList.addAll(thirdSnapshotRecordList);
        this.appendData(firstSnapshotRecordList);
        this.table.refresh();
        long firstSnapshotid = this.table.currentSnapshot().snapshotId();
        long firstSnapshotCommitTime = this.table.currentSnapshot().timestampMillis();
        this.appendData(secondSnapshotRecordList);
        this.appendData(thirdSnapshotRecordList);
        this.table.expireSnapshots().expireSnapshotId(firstSnapshotid).commit();
        StreamingQuery query = this.startStream("stream-from-timestamp", String.valueOf(firstSnapshotCommitTime));
        List<SimpleRecord> actual = this.rowsAvailable(query);
        Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf((Iterable)expectedRecordList);
    }

    @Test
    public void testResumingStreamReadFromCheckpoint() throws Exception {
        File writerCheckpointFolder = this.temp.newFolder("writer-checkpoint-folder");
        File writerCheckpoint = new File(writerCheckpointFolder, "writer-checkpoint");
        File output = this.temp.newFolder();
        DataStreamWriter querySource = spark.readStream().format("iceberg").load(this.tableName).writeStream().option("checkpointLocation", writerCheckpoint.toString()).format("parquet").queryName("checkpoint_test").option("path", output.getPath());
        StreamingQuery startQuery = querySource.start();
        startQuery.processAllAvailable();
        startQuery.stop();
        ArrayList expected = Lists.newArrayList();
        for (List<List<SimpleRecord>> expectedCheckpoint : TEST_DATA_MULTIPLE_WRITES_MULTIPLE_SNAPSHOTS) {
            this.appendDataAsMultipleSnapshots(expectedCheckpoint);
            expected.addAll(Lists.newArrayList((Iterable)Iterables.concat((Iterable[])new Iterable[]{Iterables.concat(expectedCheckpoint)})));
            StreamingQuery restartedQuery = querySource.start();
            restartedQuery.processAllAvailable();
            restartedQuery.stop();
            List actual = spark.read().load(output.getPath()).as(Encoders.bean(SimpleRecord.class)).collectAsList();
            Assertions.assertThat((List)actual).containsExactlyInAnyOrderElementsOf(Iterables.concat((Iterable[])new Iterable[]{expected}));
        }
    }

    @Test
    public void testFailReadingCheckpointInvalidSnapshot() throws IOException, TimeoutException {
        File writerCheckpointFolder = this.temp.newFolder("writer-checkpoint-folder");
        File writerCheckpoint = new File(writerCheckpointFolder, "writer-checkpoint");
        File output = this.temp.newFolder();
        DataStreamWriter querySource = spark.readStream().format("iceberg").load(this.tableName).writeStream().option("checkpointLocation", writerCheckpoint.toString()).format("parquet").queryName("checkpoint_test").option("path", output.getPath());
        ArrayList firstSnapshotRecordList = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "one")});
        ArrayList secondSnapshotRecordList = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(2, "two")});
        StreamingQuery startQuery = querySource.start();
        this.appendData(firstSnapshotRecordList);
        this.table.refresh();
        long firstSnapshotid = this.table.currentSnapshot().snapshotId();
        startQuery.processAllAvailable();
        startQuery.stop();
        this.appendData(secondSnapshotRecordList);
        this.table.expireSnapshots().expireSnapshotId(firstSnapshotid).commit();
        StreamingQuery restartedQuery = querySource.start();
        AssertionsForClassTypes.assertThatThrownBy(() -> ((StreamingQuery)restartedQuery).processAllAvailable()).hasCauseInstanceOf(IllegalStateException.class).hasMessageContaining(String.format("Cannot load current offset at snapshot %d, the snapshot was expired or removed", firstSnapshotid));
    }

    @Test
    public void testParquetOrcAvroDataInOneTable() throws Exception {
        ArrayList parquetFileRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "one"), new SimpleRecord(2, "two"), new SimpleRecord(3, "three")});
        ArrayList orcFileRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(4, "four"), new SimpleRecord(5, "five")});
        ArrayList avroFileRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(6, "six"), new SimpleRecord(7, "seven")});
        this.appendData(parquetFileRecords);
        this.appendData(orcFileRecords, "orc");
        this.appendData(avroFileRecords, "avro");
        StreamingQuery query = this.startStream();
        Assertions.assertThat(this.rowsAvailable(query)).containsExactlyInAnyOrderElementsOf(Iterables.concat((Iterable)parquetFileRecords, (Iterable)orcFileRecords, (Iterable)avroFileRecords));
    }

    @Test
    public void testReadStreamFromEmptyTable() throws Exception {
        StreamingQuery stream = this.startStream();
        List<SimpleRecord> actual = this.rowsAvailable(stream);
        Assert.assertEquals(Collections.emptyList(), actual);
    }

    @Test
    public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws Exception {
        TableOperations ops = ((BaseTable)this.table).operations();
        TableMetadata meta = ops.current();
        ops.commit(meta, meta.upgradeToFormatVersion(2));
        List<List<SimpleRecord>> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS;
        this.appendDataAsMultipleSnapshots(dataAcrossSnapshots);
        Schema deleteRowSchema = this.table.schema().select(new String[]{"data"});
        GenericRecord dataDelete = GenericRecord.create((Schema)deleteRowSchema);
        ArrayList dataDeletes = Lists.newArrayList((Object[])new Record[]{dataDelete.copy("data", (Object)"one")});
        DeleteFile eqDeletes = FileHelpers.writeDeleteFile((Table)this.table, (OutputFile)Files.localOutput((File)this.temp.newFile()), (StructLike)TestHelpers.Row.of((Object[])new Object[]{0}), (List)dataDeletes, (Schema)deleteRowSchema);
        this.table.newRowDelta().addDeletes(eqDeletes).commit();
        Assert.assertEquals((Object)"overwrite", (Object)this.table.currentSnapshot().operation());
        StreamingQuery query = this.startStream();
        AssertHelpers.assertThrowsCause((String)"Streaming should fail with IllegalStateException, as the snapshot is not of type APPEND", IllegalStateException.class, (String)"Cannot process overwrite snapshot", () -> query.processAllAvailable());
    }

    @Test
    public void testReadStreamWithSnapshotTypeReplaceIgnoresReplace() throws Exception {
        List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
        this.appendDataAsMultipleSnapshots(expected);
        this.table.rewriteManifests().clusterBy(f -> 1).commit();
        Assert.assertEquals((Object)"replace", (Object)this.table.currentSnapshot().operation());
        StreamingQuery query = this.startStream();
        List<SimpleRecord> actual = this.rowsAvailable(query);
        Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
    }

    @Test
    public void testReadStreamWithSnapshotTypeDeleteErrorsOut() throws Exception {
        this.table.updateSpec().removeField("id_bucket").addField((Term)Expressions.ref((String)"id")).commit();
        List<List<SimpleRecord>> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS;
        this.appendDataAsMultipleSnapshots(dataAcrossSnapshots);
        this.table.newDelete().deleteFromRowFilter((Expression)Expressions.equal((String)"id", (Object)4)).commit();
        Assert.assertEquals((Object)"delete", (Object)this.table.currentSnapshot().operation());
        StreamingQuery query = this.startStream();
        AssertHelpers.assertThrowsCause((String)"Streaming should fail with IllegalStateException, as the snapshot is not of type APPEND", IllegalStateException.class, (String)"Cannot process delete snapshot", () -> query.processAllAvailable());
    }

    @Test
    public void testReadStreamWithSnapshotTypeDeleteAndSkipDeleteOption() throws Exception {
        this.table.updateSpec().removeField("id_bucket").addField((Term)Expressions.ref((String)"id")).commit();
        List<List<SimpleRecord>> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS;
        this.appendDataAsMultipleSnapshots(dataAcrossSnapshots);
        this.table.newDelete().deleteFromRowFilter((Expression)Expressions.equal((String)"id", (Object)4)).commit();
        Assert.assertEquals((Object)"delete", (Object)this.table.currentSnapshot().operation());
        StreamingQuery query = this.startStream("streaming-skip-delete-snapshots", "true");
        Assertions.assertThat(this.rowsAvailable(query)).containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots));
    }

    @Test
    public void testReadStreamWithSnapshotTypeDeleteAndSkipOverwriteOption() throws Exception {
        this.table.updateSpec().removeField("id_bucket").addField((Term)Expressions.ref((String)"id")).commit();
        List<List<SimpleRecord>> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS;
        this.appendDataAsMultipleSnapshots(dataAcrossSnapshots);
        this.table.newOverwrite().overwriteByRowFilter((Expression)Expressions.greaterThan((String)"id", (Object)4)).commit();
        Assert.assertEquals((Object)"overwrite", (Object)this.table.currentSnapshot().operation());
        StreamingQuery query = this.startStream("streaming-skip-overwrite-snapshots", "true");
        Assertions.assertThat(this.rowsAvailable(query)).containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots));
    }

    private void appendDataAsMultipleSnapshots(List<List<SimpleRecord>> data) {
        for (List<SimpleRecord> l : data) {
            this.appendData(l);
        }
    }

    private void appendData(List<SimpleRecord> data) {
        this.appendData(data, "parquet");
    }

    private void appendData(List<SimpleRecord> data, String format) {
        Dataset df = spark.createDataFrame(data, SimpleRecord.class);
        df.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", format).mode("append").save(this.tableName);
    }

    private StreamingQuery startStream(Map<String, String> options) throws TimeoutException {
        return spark.readStream().options(options).format("iceberg").load(this.tableName).writeStream().options(options).format("memory").queryName(MEMORY_TABLE).outputMode(OutputMode.Append()).start();
    }

    private StreamingQuery startStream() throws TimeoutException {
        return this.startStream(Collections.emptyMap());
    }

    private StreamingQuery startStream(String key, String value) throws TimeoutException {
        return this.startStream((Map<String, String>)ImmutableMap.of((Object)key, (Object)value));
    }

    private List<SimpleRecord> rowsAvailable(StreamingQuery query) {
        query.processAllAvailable();
        return spark.sql("select * from _stream_view_mem").as(Encoders.bean(SimpleRecord.class)).collectAsList();
    }
}

