/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.source;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.data.RandomData;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.streaming.MemoryStream;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Seq;

public class TestForwardCompatibility {
    private static final Configuration CONF = new Configuration();
    private static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"id", (Type)Types.LongType.get()), Types.NestedField.optional((int)2, (String)"data", (Type)Types.StringType.get())});
    private static final PartitionSpec UNKNOWN_SPEC = TestHelpers.newExpectedSpecBuilder().withSchema(SCHEMA).withSpecId(0).addField("zero", 1, "id_zero").build();
    private static final PartitionSpec FAKE_SPEC = TestHelpers.newExpectedSpecBuilder().withSchema(SCHEMA).withSpecId(0).addField("identity", 1, "id_zero").build();
    @TempDir
    private Path temp;
    private static SparkSession spark = null;

    @BeforeAll
    public static void startSpark() {
        spark = SparkSession.builder().master("local[2]").getOrCreate();
    }

    @AfterAll
    public static void stopSpark() {
        SparkSession currentSpark = spark;
        spark = null;
        currentSpark.stop();
    }

    @Test
    public void testSparkWriteFailsUnknownTransform() throws IOException {
        File parent = this.temp.resolve("avro").toFile();
        File location = new File(parent, "test");
        File dataFolder = new File(location, "data");
        dataFolder.mkdirs();
        HadoopTables tables = new HadoopTables(CONF);
        tables.create(SCHEMA, UNKNOWN_SPEC, location.toString());
        ArrayList expected = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset df = spark.createDataFrame((List)expected, SimpleRecord.class);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> df.select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(location.toString())).isInstanceOf(UnsupportedOperationException.class)).hasMessageEndingWith("Cannot write using unsupported transforms: zero");
    }

    @Test
    public void testSparkStreamingWriteFailsUnknownTransform() throws IOException, TimeoutException {
        File parent = this.temp.resolve("avro").toFile();
        File location = new File(parent, "test");
        File dataFolder = new File(location, "data");
        dataFolder.mkdirs();
        File checkpoint = new File(parent, "checkpoint");
        checkpoint.mkdirs();
        HadoopTables tables = new HadoopTables(CONF);
        tables.create(SCHEMA, UNKNOWN_SPEC, location.toString());
        MemoryStream inputStream = this.newMemoryStream(1, spark.sqlContext(), Encoders.INT());
        StreamingQuery query = inputStream.toDF().selectExpr(new String[]{"value AS id", "CAST (value AS STRING) AS data"}).writeStream().outputMode("append").format("iceberg").option("checkpointLocation", checkpoint.toString()).option("path", location.toString()).start();
        ArrayList batch1 = Lists.newArrayList((Object[])new Integer[]{1, 2});
        this.send(batch1, inputStream);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ((StreamingQuery)query).processAllAvailable()).isInstanceOf(StreamingQueryException.class)).hasMessageEndingWith("Cannot write using unsupported transforms: zero");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSparkCanReadUnknownTransform() throws IOException {
        File parent = this.temp.resolve("avro").toFile();
        File location = new File(parent, "test");
        File dataFolder = new File(location, "data");
        dataFolder.mkdirs();
        HadoopTables tables = new HadoopTables(CONF);
        Table table = tables.create(SCHEMA, UNKNOWN_SPEC, location.toString());
        table.updateProperties().set("compatibility.snapshot-id-inheritance.enabled", "true").commit();
        List<GenericData.Record> expected = RandomData.generateList(table.schema(), 100, 1L);
        File parquetFile = new File(dataFolder, FileFormat.PARQUET.addExtension(UUID.randomUUID().toString()));
        try (FileAppender writer = Parquet.write((OutputFile)Files.localOutput((File)parquetFile)).schema(table.schema()).build();){
            writer.addAll(expected);
        }
        DataFile file = DataFiles.builder((PartitionSpec)FAKE_SPEC).withInputFile(Files.localInput((File)parquetFile)).withMetrics(writer.metrics()).withPartitionPath("id_zero=0").build();
        OutputFile manifestFile = Files.localOutput((String)FileFormat.AVRO.addExtension(this.temp.toFile().toString()));
        try (ManifestWriter manifestWriter = ManifestFiles.write((PartitionSpec)FAKE_SPEC, (OutputFile)manifestFile);){
            manifestWriter.add((ContentFile)file);
        }
        table.newFastAppend().appendManifest(manifestWriter.toManifestFile()).commit();
        Dataset df = spark.read().format("iceberg").load(location.toString());
        List rows = df.collectAsList();
        ((ListAssert)Assertions.assertThat((List)rows).as("Should contain 100 rows", new Object[0])).hasSize(100);
        for (int i = 0; i < expected.size(); ++i) {
            org.apache.iceberg.spark.data.TestHelpers.assertEqualsSafe(table.schema().asStruct(), expected.get(i), (Row)rows.get(i));
        }
    }

    private <T> MemoryStream<T> newMemoryStream(int id, SQLContext sqlContext, Encoder<T> encoder) {
        return new MemoryStream(id, sqlContext, Option.empty(), encoder);
    }

    private <T> void send(List<T> records, MemoryStream<T> stream) {
        stream.addData((Seq)JavaConverters.asScalaBuffer(records));
    }
}

