/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.source;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.source.ManualSource;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.SparkException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;

@RunWith(value=Parameterized.class)
public class TestSparkDataWrite {
    private static final Configuration CONF = new Configuration();
    private final FileFormat format;
    private final String branch;
    private static SparkSession spark = null;
    private static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"id", (Type)Types.IntegerType.get()), Types.NestedField.optional((int)2, (String)"data", (Type)Types.StringType.get())});
    @Rule
    public TemporaryFolder temp = new TemporaryFolder();

    @Parameterized.Parameters(name="format = {0}, branch = {1}")
    public static Object[] parameters() {
        return new Object[]{new Object[]{"parquet", null}, new Object[]{"parquet", "main"}, new Object[]{"parquet", "testBranch"}, new Object[]{"avro", null}, new Object[]{"orc", "testBranch"}};
    }

    @BeforeClass
    public static void startSpark() {
        spark = SparkSession.builder().master("local[2]").getOrCreate();
    }

    @Parameterized.AfterParam
    public static void clearSourceCache() {
        ManualSource.clearTables();
    }

    @AfterClass
    public static void stopSpark() {
        SparkSession currentSpark = spark;
        spark = null;
        currentSpark.stop();
    }

    public TestSparkDataWrite(String format, String branch) {
        this.format = FileFormat.fromString((String)format);
        this.branch = branch;
    }

    @Test
    public void testBasicWrite() throws IOException {
        File parent = this.temp.newFolder(this.format.toString());
        File location = new File(parent, "test");
        String targetLocation = this.locationWithBranch(location);
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.builderFor((Schema)SCHEMA).identity("data").build();
        Table table = tables.create(SCHEMA, spec, location.toString());
        ArrayList expected = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset df = spark.createDataFrame((List)expected, SimpleRecord.class);
        df.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(location.toString());
        this.createBranch(table);
        table.refresh();
        Dataset result = spark.read().format("iceberg").load(targetLocation);
        List actual = result.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        Assert.assertEquals((String)"Number of rows should match", (long)expected.size(), (long)actual.size());
        Assert.assertEquals((String)"Result rows should match", (Object)expected, (Object)actual);
        for (ManifestFile manifest : SnapshotUtil.latestSnapshot((Table)table, (String)this.branch).allManifests(table.io())) {
            for (DataFile file : ManifestFiles.read((ManifestFile)manifest, (FileIO)table.io())) {
                if (!this.format.equals((Object)FileFormat.AVRO)) {
                    Assert.assertNotNull((String)"Split offsets not present", (Object)file.splitOffsets());
                }
                Assert.assertEquals((String)"Should have reported record count as 1", (long)1L, (long)file.recordCount());
                if (!this.format.equals((Object)FileFormat.PARQUET)) continue;
                Assert.assertNotNull((String)"Column sizes metric not present", (Object)file.columnSizes());
                Assert.assertNotNull((String)"Counts metric not present", (Object)file.valueCounts());
                Assert.assertNotNull((String)"Null value counts metric not present", (Object)file.nullValueCounts());
                Assert.assertNotNull((String)"Lower bounds metric not present", (Object)file.lowerBounds());
                Assert.assertNotNull((String)"Upper bounds metric not present", (Object)file.upperBounds());
            }
        }
    }

    @Test
    public void testAppend() throws IOException {
        File parent = this.temp.newFolder(this.format.toString());
        File location = new File(parent, "test");
        String targetLocation = this.locationWithBranch(location);
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.builderFor((Schema)SCHEMA).identity("data").build();
        Table table = tables.create(SCHEMA, spec, location.toString());
        ArrayList records = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        ArrayList expected = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"), new SimpleRecord(4, "a"), new SimpleRecord(5, "b"), new SimpleRecord(6, "c")});
        Dataset df = spark.createDataFrame((List)records, SimpleRecord.class);
        df.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(location.toString());
        this.createBranch(table);
        df.withColumn("id", df.col("id").plus((Object)3)).select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(targetLocation);
        table.refresh();
        Dataset result = spark.read().format("iceberg").load(targetLocation);
        List actual = result.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        Assert.assertEquals((String)"Number of rows should match", (long)expected.size(), (long)actual.size());
        Assert.assertEquals((String)"Result rows should match", (Object)expected, (Object)actual);
    }

    @Test
    public void testEmptyOverwrite() throws IOException {
        ArrayList records;
        File parent = this.temp.newFolder(this.format.toString());
        File location = new File(parent, "test");
        String targetLocation = this.locationWithBranch(location);
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.builderFor((Schema)SCHEMA).identity("id").build();
        Table table = tables.create(SCHEMA, spec, location.toString());
        ArrayList expected = records = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset df = spark.createDataFrame((List)records, SimpleRecord.class);
        df.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(location.toString());
        this.createBranch(table);
        Dataset empty = spark.createDataFrame((List)ImmutableList.of(), SimpleRecord.class);
        empty.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Overwrite).option("overwrite-mode", "dynamic").save(targetLocation);
        table.refresh();
        Dataset result = spark.read().format("iceberg").load(targetLocation);
        List actual = result.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        Assert.assertEquals((String)"Number of rows should match", (long)expected.size(), (long)actual.size());
        Assert.assertEquals((String)"Result rows should match", (Object)expected, (Object)actual);
    }

    @Test
    public void testOverwrite() throws IOException {
        File parent = this.temp.newFolder(this.format.toString());
        File location = new File(parent, "test");
        String targetLocation = this.locationWithBranch(location);
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.builderFor((Schema)SCHEMA).identity("id").build();
        Table table = tables.create(SCHEMA, spec, location.toString());
        ArrayList records = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        ArrayList expected = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "a"), new SimpleRecord(3, "c"), new SimpleRecord(4, "b"), new SimpleRecord(6, "c")});
        Dataset df = spark.createDataFrame((List)records, SimpleRecord.class);
        df.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(location.toString());
        this.createBranch(table);
        df.withColumn("id", df.col("id").multiply((Object)2)).select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Overwrite).option("overwrite-mode", "dynamic").save(targetLocation);
        table.refresh();
        Dataset result = spark.read().format("iceberg").load(targetLocation);
        List actual = result.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        Assert.assertEquals((String)"Number of rows should match", (long)expected.size(), (long)actual.size());
        Assert.assertEquals((String)"Result rows should match", (Object)expected, (Object)actual);
    }

    @Test
    public void testUnpartitionedOverwrite() throws IOException {
        File parent = this.temp.newFolder(this.format.toString());
        File location = new File(parent, "test");
        String targetLocation = this.locationWithBranch(location);
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        Table table = tables.create(SCHEMA, spec, location.toString());
        ArrayList expected = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset df = spark.createDataFrame((List)expected, SimpleRecord.class);
        df.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(location.toString());
        this.createBranch(table);
        df.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Overwrite).save(targetLocation);
        table.refresh();
        Dataset result = spark.read().format("iceberg").load(targetLocation);
        List actual = result.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        Assert.assertEquals((String)"Number of rows should match", (long)expected.size(), (long)actual.size());
        Assert.assertEquals((String)"Result rows should match", (Object)expected, (Object)actual);
    }

    @Test
    public void testUnpartitionedCreateWithTargetFileSizeViaTableProperties() throws IOException {
        File parent = this.temp.newFolder(this.format.toString());
        File location = new File(parent, "test");
        String targetLocation = this.locationWithBranch(location);
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        Table table = tables.create(SCHEMA, spec, location.toString());
        table.updateProperties().set("write.target-file-size-bytes", "4").commit();
        ArrayList expected = Lists.newArrayListWithCapacity((int)4000);
        for (int i = 0; i < 4000; ++i) {
            expected.add(new SimpleRecord(i, "a"));
        }
        Dataset df = spark.createDataFrame((List)expected, SimpleRecord.class);
        df.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(location.toString());
        this.createBranch(table);
        table.refresh();
        Dataset result = spark.read().format("iceberg").load(targetLocation);
        List actual = result.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        Assert.assertEquals((String)"Number of rows should match", (long)expected.size(), (long)actual.size());
        Assert.assertEquals((String)"Result rows should match", (Object)expected, (Object)actual);
        ArrayList files = Lists.newArrayList();
        for (ManifestFile manifest : SnapshotUtil.latestSnapshot((Table)table, (String)this.branch).allManifests(table.io())) {
            for (DataFile file : ManifestFiles.read((ManifestFile)manifest, (FileIO)table.io())) {
                files.add(file);
            }
        }
        Assert.assertEquals((String)"Should have 4 DataFiles", (long)4L, (long)files.size());
        Assert.assertTrue((String)"All DataFiles contain 1000 rows", (boolean)files.stream().allMatch(d -> d.recordCount() == 1000L));
    }

    @Test
    public void testPartitionedCreateWithTargetFileSizeViaOption() throws IOException {
        this.partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType.NONE);
    }

    @Test
    public void testPartitionedFanoutCreateWithTargetFileSizeViaOption() throws IOException {
        this.partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType.TABLE);
    }

    @Test
    public void testPartitionedFanoutCreateWithTargetFileSizeViaOption2() throws IOException {
        this.partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType.JOB);
    }

    @Test
    public void testWriteProjection() throws IOException {
        Assume.assumeTrue((String)"Not supported in Spark 3; analysis requires all columns are present", (boolean)spark.version().startsWith("2"));
        File parent = this.temp.newFolder(this.format.toString());
        File location = new File(parent, "test");
        String targetLocation = this.locationWithBranch(location);
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        Table table = tables.create(SCHEMA, spec, location.toString());
        ArrayList expected = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, null), new SimpleRecord(2, null), new SimpleRecord(3, null)});
        Dataset df = spark.createDataFrame((List)expected, SimpleRecord.class);
        df.select("id", new String[0]).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(location.toString());
        this.createBranch(table);
        table.refresh();
        Dataset result = spark.read().format("iceberg").load(targetLocation);
        List actual = result.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        Assert.assertEquals((String)"Number of rows should match", (long)expected.size(), (long)actual.size());
        Assert.assertEquals((String)"Result rows should match", (Object)expected, (Object)actual);
    }

    @Test
    public void testWriteProjectionWithMiddle() throws IOException {
        Assume.assumeTrue((String)"Not supported in Spark 3; analysis requires all columns are present", (boolean)spark.version().startsWith("2"));
        File parent = this.temp.newFolder(this.format.toString());
        File location = new File(parent, "test");
        String targetLocation = this.locationWithBranch(location);
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        Schema schema = new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"c1", (Type)Types.IntegerType.get()), Types.NestedField.optional((int)2, (String)"c2", (Type)Types.StringType.get()), Types.NestedField.optional((int)3, (String)"c3", (Type)Types.StringType.get())});
        Table table = tables.create(schema, spec, location.toString());
        ArrayList expected = Lists.newArrayList((Object[])new ThreeColumnRecord[]{new ThreeColumnRecord(1, null, "hello"), new ThreeColumnRecord(2, null, "world"), new ThreeColumnRecord(3, null, null)});
        Dataset df = spark.createDataFrame((List)expected, ThreeColumnRecord.class);
        df.select("c1", new String[]{"c3"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(location.toString());
        this.createBranch(table);
        table.refresh();
        Dataset result = spark.read().format("iceberg").load(targetLocation);
        List actual = result.orderBy("c1", new String[0]).as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
        Assert.assertEquals((String)"Number of rows should match", (long)expected.size(), (long)actual.size());
        Assert.assertEquals((String)"Result rows should match", (Object)expected, (Object)actual);
    }

    @Test
    public void testViewsReturnRecentResults() throws IOException {
        File parent = this.temp.newFolder(this.format.toString());
        File location = new File(parent, "test");
        String targetLocation = this.locationWithBranch(location);
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.builderFor((Schema)SCHEMA).identity("data").build();
        tables.create(SCHEMA, spec, location.toString());
        ArrayList records = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset df = spark.createDataFrame((List)records, SimpleRecord.class);
        df.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(location.toString());
        Table table = tables.load(location.toString());
        this.createBranch(table);
        Dataset query = spark.read().format("iceberg").load(targetLocation).where("id = 1");
        query.createOrReplaceTempView("tmp");
        List actual1 = spark.table("tmp").as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ArrayList expected1 = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a")});
        Assert.assertEquals((String)"Number of rows should match", (long)expected1.size(), (long)actual1.size());
        Assert.assertEquals((String)"Result rows should match", (Object)expected1, (Object)actual1);
        df.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(targetLocation);
        List actual2 = spark.table("tmp").as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ArrayList expected2 = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(1, "a")});
        Assert.assertEquals((String)"Number of rows should match", (long)expected2.size(), (long)actual2.size());
        Assert.assertEquals((String)"Result rows should match", (Object)expected2, (Object)actual2);
    }

    public void partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType option) throws IOException {
        File parent = this.temp.newFolder(this.format.toString());
        File location = new File(parent, "test");
        String targetLocation = this.locationWithBranch(location);
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.builderFor((Schema)SCHEMA).identity("data").build();
        ImmutableMap properties = ImmutableMap.of((Object)"write.distribution-mode", (Object)"none");
        Table table = tables.create(SCHEMA, spec, (Map)properties, location.toString());
        ArrayList expected = Lists.newArrayListWithCapacity((int)8000);
        for (int i = 0; i < 2000; ++i) {
            expected.add(new SimpleRecord(i, "a"));
            expected.add(new SimpleRecord(i, "b"));
            expected.add(new SimpleRecord(i, "c"));
            expected.add(new SimpleRecord(i, "d"));
        }
        Dataset df = spark.createDataFrame((List)expected, SimpleRecord.class);
        switch (option) {
            case NONE: {
                df.select("id", new String[]{"data"}).sort("data", new String[0]).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).option("target-file-size-bytes", 4L).save(location.toString());
                break;
            }
            case TABLE: {
                table.updateProperties().set("write.spark.fanout.enabled", "true").commit();
                df.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).option("target-file-size-bytes", 4L).save(location.toString());
                break;
            }
            case JOB: {
                df.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).option("target-file-size-bytes", 4L).option("fanout-enabled", true).save(location.toString());
                break;
            }
        }
        this.createBranch(table);
        table.refresh();
        Dataset result = spark.read().format("iceberg").load(targetLocation);
        List actual = result.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        Assert.assertEquals((String)"Number of rows should match", (long)expected.size(), (long)actual.size());
        Assert.assertEquals((String)"Result rows should match", (Object)expected, (Object)actual);
        ArrayList files = Lists.newArrayList();
        for (ManifestFile manifest : SnapshotUtil.latestSnapshot((Table)table, (String)this.branch).allManifests(table.io())) {
            for (DataFile file : ManifestFiles.read((ManifestFile)manifest, (FileIO)table.io())) {
                files.add(file);
            }
        }
        Assert.assertEquals((String)"Should have 8 DataFiles", (long)8L, (long)files.size());
        Assert.assertTrue((String)"All DataFiles contain 1000 rows", (boolean)files.stream().allMatch(d -> d.recordCount() == 1000L));
    }

    @Test
    public void testCommitUnknownException() throws IOException {
        File parent = this.temp.newFolder(this.format.toString());
        File location = new File(parent, "commitunknown");
        String targetLocation = this.locationWithBranch(location);
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.builderFor((Schema)SCHEMA).identity("data").build();
        Table table = tables.create(SCHEMA, spec, location.toString());
        ArrayList records = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset df = spark.createDataFrame((List)records, SimpleRecord.class);
        df.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(location.toString());
        this.createBranch(table);
        table.refresh();
        ArrayList records2 = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(4, "d"), new SimpleRecord(5, "e"), new SimpleRecord(6, "f")});
        Dataset df2 = spark.createDataFrame((List)records2, SimpleRecord.class);
        AppendFiles append = table.newFastAppend();
        if (this.branch != null) {
            append.toBranch(this.branch);
        }
        AppendFiles spyAppend = (AppendFiles)Mockito.spy((Object)append);
        ((AppendFiles)Mockito.doAnswer(invocation -> {
            append.commit();
            throw new CommitStateUnknownException((Throwable)new RuntimeException("Datacenter on Fire"));
        }).when((Object)spyAppend)).commit();
        Table spyTable = (Table)Mockito.spy((Object)table);
        Mockito.when((Object)spyTable.newAppend()).thenReturn((Object)spyAppend);
        SparkTable sparkTable = new SparkTable(spyTable, false);
        String manualTableName = "unknown_exception";
        ManualSource.setTable(manualTableName, (org.apache.spark.sql.connector.catalog.Table)sparkTable);
        AssertHelpers.assertThrowsWithCause((String)"Should throw a Commit State Unknown Exception", SparkException.class, (String)"Writing job aborted", CommitStateUnknownException.class, (String)"Datacenter on Fire", () -> df2.select("id", new String[]{"data"}).sort("data", new String[0]).write().format("org.apache.iceberg.spark.source.ManualSource").option("TABLE_NAME", manualTableName).mode(SaveMode.Append).save(targetLocation));
        Dataset result = spark.read().format("iceberg").load(targetLocation);
        List actual = result.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        Assert.assertEquals((String)"Number of rows should match", (long)(records.size() + records2.size()), (long)actual.size());
        ((ListAssert)Assertions.assertThat((List)actual).describedAs("Result rows should match", new Object[0])).containsExactlyInAnyOrder((Object[])((SimpleRecord[])ImmutableList.builder().addAll((Iterable)records).addAll((Iterable)records2).build().toArray((Object[])new SimpleRecord[0])));
    }

    private String locationWithBranch(File location) {
        if (this.branch == null) {
            return location.toString();
        }
        return location + "#branch_" + this.branch;
    }

    private void createBranch(Table table) {
        if (this.branch != null && !this.branch.equals("main")) {
            table.manageSnapshots().createBranch(this.branch, table.currentSnapshot().snapshotId()).commit();
        }
    }

    public static enum IcebergOptionsType {
        NONE,
        TABLE,
        JOB;

    }
}

