/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableStateManager;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.flink.sink.CommitterOperatorFactory;
import org.apache.paimon.flink.sink.MultiTableCommittable;
import org.apache.paimon.flink.sink.MultiTableCommittableTypeInfo;
import org.apache.paimon.flink.sink.RestoreAndFailCommittableStateManager;
import org.apache.paimon.flink.sink.StoreMultiCommitter;
import org.apache.paimon.flink.sink.WrappedManifestCommittableSerializer;
import org.apache.paimon.flink.utils.TestingMetricUtils;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.manifest.WrappedManifestCommittable;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SnapshotManager;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class StoreMultiCommitterTest {
    private String initialCommitUser;
    private Path warehouse;
    private CatalogLoader catalogLoader;
    private Catalog catalog;
    private Identifier firstTable;
    private Identifier secondTable;
    private Path firstTablePath;
    private Path secondTablePath;
    @TempDir
    public java.nio.file.Path tempDir;

    StoreMultiCommitterTest() {
    }

    @SafeVarargs
    private final void createTestTables(Catalog catalog, Tuple2<Identifier, Schema> ... tableSpecs) throws Exception {
        for (Tuple2<Identifier, Schema> spec : tableSpecs) {
            catalog.createTable((Identifier)spec.f0, (Schema)spec.f1, false);
        }
    }

    @BeforeEach
    public void beforeEach() throws Exception {
        this.initialCommitUser = UUID.randomUUID().toString();
        this.warehouse = new Path("traceable://" + this.tempDir.toString());
        String databaseName = "test_db";
        this.firstTable = Identifier.create((String)databaseName, (String)"test_table1");
        this.secondTable = Identifier.create((String)databaseName, (String)"test_table2");
        this.catalogLoader = this.createCatalogLoader();
        this.catalog = this.catalogLoader.load();
        this.catalog.createDatabase(databaseName, true);
        RowType rowType1 = RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.BIGINT()}, (String[])new String[]{"a", "b"});
        RowType rowType2 = RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.DOUBLE(), DataTypes.VARCHAR((int)5)}, (String[])new String[]{"a", "b", "c"});
        Options firstOptions = new Options();
        firstOptions.set(CoreOptions.TAG_AUTOMATIC_CREATION, (Object)CoreOptions.TagCreationMode.PROCESS_TIME);
        firstOptions.setString("bucket", "-1");
        Schema firstTableSchema = new Schema(rowType1.getFields(), Collections.emptyList(), Collections.emptyList(), firstOptions.toMap(), "");
        Options secondOptions = new Options();
        secondOptions.setString("bucket", "1");
        secondOptions.setString("bucket-key", "a");
        secondOptions.set(CoreOptions.COMPACTION_MAX_FILE_NUM, (Object)50);
        Schema secondTableSchema = new Schema(rowType2.getFields(), Collections.emptyList(), Collections.emptyList(), secondOptions.toMap(), "");
        this.createTestTables(this.catalog, Tuple2.of((Object)this.firstTable, (Object)firstTableSchema), Tuple2.of((Object)this.secondTable, (Object)secondTableSchema));
        this.firstTablePath = ((FileStoreTable)this.catalog.getTable(this.firstTable)).location();
        this.secondTablePath = ((FileStoreTable)this.catalog.getTable(this.secondTable)).location();
    }

    @AfterEach
    public void after() throws Exception {
        if (this.catalog != null) {
            this.catalog.close();
        }
    }

    @Test
    public void testFailIntentionallyAfterRestore() throws Exception {
        FileStoreTable table = (FileStoreTable)this.catalog.getTable(this.firstTable);
        OneInputStreamOperatorTestHarness<MultiTableCommittable, MultiTableCommittable> testHarness = this.createRecoverableTestHarness();
        testHarness.open();
        StreamTableWrite write = table.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{1, 10L}));
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{2, 20L}));
        long timestamp = 1L;
        for (CommitMessage committable : write.prepareCommit(false, 8L)) {
            testHarness.processElement((Object)this.getMultiTableCommittable(this.firstTable, new Committable(8L, Committable.Kind.FILE, (Object)committable)), timestamp++);
        }
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, timestamp++);
        Assertions.assertThat((Long)table.snapshotManager().latestSnapshotId()).isNull();
        testHarness.close();
        testHarness = this.createRecoverableTestHarness();
        try {
            testHarness.initializeState(snapshot);
            testHarness.open();
            Assertions.fail((String)"Expecting intentional exception");
        }
        catch (Exception e) {
            Assertions.assertThat((Throwable)e).hasMessageContaining("This exception is intentionally thrown after committing the restored checkpoints. By restarting the job we hope that writers can start writing based on these new commits.");
        }
        this.assertResultsForFirstTable(table, "1, 10", "2, 20");
        testHarness.close();
        testHarness = this.createRecoverableTestHarness();
        testHarness.initializeState(snapshot);
        testHarness.open();
        this.assertResultsForFirstTable(table, "1, 10", "2, 20");
        table = (FileStoreTable)this.catalog.getTable(this.secondTable);
        write = table.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{3, 30.0, BinaryString.fromString((String)"s3")}));
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{4, 40.0, BinaryString.fromString((String)"s4")}));
        for (CommitMessage committable : write.prepareCommit(false, 9L)) {
            testHarness.processElement((Object)this.getMultiTableCommittable(this.secondTable, new Committable(9L, Committable.Kind.FILE, (Object)committable)), timestamp++);
        }
        snapshot = testHarness.snapshot(1L, timestamp);
        Assertions.assertThat((Long)table.snapshotManager().latestSnapshotId()).isNull();
        testHarness.close();
        testHarness = this.createRecoverableTestHarness();
        try {
            testHarness.initializeState(snapshot);
            testHarness.open();
            Assertions.fail((String)"Expecting intentional exception");
        }
        catch (Exception e) {
            Assertions.assertThat((Throwable)e).hasMessageContaining("This exception is intentionally thrown after committing the restored checkpoints. By restarting the job we hope that writers can start writing based on these new commits.");
        }
        this.assertResultsForSecondTable(table, "3, 30.0, s3", "4, 40.0, s4");
        testHarness.close();
        testHarness = this.createRecoverableTestHarness();
        testHarness.initializeState(snapshot);
        testHarness.open();
        this.assertResultsForSecondTable(table, "3, 30.0, s3", "4, 40.0, s4");
        testHarness.close();
    }

    @Test
    public void testCheckpointAbort() throws Exception {
        FileStoreTable table1 = (FileStoreTable)this.catalog.getTable(this.firstTable);
        FileStoreTable table2 = (FileStoreTable)this.catalog.getTable(this.secondTable);
        OneInputStreamOperatorTestHarness<MultiTableCommittable, MultiTableCommittable> testHarness = this.createRecoverableTestHarness();
        testHarness.open();
        StreamTableWrite write1 = table1.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        StreamTableWrite write2 = table2.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        long cpId = 0L;
        for (int i = 0; i < 10; ++i) {
            write1.write((InternalRow)GenericRow.of((Object[])new Object[]{1, 10L}));
            write1.write((InternalRow)GenericRow.of((Object[])new Object[]{2, 20L}));
            for (CommitMessage committable : write1.prepareCommit(false, ++cpId)) {
                testHarness.processElement((Object)this.getMultiTableCommittable(this.firstTable, new Committable(cpId, Committable.Kind.FILE, (Object)committable)), 1L);
            }
        }
        testHarness.snapshot(cpId, 1L);
        testHarness.notifyOfCompletedCheckpoint(cpId);
        SnapshotManager snapshotManager1 = new SnapshotManager((FileIO)LocalFileIO.create(), this.firstTablePath);
        SnapshotManager snapshotManager2 = new SnapshotManager((FileIO)LocalFileIO.create(), this.secondTablePath);
        Assertions.assertThat((Long)snapshotManager1.latestSnapshotId()).isEqualTo(cpId);
        Assertions.assertThat((Long)snapshotManager2.latestSnapshotId()).isNull();
        for (int i = 0; i < 10; ++i) {
            write1.write((InternalRow)GenericRow.of((Object[])new Object[]{3, 30L}));
            write1.write((InternalRow)GenericRow.of((Object[])new Object[]{3, 40L}));
            write2.write((InternalRow)GenericRow.of((Object[])new Object[]{3, 30.0, BinaryString.fromString((String)"s3")}));
            write2.write((InternalRow)GenericRow.of((Object[])new Object[]{3, 40.0, BinaryString.fromString((String)"s4")}));
            for (CommitMessage committable : write1.prepareCommit(false, ++cpId)) {
                testHarness.processElement((Object)this.getMultiTableCommittable(this.firstTable, new Committable(cpId, Committable.Kind.FILE, (Object)committable)), 1L);
            }
            for (CommitMessage committable : write2.prepareCommit(false, cpId)) {
                testHarness.processElement((Object)this.getMultiTableCommittable(this.secondTable, new Committable(cpId, Committable.Kind.FILE, (Object)committable)), 1L);
            }
        }
        testHarness.snapshot(cpId, 2L);
        testHarness.notifyOfCompletedCheckpoint(cpId);
        Assertions.assertThat((Long)snapshotManager1.latestSnapshotId()).isEqualTo(20L);
        Assertions.assertThat((Long)snapshotManager2.latestSnapshotId()).isEqualTo(10L);
        testHarness.close();
    }

    @Test
    public void testSnapshotLostWhenFailed() throws Exception {
        FileStoreTable table1 = (FileStoreTable)this.catalog.getTable(this.firstTable);
        FileStoreTable table2 = (FileStoreTable)this.catalog.getTable(this.secondTable);
        StreamTableWrite write1 = table1.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        StreamTableWrite write2 = table2.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        OneInputStreamOperatorTestHarness<MultiTableCommittable, MultiTableCommittable> testHarness = this.createLossyTestHarness();
        testHarness.open();
        long timestamp = 1L;
        StreamWriteBuilder streamWriteBuilder1 = table1.newStreamWriteBuilder().withCommitUser(this.initialCommitUser);
        StreamWriteBuilder streamWriteBuilder2 = table2.newStreamWriteBuilder().withCommitUser(this.initialCommitUser);
        write1.write((InternalRow)GenericRow.of((Object[])new Object[]{1, 10L}));
        write1.write((InternalRow)GenericRow.of((Object[])new Object[]{2, 20L}));
        for (CommitMessage committable : write1.prepareCommit(false, 1L)) {
            testHarness.processElement((Object)this.getMultiTableCommittable(this.firstTable, new Committable(1L, Committable.Kind.FILE, (Object)committable)), timestamp++);
        }
        testHarness.snapshot(1L, timestamp++);
        testHarness.notifyOfCompletedCheckpoint(1L);
        write1.write((InternalRow)GenericRow.of((Object[])new Object[]{3, 30L}));
        write1.write((InternalRow)GenericRow.of((Object[])new Object[]{4, 40L}));
        write2.write((InternalRow)GenericRow.of((Object[])new Object[]{3, 30.0, BinaryString.fromString((String)"s3")}));
        write2.write((InternalRow)GenericRow.of((Object[])new Object[]{3, 40.0, BinaryString.fromString((String)"s4")}));
        for (CommitMessage committable : write1.prepareCommit(false, 2L)) {
            testHarness.processElement((Object)this.getMultiTableCommittable(this.firstTable, new Committable(2L, Committable.Kind.FILE, (Object)committable)), timestamp++);
        }
        for (CommitMessage committable : write2.prepareCommit(false, 2L)) {
            testHarness.processElement((Object)this.getMultiTableCommittable(this.secondTable, new Committable(2L, Committable.Kind.FILE, (Object)committable)), timestamp++);
        }
        OperatorSubtaskState snapshot = testHarness.snapshot(2L, timestamp++);
        write1.close();
        write2.close();
        testHarness.close();
        testHarness = this.createLossyTestHarness();
        testHarness.initializeState(snapshot);
        testHarness.open();
        write1 = streamWriteBuilder1.newWrite();
        write1.write((InternalRow)GenericRow.of((Object[])new Object[]{5, 50L}));
        write1.write((InternalRow)GenericRow.of((Object[])new Object[]{6, 60L}));
        write2 = streamWriteBuilder2.newWrite();
        write2.write((InternalRow)GenericRow.of((Object[])new Object[]{5, 50.0, BinaryString.fromString((String)"s5")}));
        write2.write((InternalRow)GenericRow.of((Object[])new Object[]{6, 60.0, BinaryString.fromString((String)"s6")}));
        for (CommitMessage committable : write1.prepareCommit(false, 3L)) {
            testHarness.processElement((Object)this.getMultiTableCommittable(this.firstTable, new Committable(3L, Committable.Kind.FILE, (Object)committable)), timestamp++);
        }
        for (CommitMessage committable : write2.prepareCommit(false, 2L)) {
            testHarness.processElement((Object)this.getMultiTableCommittable(this.secondTable, new Committable(2L, Committable.Kind.FILE, (Object)committable)), timestamp++);
        }
        testHarness.snapshot(3L, timestamp);
        testHarness.notifyOfCompletedCheckpoint(3L);
        write1.close();
        write2.close();
        testHarness.close();
        this.assertResultsForFirstTable(table1, "1, 10", "2, 20", "5, 50", "6, 60");
        this.assertResultsForSecondTable(table2, "5, 50.0, s5", "6, 60.0, s6");
    }

    @Test
    public void testWatermarkCommit() throws Exception {
        FileStoreTable table1 = (FileStoreTable)this.catalog.getTable(this.firstTable);
        FileStoreTable table2 = (FileStoreTable)this.catalog.getTable(this.secondTable);
        StreamTableWrite write1 = table1.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        StreamTableWrite write2 = table2.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        OneInputStreamOperatorTestHarness<MultiTableCommittable, MultiTableCommittable> testHarness = this.createRecoverableTestHarness();
        testHarness.open();
        long timestamp = 0L;
        long cpId = 1L;
        write1.write((InternalRow)GenericRow.of((Object[])new Object[]{1, 10L}));
        testHarness.processElement((Object)this.getMultiTableCommittable(this.firstTable, new Committable(cpId, Committable.Kind.FILE, write1.prepareCommit(true, cpId).get(0))), timestamp++);
        testHarness.processWatermark(new Watermark(1024L));
        testHarness.snapshot(cpId, timestamp++);
        testHarness.notifyOfCompletedCheckpoint(cpId);
        Assertions.assertThat((Long)Objects.requireNonNull(table1.snapshotManager().latestSnapshot()).watermark()).isEqualTo(1024L);
        Assertions.assertThat((Object)table2.snapshotManager().latestSnapshot()).isNull();
        cpId = 2L;
        write1.write((InternalRow)GenericRow.of((Object[])new Object[]{1, 20L}));
        write2.write((InternalRow)GenericRow.of((Object[])new Object[]{1, 20.0, BinaryString.fromString((String)"s2")}));
        testHarness.processElement((Object)this.getMultiTableCommittable(this.firstTable, new Committable(cpId, Committable.Kind.FILE, write1.prepareCommit(true, cpId).get(0))), timestamp++);
        testHarness.processWatermark(new Watermark(2048L));
        testHarness.snapshot(cpId, timestamp);
        testHarness.notifyOfCompletedCheckpoint(cpId);
        testHarness.close();
        Assertions.assertThat((Long)Objects.requireNonNull(table1.snapshotManager().latestSnapshot()).watermark()).isEqualTo(2048L);
        Assertions.assertThat((Long)Objects.requireNonNull(table1.snapshotManager().latestSnapshot()).watermark()).isEqualTo(2048L);
    }

    @Test
    public void testEmptyCommit() throws Exception {
        FileStoreTable table1 = (FileStoreTable)this.catalog.getTable(this.firstTable);
        FileStoreTable table2 = (FileStoreTable)this.catalog.getTable(this.secondTable);
        StreamTableWrite write1 = table1.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        StreamTableWrite write2 = table2.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        OneInputStreamOperatorTestHarness<MultiTableCommittable, MultiTableCommittable> testHarness = this.createRecoverableTestHarness();
        testHarness.open();
        long timestamp = 0L;
        long cpId = 1L;
        write1.write((InternalRow)GenericRow.of((Object[])new Object[]{1, 20L}));
        write2.write((InternalRow)GenericRow.of((Object[])new Object[]{1, 20.0, BinaryString.fromString((String)"s2")}));
        testHarness.processElement((Object)this.getMultiTableCommittable(this.firstTable, new Committable(cpId, Committable.Kind.FILE, write1.prepareCommit(true, cpId).get(0))), timestamp++);
        testHarness.processElement((Object)this.getMultiTableCommittable(this.secondTable, new Committable(cpId, Committable.Kind.FILE, write2.prepareCommit(true, cpId).get(0))), timestamp++);
        testHarness.processWatermark(new Watermark(2048L));
        testHarness.snapshot(cpId, timestamp++);
        testHarness.notifyOfCompletedCheckpoint(cpId);
        Assertions.assertThat((Long)Objects.requireNonNull(table1.snapshotManager().latestSnapshot()).watermark()).isEqualTo(2048L);
        Assertions.assertThat((Long)Objects.requireNonNull(table2.snapshotManager().latestSnapshot()).watermark()).isEqualTo(2048L);
        testHarness.snapshot(++cpId, timestamp);
        testHarness.notifyOfCompletedCheckpoint(cpId);
        Assertions.assertThat((long)Objects.requireNonNull(table1.snapshotManager().latestSnapshot()).id()).isEqualTo(1L);
        Assertions.assertThat((long)Objects.requireNonNull(table2.snapshotManager().latestSnapshot()).id()).isEqualTo(1L);
        testHarness.close();
    }

    @Test
    public void testCommitMetrics() throws Exception {
        FileStoreTable table1 = (FileStoreTable)this.catalog.getTable(this.firstTable);
        FileStoreTable table2 = (FileStoreTable)this.catalog.getTable(this.secondTable);
        StreamTableWrite write1 = table1.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        StreamTableWrite write2 = table2.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        OneInputStreamOperatorTestHarness<MultiTableCommittable, MultiTableCommittable> testHarness = this.createRecoverableTestHarness();
        testHarness.open();
        long timestamp = 0L;
        long cpId = 1L;
        write1.write((InternalRow)GenericRow.of((Object[])new Object[]{1, 10L}));
        write2.write((InternalRow)GenericRow.of((Object[])new Object[]{1, 1.1, BinaryString.fromString((String)"AAA")}));
        write2.compact(BinaryRow.EMPTY_ROW, 0, false);
        write2.write((InternalRow)GenericRow.of((Object[])new Object[]{1, 1.2, BinaryString.fromString((String)"aaa")}));
        write2.compact(BinaryRow.EMPTY_ROW, 0, false);
        write2.write((InternalRow)GenericRow.of((Object[])new Object[]{2, 2.1, BinaryString.fromString((String)"BBB")}));
        write2.compact(BinaryRow.EMPTY_ROW, 0, true);
        testHarness.processElement((Object)this.getMultiTableCommittable(this.firstTable, new Committable(cpId, Committable.Kind.FILE, write1.prepareCommit(true, cpId).get(0))), timestamp++);
        testHarness.processElement((Object)this.getMultiTableCommittable(this.secondTable, new Committable(cpId, Committable.Kind.FILE, write2.prepareCommit(true, cpId).get(0))), timestamp++);
        testHarness.snapshot(cpId, timestamp);
        testHarness.notifyOfCompletedCheckpoint(cpId);
        OperatorMetricGroup operatorMetricGroup = testHarness.getOperator().getRuntimeContext().getMetricGroup();
        MetricGroup commitMetricGroup1 = operatorMetricGroup.addGroup("paimon").addGroup("table", table1.name()).addGroup("commit");
        MetricGroup commitMetricGroup2 = operatorMetricGroup.addGroup("paimon").addGroup("table", table2.name()).addGroup("commit");
        Assertions.assertThat((Object)TestingMetricUtils.getGauge(commitMetricGroup1, "lastTableFilesAdded").getValue()).isEqualTo((Object)1L);
        Assertions.assertThat((Object)TestingMetricUtils.getGauge(commitMetricGroup1, "lastTableFilesDeleted").getValue()).isEqualTo((Object)0L);
        Assertions.assertThat((Object)TestingMetricUtils.getGauge(commitMetricGroup1, "lastTableFilesAppended").getValue()).isEqualTo((Object)1L);
        Assertions.assertThat((Object)TestingMetricUtils.getGauge(commitMetricGroup1, "lastTableFilesCommitCompacted").getValue()).isEqualTo((Object)0L);
        Assertions.assertThat((Object)TestingMetricUtils.getGauge(commitMetricGroup2, "lastTableFilesAdded").getValue()).isEqualTo((Object)4L);
        Assertions.assertThat((Object)TestingMetricUtils.getGauge(commitMetricGroup2, "lastTableFilesDeleted").getValue()).isEqualTo((Object)3L);
        Assertions.assertThat((Object)TestingMetricUtils.getGauge(commitMetricGroup2, "lastTableFilesAppended").getValue()).isEqualTo((Object)3L);
        Assertions.assertThat((Object)TestingMetricUtils.getGauge(commitMetricGroup2, "lastTableFilesCommitCompacted").getValue()).isEqualTo((Object)4L);
        testHarness.close();
        write1.close();
        write2.close();
    }

    private OneInputStreamOperatorTestHarness<MultiTableCommittable, MultiTableCommittable> createRecoverableTestHarness() throws Exception {
        CommitterOperatorFactory operator = new CommitterOperatorFactory(true, false, this.initialCommitUser, (Committer.Factory & Serializable)context -> new StoreMultiCommitter(this.catalogLoader, context), (CommittableStateManager)new RestoreAndFailCommittableStateManager(WrappedManifestCommittableSerializer::new));
        return this.createTestHarness((CommitterOperatorFactory<MultiTableCommittable, WrappedManifestCommittable>)operator);
    }

    private OneInputStreamOperatorTestHarness<MultiTableCommittable, MultiTableCommittable> createLossyTestHarness() throws Exception {
        CommitterOperatorFactory operator = new CommitterOperatorFactory(true, false, this.initialCommitUser, (Committer.Factory & Serializable)context -> new StoreMultiCommitter(this.catalogLoader, context), (CommittableStateManager)new CommittableStateManager<WrappedManifestCommittable>(){

            public void initializeState(StateInitializationContext context, Committer<?, WrappedManifestCommittable> committer) {
            }

            public void snapshotState(StateSnapshotContext context, List<WrappedManifestCommittable> committables) {
            }
        });
        return this.createTestHarness((CommitterOperatorFactory<MultiTableCommittable, WrappedManifestCommittable>)operator);
    }

    private OneInputStreamOperatorTestHarness<MultiTableCommittable, MultiTableCommittable> createTestHarness(CommitterOperatorFactory<MultiTableCommittable, WrappedManifestCommittable> operatorFactory) throws Exception {
        TypeSerializer serializer = new MultiTableCommittableTypeInfo().createSerializer(new ExecutionConfig());
        OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness(operatorFactory, serializer);
        harness.setup(serializer);
        return harness;
    }

    private CatalogLoader createCatalogLoader() {
        Options catalogOptions = this.createCatalogOptions(this.warehouse);
        return (CatalogLoader & Serializable)() -> CatalogFactory.createCatalog((CatalogContext)CatalogContext.create((Options)catalogOptions));
    }

    private Options createCatalogOptions(Path warehouse) {
        Options conf = new Options();
        conf.set(CatalogOptions.WAREHOUSE, (Object)warehouse.toString());
        conf.set(CatalogOptions.URI, (Object)"");
        return conf;
    }

    protected void assertResultsForFirstTable(FileStoreTable table, String ... expected) {
        TableRead read = table.newReadBuilder().newRead();
        ArrayList actual = new ArrayList();
        table.newReadBuilder().newScan().plan().splits().forEach(s -> {
            try {
                RecordReader recordReader = read.createReader(s);
                RecordReaderIterator it = new RecordReaderIterator(recordReader);
                while (it.hasNext()) {
                    InternalRow row = (InternalRow)it.next();
                    actual.add(row.getInt(0) + ", " + row.getLong(1));
                }
                it.close();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        Collections.sort(actual);
        Assertions.assertThat(actual).isEqualTo(Arrays.asList(expected));
    }

    private void assertResultsForSecondTable(FileStoreTable table, String ... expected) {
        TableRead read = table.newReadBuilder().newRead();
        ArrayList actual = new ArrayList();
        table.newReadBuilder().newScan().plan().splits().forEach(s -> {
            try {
                RecordReader recordReader = read.createReader(s);
                RecordReaderIterator it = new RecordReaderIterator(recordReader);
                while (it.hasNext()) {
                    InternalRow row = (InternalRow)it.next();
                    actual.add(row.getInt(0) + ", " + row.getDouble(1) + ", " + row.getString(2));
                }
                it.close();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        Collections.sort(actual);
        Assertions.assertThat(actual).isEqualTo(Arrays.asList(expected));
    }

    private MultiTableCommittable getMultiTableCommittable(Identifier tableId, Committable committable) {
        return MultiTableCommittable.fromCommittable((Identifier)tableId, (Committable)committable);
    }
}

