/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.GenericManifestFile;
import org.apache.iceberg.ManifestContent;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestBase;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.flink.TestTableLoader;
import org.apache.iceberg.flink.sink.DeltaManifests;
import org.apache.iceberg.flink.sink.DeltaManifestsSerializer;
import org.apache.iceberg.flink.sink.FlinkAppenderFactory;
import org.apache.iceberg.flink.sink.FlinkManifestUtil;
import org.apache.iceberg.flink.sink.IcebergFilesCommitter;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.ThreadPools;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
public class TestIcebergFilesCommitter
extends TestBase {
    private static final org.apache.hadoop.conf.Configuration CONF = new org.apache.hadoop.conf.Configuration();
    private File flinkManifestFolder;
    @Parameter(index=1)
    private FileFormat format;
    @Parameter(index=2)
    private String branch;

    @Parameters(name="formatVersion = {0}, fileFormat = {1}, branch = {2}")
    protected static List<Object> parameters() {
        return Arrays.asList(new Object[]{1, FileFormat.AVRO, "main"}, new Object[]{2, FileFormat.AVRO, "test-branch"}, new Object[]{1, FileFormat.PARQUET, "main"}, new Object[]{2, FileFormat.PARQUET, "test-branch"}, new Object[]{1, FileFormat.ORC, "main"}, new Object[]{2, FileFormat.ORC, "test-branch"});
    }

    @BeforeEach
    public void setupTable() throws IOException {
        this.flinkManifestFolder = Files.createTempDirectory(this.temp, "flink", new FileAttribute[0]).toFile();
        this.tableDir = Files.createTempDirectory(this.temp, "junit", new FileAttribute[0]).toFile();
        this.metadataDir = new File(this.tableDir, "metadata");
        Assertions.assertThat((boolean)this.tableDir.delete()).isTrue();
        this.table = this.create(SimpleDataUtil.SCHEMA, PartitionSpec.unpartitioned());
        this.table.updateProperties().set("write.format.default", this.format.name()).set("flink.manifests.location", this.flinkManifestFolder.getAbsolutePath()).set("flink.max-continuous-empty-commits", "1").commit();
    }

    @TestTemplate
    public void testCommitTxnWithoutDataFiles() throws Exception {
        long checkpointId = 0L;
        long timestamp = 0L;
        JobID jobId = new JobID();
        try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = this.createStreamSink(jobId);){
            harness.setup();
            harness.open();
            OperatorID operatorId = harness.getOperator().getOperatorID();
            SimpleDataUtil.assertTableRows((Table)this.table, (List<RowData>)Lists.newArrayList(), this.branch);
            this.assertSnapshotSize(0);
            this.assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
            for (int i = 1; i <= 3; ++i) {
                harness.snapshot(++checkpointId, ++timestamp);
                this.assertFlinkManifests(0);
                harness.notifyOfCompletedCheckpoint(checkpointId);
                this.assertFlinkManifests(0);
                this.assertSnapshotSize(i);
                this.assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
            }
        }
    }

    @TestTemplate
    public void testMaxContinuousEmptyCommits() throws Exception {
        this.table.updateProperties().set("flink.max-continuous-empty-commits", "3").commit();
        JobID jobId = new JobID();
        long checkpointId = 0L;
        long timestamp = 0L;
        try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = this.createStreamSink(jobId);){
            harness.setup();
            harness.open();
            this.assertSnapshotSize(0);
            for (int i = 1; i <= 9; ++i) {
                harness.snapshot(++checkpointId, ++timestamp);
                harness.notifyOfCompletedCheckpoint(checkpointId);
                this.assertSnapshotSize(i / 3);
            }
        }
    }

    private WriteResult of(DataFile dataFile) {
        return WriteResult.builder().addDataFiles(new DataFile[]{dataFile}).build();
    }

    @TestTemplate
    public void testCommitTxn() throws Exception {
        long timestamp = 0L;
        JobID jobID = new JobID();
        try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = this.createStreamSink(jobID);){
            harness.setup();
            harness.open();
            OperatorID operatorId = harness.getOperator().getOperatorID();
            this.assertSnapshotSize(0);
            ArrayList rows = Lists.newArrayListWithExpectedSize((int)3);
            for (int i = 1; i <= 3; ++i) {
                RowData rowData = SimpleDataUtil.createRowData(i, "hello" + i);
                DataFile dataFile = this.writeDataFile("data-" + i, (List<RowData>)ImmutableList.of((Object)rowData));
                harness.processElement((Object)this.of(dataFile), ++timestamp);
                rows.add(rowData);
                harness.snapshot((long)i, ++timestamp);
                this.assertFlinkManifests(1);
                harness.notifyOfCompletedCheckpoint((long)i);
                this.assertFlinkManifests(0);
                SimpleDataUtil.assertTableRows((Table)this.table, (List<RowData>)ImmutableList.copyOf((Collection)rows), this.branch);
                this.assertSnapshotSize(i);
                this.assertMaxCommittedCheckpointId(jobID, operatorId, i);
                Assertions.assertThat((Map)SimpleDataUtil.latestSnapshot((Table)this.table, this.branch).summary()).containsEntry((Object)"flink.test", (Object)TestIcebergFilesCommitter.class.getName());
            }
        }
    }

    @TestTemplate
    public void testOrderedEventsBetweenCheckpoints() throws Exception {
        long timestamp = 0L;
        JobID jobId = new JobID();
        try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = this.createStreamSink(jobId);){
            harness.setup();
            harness.open();
            OperatorID operatorId = harness.getOperator().getOperatorID();
            this.assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
            RowData row1 = SimpleDataUtil.createRowData(1, "hello");
            DataFile dataFile1 = this.writeDataFile("data-1", (List<RowData>)ImmutableList.of((Object)row1));
            harness.processElement((Object)this.of(dataFile1), ++timestamp);
            this.assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
            long firstCheckpointId = 1L;
            harness.snapshot(firstCheckpointId, ++timestamp);
            this.assertFlinkManifests(1);
            RowData row2 = SimpleDataUtil.createRowData(2, "world");
            DataFile dataFile2 = this.writeDataFile("data-2", (List<RowData>)ImmutableList.of((Object)row2));
            harness.processElement((Object)this.of(dataFile2), ++timestamp);
            this.assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
            long secondCheckpointId = 2L;
            harness.snapshot(secondCheckpointId, ++timestamp);
            this.assertFlinkManifests(2);
            harness.notifyOfCompletedCheckpoint(firstCheckpointId);
            SimpleDataUtil.assertTableRows((Table)this.table, (List<RowData>)ImmutableList.of((Object)row1), this.branch);
            this.assertMaxCommittedCheckpointId(jobId, operatorId, firstCheckpointId);
            this.assertFlinkManifests(1);
            harness.notifyOfCompletedCheckpoint(secondCheckpointId);
            SimpleDataUtil.assertTableRows((Table)this.table, (List<RowData>)ImmutableList.of((Object)row1, (Object)row2), this.branch);
            this.assertMaxCommittedCheckpointId(jobId, operatorId, secondCheckpointId);
            this.assertFlinkManifests(0);
        }
    }

    @TestTemplate
    public void testDisorderedEventsBetweenCheckpoints() throws Exception {
        long timestamp = 0L;
        JobID jobId = new JobID();
        try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = this.createStreamSink(jobId);){
            harness.setup();
            harness.open();
            OperatorID operatorId = harness.getOperator().getOperatorID();
            this.assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
            RowData row1 = SimpleDataUtil.createRowData(1, "hello");
            DataFile dataFile1 = this.writeDataFile("data-1", (List<RowData>)ImmutableList.of((Object)row1));
            harness.processElement((Object)this.of(dataFile1), ++timestamp);
            this.assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
            long firstCheckpointId = 1L;
            harness.snapshot(firstCheckpointId, ++timestamp);
            this.assertFlinkManifests(1);
            RowData row2 = SimpleDataUtil.createRowData(2, "world");
            DataFile dataFile2 = this.writeDataFile("data-2", (List<RowData>)ImmutableList.of((Object)row2));
            harness.processElement((Object)this.of(dataFile2), ++timestamp);
            this.assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
            long secondCheckpointId = 2L;
            harness.snapshot(secondCheckpointId, ++timestamp);
            this.assertFlinkManifests(2);
            harness.notifyOfCompletedCheckpoint(secondCheckpointId);
            SimpleDataUtil.assertTableRows((Table)this.table, (List<RowData>)ImmutableList.of((Object)row1, (Object)row2), this.branch);
            this.assertMaxCommittedCheckpointId(jobId, operatorId, secondCheckpointId);
            this.assertFlinkManifests(0);
            harness.notifyOfCompletedCheckpoint(firstCheckpointId);
            SimpleDataUtil.assertTableRows((Table)this.table, (List<RowData>)ImmutableList.of((Object)row1, (Object)row2), this.branch);
            this.assertMaxCommittedCheckpointId(jobId, operatorId, secondCheckpointId);
            this.assertFlinkManifests(0);
        }
    }

    @TestTemplate
    public void testRecoveryFromValidSnapshot() throws Exception {
        OperatorSubtaskState snapshot;
        RowData row;
        OperatorID operatorId;
        long checkpointId = 0L;
        long timestamp = 0L;
        ArrayList expectedRows = Lists.newArrayList();
        JobID jobId = new JobID();
        try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = this.createStreamSink(jobId);){
            harness.setup();
            harness.open();
            operatorId = harness.getOperator().getOperatorID();
            this.assertSnapshotSize(0);
            this.assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
            row = SimpleDataUtil.createRowData(1, "hello");
            expectedRows.add(row);
            DataFile dataFile1 = this.writeDataFile("data-1", (List<RowData>)ImmutableList.of((Object)row));
            harness.processElement((Object)this.of(dataFile1), ++timestamp);
            snapshot = harness.snapshot(++checkpointId, ++timestamp);
            this.assertFlinkManifests(1);
            harness.notifyOfCompletedCheckpoint(checkpointId);
            this.assertFlinkManifests(0);
            SimpleDataUtil.assertTableRows((Table)this.table, (List<RowData>)ImmutableList.of((Object)row), this.branch);
            this.assertSnapshotSize(1);
            this.assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
        }
        harness = this.createStreamSink(jobId);
        var10_6 = null;
        try {
            harness.getStreamConfig().setOperatorID(operatorId);
            harness.setup();
            harness.initializeState(snapshot);
            harness.open();
            SimpleDataUtil.assertTableRows((Table)this.table, (List<RowData>)expectedRows, this.branch);
            this.assertSnapshotSize(1);
            this.assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
            row = SimpleDataUtil.createRowData(2, "world");
            expectedRows.add(row);
            DataFile dataFile = this.writeDataFile("data-2", (List<RowData>)ImmutableList.of((Object)row));
            harness.processElement((Object)this.of(dataFile), ++timestamp);
            harness.snapshot(++checkpointId, ++timestamp);
            this.assertFlinkManifests(1);
            harness.notifyOfCompletedCheckpoint(checkpointId);
            this.assertFlinkManifests(0);
            SimpleDataUtil.assertTableRows((Table)this.table, (List<RowData>)expectedRows, this.branch);
            this.assertSnapshotSize(2);
            this.assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
        }
        catch (Throwable throwable) {
            var10_6 = throwable;
            throw throwable;
        }
        finally {
            if (harness != null) {
                TestIcebergFilesCommitter.$closeResource(var10_6, harness);
            }
        }
    }

    @TestTemplate
    public void testRecoveryFromSnapshotWithoutCompletedNotification() throws Exception {
        OperatorSubtaskState snapshot;
        DataFile dataFile;
        RowData row;
        OperatorID operatorId;
        long checkpointId = 0L;
        long timestamp = 0L;
        ArrayList expectedRows = Lists.newArrayList();
        JobID jobId = new JobID();
        try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = this.createStreamSink(jobId);){
            harness.setup();
            harness.open();
            operatorId = harness.getOperator().getOperatorID();
            this.assertSnapshotSize(0);
            this.assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
            row = SimpleDataUtil.createRowData(1, "hello");
            expectedRows.add(row);
            dataFile = this.writeDataFile("data-1", (List<RowData>)ImmutableList.of((Object)row));
            harness.processElement((Object)this.of(dataFile), ++timestamp);
            snapshot = harness.snapshot(++checkpointId, ++timestamp);
            SimpleDataUtil.assertTableRows((Table)this.table, (List<RowData>)ImmutableList.of(), this.branch);
            this.assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
            this.assertFlinkManifests(1);
        }
        harness = this.createStreamSink(jobId);
        var10_6 = null;
        try {
            harness.getStreamConfig().setOperatorID(operatorId);
            harness.setup();
            harness.initializeState(snapshot);
            harness.open();
            this.assertFlinkManifests(0);
            SimpleDataUtil.assertTableRows((Table)this.table, (List<RowData>)expectedRows, this.branch);
            this.assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
            harness.snapshot(++checkpointId, ++timestamp);
            this.assertFlinkManifests(0);
            harness.notifyOfCompletedCheckpoint(checkpointId);
            this.assertFlinkManifests(0);
            SimpleDataUtil.assertTableRows((Table)this.table, (List<RowData>)expectedRows, this.branch);
            this.assertSnapshotSize(2);
            this.assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
            row = SimpleDataUtil.createRowData(2, "world");
            expectedRows.add(row);
            dataFile = this.writeDataFile("data-2", (List<RowData>)ImmutableList.of((Object)row));
            harness.processElement((Object)this.of(dataFile), ++timestamp);
            snapshot = harness.snapshot(++checkpointId, ++timestamp);
            this.assertFlinkManifests(1);
        }
        catch (Throwable throwable) {
            var10_6 = throwable;
            throw throwable;
        }
        finally {
            if (harness != null) {
                TestIcebergFilesCommitter.$closeResource(var10_6, harness);
            }
        }
        JobID newJobId = new JobID();
        try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = this.createStreamSink(newJobId);){
            harness.setup();
            harness.initializeState(snapshot);
            harness.open();
            operatorId = harness.getOperator().getOperatorID();
            this.assertFlinkManifests(0);
            this.assertMaxCommittedCheckpointId(newJobId, operatorId, -1L);
            this.assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
            SimpleDataUtil.assertTableRows((Table)this.table, (List<RowData>)expectedRows, this.branch);
            this.assertSnapshotSize(3);
            RowData row2 = SimpleDataUtil.createRowData(3, "foo");
            expectedRows.add(row2);
            DataFile dataFile2 = this.writeDataFile("data-3", (List<RowData>)ImmutableList.of((Object)row2));
            harness.processElement((Object)this.of(dataFile2), ++timestamp);
            harness.snapshot(++checkpointId, ++timestamp);
            this.assertFlinkManifests(1);
            harness.notifyOfCompletedCheckpoint(checkpointId);
            this.assertFlinkManifests(0);
            SimpleDataUtil.assertTableRows((Table)this.table, (List<RowData>)expectedRows, this.branch);
            this.assertSnapshotSize(4);
            this.assertMaxCommittedCheckpointId(newJobId, operatorId, checkpointId);
        }
    }

    @TestTemplate
    public void testStartAnotherJobToWriteSameTable() throws Exception {
        OperatorID oldOperatorId;
        long checkpointId = 0L;
        long timestamp = 0L;
        ArrayList rows = Lists.newArrayList();
        ArrayList tableRows = Lists.newArrayList();
        JobID oldJobId = new JobID();
        try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = this.createStreamSink(oldJobId);){
            harness.setup();
            harness.open();
            oldOperatorId = harness.getOperator().getOperatorID();
            this.assertSnapshotSize(0);
            this.assertMaxCommittedCheckpointId(oldJobId, oldOperatorId, -1L);
            for (int i = 1; i <= 3; ++i) {
                rows.add(SimpleDataUtil.createRowData(i, "hello" + i));
                tableRows.addAll(rows);
                DataFile dataFile = this.writeDataFile(String.format("data-%d", i), rows);
                harness.processElement((Object)this.of(dataFile), ++timestamp);
                harness.snapshot(++checkpointId, ++timestamp);
                this.assertFlinkManifests(1);
                harness.notifyOfCompletedCheckpoint(checkpointId);
                this.assertFlinkManifests(0);
                SimpleDataUtil.assertTableRows((Table)this.table, (List<RowData>)tableRows, this.branch);
                this.assertSnapshotSize(i);
                this.assertMaxCommittedCheckpointId(oldJobId, oldOperatorId, checkpointId);
            }
        }
        checkpointId = 0L;
        timestamp = 0L;
        JobID newJobId = new JobID();
        try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = this.createStreamSink(newJobId);){
            harness.setup();
            harness.open();
            OperatorID newOperatorId = harness.getOperator().getOperatorID();
            this.assertSnapshotSize(3);
            this.assertMaxCommittedCheckpointId(oldJobId, oldOperatorId, 3L);
            this.assertMaxCommittedCheckpointId(newJobId, newOperatorId, -1L);
            rows.add(SimpleDataUtil.createRowData(2, "world"));
            tableRows.addAll(rows);
            DataFile dataFile = this.writeDataFile("data-new-1", rows);
            harness.processElement((Object)this.of(dataFile), ++timestamp);
            harness.snapshot(++checkpointId, ++timestamp);
            this.assertFlinkManifests(1);
            harness.notifyOfCompletedCheckpoint(checkpointId);
            this.assertFlinkManifests(0);
            SimpleDataUtil.assertTableRows((Table)this.table, (List<RowData>)tableRows, this.branch);
            this.assertSnapshotSize(4);
            this.assertMaxCommittedCheckpointId(newJobId, newOperatorId, checkpointId);
        }
    }

    @TestTemplate
    public void testMultipleJobsWriteSameTable() throws Exception {
        long timestamp = 0L;
        ArrayList tableRows = Lists.newArrayList();
        JobID[] jobs = new JobID[]{new JobID(), new JobID(), new JobID()};
        OperatorID[] operatorIds = new OperatorID[]{new OperatorID(), new OperatorID(), new OperatorID()};
        for (int i = 0; i < 20; ++i) {
            int jobIndex = i % 3;
            int checkpointId = i / 3;
            JobID jobId = jobs[jobIndex];
            OperatorID operatorId = operatorIds[jobIndex];
            try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = this.createStreamSink(jobId);){
                harness.getStreamConfig().setOperatorID(operatorId);
                harness.setup();
                harness.open();
                this.assertSnapshotSize(i);
                this.assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId == 0 ? -1L : (long)checkpointId);
                ArrayList rows = Lists.newArrayList((Object[])new RowData[]{SimpleDataUtil.createRowData(i, "word-" + i)});
                tableRows.addAll(rows);
                DataFile dataFile = this.writeDataFile(String.format("data-%d", i), rows);
                harness.processElement((Object)this.of(dataFile), ++timestamp);
                harness.snapshot((long)(checkpointId + 1), ++timestamp);
                this.assertFlinkManifests(1);
                harness.notifyOfCompletedCheckpoint((long)(checkpointId + 1));
                this.assertFlinkManifests(0);
                SimpleDataUtil.assertTableRows((Table)this.table, (List<RowData>)tableRows, this.branch);
                this.assertSnapshotSize(i + 1);
                this.assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId + 1);
                continue;
            }
        }
    }

    @TestTemplate
    public void testMultipleSinksRecoveryFromValidSnapshot() throws Exception {
        OperatorSubtaskState snapshot2;
        DataFile dataFile2;
        RowData row2;
        OperatorSubtaskState snapshot1;
        DataFile dataFile1;
        RowData row12;
        Throwable throwable;
        OneInputStreamOperatorTestHarness<WriteResult, Void> harness2;
        long checkpointId = 0L;
        long timestamp = 0L;
        ArrayList expectedRows = Lists.newArrayList();
        JobID jobId = new JobID();
        OperatorID operatorId1 = new OperatorID();
        OperatorID operatorId2 = new OperatorID();
        try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness1 = this.createStreamSink(jobId);){
            harness2 = this.createStreamSink(jobId);
            throwable = null;
            try {
                harness1.getStreamConfig().setOperatorID(operatorId1);
                harness1.setup();
                harness1.open();
                harness2.getStreamConfig().setOperatorID(operatorId2);
                harness2.setup();
                harness2.open();
                this.assertSnapshotSize(0);
                this.assertMaxCommittedCheckpointId(jobId, operatorId1, -1L);
                this.assertMaxCommittedCheckpointId(jobId, operatorId2, -1L);
                row12 = SimpleDataUtil.createRowData(1, "hello1");
                expectedRows.add(row12);
                dataFile1 = this.writeDataFile("data-1-1", (List<RowData>)ImmutableList.of((Object)row12));
                harness1.processElement((Object)this.of(dataFile1), ++timestamp);
                snapshot1 = harness1.snapshot(++checkpointId, ++timestamp);
                row2 = SimpleDataUtil.createRowData(1, "hello2");
                expectedRows.add(row2);
                dataFile2 = this.writeDataFile("data-1-2", (List<RowData>)ImmutableList.of((Object)row2));
                harness2.processElement((Object)this.of(dataFile2), ++timestamp);
                snapshot2 = harness2.snapshot(checkpointId, ++timestamp);
                this.assertFlinkManifests(2);
                harness1.notifyOfCompletedCheckpoint(checkpointId);
                this.assertFlinkManifests(1);
                SimpleDataUtil.assertTableRows((Table)this.table, (List<RowData>)ImmutableList.of((Object)row12), this.branch);
                this.assertSnapshotSize(1);
                this.assertMaxCommittedCheckpointId(jobId, operatorId1, checkpointId);
                this.assertMaxCommittedCheckpointId(jobId, operatorId2, -1L);
            }
            catch (Throwable row12) {
                throwable = row12;
                throw row12;
            }
            finally {
                if (harness2 != null) {
                    TestIcebergFilesCommitter.$closeResource(throwable, harness2);
                }
            }
        }
        harness1 = this.createStreamSink(jobId);
        var12_8 = null;
        try {
            harness2 = this.createStreamSink(jobId);
            throwable = null;
            try {
                harness1.getStreamConfig().setOperatorID(operatorId1);
                harness1.setup();
                harness1.initializeState(snapshot1);
                harness1.open();
                harness2.getStreamConfig().setOperatorID(operatorId2);
                harness2.setup();
                harness2.initializeState(snapshot2);
                harness2.open();
                this.assertFlinkManifests(0);
                SimpleDataUtil.assertTableRows((Table)this.table, (List<RowData>)expectedRows, this.branch);
                this.assertSnapshotSize(2);
                this.assertMaxCommittedCheckpointId(jobId, operatorId1, checkpointId);
                this.assertMaxCommittedCheckpointId(jobId, operatorId2, checkpointId);
                row12 = SimpleDataUtil.createRowData(2, "world1");
                expectedRows.add(row12);
                dataFile1 = this.writeDataFile("data-2-1", (List<RowData>)ImmutableList.of((Object)row12));
                harness1.processElement((Object)this.of(dataFile1), ++timestamp);
                harness1.snapshot(++checkpointId, ++timestamp);
                row2 = SimpleDataUtil.createRowData(2, "world2");
                expectedRows.add(row2);
                dataFile2 = this.writeDataFile("data-2-2", (List<RowData>)ImmutableList.of((Object)row2));
                harness2.processElement((Object)this.of(dataFile2), ++timestamp);
                harness2.snapshot(checkpointId, ++timestamp);
                this.assertFlinkManifests(2);
                harness1.notifyOfCompletedCheckpoint(checkpointId);
                harness2.notifyOfCompletedCheckpoint(checkpointId);
                this.assertFlinkManifests(0);
                SimpleDataUtil.assertTableRows((Table)this.table, (List<RowData>)expectedRows, this.branch);
                this.assertSnapshotSize(4);
                this.assertMaxCommittedCheckpointId(jobId, operatorId1, checkpointId);
                this.assertMaxCommittedCheckpointId(jobId, operatorId2, checkpointId);
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (harness2 != null) {
                    TestIcebergFilesCommitter.$closeResource(throwable, harness2);
                }
            }
        }
        catch (Throwable throwable3) {
            var12_8 = throwable3;
            throw throwable3;
        }
        finally {
            if (harness1 != null) {
                TestIcebergFilesCommitter.$closeResource(var12_8, harness1);
            }
        }
    }

    @TestTemplate
    public void testBoundedStream() throws Exception {
        JobID jobId = new JobID();
        try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = this.createStreamSink(jobId);){
            harness.setup();
            harness.open();
            OperatorID operatorId = harness.getOperator().getOperatorID();
            this.assertFlinkManifests(0);
            this.assertSnapshotSize(0);
            this.assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
            ArrayList tableRows = Lists.newArrayList((Object[])new RowData[]{SimpleDataUtil.createRowData(1, "word-1")});
            DataFile dataFile = this.writeDataFile("data-1", tableRows);
            harness.processElement((Object)this.of(dataFile), 1L);
            ((BoundedOneInput)harness.getOneInputOperator()).endInput();
            this.assertFlinkManifests(0);
            SimpleDataUtil.assertTableRows((Table)this.table, (List<RowData>)tableRows, this.branch);
            this.assertSnapshotSize(1);
            this.assertMaxCommittedCheckpointId(jobId, operatorId, Long.MAX_VALUE);
            Assertions.assertThat((Map)SimpleDataUtil.latestSnapshot((Table)this.table, this.branch).summary()).containsEntry((Object)"flink.test", (Object)TestIcebergFilesCommitter.class.getName());
        }
    }

    @TestTemplate
    public void testFlinkManifests() throws Exception {
        long timestamp = 0L;
        long checkpoint = 10L;
        JobID jobId = new JobID();
        try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = this.createStreamSink(jobId);){
            harness.setup();
            harness.open();
            OperatorID operatorId = harness.getOperator().getOperatorID();
            this.assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
            RowData row1 = SimpleDataUtil.createRowData(1, "hello");
            DataFile dataFile1 = this.writeDataFile("data-1", (List<RowData>)ImmutableList.of((Object)row1));
            harness.processElement((Object)this.of(dataFile1), ++timestamp);
            this.assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
            harness.snapshot(10L, ++timestamp);
            List<Path> manifestPaths = this.assertFlinkManifests(1);
            Path manifestPath = manifestPaths.get(0);
            Assertions.assertThat((Path)manifestPath.getFileName()).asString().isEqualTo(String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, 10L, 1));
            List dataFiles = FlinkManifestUtil.readDataFiles((ManifestFile)this.createTestingManifestFile(manifestPath), (FileIO)this.table.io(), (Map)this.table.specs());
            Assertions.assertThat((List)dataFiles).hasSize(1);
            TestHelpers.assertEquals(dataFile1, (ContentFile)dataFiles.get(0));
            harness.notifyOfCompletedCheckpoint(10L);
            SimpleDataUtil.assertTableRows((Table)this.table, (List<RowData>)ImmutableList.of((Object)row1), this.branch);
            this.assertMaxCommittedCheckpointId(jobId, operatorId, 10L);
            this.assertFlinkManifests(0);
        }
    }

    @TestTemplate
    public void testDeleteFiles() throws Exception {
        ((AbstractIntegerAssert)Assumptions.assumeThat((int)this.formatVersion).as("Only support equality-delete in format v2 or later.", new Object[0])).isGreaterThan(1);
        long timestamp = 0L;
        long checkpoint = 10L;
        JobID jobId = new JobID();
        FileAppenderFactory<RowData> appenderFactory = this.createDeletableAppenderFactory();
        try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = this.createStreamSink(jobId);){
            harness.setup();
            harness.open();
            OperatorID operatorId = harness.getOperator().getOperatorID();
            this.assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
            RowData row1 = SimpleDataUtil.createInsert(1, "aaa");
            DataFile dataFile1 = this.writeDataFile("data-file-1", (List<RowData>)ImmutableList.of((Object)row1));
            harness.processElement((Object)this.of(dataFile1), ++timestamp);
            this.assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
            harness.snapshot(checkpoint, ++timestamp);
            List<Path> manifestPaths = this.assertFlinkManifests(1);
            Path manifestPath = manifestPaths.get(0);
            Assertions.assertThat((Path)manifestPath.getFileName()).asString().isEqualTo(String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, checkpoint, 1));
            List dataFiles = FlinkManifestUtil.readDataFiles((ManifestFile)this.createTestingManifestFile(manifestPath), (FileIO)this.table.io(), (Map)this.table.specs());
            Assertions.assertThat((List)dataFiles).hasSize(1);
            TestHelpers.assertEquals(dataFile1, (ContentFile)dataFiles.get(0));
            harness.notifyOfCompletedCheckpoint(checkpoint);
            SimpleDataUtil.assertTableRows((Table)this.table, (List<RowData>)ImmutableList.of((Object)row1), this.branch);
            this.assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
            this.assertFlinkManifests(0);
            RowData row2 = SimpleDataUtil.createInsert(2, "bbb");
            DataFile dataFile2 = this.writeDataFile("data-file-2", (List<RowData>)ImmutableList.of((Object)row2));
            RowData delete1 = SimpleDataUtil.createDelete(1, "aaa");
            DeleteFile deleteFile1 = this.writeEqDeleteFile(appenderFactory, "delete-file-1", (List<RowData>)ImmutableList.of((Object)delete1));
            harness.processElement((Object)WriteResult.builder().addDataFiles(new DataFile[]{dataFile2}).addDeleteFiles(new DeleteFile[]{deleteFile1}).build(), ++timestamp);
            this.assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
            harness.snapshot(++checkpoint, ++timestamp);
            this.assertFlinkManifests(2);
            harness.notifyOfCompletedCheckpoint(checkpoint);
            SimpleDataUtil.assertTableRows((Table)this.table, (List<RowData>)ImmutableList.of((Object)row2), this.branch);
            this.assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
            this.assertFlinkManifests(0);
        }
    }

    @TestTemplate
    public void testCommitTwoCheckpointsInSingleTxn() throws Exception {
        ((AbstractIntegerAssert)Assumptions.assumeThat((int)this.formatVersion).as("Only support equality-delete in format v2 or later.", new Object[0])).isGreaterThan(1);
        long timestamp = 0L;
        long checkpoint = 10L;
        JobID jobId = new JobID();
        FileAppenderFactory<RowData> appenderFactory = this.createDeletableAppenderFactory();
        try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = this.createStreamSink(jobId);){
            harness.setup();
            harness.open();
            OperatorID operatorId = harness.getOperator().getOperatorID();
            this.assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
            RowData insert1 = SimpleDataUtil.createInsert(1, "aaa");
            RowData insert2 = SimpleDataUtil.createInsert(2, "bbb");
            RowData delete3 = SimpleDataUtil.createDelete(3, "ccc");
            DataFile dataFile1 = this.writeDataFile("data-file-1", (List<RowData>)ImmutableList.of((Object)insert1, (Object)insert2));
            DeleteFile deleteFile1 = this.writeEqDeleteFile(appenderFactory, "delete-file-1", (List<RowData>)ImmutableList.of((Object)delete3));
            harness.processElement((Object)WriteResult.builder().addDataFiles(new DataFile[]{dataFile1}).addDeleteFiles(new DeleteFile[]{deleteFile1}).build(), ++timestamp);
            harness.snapshot(checkpoint, ++timestamp);
            RowData insert4 = SimpleDataUtil.createInsert(4, "ddd");
            RowData delete2 = SimpleDataUtil.createDelete(2, "bbb");
            DataFile dataFile2 = this.writeDataFile("data-file-2", (List<RowData>)ImmutableList.of((Object)insert4));
            DeleteFile deleteFile2 = this.writeEqDeleteFile(appenderFactory, "delete-file-2", (List<RowData>)ImmutableList.of((Object)delete2));
            harness.processElement((Object)WriteResult.builder().addDataFiles(new DataFile[]{dataFile2}).addDeleteFiles(new DeleteFile[]{deleteFile2}).build(), ++timestamp);
            harness.snapshot(++checkpoint, ++timestamp);
            harness.notifyOfCompletedCheckpoint(checkpoint);
            SimpleDataUtil.assertTableRows((Table)this.table, (List<RowData>)ImmutableList.of((Object)insert1, (Object)insert4), this.branch);
            this.assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
            this.assertFlinkManifests(0);
            Assertions.assertThat((Iterable)this.table.snapshots()).hasSize(2);
        }
    }

    @TestTemplate
    public void testSpecEvolution() throws Exception {
        OperatorSubtaskState snapshot;
        int specId;
        DataFile dataFile;
        OperatorID operatorId;
        long timestamp = 0L;
        int checkpointId = 0;
        ArrayList rows = Lists.newArrayList();
        JobID jobId = new JobID();
        try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = this.createStreamSink(jobId);){
            harness.setup();
            harness.open();
            operatorId = harness.getOperator().getOperatorID();
            this.assertSnapshotSize(0);
            RowData rowData = SimpleDataUtil.createRowData(++checkpointId, "hello" + checkpointId);
            dataFile = this.writeDataFile("data-" + checkpointId, (List<RowData>)ImmutableList.of((Object)rowData));
            harness.processElement((Object)this.of(dataFile), ++timestamp);
            rows.add(rowData);
            harness.snapshot((long)checkpointId, ++timestamp);
            specId = this.getStagingManifestSpecId((OperatorStateStore)harness.getOperator().getOperatorStateBackend(), checkpointId);
            Assertions.assertThat((int)specId).isEqualTo(this.table.spec().specId());
            harness.notifyOfCompletedCheckpoint((long)checkpointId);
            this.table.refresh();
            PartitionSpec oldSpec = this.table.spec();
            this.table.updateSpec().addField("id").commit();
            rowData = SimpleDataUtil.createRowData(++checkpointId, "hello" + checkpointId);
            dataFile = this.writeDataFile("data-" + checkpointId, (List<RowData>)ImmutableList.of((Object)rowData), oldSpec, null);
            harness.processElement((Object)this.of(dataFile), ++timestamp);
            rows.add(rowData);
            snapshot = harness.snapshot((long)checkpointId, ++timestamp);
            specId = this.getStagingManifestSpecId((OperatorStateStore)harness.getOperator().getOperatorStateBackend(), checkpointId);
            Assertions.assertThat((int)specId).isEqualTo(oldSpec.specId());
            harness.notifyOfCompletedCheckpoint((long)checkpointId);
            this.assertFlinkManifests(0);
            SimpleDataUtil.assertTableRows((Table)this.table, (List<RowData>)ImmutableList.copyOf((Collection)rows), this.branch);
            this.assertSnapshotSize(checkpointId);
            this.assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
        }
        harness = this.createStreamSink(jobId);
        var11_6 = null;
        try {
            harness.getStreamConfig().setOperatorID(operatorId);
            harness.setup();
            harness.initializeState(snapshot);
            harness.open();
            SimpleDataUtil.assertTableRows((Table)this.table, (List<RowData>)rows, this.branch);
            this.assertSnapshotSize(checkpointId);
            this.assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
            RowData row = SimpleDataUtil.createRowData(++checkpointId, "world" + checkpointId);
            PartitionData partition = new PartitionData(this.table.spec().partitionType());
            partition.set(0, (Object)checkpointId);
            dataFile = this.writeDataFile("data-" + checkpointId, (List<RowData>)ImmutableList.of((Object)row), this.table.spec(), (StructLike)partition);
            harness.processElement((Object)this.of(dataFile), ++timestamp);
            rows.add(row);
            harness.snapshot((long)checkpointId, ++timestamp);
            this.assertFlinkManifests(1);
            specId = this.getStagingManifestSpecId((OperatorStateStore)harness.getOperator().getOperatorStateBackend(), checkpointId);
            Assertions.assertThat((int)specId).isEqualTo(this.table.spec().specId());
            harness.notifyOfCompletedCheckpoint((long)checkpointId);
            this.assertFlinkManifests(0);
            SimpleDataUtil.assertTableRows((Table)this.table, (List<RowData>)rows, this.branch);
            this.assertSnapshotSize(checkpointId);
            this.assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
        }
        catch (Throwable throwable) {
            var11_6 = throwable;
            throw throwable;
        }
        finally {
            if (harness != null) {
                TestIcebergFilesCommitter.$closeResource(var11_6, harness);
            }
        }
    }

    private int getStagingManifestSpecId(OperatorStateStore operatorStateStore, long checkPointId) throws Exception {
        ListState checkpointsState = operatorStateStore.getListState(IcebergFilesCommitter.buildStateDescriptor());
        TreeMap statedDataFiles = Maps.newTreeMap((SortedMap)((SortedMap)((Iterable)checkpointsState.get()).iterator().next()));
        DeltaManifests deltaManifests = (DeltaManifests)SimpleVersionedSerialization.readVersionAndDeSerialize((SimpleVersionedSerializer)DeltaManifestsSerializer.INSTANCE, (byte[])((byte[])statedDataFiles.get(checkPointId)));
        return deltaManifests.dataManifest().partitionSpecId();
    }

    private DeleteFile writeEqDeleteFile(FileAppenderFactory<RowData> appenderFactory, String filename, List<RowData> deletes) throws IOException {
        return SimpleDataUtil.writeEqDeleteFile((Table)this.table, this.format, filename, appenderFactory, deletes);
    }

    private DeleteFile writePosDeleteFile(FileAppenderFactory<RowData> appenderFactory, String filename, List<Pair<CharSequence, Long>> positions) throws IOException {
        return SimpleDataUtil.writePosDeleteFile((Table)this.table, this.format, filename, appenderFactory, positions);
    }

    private FileAppenderFactory<RowData> createDeletableAppenderFactory() {
        int[] equalityFieldIds = new int[]{this.table.schema().findField("id").fieldId(), this.table.schema().findField("data").fieldId()};
        return new FlinkAppenderFactory((Table)this.table, this.table.schema(), FlinkSchemaUtil.convert((Schema)this.table.schema()), this.table.properties(), this.table.spec(), equalityFieldIds, this.table.schema(), null);
    }

    private ManifestFile createTestingManifestFile(Path manifestPath) {
        return new GenericManifestFile(manifestPath.toAbsolutePath().toString(), manifestPath.toFile().length(), 0, ManifestContent.DATA, 0L, 0L, Long.valueOf(0L), 0, 0L, 0, 0L, 0, 0L, null, null);
    }

    private List<Path> assertFlinkManifests(int expectedCount) throws IOException {
        List<Path> manifests = Files.list(this.flinkManifestFolder.toPath()).filter(p -> !p.toString().endsWith(".crc")).collect(Collectors.toList());
        Assertions.assertThat(manifests).hasSize(expectedCount);
        return manifests;
    }

    private DataFile writeDataFile(String filename, List<RowData> rows) throws IOException {
        return SimpleDataUtil.writeFile((Table)this.table, this.table.schema(), this.table.spec(), CONF, this.table.location(), this.format.addExtension(filename), rows);
    }

    private DataFile writeDataFile(String filename, List<RowData> rows, PartitionSpec spec, StructLike partition) throws IOException {
        return SimpleDataUtil.writeFile((Table)this.table, this.table.schema(), spec, CONF, this.table.location(), this.format.addExtension(filename), rows, partition);
    }

    private void assertMaxCommittedCheckpointId(JobID jobID, OperatorID operatorID, long expectedId) {
        this.table.refresh();
        long actualId = IcebergFilesCommitter.getMaxCommittedCheckpointId((Table)this.table, (String)jobID.toString(), (String)operatorID.toHexString(), (String)this.branch);
        Assertions.assertThat((long)actualId).isEqualTo(expectedId);
    }

    private void assertSnapshotSize(int expectedSnapshotSize) {
        this.table.refresh();
        Assertions.assertThat((Iterable)this.table.snapshots()).hasSize(expectedSnapshotSize);
    }

    private OneInputStreamOperatorTestHarness<WriteResult, Void> createStreamSink(JobID jobID) throws Exception {
        TestOperatorFactory factory = TestOperatorFactory.of(this.table.location(), this.branch, this.table.spec());
        return new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)factory, TestIcebergFilesCommitter.createEnvironment(jobID));
    }

    private static MockEnvironment createEnvironment(JobID jobID) {
        return new MockEnvironmentBuilder().setTaskName("test task").setManagedMemorySize(32768L).setInputSplitProvider(new MockInputSplitProvider()).setBufferSize(256).setTaskConfiguration(new Configuration()).setExecutionConfig(new ExecutionConfig()).setMaxParallelism(16).setJobID(jobID).build();
    }

    private static class TestOperatorFactory
    extends AbstractStreamOperatorFactory<Void>
    implements OneInputStreamOperatorFactory<WriteResult, Void> {
        private final String tablePath;
        private final String branch;
        private final PartitionSpec spec;

        private TestOperatorFactory(String tablePath, String branch, PartitionSpec spec) {
            this.tablePath = tablePath;
            this.branch = branch;
            this.spec = spec;
        }

        private static TestOperatorFactory of(String tablePath, String branch, PartitionSpec spec) {
            return new TestOperatorFactory(tablePath, branch, spec);
        }

        public <T extends StreamOperator<Void>> T createStreamOperator(StreamOperatorParameters<Void> param) {
            IcebergFilesCommitter committer = new IcebergFilesCommitter((TableLoader)new TestTableLoader(this.tablePath), false, Collections.singletonMap("flink.test", TestIcebergFilesCommitter.class.getName()), Integer.valueOf(ThreadPools.WORKER_THREAD_POOL_SIZE), this.branch, this.spec);
            committer.setup(param.getContainingTask(), param.getStreamConfig(), param.getOutput());
            return (T)committer;
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return IcebergFilesCommitter.class;
        }
    }
}

