/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.source;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.streaming.MemoryStream;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Seq;

public class TestStructuredStreaming {
    private static final Configuration CONF = new Configuration();
    private static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"id", (Type)Types.IntegerType.get()), Types.NestedField.optional((int)2, (String)"data", (Type)Types.StringType.get())});
    private static SparkSession spark = null;
    @Rule
    public TemporaryFolder temp = new TemporaryFolder();

    @BeforeClass
    public static void startSpark() {
        spark = SparkSession.builder().master("local[2]").config("spark.sql.shuffle.partitions", 4L).getOrCreate();
    }

    @AfterClass
    public static void stopSpark() {
        SparkSession currentSpark = spark;
        spark = null;
        currentSpark.stop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStreamingWriteAppendMode() throws Exception {
        File parent = this.temp.newFolder("parquet");
        File location = new File(parent, "test-table");
        File checkpoint = new File(parent, "checkpoint");
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.builderFor((Schema)SCHEMA).identity("data").build();
        Table table = tables.create(SCHEMA, spec, location.toString());
        ArrayList expected = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "1"), new SimpleRecord(2, "2"), new SimpleRecord(3, "3"), new SimpleRecord(4, "4")});
        MemoryStream inputStream = this.newMemoryStream(1, spark.sqlContext(), Encoders.INT());
        DataStreamWriter streamWriter = inputStream.toDF().selectExpr(new String[]{"value AS id", "CAST (value AS STRING) AS data"}).writeStream().outputMode("append").format("iceberg").option("checkpointLocation", checkpoint.toString()).option("path", location.toString());
        try {
            StreamingQuery query = streamWriter.start();
            ArrayList batch1 = Lists.newArrayList((Object[])new Integer[]{1, 2});
            this.send(batch1, inputStream);
            query.processAllAvailable();
            ArrayList batch2 = Lists.newArrayList((Object[])new Integer[]{3, 4});
            this.send(batch2, inputStream);
            query.processAllAvailable();
            query.stop();
            File lastCommitFile = new File(checkpoint.toString() + "/commits/1");
            Assert.assertTrue((String)"The commit file must be deleted", (boolean)lastCommitFile.delete());
            StreamingQuery restartedQuery = streamWriter.start();
            restartedQuery.processAllAvailable();
            Dataset result = spark.read().format("iceberg").load(location.toString());
            List actual = result.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
            Assert.assertEquals((String)"Number of rows should match", (long)expected.size(), (long)actual.size());
            Assert.assertEquals((String)"Result rows should match", (Object)expected, (Object)actual);
            Assert.assertEquals((String)"Number of snapshots should match", (long)2L, (long)Iterables.size((Iterable)table.snapshots()));
        }
        finally {
            for (StreamingQuery query : spark.streams().active()) {
                query.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStreamingWriteCompleteMode() throws Exception {
        File parent = this.temp.newFolder("parquet");
        File location = new File(parent, "test-table");
        File checkpoint = new File(parent, "checkpoint");
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.builderFor((Schema)SCHEMA).identity("data").build();
        Table table = tables.create(SCHEMA, spec, location.toString());
        ArrayList expected = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(2, "1"), new SimpleRecord(3, "2"), new SimpleRecord(1, "3")});
        MemoryStream inputStream = this.newMemoryStream(1, spark.sqlContext(), Encoders.INT());
        DataStreamWriter streamWriter = inputStream.toDF().groupBy("value", new String[0]).count().selectExpr(new String[]{"CAST(count AS INT) AS id", "CAST (value AS STRING) AS data"}).writeStream().outputMode("complete").format("iceberg").option("checkpointLocation", checkpoint.toString()).option("path", location.toString());
        try {
            StreamingQuery query = streamWriter.start();
            ArrayList batch1 = Lists.newArrayList((Object[])new Integer[]{1, 2});
            this.send(batch1, inputStream);
            query.processAllAvailable();
            ArrayList batch2 = Lists.newArrayList((Object[])new Integer[]{1, 2, 2, 3});
            this.send(batch2, inputStream);
            query.processAllAvailable();
            query.stop();
            File lastCommitFile = new File(checkpoint.toString() + "/commits/1");
            Assert.assertTrue((String)"The commit file must be deleted", (boolean)lastCommitFile.delete());
            StreamingQuery restartedQuery = streamWriter.start();
            restartedQuery.processAllAvailable();
            Dataset result = spark.read().format("iceberg").load(location.toString());
            List actual = result.orderBy("data", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
            Assert.assertEquals((String)"Number of rows should match", (long)expected.size(), (long)actual.size());
            Assert.assertEquals((String)"Result rows should match", (Object)expected, (Object)actual);
            Assert.assertEquals((String)"Number of snapshots should match", (long)2L, (long)Iterables.size((Iterable)table.snapshots()));
        }
        finally {
            for (StreamingQuery query : spark.streams().active()) {
                query.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStreamingWriteCompleteModeWithProjection() throws Exception {
        File parent = this.temp.newFolder("parquet");
        File location = new File(parent, "test-table");
        File checkpoint = new File(parent, "checkpoint");
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        Table table = tables.create(SCHEMA, spec, location.toString());
        ArrayList expected = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, null), new SimpleRecord(2, null), new SimpleRecord(3, null)});
        MemoryStream inputStream = this.newMemoryStream(1, spark.sqlContext(), Encoders.INT());
        DataStreamWriter streamWriter = inputStream.toDF().groupBy("value", new String[0]).count().selectExpr(new String[]{"CAST(count AS INT) AS id"}).writeStream().outputMode("complete").format("iceberg").option("checkpointLocation", checkpoint.toString()).option("path", location.toString());
        try {
            StreamingQuery query = streamWriter.start();
            ArrayList batch1 = Lists.newArrayList((Object[])new Integer[]{1, 2});
            this.send(batch1, inputStream);
            query.processAllAvailable();
            ArrayList batch2 = Lists.newArrayList((Object[])new Integer[]{1, 2, 2, 3});
            this.send(batch2, inputStream);
            query.processAllAvailable();
            query.stop();
            File lastCommitFile = new File(checkpoint.toString() + "/commits/1");
            Assert.assertTrue((String)"The commit file must be deleted", (boolean)lastCommitFile.delete());
            StreamingQuery restartedQuery = streamWriter.start();
            restartedQuery.processAllAvailable();
            Dataset result = spark.read().format("iceberg").load(location.toString());
            List actual = result.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
            Assert.assertEquals((String)"Number of rows should match", (long)expected.size(), (long)actual.size());
            Assert.assertEquals((String)"Result rows should match", (Object)expected, (Object)actual);
            Assert.assertEquals((String)"Number of snapshots should match", (long)2L, (long)Iterables.size((Iterable)table.snapshots()));
        }
        finally {
            for (StreamingQuery query : spark.streams().active()) {
                query.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStreamingWriteUpdateMode() throws Exception {
        File parent = this.temp.newFolder("parquet");
        File location = new File(parent, "test-table");
        File checkpoint = new File(parent, "checkpoint");
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.builderFor((Schema)SCHEMA).identity("data").build();
        tables.create(SCHEMA, spec, location.toString());
        MemoryStream inputStream = this.newMemoryStream(1, spark.sqlContext(), Encoders.INT());
        DataStreamWriter streamWriter = inputStream.toDF().selectExpr(new String[]{"value AS id", "CAST (value AS STRING) AS data"}).writeStream().outputMode("update").format("iceberg").option("checkpointLocation", checkpoint.toString()).option("path", location.toString());
        try {
            StreamingQuery query = streamWriter.start();
            ArrayList batch1 = Lists.newArrayList((Object[])new Integer[]{1, 2});
            this.send(batch1, inputStream);
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ((StreamingQuery)query).processAllAvailable()).isInstanceOf(StreamingQueryException.class)).hasMessageContaining("does not support Update mode");
        }
        finally {
            for (StreamingQuery query : spark.streams().active()) {
                query.stop();
            }
        }
    }

    private <T> MemoryStream<T> newMemoryStream(int id, SQLContext sqlContext, Encoder<T> encoder) {
        return new MemoryStream(id, sqlContext, Option.empty(), encoder);
    }

    private <T> void send(List<T> records, MemoryStream<T> stream) {
        stream.addData((Seq)JavaConverters.asScalaBuffer(records));
    }
}

