/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.compact;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.types.Either;
import org.apache.flink.util.Preconditions;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.append.AppendCompactTask;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.flink.compact.AppendPreCommitCompactCoordinatorOperator;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableTypeInfo;
import org.apache.paimon.flink.sink.CompactionTaskTypeInfo;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class AppendPreCommitCompactCoordinatorOperatorTest {
    @Test
    public void testPrepareSnapshotWithMultipleFiles() throws Exception {
        Options options = new Options();
        options.set(CoreOptions.TARGET_FILE_SIZE, (Object)MemorySize.ofMebiBytes((long)8L));
        AppendPreCommitCompactCoordinatorOperator operator = new AppendPreCommitCompactCoordinatorOperator(new CoreOptions(options));
        OneInputStreamOperatorTestHarness<Committable, Either<Committable, Tuple2<Long, AppendCompactTask>>> testHarness = this.createTestHarness(operator);
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)this.createCommittable(1L, BinaryRow.EMPTY_ROW, 3, 5, 1, 2, 3)));
        testHarness.prepareSnapshotPreBarrier(1L);
        testHarness.processElement(new StreamRecord((Object)this.createCommittable(2L, BinaryRow.EMPTY_ROW, 3, 2)));
        testHarness.prepareSnapshotPreBarrier(2L);
        ArrayList output = new ArrayList(testHarness.getOutput());
        Assertions.assertThat(output).hasSize(3);
        this.assertCompactionTask(output.get(0), 1L, BinaryRow.EMPTY_ROW, 3, 5);
        this.assertCompactionTask(output.get(1), 1L, BinaryRow.EMPTY_ROW, 1, 2, 3);
        this.assertCompactionTask(output.get(2), 2L, BinaryRow.EMPTY_ROW, 3, 2);
        testHarness.close();
    }

    @Test
    public void testPrepareSnapshotWithSingleFile() throws Exception {
        Options options = new Options();
        options.set(CoreOptions.TARGET_FILE_SIZE, (Object)MemorySize.ofMebiBytes((long)8L));
        AppendPreCommitCompactCoordinatorOperator operator = new AppendPreCommitCompactCoordinatorOperator(new CoreOptions(options));
        OneInputStreamOperatorTestHarness<Committable, Either<Committable, Tuple2<Long, AppendCompactTask>>> testHarness = this.createTestHarness(operator);
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)this.createCommittable(1L, BinaryRow.EMPTY_ROW, 3, 5, 1)));
        testHarness.prepareSnapshotPreBarrier(1L);
        testHarness.processElement(new StreamRecord((Object)this.createCommittable(2L, BinaryRow.EMPTY_ROW, 4)));
        testHarness.prepareSnapshotPreBarrier(2L);
        ArrayList output = new ArrayList(testHarness.getOutput());
        Assertions.assertThat(output).hasSize(3);
        this.assertCompactionTask(output.get(0), 1L, BinaryRow.EMPTY_ROW, 3, 5);
        this.assertCommittable(output.get(1), 1L, BinaryRow.EMPTY_ROW, 1);
        this.assertCommittable(output.get(2), 2L, BinaryRow.EMPTY_ROW, 4);
        testHarness.close();
    }

    @Test
    public void testPrepareSnapshotWithMultiplePartitions() throws Exception {
        Options options = new Options();
        options.set(CoreOptions.TARGET_FILE_SIZE, (Object)MemorySize.ofMebiBytes((long)8L));
        AppendPreCommitCompactCoordinatorOperator operator = new AppendPreCommitCompactCoordinatorOperator(new CoreOptions(options));
        OneInputStreamOperatorTestHarness<Committable, Either<Committable, Tuple2<Long, AppendCompactTask>>> testHarness = this.createTestHarness(operator);
        Function<Integer, BinaryRow> binaryRow = i -> {
            BinaryRow row = new BinaryRow(1);
            BinaryRowWriter writer = new BinaryRowWriter(row);
            writer.writeInt(0, i.intValue());
            writer.complete();
            return row;
        };
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)this.createCommittable(1L, binaryRow.apply(1), 3, 5, 1, 2, 3)));
        testHarness.processElement(new StreamRecord((Object)this.createCommittable(1L, binaryRow.apply(2), 3, 2, 4, 3)));
        testHarness.prepareSnapshotPreBarrier(1L);
        testHarness.processElement(new StreamRecord((Object)this.createCommittable(2L, binaryRow.apply(2), 3, 2)));
        testHarness.prepareSnapshotPreBarrier(2L);
        ArrayList output = new ArrayList(testHarness.getOutput());
        Assertions.assertThat(output).hasSize(5);
        this.assertCompactionTask(output.get(0), 1L, binaryRow.apply(1), 3, 5);
        this.assertCompactionTask(output.get(1), 1L, binaryRow.apply(2), 3, 2, 4);
        this.assertCompactionTask(output.get(2), 1L, binaryRow.apply(1), 1, 2, 3);
        this.assertCommittable(output.get(3), 1L, binaryRow.apply(2), 3);
        this.assertCompactionTask(output.get(4), 2L, binaryRow.apply(2), 3, 2);
        testHarness.close();
    }

    @Test
    public void testSkipLargeFiles() throws Exception {
        Options options = new Options();
        options.set(CoreOptions.TARGET_FILE_SIZE, (Object)MemorySize.ofMebiBytes((long)8L));
        AppendPreCommitCompactCoordinatorOperator operator = new AppendPreCommitCompactCoordinatorOperator(new CoreOptions(options));
        OneInputStreamOperatorTestHarness<Committable, Either<Committable, Tuple2<Long, AppendCompactTask>>> testHarness = this.createTestHarness(operator);
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)this.createCommittable(1L, BinaryRow.EMPTY_ROW, 8, 3, 5, 9)));
        testHarness.prepareSnapshotPreBarrier(1L);
        ArrayList output = new ArrayList(testHarness.getOutput());
        Assertions.assertThat(output).hasSize(2);
        this.assertCompactionTask(output.get(0), 1L, BinaryRow.EMPTY_ROW, 3, 5);
        this.assertCommittable(output.get(1), 1L, BinaryRow.EMPTY_ROW, 8, 9);
        testHarness.close();
    }

    private void assertCommittable(Object o, long checkpointId, BinaryRow partition, int ... mbs) {
        StreamRecord record = (StreamRecord)o;
        Assertions.assertThat((boolean)((Either)record.getValue()).isLeft()).isTrue();
        Committable committable = (Committable)((Either)record.getValue()).left();
        Assertions.assertThat((long)committable.checkpointId()).isEqualTo(checkpointId);
        CommitMessageImpl message = (CommitMessageImpl)committable.wrappedCommittable();
        Assertions.assertThat((Object)message.partition()).isEqualTo((Object)partition);
        Assertions.assertThat((List)message.newFilesIncrement().deletedFiles()).isEmpty();
        Assertions.assertThat((List)message.newFilesIncrement().changelogFiles()).isEmpty();
        Assertions.assertThat((boolean)message.compactIncrement().isEmpty()).isTrue();
        Assertions.assertThat((boolean)message.indexIncrement().isEmpty()).isTrue();
        Assertions.assertThat(message.newFilesIncrement().newFiles().stream().map(DataFileMeta::fileSize)).hasSameElementsAs((Iterable)Arrays.stream(mbs).mapToObj(i -> MemorySize.ofMebiBytes((long)i).getBytes()).collect(Collectors.toList()));
    }

    private void assertCompactionTask(Object o, long checkpointId, BinaryRow partition, int ... mbs) {
        StreamRecord record = (StreamRecord)o;
        Assertions.assertThat((boolean)((Either)record.getValue()).isRight()).isTrue();
        Assertions.assertThat((Long)((Long)((Tuple2)((Either)record.getValue()).right()).f0)).isEqualTo(checkpointId);
        AppendCompactTask task = (AppendCompactTask)((Tuple2)((Either)record.getValue()).right()).f1;
        Assertions.assertThat((Object)task.partition()).isEqualTo((Object)partition);
        Assertions.assertThat(task.compactBefore().stream().map(DataFileMeta::fileSize)).hasSameElementsAs((Iterable)Arrays.stream(mbs).mapToObj(i -> MemorySize.ofMebiBytes((long)i).getBytes()).collect(Collectors.toList()));
    }

    private Committable createCommittable(long checkpointId, BinaryRow partition, int ... mbs) {
        CommitMessageImpl message = new CommitMessageImpl(partition, 0, Integer.valueOf(-1), new DataIncrement(Arrays.stream(mbs).mapToObj(this::createDataFileMetaOfSize).collect(Collectors.toList()), Collections.emptyList(), Collections.emptyList()), CompactIncrement.emptyIncrement());
        return new Committable(checkpointId, Committable.Kind.FILE, (Object)message);
    }

    private DataFileMeta createDataFileMetaOfSize(int mb) {
        return DataFileMeta.forAppend((String)UUID.randomUUID().toString(), (long)MemorySize.ofMebiBytes((long)mb).getBytes(), (long)0L, (SimpleStats)SimpleStats.EMPTY_STATS, (long)0L, (long)0L, (long)1L, Collections.emptyList(), null, null, null, null);
    }

    private OneInputStreamOperatorTestHarness<Committable, Either<Committable, Tuple2<Long, AppendCompactTask>>> createTestHarness(AppendPreCommitCompactCoordinatorOperator operator) throws Exception {
        EitherSerializer serializer = new EitherSerializer(new CommittableTypeInfo().createSerializer(new ExecutionConfig()), (TypeSerializer)new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.LONG_TYPE_INFO, new CompactionTaskTypeInfo()}).createSerializer((SerializerConfig)new SerializerConfigImpl()));
        OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)operator, 1, 1, 0);
        harness.getStreamConfig().setupNetworkInputs(new TypeSerializer[]{(TypeSerializer)Preconditions.checkNotNull((Object)serializer)});
        harness.getStreamConfig().serializeAllConfigs();
        harness.setup((TypeSerializer)serializer);
        return harness;
    }
}

