/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.source;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.data.FileHelpers;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Term;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.CatalogTestBase;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
public final class TestStructuredStreamingRead3
extends CatalogTestBase {
    private Table table;
    private final AtomicInteger microBatches = new AtomicInteger();
    private static final List<List<SimpleRecord>> TEST_DATA_MULTIPLE_SNAPSHOTS = Lists.newArrayList((Object[])new List[]{Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "one"), new SimpleRecord(2, "two"), new SimpleRecord(3, "three")}), Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(4, "four"), new SimpleRecord(5, "five")}), Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(6, "six"), new SimpleRecord(7, "seven")})});
    private static final List<List<List<SimpleRecord>>> TEST_DATA_MULTIPLE_WRITES_MULTIPLE_SNAPSHOTS = Lists.newArrayList((Object[])new List[]{Lists.newArrayList((Object[])new List[]{Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "one"), new SimpleRecord(2, "two"), new SimpleRecord(3, "three")}), Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(4, "four"), new SimpleRecord(5, "five")})}), Lists.newArrayList((Object[])new List[]{Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(6, "six"), new SimpleRecord(7, "seven")}), Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(8, "eight"), new SimpleRecord(9, "nine")})}), Lists.newArrayList((Object[])new List[]{Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(10, "ten"), new SimpleRecord(11, "eleven"), new SimpleRecord(12, "twelve")}), Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(13, "thirteen"), new SimpleRecord(14, "fourteen")}), Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(15, "fifteen"), new SimpleRecord(16, "sixteen")})})});
    private static final String MEMORY_TABLE = "_stream_view_mem";

    @BeforeAll
    public static void setupSpark() {
        spark.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "false");
    }

    @BeforeEach
    public void setupTable() {
        this.sql("CREATE TABLE %s (id INT, data STRING) USING iceberg PARTITIONED BY (bucket(3, id)) TBLPROPERTIES ('commit.manifest.min-count-to-merge'='3', 'commit.manifest-merge.enabled'='true')", this.tableName);
        this.table = this.validationCatalog.loadTable(this.tableIdent);
        this.microBatches.set(0);
    }

    @AfterEach
    public void stopStreams() throws TimeoutException {
        for (StreamingQuery query : spark.streams().active()) {
            query.stop();
        }
    }

    @AfterEach
    public void removeTables() {
        this.sql("DROP TABLE IF EXISTS %s", this.tableName);
    }

    @TestTemplate
    public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws Exception {
        List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
        this.appendDataAsMultipleSnapshots(expected);
        StreamingQuery query = this.startStream();
        List<SimpleRecord> actual = this.rowsAvailable(query);
        Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
    }

    @TestTemplate
    public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_1() throws Exception {
        this.appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
        Assertions.assertThat((int)this.microBatchCount((Map<String, String>)ImmutableMap.of((Object)"streaming-max-files-per-micro-batch", (Object)"1"))).isEqualTo(6);
    }

    @TestTemplate
    public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_2() throws Exception {
        this.appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
        Assertions.assertThat((int)this.microBatchCount((Map<String, String>)ImmutableMap.of((Object)"streaming-max-files-per-micro-batch", (Object)"2"))).isEqualTo(3);
    }

    @TestTemplate
    public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_1() throws Exception {
        this.appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
        Assertions.assertThat((int)this.microBatchCount((Map<String, String>)ImmutableMap.of((Object)"streaming-max-rows-per-micro-batch", (Object)"1"))).isEqualTo(1);
        StreamingQuery query = this.startStream("streaming-max-rows-per-micro-batch", "1");
        List<SimpleRecord> actual = this.rowsAvailable(query);
        Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf((Iterable)Lists.newArrayList((Object[])new SimpleRecord[]{TEST_DATA_MULTIPLE_SNAPSHOTS.get(0).get(0)}));
    }

    @TestTemplate
    public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_4() throws Exception {
        this.appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
        Assertions.assertThat((int)this.microBatchCount((Map<String, String>)ImmutableMap.of((Object)"streaming-max-rows-per-micro-batch", (Object)"4"))).isEqualTo(2);
    }

    @TestTemplate
    public void testReadStreamOnIcebergThenAddData() throws Exception {
        List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
        StreamingQuery query = this.startStream();
        this.appendDataAsMultipleSnapshots(expected);
        List<SimpleRecord> actual = this.rowsAvailable(query);
        Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
    }

    @TestTemplate
    public void testReadingStreamFromTimestamp() throws Exception {
        ArrayList dataBeforeTimestamp = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(-2, "minustwo"), new SimpleRecord(-1, "minusone"), new SimpleRecord(0, "zero")});
        this.appendData(dataBeforeTimestamp);
        this.table.refresh();
        long streamStartTimestamp = this.table.currentSnapshot().timestampMillis() + 1L;
        StreamingQuery query = this.startStream("stream-from-timestamp", Long.toString(streamStartTimestamp));
        List<SimpleRecord> empty = this.rowsAvailable(query);
        Assertions.assertThat(empty).isEmpty();
        List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
        this.appendDataAsMultipleSnapshots(expected);
        List<SimpleRecord> actual = this.rowsAvailable(query);
        Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
    }

    @TestTemplate
    public void testReadingStreamFromFutureTimetsamp() throws Exception {
        long futureTimestamp = System.currentTimeMillis() + 10000L;
        StreamingQuery query = this.startStream("stream-from-timestamp", Long.toString(futureTimestamp));
        List<SimpleRecord> actual = this.rowsAvailable(query);
        Assertions.assertThat(actual).isEmpty();
        ArrayList data = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(-2, "minustwo"), new SimpleRecord(-1, "minusone"), new SimpleRecord(0, "zero")});
        IntStream.range(0, 3).forEach(x -> {
            this.appendData(data);
            Assertions.assertThat(this.rowsAvailable(query)).isEmpty();
        });
        this.waitUntilAfter(futureTimestamp);
        this.appendData(data);
        actual = this.rowsAvailable(query);
        Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf((Iterable)data);
    }

    @TestTemplate
    public void testReadingStreamFromTimestampFutureWithExistingSnapshots() throws Exception {
        ArrayList dataBeforeTimestamp = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "one"), new SimpleRecord(2, "two"), new SimpleRecord(3, "three")});
        this.appendData(dataBeforeTimestamp);
        long streamStartTimestamp = System.currentTimeMillis() + 2000L;
        StreamingQuery query = this.startStream("stream-from-timestamp", Long.toString(streamStartTimestamp));
        List<SimpleRecord> actual = this.rowsAvailable(query);
        Assertions.assertThat(actual).isEmpty();
        this.waitUntilAfter(streamStartTimestamp);
        List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
        this.appendDataAsMultipleSnapshots(expected);
        Assertions.assertThat(this.rowsAvailable(query)).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
    }

    @TestTemplate
    public void testReadingStreamFromTimestampOfExistingSnapshot() throws Exception {
        List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
        this.appendData(expected.get(0));
        this.table.refresh();
        long firstSnapshotTime = this.table.currentSnapshot().timestampMillis();
        StreamingQuery stream = this.startStream("stream-from-timestamp", Long.toString(firstSnapshotTime));
        for (int i = 1; i < expected.size(); ++i) {
            this.appendData(expected.get(i));
        }
        List<SimpleRecord> actual = this.rowsAvailable(stream);
        Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
    }

    @TestTemplate
    public void testReadingStreamWithExpiredSnapshotFromTimestamp() throws TimeoutException {
        ArrayList firstSnapshotRecordList = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "one")});
        ArrayList secondSnapshotRecordList = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(2, "two")});
        ArrayList thirdSnapshotRecordList = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(3, "three")});
        ArrayList expectedRecordList = Lists.newArrayList();
        expectedRecordList.addAll(secondSnapshotRecordList);
        expectedRecordList.addAll(thirdSnapshotRecordList);
        this.appendData(firstSnapshotRecordList);
        this.table.refresh();
        long firstSnapshotid = this.table.currentSnapshot().snapshotId();
        long firstSnapshotCommitTime = this.table.currentSnapshot().timestampMillis();
        this.appendData(secondSnapshotRecordList);
        this.appendData(thirdSnapshotRecordList);
        this.table.expireSnapshots().expireSnapshotId(firstSnapshotid).commit();
        StreamingQuery query = this.startStream("stream-from-timestamp", String.valueOf(firstSnapshotCommitTime));
        List<SimpleRecord> actual = this.rowsAvailable(query);
        Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf((Iterable)expectedRecordList);
    }

    @TestTemplate
    public void testResumingStreamReadFromCheckpoint() throws Exception {
        File writerCheckpointFolder = this.temp.resolve("writer-checkpoint-folder").toFile();
        File writerCheckpoint = new File(writerCheckpointFolder, "writer-checkpoint");
        File output = this.temp.resolve("junit").toFile();
        DataStreamWriter querySource = spark.readStream().format("iceberg").load(this.tableName).writeStream().option("checkpointLocation", writerCheckpoint.toString()).format("parquet").queryName("checkpoint_test").option("path", output.getPath());
        StreamingQuery startQuery = querySource.start();
        startQuery.processAllAvailable();
        startQuery.stop();
        ArrayList expected = Lists.newArrayList();
        for (List<List<SimpleRecord>> expectedCheckpoint : TEST_DATA_MULTIPLE_WRITES_MULTIPLE_SNAPSHOTS) {
            this.appendDataAsMultipleSnapshots(expectedCheckpoint);
            expected.addAll(Lists.newArrayList((Iterable)Iterables.concat((Iterable[])new Iterable[]{Iterables.concat(expectedCheckpoint)})));
            StreamingQuery restartedQuery = querySource.start();
            restartedQuery.processAllAvailable();
            restartedQuery.stop();
            List actual = spark.read().load(output.getPath()).as(Encoders.bean(SimpleRecord.class)).collectAsList();
            Assertions.assertThat((List)actual).containsExactlyInAnyOrderElementsOf(Iterables.concat((Iterable[])new Iterable[]{expected}));
        }
    }

    @TestTemplate
    public void testFailReadingCheckpointInvalidSnapshot() throws IOException, TimeoutException {
        File writerCheckpointFolder = this.temp.resolve("writer-checkpoint-folder").toFile();
        File writerCheckpoint = new File(writerCheckpointFolder, "writer-checkpoint");
        File output = this.temp.resolve("junit").toFile();
        DataStreamWriter querySource = spark.readStream().format("iceberg").load(this.tableName).writeStream().option("checkpointLocation", writerCheckpoint.toString()).format("parquet").queryName("checkpoint_test").option("path", output.getPath());
        ArrayList firstSnapshotRecordList = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "one")});
        ArrayList secondSnapshotRecordList = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(2, "two")});
        StreamingQuery startQuery = querySource.start();
        this.appendData(firstSnapshotRecordList);
        this.table.refresh();
        long firstSnapshotid = this.table.currentSnapshot().snapshotId();
        startQuery.processAllAvailable();
        startQuery.stop();
        this.appendData(secondSnapshotRecordList);
        this.table.expireSnapshots().expireSnapshotId(firstSnapshotid).commit();
        StreamingQuery restartedQuery = querySource.start();
        Assertions.assertThatThrownBy(() -> ((StreamingQuery)restartedQuery).processAllAvailable()).hasCauseInstanceOf(IllegalStateException.class).hasMessageContaining(String.format("Cannot load current offset at snapshot %d, the snapshot was expired or removed", firstSnapshotid));
    }

    @TestTemplate
    public void testParquetOrcAvroDataInOneTable() throws Exception {
        ArrayList parquetFileRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "one"), new SimpleRecord(2, "two"), new SimpleRecord(3, "three")});
        ArrayList orcFileRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(4, "four"), new SimpleRecord(5, "five")});
        ArrayList avroFileRecords = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(6, "six"), new SimpleRecord(7, "seven")});
        this.appendData(parquetFileRecords);
        this.appendData(orcFileRecords, "orc");
        this.appendData(avroFileRecords, "avro");
        StreamingQuery query = this.startStream();
        Assertions.assertThat(this.rowsAvailable(query)).containsExactlyInAnyOrderElementsOf(Iterables.concat((Iterable)parquetFileRecords, (Iterable)orcFileRecords, (Iterable)avroFileRecords));
    }

    @TestTemplate
    public void testReadStreamFromEmptyTable() throws Exception {
        StreamingQuery stream = this.startStream();
        List<SimpleRecord> actual = this.rowsAvailable(stream);
        Assertions.assertThat(actual).isEmpty();
    }

    @TestTemplate
    public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws Exception {
        TableOperations ops = ((BaseTable)this.table).operations();
        TableMetadata meta = ops.current();
        ops.commit(meta, meta.upgradeToFormatVersion(2));
        List<List<SimpleRecord>> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS;
        this.appendDataAsMultipleSnapshots(dataAcrossSnapshots);
        Schema deleteRowSchema = this.table.schema().select(new String[]{"data"});
        GenericRecord dataDelete = GenericRecord.create((Schema)deleteRowSchema);
        ArrayList dataDeletes = Lists.newArrayList((Object[])new Record[]{dataDelete.copy("data", (Object)"one")});
        DeleteFile eqDeletes = FileHelpers.writeDeleteFile((Table)this.table, (OutputFile)Files.localOutput((File)File.createTempFile("junit", null, this.temp.toFile())), (StructLike)TestHelpers.Row.of((Object[])new Object[]{0}), (List)dataDeletes, (Schema)deleteRowSchema);
        DataFile dataFile = DataFiles.builder((PartitionSpec)this.table.spec()).withPath(File.createTempFile("junit", null, this.temp.toFile()).getPath()).withFileSizeInBytes(10L).withRecordCount(1L).withFormat(FileFormat.PARQUET).build();
        this.table.newRowDelta().addRows(dataFile).addDeletes(eqDeletes).commit();
        Assertions.assertThat((String)this.table.currentSnapshot().operation()).isEqualTo("overwrite");
        StreamingQuery query = this.startStream();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ((StreamingQuery)query).processAllAvailable()).cause().isInstanceOf(IllegalStateException.class)).hasMessageStartingWith("Cannot process overwrite snapshot");
    }

    @TestTemplate
    public void testReadStreamWithSnapshotTypeRewriteDataFilesIgnoresReplace() throws Exception {
        List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
        this.appendDataAsMultipleSnapshots(expected);
        this.makeRewriteDataFiles();
        Assertions.assertThat((int)this.microBatchCount((Map<String, String>)ImmutableMap.of((Object)"streaming-max-files-per-micro-batch", (Object)"1"))).isEqualTo(6);
    }

    @TestTemplate
    public void testReadStreamWithSnapshotTypeRewriteDataFilesIgnoresReplaceMaxRows() throws Exception {
        List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
        this.appendDataAsMultipleSnapshots(expected);
        this.makeRewriteDataFiles();
        Assertions.assertThat((int)this.microBatchCount((Map<String, String>)ImmutableMap.of((Object)"streaming-max-rows-per-micro-batch", (Object)"4"))).isEqualTo(2);
    }

    @TestTemplate
    public void testReadStreamWithSnapshotTypeRewriteDataFilesIgnoresReplaceMaxFilesAndRows() throws Exception {
        List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
        this.appendDataAsMultipleSnapshots(expected);
        this.makeRewriteDataFiles();
        Assertions.assertThat((int)this.microBatchCount((Map<String, String>)ImmutableMap.of((Object)"streaming-max-rows-per-micro-batch", (Object)"4", (Object)"streaming-max-files-per-micro-batch", (Object)"1"))).isEqualTo(6);
    }

    @TestTemplate
    public void testReadStreamWithSnapshotType2RewriteDataFilesIgnoresReplace() throws Exception {
        List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
        this.appendDataAsMultipleSnapshots(expected);
        this.makeRewriteDataFiles();
        this.makeRewriteDataFiles();
        Assertions.assertThat((int)this.microBatchCount((Map<String, String>)ImmutableMap.of((Object)"streaming-max-files-per-micro-batch", (Object)"1"))).isEqualTo(6);
    }

    @TestTemplate
    public void testReadStreamWithSnapshotTypeRewriteDataFilesIgnoresReplaceFollowedByAppend() throws Exception {
        List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
        this.appendDataAsMultipleSnapshots(expected);
        this.makeRewriteDataFiles();
        this.appendDataAsMultipleSnapshots(expected);
        Assertions.assertThat((int)this.microBatchCount((Map<String, String>)ImmutableMap.of((Object)"streaming-max-files-per-micro-batch", (Object)"1"))).isEqualTo(12);
    }

    @TestTemplate
    public void testReadStreamWithSnapshotTypeReplaceIgnoresReplace() throws Exception {
        List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
        this.appendDataAsMultipleSnapshots(expected);
        this.table.rewriteManifests().clusterBy(f -> 1).commit();
        Assertions.assertThat((String)this.table.currentSnapshot().operation()).isEqualTo("replace");
        StreamingQuery query = this.startStream();
        List<SimpleRecord> actual = this.rowsAvailable(query);
        Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
    }

    @TestTemplate
    public void testReadStreamWithSnapshotTypeDeleteErrorsOut() throws Exception {
        this.table.updateSpec().removeField("id_bucket").addField((Term)Expressions.ref((String)"id")).commit();
        List<List<SimpleRecord>> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS;
        this.appendDataAsMultipleSnapshots(dataAcrossSnapshots);
        this.table.newDelete().deleteFromRowFilter((Expression)Expressions.equal((String)"id", (Object)4)).commit();
        Assertions.assertThat((String)this.table.currentSnapshot().operation()).isEqualTo("delete");
        StreamingQuery query = this.startStream();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ((StreamingQuery)query).processAllAvailable()).cause().isInstanceOf(IllegalStateException.class)).hasMessageStartingWith("Cannot process delete snapshot");
    }

    @TestTemplate
    public void testReadStreamWithSnapshotTypeDeleteAndSkipDeleteOption() throws Exception {
        this.table.updateSpec().removeField("id_bucket").addField((Term)Expressions.ref((String)"id")).commit();
        List<List<SimpleRecord>> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS;
        this.appendDataAsMultipleSnapshots(dataAcrossSnapshots);
        this.table.newDelete().deleteFromRowFilter((Expression)Expressions.equal((String)"id", (Object)4)).commit();
        Assertions.assertThat((String)this.table.currentSnapshot().operation()).isEqualTo("delete");
        StreamingQuery query = this.startStream("streaming-skip-delete-snapshots", "true");
        Assertions.assertThat(this.rowsAvailable(query)).containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots));
    }

    @TestTemplate
    public void testReadStreamWithSnapshotTypeDeleteAndSkipOverwriteOption() throws Exception {
        this.table.updateSpec().removeField("id_bucket").addField((Term)Expressions.ref((String)"id")).commit();
        List<List<SimpleRecord>> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS;
        this.appendDataAsMultipleSnapshots(dataAcrossSnapshots);
        DataFile dataFile = DataFiles.builder((PartitionSpec)this.table.spec()).withPath(File.createTempFile("junit", null, this.temp.toFile()).getPath()).withFileSizeInBytes(10L).withRecordCount(1L).withFormat(FileFormat.PARQUET).build();
        this.table.newOverwrite().addFile(dataFile).overwriteByRowFilter((Expression)Expressions.greaterThan((String)"id", (Object)4)).commit();
        Assertions.assertThat((String)this.table.currentSnapshot().operation()).isEqualTo("overwrite");
        StreamingQuery query = this.startStream("streaming-skip-overwrite-snapshots", "true");
        Assertions.assertThat(this.rowsAvailable(query)).containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots));
    }

    public void makeRewriteDataFiles() {
        this.table.refresh();
        RewriteFiles rewrite = this.table.newRewrite();
        Iterable it = this.table.snapshots();
        for (Snapshot snapshot : it) {
            if (!snapshot.operation().equals("append")) continue;
            Iterable datafiles = snapshot.addedDataFiles(this.table.io());
            for (DataFile datafile : datafiles) {
                rewrite.addFile(datafile);
                rewrite.deleteFile(datafile);
            }
        }
        rewrite.commit();
    }

    private void appendDataAsMultipleSnapshots(List<List<SimpleRecord>> data) {
        for (List<SimpleRecord> l : data) {
            this.appendData(l);
        }
    }

    private void appendData(List<SimpleRecord> data) {
        this.appendData(data, "parquet");
    }

    private void appendData(List<SimpleRecord> data, String format) {
        Dataset df = spark.createDataFrame(data, SimpleRecord.class);
        df.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", format).mode("append").save(this.tableName);
    }

    private StreamingQuery startStream(Map<String, String> options) throws TimeoutException {
        return spark.readStream().options(options).format("iceberg").load(this.tableName).writeStream().options(options).format("memory").queryName(MEMORY_TABLE).outputMode(OutputMode.Append()).start();
    }

    private StreamingQuery startStream() throws TimeoutException {
        return this.startStream(Collections.emptyMap());
    }

    private StreamingQuery startStream(String key, String value) throws TimeoutException {
        return this.startStream((Map<String, String>)ImmutableMap.of((Object)key, (Object)value, (Object)"streaming-max-files-per-micro-batch", (Object)"1"));
    }

    private int microBatchCount(Map<String, String> options) throws TimeoutException {
        Dataset ds = spark.readStream().options(options).format("iceberg").load(this.tableName);
        ds.writeStream().options(options).foreachBatch((VoidFunction2 & Serializable)(dataset, batchId) -> this.microBatches.getAndIncrement()).start().processAllAvailable();
        this.stopStreams();
        return this.microBatches.get();
    }

    private List<SimpleRecord> rowsAvailable(StreamingQuery query) {
        query.processAllAvailable();
        return spark.sql("select * from _stream_view_mem").as(Encoders.bean(SimpleRecord.class)).collectAsList();
    }
}

