/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.source;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.source.ManualSource;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.MapAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mockito;

@ExtendWith(value={ParameterizedTestExtension.class})
public class TestSparkDataWrite {
    private static final Configuration CONF = new Configuration();
    @Parameter(index=0)
    private FileFormat format;
    @Parameter(index=1)
    private String branch;
    private static SparkSession spark = null;
    private static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"id", (Type)Types.IntegerType.get()), Types.NestedField.optional((int)2, (String)"data", (Type)Types.StringType.get())});
    @TempDir
    private Path temp;

    @Parameters(name="format = {0}, branch = {1}")
    public static Object[][] parameters() {
        return new Object[][]{{FileFormat.PARQUET, null}, {FileFormat.PARQUET, "main"}, {FileFormat.PARQUET, "testBranch"}, {FileFormat.AVRO, null}, {FileFormat.ORC, "testBranch"}};
    }

    @BeforeAll
    public static void startSpark() {
        spark = SparkSession.builder().master("local[2]").getOrCreate();
    }

    @AfterEach
    public void clearSourceCache() {
        ManualSource.clearTables();
    }

    @AfterAll
    public static void stopSpark() {
        SparkSession currentSpark = spark;
        spark = null;
        currentSpark.stop();
    }

    @TestTemplate
    public void testBasicWrite() throws IOException {
        File parent = this.temp.resolve(this.format.toString()).toFile();
        File location = new File(parent, "test");
        String targetLocation = this.locationWithBranch(location);
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.builderFor((Schema)SCHEMA).identity("data").build();
        Table table = tables.create(SCHEMA, spec, location.toString());
        ArrayList expected = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset df = spark.createDataFrame((List)expected, SimpleRecord.class);
        df.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(location.toString());
        this.createBranch(table);
        table.refresh();
        Dataset result = spark.read().format("iceberg").load(targetLocation);
        List actual = result.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ((ListAssert)Assertions.assertThat((List)actual).as("Number of rows should match", new Object[0])).hasSameSizeAs((Iterable)expected);
        ((ListAssert)Assertions.assertThat((List)actual).as("Result rows should match", new Object[0])).isEqualTo((Object)expected);
        for (ManifestFile manifest : SnapshotUtil.latestSnapshot((Table)table, (String)this.branch).allManifests(table.io())) {
            for (DataFile file : ManifestFiles.read((ManifestFile)manifest, (FileIO)table.io())) {
                if (!this.format.equals((Object)FileFormat.AVRO)) {
                    ((ListAssert)Assertions.assertThat((List)file.splitOffsets()).as("Split offsets not present", new Object[0])).isNotNull();
                }
                ((AbstractLongAssert)Assertions.assertThat((long)file.recordCount()).as("Should have reported record count as 1", new Object[0])).isEqualTo(1L);
                if (!this.format.equals((Object)FileFormat.PARQUET)) continue;
                ((MapAssert)Assertions.assertThat((Map)file.columnSizes()).as("Column sizes metric not present", new Object[0])).isNotNull();
                ((MapAssert)Assertions.assertThat((Map)file.valueCounts()).as("Counts metric not present", new Object[0])).isNotNull();
                ((MapAssert)Assertions.assertThat((Map)file.nullValueCounts()).as("Null value counts metric not present", new Object[0])).isNotNull();
                ((MapAssert)Assertions.assertThat((Map)file.lowerBounds()).as("Lower bounds metric not present", new Object[0])).isNotNull();
                ((MapAssert)Assertions.assertThat((Map)file.upperBounds()).as("Upper bounds metric not present", new Object[0])).isNotNull();
            }
        }
    }

    @TestTemplate
    public void testAppend() throws IOException {
        File parent = this.temp.resolve(this.format.toString()).toFile();
        File location = new File(parent, "test");
        String targetLocation = this.locationWithBranch(location);
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.builderFor((Schema)SCHEMA).identity("data").build();
        Table table = tables.create(SCHEMA, spec, location.toString());
        ArrayList records = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        ArrayList expected = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"), new SimpleRecord(4, "a"), new SimpleRecord(5, "b"), new SimpleRecord(6, "c")});
        Dataset df = spark.createDataFrame((List)records, SimpleRecord.class);
        df.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(location.toString());
        this.createBranch(table);
        df.withColumn("id", df.col("id").plus((Object)3)).select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(targetLocation);
        table.refresh();
        Dataset result = spark.read().format("iceberg").load(targetLocation);
        List actual = result.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ((ListAssert)Assertions.assertThat((List)actual).as("Number of rows should match", new Object[0])).hasSameSizeAs((Iterable)expected);
        ((ListAssert)Assertions.assertThat((List)actual).as("Result rows should match", new Object[0])).isEqualTo((Object)expected);
    }

    @TestTemplate
    public void testEmptyOverwrite() throws IOException {
        ArrayList records;
        File parent = this.temp.resolve(this.format.toString()).toFile();
        File location = new File(parent, "test");
        String targetLocation = this.locationWithBranch(location);
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.builderFor((Schema)SCHEMA).identity("id").build();
        Table table = tables.create(SCHEMA, spec, location.toString());
        ArrayList expected = records = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset df = spark.createDataFrame((List)records, SimpleRecord.class);
        df.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(location.toString());
        this.createBranch(table);
        Dataset empty = spark.createDataFrame((List)ImmutableList.of(), SimpleRecord.class);
        empty.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Overwrite).option("overwrite-mode", "dynamic").save(targetLocation);
        table.refresh();
        Dataset result = spark.read().format("iceberg").load(targetLocation);
        List actual = result.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ((ListAssert)Assertions.assertThat((List)actual).as("Number of rows should match", new Object[0])).hasSameSizeAs((Iterable)expected);
        ((ListAssert)Assertions.assertThat((List)actual).as("Result rows should match", new Object[0])).isEqualTo((Object)expected);
    }

    @TestTemplate
    public void testOverwrite() throws IOException {
        File parent = this.temp.resolve(this.format.toString()).toFile();
        File location = new File(parent, "test");
        String targetLocation = this.locationWithBranch(location);
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.builderFor((Schema)SCHEMA).identity("id").build();
        Table table = tables.create(SCHEMA, spec, location.toString());
        ArrayList records = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        ArrayList expected = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "a"), new SimpleRecord(3, "c"), new SimpleRecord(4, "b"), new SimpleRecord(6, "c")});
        Dataset df = spark.createDataFrame((List)records, SimpleRecord.class);
        df.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(location.toString());
        this.createBranch(table);
        df.withColumn("id", df.col("id").multiply((Object)2)).select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Overwrite).option("overwrite-mode", "dynamic").save(targetLocation);
        table.refresh();
        Dataset result = spark.read().format("iceberg").load(targetLocation);
        List actual = result.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ((ListAssert)Assertions.assertThat((List)actual).as("Number of rows should match", new Object[0])).hasSameSizeAs((Iterable)expected);
        ((ListAssert)Assertions.assertThat((List)actual).as("Result rows should match", new Object[0])).isEqualTo((Object)expected);
    }

    @TestTemplate
    public void testUnpartitionedOverwrite() throws IOException {
        File parent = this.temp.resolve(this.format.toString()).toFile();
        File location = new File(parent, "test");
        String targetLocation = this.locationWithBranch(location);
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        Table table = tables.create(SCHEMA, spec, location.toString());
        ArrayList expected = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset df = spark.createDataFrame((List)expected, SimpleRecord.class);
        df.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(location.toString());
        this.createBranch(table);
        df.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Overwrite).save(targetLocation);
        table.refresh();
        Dataset result = spark.read().format("iceberg").load(targetLocation);
        List actual = result.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ((ListAssert)Assertions.assertThat((List)actual).as("Number of rows should match", new Object[0])).hasSameSizeAs((Iterable)expected);
        ((ListAssert)Assertions.assertThat((List)actual).as("Result rows should match", new Object[0])).isEqualTo((Object)expected);
    }

    @TestTemplate
    public void testUnpartitionedCreateWithTargetFileSizeViaTableProperties() throws IOException {
        File parent = this.temp.resolve(this.format.toString()).toFile();
        File location = new File(parent, "test");
        String targetLocation = this.locationWithBranch(location);
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        Table table = tables.create(SCHEMA, spec, location.toString());
        table.updateProperties().set("write.target-file-size-bytes", "4").commit();
        ArrayList expected = Lists.newArrayListWithCapacity((int)4000);
        for (int i = 0; i < 4000; ++i) {
            expected.add(new SimpleRecord(i, "a"));
        }
        Dataset df = spark.createDataFrame((List)expected, SimpleRecord.class);
        df.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(location.toString());
        this.createBranch(table);
        table.refresh();
        Dataset result = spark.read().format("iceberg").load(targetLocation);
        List actual = result.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ((ListAssert)Assertions.assertThat((List)actual).as("Number of rows should match", new Object[0])).hasSameSizeAs((Iterable)expected);
        ((ListAssert)Assertions.assertThat((List)actual).as("Result rows should match", new Object[0])).isEqualTo((Object)expected);
        ArrayList files = Lists.newArrayList();
        for (ManifestFile manifest : SnapshotUtil.latestSnapshot((Table)table, (String)this.branch).allManifests(table.io())) {
            for (DataFile file : ManifestFiles.read((ManifestFile)manifest, (FileIO)table.io())) {
                files.add(file);
            }
        }
        ((ListAssert)Assertions.assertThat((List)files).as("Should have 4 DataFiles", new Object[0])).hasSize(4);
        ((ListAssert)Assertions.assertThat(files.stream()).as("All DataFiles contain 1000 rows", new Object[0])).allMatch(d -> d.recordCount() == 1000L);
    }

    @TestTemplate
    public void testPartitionedCreateWithTargetFileSizeViaOption() throws IOException {
        this.partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType.NONE);
    }

    @TestTemplate
    public void testPartitionedFanoutCreateWithTargetFileSizeViaOption() throws IOException {
        this.partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType.TABLE);
    }

    @TestTemplate
    public void testPartitionedFanoutCreateWithTargetFileSizeViaOption2() throws IOException {
        this.partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType.JOB);
    }

    @TestTemplate
    public void testWriteProjection() throws IOException {
        ((AbstractStringAssert)Assumptions.assumeThat((String)spark.version()).as("Not supported in Spark 3; analysis requires all columns are present", new Object[0])).startsWith((CharSequence)"2");
        File parent = this.temp.resolve(this.format.toString()).toFile();
        File location = new File(parent, "test");
        String targetLocation = this.locationWithBranch(location);
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        Table table = tables.create(SCHEMA, spec, location.toString());
        ArrayList expected = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, null), new SimpleRecord(2, null), new SimpleRecord(3, null)});
        Dataset df = spark.createDataFrame((List)expected, SimpleRecord.class);
        df.select("id", new String[0]).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(location.toString());
        this.createBranch(table);
        table.refresh();
        Dataset result = spark.read().format("iceberg").load(targetLocation);
        List actual = result.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ((ListAssert)Assertions.assertThat((List)actual).as("Number of rows should match", new Object[0])).hasSameSizeAs((Iterable)expected);
        ((ListAssert)Assertions.assertThat((List)actual).as("Result rows should match", new Object[0])).isEqualTo((Object)expected);
    }

    @TestTemplate
    public void testWriteProjectionWithMiddle() throws IOException {
        ((AbstractStringAssert)Assumptions.assumeThat((String)spark.version()).as("Not supported in Spark 3; analysis requires all columns are present", new Object[0])).startsWith((CharSequence)"2");
        File parent = this.temp.resolve(this.format.toString()).toFile();
        File location = new File(parent, "test");
        String targetLocation = this.locationWithBranch(location);
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.unpartitioned();
        Schema schema = new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"c1", (Type)Types.IntegerType.get()), Types.NestedField.optional((int)2, (String)"c2", (Type)Types.StringType.get()), Types.NestedField.optional((int)3, (String)"c3", (Type)Types.StringType.get())});
        Table table = tables.create(schema, spec, location.toString());
        ArrayList expected = Lists.newArrayList((Object[])new ThreeColumnRecord[]{new ThreeColumnRecord(1, null, "hello"), new ThreeColumnRecord(2, null, "world"), new ThreeColumnRecord(3, null, null)});
        Dataset df = spark.createDataFrame((List)expected, ThreeColumnRecord.class);
        df.select("c1", new String[]{"c3"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(location.toString());
        this.createBranch(table);
        table.refresh();
        Dataset result = spark.read().format("iceberg").load(targetLocation);
        List actual = result.orderBy("c1", new String[0]).as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
        ((ListAssert)Assertions.assertThat((List)actual).as("Number of rows should match", new Object[0])).hasSameSizeAs((Iterable)expected);
        ((ListAssert)Assertions.assertThat((List)actual).as("Result rows should match", new Object[0])).isEqualTo((Object)expected);
    }

    @TestTemplate
    public void testViewsReturnRecentResults() throws IOException {
        File parent = this.temp.resolve(this.format.toString()).toFile();
        File location = new File(parent, "test");
        String targetLocation = this.locationWithBranch(location);
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.builderFor((Schema)SCHEMA).identity("data").build();
        tables.create(SCHEMA, spec, location.toString());
        ArrayList records = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset df = spark.createDataFrame((List)records, SimpleRecord.class);
        df.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(location.toString());
        Table table = tables.load(location.toString());
        this.createBranch(table);
        Dataset query = spark.read().format("iceberg").load(targetLocation).where("id = 1");
        query.createOrReplaceTempView("tmp");
        List actual1 = spark.table("tmp").as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ArrayList expected1 = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a")});
        ((ListAssert)Assertions.assertThat((List)actual1).as("Number of rows should match", new Object[0])).hasSameSizeAs((Iterable)expected1);
        ((ListAssert)Assertions.assertThat((List)actual1).as("Result rows should match", new Object[0])).isEqualTo((Object)expected1);
        df.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(targetLocation);
        List actual2 = spark.table("tmp").as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ArrayList expected2 = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(1, "a")});
        ((ListAssert)Assertions.assertThat((List)actual2).as("Number of rows should match", new Object[0])).hasSameSizeAs((Iterable)expected2);
        ((ListAssert)Assertions.assertThat((List)actual2).as("Result rows should match", new Object[0])).isEqualTo((Object)expected2);
    }

    public void partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType option) throws IOException {
        File parent = this.temp.resolve(this.format.toString()).toFile();
        File location = new File(parent, "test");
        String targetLocation = this.locationWithBranch(location);
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.builderFor((Schema)SCHEMA).identity("data").build();
        ImmutableMap properties = ImmutableMap.of((Object)"write.distribution-mode", (Object)"none");
        Table table = tables.create(SCHEMA, spec, (Map)properties, location.toString());
        ArrayList expected = Lists.newArrayListWithCapacity((int)8000);
        for (int i = 0; i < 2000; ++i) {
            expected.add(new SimpleRecord(i, "a"));
            expected.add(new SimpleRecord(i, "b"));
            expected.add(new SimpleRecord(i, "c"));
            expected.add(new SimpleRecord(i, "d"));
        }
        Dataset df = spark.createDataFrame((List)expected, SimpleRecord.class);
        switch (option) {
            case NONE: {
                df.select("id", new String[]{"data"}).sort("data", new String[0]).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).option("target-file-size-bytes", 4L).save(location.toString());
                break;
            }
            case TABLE: {
                table.updateProperties().set("write.spark.fanout.enabled", "true").commit();
                df.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).option("target-file-size-bytes", 4L).save(location.toString());
                break;
            }
            case JOB: {
                df.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).option("target-file-size-bytes", 4L).option("fanout-enabled", true).save(location.toString());
                break;
            }
        }
        this.createBranch(table);
        table.refresh();
        Dataset result = spark.read().format("iceberg").load(targetLocation);
        List actual = result.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ((ListAssert)Assertions.assertThat((List)actual).as("Number of rows should match", new Object[0])).hasSameSizeAs((Iterable)expected);
        ((ListAssert)Assertions.assertThat((List)actual).as("Result rows should match", new Object[0])).isEqualTo((Object)expected);
        ArrayList files = Lists.newArrayList();
        for (ManifestFile manifest : SnapshotUtil.latestSnapshot((Table)table, (String)this.branch).allManifests(table.io())) {
            for (DataFile file : ManifestFiles.read((ManifestFile)manifest, (FileIO)table.io())) {
                files.add(file);
            }
        }
        ((ListAssert)Assertions.assertThat((List)files).as("Should have 8 DataFiles", new Object[0])).hasSize(8);
        ((ListAssert)Assertions.assertThat(files.stream()).as("All DataFiles contain 1000 rows", new Object[0])).allMatch(d -> d.recordCount() == 1000L);
    }

    @TestTemplate
    public void testCommitUnknownException() throws IOException {
        File parent = this.temp.resolve(this.format.toString()).toFile();
        File location = new File(parent, "commitunknown");
        String targetLocation = this.locationWithBranch(location);
        HadoopTables tables = new HadoopTables(CONF);
        PartitionSpec spec = PartitionSpec.builderFor((Schema)SCHEMA).identity("data").build();
        Table table = tables.create(SCHEMA, spec, location.toString());
        ArrayList records = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")});
        Dataset df = spark.createDataFrame((List)records, SimpleRecord.class);
        df.select("id", new String[]{"data"}).write().format("iceberg").option("write-format", this.format.toString()).mode(SaveMode.Append).save(location.toString());
        this.createBranch(table);
        table.refresh();
        ArrayList records2 = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(4, "d"), new SimpleRecord(5, "e"), new SimpleRecord(6, "f")});
        Dataset df2 = spark.createDataFrame((List)records2, SimpleRecord.class);
        AppendFiles append = table.newFastAppend();
        if (this.branch != null) {
            append.toBranch(this.branch);
        }
        AppendFiles spyAppend = (AppendFiles)Mockito.spy((Object)append);
        ((AppendFiles)Mockito.doAnswer(invocation -> {
            append.commit();
            throw new CommitStateUnknownException((Throwable)new RuntimeException("Datacenter on Fire"));
        }).when((Object)spyAppend)).commit();
        Table spyTable = (Table)Mockito.spy((Object)table);
        Mockito.when((Object)spyTable.newAppend()).thenReturn((Object)spyAppend);
        SparkTable sparkTable = new SparkTable(spyTable, false);
        String manualTableName = "unknown_exception";
        ManualSource.setTable(manualTableName, (org.apache.spark.sql.connector.catalog.Table)sparkTable);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> df2.select("id", new String[]{"data"}).sort("data", new String[0]).write().format("org.apache.iceberg.spark.source.ManualSource").option("TABLE_NAME", manualTableName).mode(SaveMode.Append).save(targetLocation)).isInstanceOf(CommitStateUnknownException.class)).hasMessageStartingWith("Datacenter on Fire");
        Dataset result = spark.read().format("iceberg").load(targetLocation);
        List actual = result.orderBy("id", new String[0]).as(Encoders.bean(SimpleRecord.class)).collectAsList();
        ((ListAssert)Assertions.assertThat((List)actual).as("Number of rows should match", new Object[0])).hasSize(records.size() + records2.size());
        ((ListAssert)Assertions.assertThat((List)actual).describedAs("Result rows should match", new Object[0])).containsExactlyInAnyOrder((Object[])((SimpleRecord[])ImmutableList.builder().addAll((Iterable)records).addAll((Iterable)records2).build().toArray((Object[])new SimpleRecord[0])));
    }

    private String locationWithBranch(File location) {
        if (this.branch == null) {
            return location.toString();
        }
        return location + "#branch_" + this.branch;
    }

    private void createBranch(Table table) {
        if (this.branch != null && !this.branch.equals("main")) {
            table.manageSnapshots().createBranch(this.branch, table.currentSnapshot().snapshotId()).commit();
        }
    }

    public static enum IcebergOptionsType {
        NONE,
        TABLE,
        JOB;

    }
}

