/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.sink.AutoTagForSavepointCommitterOperator;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableStateManager;
import org.apache.paimon.flink.sink.CommitterOperator;
import org.apache.paimon.flink.sink.CommitterOperatorTest;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.utils.SerializableSupplier;
import org.apache.paimon.utils.ThrowingConsumer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class AutoTagForSavepointCommitterOperatorTest
extends CommitterOperatorTest {
    @Test
    public void testAutoTagForSavepoint() throws Exception {
        FileStoreTable table = this.createFileStoreTable();
        OneInputStreamOperatorTestHarness<Committable, Committable> testHarness = this.createRecoverableTestHarness(table);
        testHarness.open();
        StreamTableWrite write = table.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        long checkpointId = 1L;
        long timestamp = 1L;
        this.processCommittable(testHarness, write, checkpointId, timestamp, new InternalRow[]{GenericRow.of((Object[])new Object[]{1, 10L})});
        testHarness.snapshotWithLocalState(checkpointId, timestamp, (SnapshotType)CheckpointType.CHECKPOINT);
        Assertions.assertThat((Long)table.snapshotManager().latestSnapshotId()).isNull();
        testHarness.notifyOfCompletedCheckpoint(checkpointId);
        Assertions.assertThat((long)table.snapshotManager().snapshotCount()).isEqualTo(1L);
        Assertions.assertThat((long)table.tagManager().tagCount()).isEqualTo(0L);
        this.processCommittable(testHarness, write, ++checkpointId, ++timestamp, new InternalRow[]{GenericRow.of((Object[])new Object[]{2, 20L})});
        testHarness.snapshotWithLocalState(checkpointId, timestamp, (SnapshotType)SavepointType.savepoint((SavepointFormatType)SavepointFormatType.CANONICAL));
        Assertions.assertThat((long)table.snapshotManager().snapshotCount()).isEqualTo(1L);
        Assertions.assertThat((long)table.tagManager().tagCount()).isEqualTo(0L);
        this.processCommittable(testHarness, write, ++checkpointId, ++timestamp, new InternalRow[]{GenericRow.of((Object[])new Object[]{3, 20L})});
        testHarness.snapshotWithLocalState(checkpointId, timestamp, (SnapshotType)CheckpointType.CHECKPOINT);
        Assertions.assertThat((long)table.snapshotManager().snapshotCount()).isEqualTo(1L);
        testHarness.notifyOfCompletedCheckpoint(checkpointId);
        testHarness.close();
        Assertions.assertThat((long)table.snapshotManager().snapshotCount()).isEqualTo(3L);
        Assertions.assertThat((long)table.tagManager().tagCount()).isEqualTo(1L);
        Snapshot snapshot = table.snapshotManager().snapshot(2L);
        Assertions.assertThat((Object)snapshot).isNotNull();
        Assertions.assertThat((long)snapshot.id()).isEqualTo(2L);
        SortedMap tags = table.tagManager().tags();
        Assertions.assertThat((Map)tags).containsOnlyKeys((Object[])new Snapshot[]{snapshot});
        Assertions.assertThat((List)((List)tags.get(snapshot))).containsOnly((Object[])new String[]{"savepoint-2"});
    }

    @Test
    public void testRestore() throws Exception {
        FileStoreTable table = this.createFileStoreTable();
        OneInputStreamOperatorTestHarness<Committable, Committable> testHarness = this.createRecoverableTestHarness(table);
        testHarness.open();
        StreamTableWrite write = table.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        long checkpointId = 1L;
        long timestamp = 1L;
        this.processCommittable(testHarness, write, checkpointId, timestamp, new InternalRow[]{GenericRow.of((Object[])new Object[]{1, 10L})});
        OperatorSubtaskState subtaskState = testHarness.snapshotWithLocalState(checkpointId, timestamp, (SnapshotType)SavepointType.savepoint((SavepointFormatType)SavepointFormatType.CANONICAL)).getJobManagerOwnedState();
        Assertions.assertThat((Object)table.snapshotManager().latestSnapshot()).isNull();
        Assertions.assertThat((long)table.tagManager().tagCount()).isEqualTo(0L);
        testHarness.close();
        testHarness = this.createRecoverableTestHarness(table);
        try {
            testHarness.initializeState(subtaskState);
            testHarness.open();
            Assertions.fail((String)"Expecting intentional exception");
        }
        catch (Exception e) {
            Assertions.assertThat((Throwable)e).hasMessageContaining("This exception is intentionally thrown after committing the restored checkpoints. By restarting the job we hope that writers can start writing based on these new commits.");
        }
        testHarness.close();
        Snapshot snapshot = table.snapshotManager().latestSnapshot();
        Assertions.assertThat((Object)snapshot).isNotNull();
        Assertions.assertThat((long)snapshot.id()).isEqualTo(checkpointId);
        SortedMap tags = table.tagManager().tags();
        Assertions.assertThat((Map)tags).containsOnlyKeys((Object[])new Snapshot[]{snapshot});
        Assertions.assertThat((List)((List)tags.get(snapshot))).containsOnly((Object[])new String[]{"savepoint-" + checkpointId});
    }

    @Test
    public void testAbortSavepointAndCleanTag() throws Exception {
        FileStoreTable table = this.createFileStoreTable();
        OneInputStreamOperatorTestHarness<Committable, Committable> testHarness = this.createRecoverableTestHarness(table);
        testHarness.open();
        StreamTableWrite write = table.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        long checkpointId = 1L;
        long timestamp = 1L;
        this.processCommittable(testHarness, write, checkpointId, timestamp, new InternalRow[]{GenericRow.of((Object[])new Object[]{1, 10L})});
        testHarness.snapshotWithLocalState(checkpointId, timestamp, (SnapshotType)SavepointType.savepoint((SavepointFormatType)SavepointFormatType.CANONICAL));
        Assertions.assertThat((long)table.snapshotManager().snapshotCount()).isEqualTo(0L);
        Assertions.assertThat((long)table.tagManager().tagCount()).isEqualTo(0L);
        this.processCommittable(testHarness, write, ++checkpointId, timestamp, new InternalRow[]{GenericRow.of((Object[])new Object[]{1, 10L})});
        testHarness.snapshotWithLocalState(checkpointId, timestamp, (SnapshotType)CheckpointType.CHECKPOINT);
        testHarness.notifyOfCompletedCheckpoint(checkpointId);
        Assertions.assertThat((long)table.snapshotManager().snapshotCount()).isEqualTo(2L);
        Assertions.assertThat((long)table.tagManager().tagCount()).isEqualTo(1L);
        testHarness.getOneInputOperator().notifyCheckpointAborted(1L);
        testHarness.close();
        Assertions.assertThat((long)table.snapshotManager().snapshotCount()).isEqualTo(2L);
        Assertions.assertThat((long)table.tagManager().tagCount()).isEqualTo(0L);
    }

    private void processCommittable(OneInputStreamOperatorTestHarness<Committable, Committable> testHarness, StreamTableWrite write, long checkpointId, long timestamp, InternalRow ... rows) throws Exception {
        for (InternalRow row : rows) {
            write.write(row);
        }
        for (CommitMessage committable : write.prepareCommit(false, checkpointId)) {
            testHarness.processElement((Object)new Committable(checkpointId, Committable.Kind.FILE, (Object)committable), timestamp);
        }
    }

    @Override
    protected OneInputStreamOperator<Committable, Committable> createCommitterOperator(FileStoreTable table, String commitUser, CommittableStateManager<ManifestCommittable> committableStateManager) {
        return new AutoTagForSavepointCommitterOperator((CommitterOperator)super.createCommitterOperator(table, commitUser, committableStateManager), () -> ((FileStoreTable)table).snapshotManager(), () -> ((FileStoreTable)table).tagManager(), (SerializableSupplier & Serializable)() -> table.store().newTagDeletion(), (SerializableSupplier & Serializable)() -> table.store().createTagCallbacks(), table.store().options().tagDefaultTimeRetained());
    }

    @Override
    protected OneInputStreamOperator<Committable, Committable> createCommitterOperator(FileStoreTable table, String commitUser, CommittableStateManager<ManifestCommittable> committableStateManager, ThrowingConsumer<StateInitializationContext, Exception> initializeFunction) {
        return new AutoTagForSavepointCommitterOperator((CommitterOperator)super.createCommitterOperator(table, commitUser, committableStateManager, initializeFunction), () -> ((FileStoreTable)table).snapshotManager(), () -> ((FileStoreTable)table).tagManager(), (SerializableSupplier & Serializable)() -> table.store().newTagDeletion(), (SerializableSupplier & Serializable)() -> table.store().createTagCallbacks(), table.store().options().tagDefaultTimeRetained());
    }
}

