/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.compact.changelog;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.types.Either;
import org.apache.flink.util.Preconditions;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.flink.compact.changelog.ChangelogCompactCoordinateOperator;
import org.apache.paimon.flink.compact.changelog.ChangelogCompactTask;
import org.apache.paimon.flink.compact.changelog.ChangelogTaskTypeInfo;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableTypeInfo;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class ChangelogCompactCoordinateOperatorTest {
    @Test
    public void testPrepareSnapshotWithMultipleFiles() throws Exception {
        Options options = new Options();
        options.set(CoreOptions.TARGET_FILE_SIZE, (Object)MemorySize.ofMebiBytes((long)8L));
        ChangelogCompactCoordinateOperator operator = new ChangelogCompactCoordinateOperator(new CoreOptions(options));
        OneInputStreamOperatorTestHarness<Committable, Either<Committable, ChangelogCompactTask>> testHarness = this.createTestHarness(operator);
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)this.createCommittable(1L, BinaryRow.EMPTY_ROW, 0, Collections.emptyList(), Arrays.asList(3, 2, 5, 4))));
        testHarness.processElement(new StreamRecord((Object)this.createCommittable(1L, BinaryRow.EMPTY_ROW, 1, Collections.emptyList(), Arrays.asList(3, 3, 2, 2))));
        testHarness.prepareSnapshotPreBarrier(1L);
        testHarness.processElement(new StreamRecord((Object)this.createCommittable(2L, BinaryRow.EMPTY_ROW, 0, Collections.emptyList(), Arrays.asList(2, 3))));
        testHarness.prepareSnapshotPreBarrier(2L);
        ArrayList output = new ArrayList(testHarness.getOutput());
        Assertions.assertThat(output).hasSize(7);
        HashMap<Integer, List<Integer>> expected = new HashMap<Integer, List<Integer>>();
        expected.put(0, Arrays.asList(3, 2, 5));
        this.assertCompactionTask(output.get(0), 1L, BinaryRow.EMPTY_ROW, new HashMap<Integer, List<Integer>>(), expected);
        expected.clear();
        expected.put(0, Collections.singletonList(4));
        expected.put(1, Arrays.asList(3, 3));
        this.assertCompactionTask(output.get(2), 1L, BinaryRow.EMPTY_ROW, new HashMap<Integer, List<Integer>>(), expected);
        expected.clear();
        expected.put(1, Arrays.asList(2, 2));
        this.assertCompactionTask(output.get(4), 1L, BinaryRow.EMPTY_ROW, new HashMap<Integer, List<Integer>>(), expected);
        expected.clear();
        expected.put(0, Arrays.asList(2, 3));
        this.assertCompactionTask(output.get(6), 2L, BinaryRow.EMPTY_ROW, new HashMap<Integer, List<Integer>>(), expected);
        testHarness.close();
    }

    @Test
    public void testPrepareSnapshotWithSingleFile() throws Exception {
        Options options = new Options();
        options.set(CoreOptions.TARGET_FILE_SIZE, (Object)MemorySize.ofMebiBytes((long)8L));
        ChangelogCompactCoordinateOperator operator = new ChangelogCompactCoordinateOperator(new CoreOptions(options));
        OneInputStreamOperatorTestHarness<Committable, Either<Committable, ChangelogCompactTask>> testHarness = this.createTestHarness(operator);
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)this.createCommittable(1L, BinaryRow.EMPTY_ROW, 0, Arrays.asList(3, 5, 2), Collections.emptyList())));
        testHarness.prepareSnapshotPreBarrier(1L);
        ArrayList output = new ArrayList(testHarness.getOutput());
        Assertions.assertThat(output).hasSize(3);
        HashMap<Integer, List<Integer>> expected = new HashMap<Integer, List<Integer>>();
        expected.put(0, Arrays.asList(3, 5));
        this.assertCompactionTask(output.get(0), 1L, BinaryRow.EMPTY_ROW, expected, new HashMap<Integer, List<Integer>>());
        this.assertCommittable(output.get(2), BinaryRow.EMPTY_ROW, Collections.singletonList(2), Collections.emptyList());
        testHarness.close();
    }

    @Test
    public void testPrepareSnapshotWithMultiplePartitions() throws Exception {
        Options options = new Options();
        options.set(CoreOptions.TARGET_FILE_SIZE, (Object)MemorySize.ofMebiBytes((long)8L));
        ChangelogCompactCoordinateOperator operator = new ChangelogCompactCoordinateOperator(new CoreOptions(options));
        OneInputStreamOperatorTestHarness<Committable, Either<Committable, ChangelogCompactTask>> testHarness = this.createTestHarness(operator);
        Function<Integer, BinaryRow> binaryRow = i -> {
            BinaryRow row = new BinaryRow(1);
            BinaryRowWriter writer = new BinaryRowWriter(row);
            writer.writeInt(0, i.intValue());
            writer.complete();
            return row;
        };
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)this.createCommittable(1L, binaryRow.apply(1), 0, Collections.emptyList(), Arrays.asList(3, 2, 5, 4))));
        testHarness.processElement(new StreamRecord((Object)this.createCommittable(1L, binaryRow.apply(2), 1, Collections.emptyList(), Arrays.asList(3, 3, 2, 2, 3))));
        testHarness.prepareSnapshotPreBarrier(1L);
        testHarness.processElement(new StreamRecord((Object)this.createCommittable(2L, binaryRow.apply(1), 0, Collections.emptyList(), Arrays.asList(2, 3))));
        testHarness.prepareSnapshotPreBarrier(2L);
        ArrayList output = new ArrayList(testHarness.getOutput());
        Assertions.assertThat(output).hasSize(8);
        HashMap<Integer, List<Integer>> expected = new HashMap<Integer, List<Integer>>();
        expected.put(0, Arrays.asList(3, 2, 5));
        this.assertCompactionTask(output.get(0), 1L, binaryRow.apply(1), new HashMap<Integer, List<Integer>>(), expected);
        expected.clear();
        expected.put(1, Arrays.asList(3, 3, 2));
        this.assertCompactionTask(output.get(2), 1L, binaryRow.apply(2), new HashMap<Integer, List<Integer>>(), expected);
        this.assertCommittable(output.get(4), binaryRow.apply(1), Collections.emptyList(), Collections.singletonList(4));
        expected.clear();
        expected.put(1, Arrays.asList(2, 3));
        this.assertCompactionTask(output.get(5), 1L, binaryRow.apply(2), new HashMap<Integer, List<Integer>>(), expected);
        expected.clear();
        expected.put(0, Arrays.asList(2, 3));
        this.assertCompactionTask(output.get(7), 2L, binaryRow.apply(1), new HashMap<Integer, List<Integer>>(), expected);
        testHarness.close();
    }

    @Test
    public void testSkipLargeFiles() throws Exception {
        Options options = new Options();
        options.set(CoreOptions.TARGET_FILE_SIZE, (Object)MemorySize.ofMebiBytes((long)8L));
        ChangelogCompactCoordinateOperator operator = new ChangelogCompactCoordinateOperator(new CoreOptions(options));
        OneInputStreamOperatorTestHarness<Committable, Either<Committable, ChangelogCompactTask>> testHarness = this.createTestHarness(operator);
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)this.createCommittable(1L, BinaryRow.EMPTY_ROW, 0, Collections.emptyList(), Arrays.asList(3, 10, 5, 9))));
        testHarness.prepareSnapshotPreBarrier(1L);
        ArrayList output = new ArrayList(testHarness.getOutput());
        Assertions.assertThat(output).hasSize(2);
        HashMap<Integer, List<Integer>> expected = new HashMap<Integer, List<Integer>>();
        expected.put(0, Arrays.asList(3, 5));
        this.assertCompactionTask(output.get(0), 1L, BinaryRow.EMPTY_ROW, new HashMap<Integer, List<Integer>>(), expected);
        this.assertCommittable(output.get(1), BinaryRow.EMPTY_ROW, Collections.emptyList(), Arrays.asList(10, 9));
        testHarness.close();
    }

    private void assertCommittable(Object o, BinaryRow partition, List<Integer> newFilesChangelogMbs, List<Integer> compactChangelogMbs) {
        StreamRecord record = (StreamRecord)o;
        Assertions.assertThat((boolean)((Either)record.getValue()).isLeft()).isTrue();
        Committable committable = (Committable)((Either)record.getValue()).left();
        Assertions.assertThat((long)committable.checkpointId()).isEqualTo(1L);
        CommitMessageImpl message = (CommitMessageImpl)committable.wrappedCommittable();
        Assertions.assertThat((Object)message.partition()).isEqualTo((Object)partition);
        Assertions.assertThat((int)message.bucket()).isEqualTo(0);
        this.assertSameSizes(message.newFilesIncrement().changelogFiles(), newFilesChangelogMbs);
        this.assertSameSizes(message.compactIncrement().changelogFiles(), compactChangelogMbs);
    }

    private void assertCompactionTask(Object o, long checkpointId, BinaryRow partition, Map<Integer, List<Integer>> newFilesChangelogMbs, Map<Integer, List<Integer>> compactChangelogMbs) {
        int bucket;
        StreamRecord record = (StreamRecord)o;
        Assertions.assertThat((boolean)((Either)record.getValue()).isRight()).isTrue();
        ChangelogCompactTask task = (ChangelogCompactTask)((Either)record.getValue()).right();
        Assertions.assertThat((long)task.checkpointId()).isEqualTo(checkpointId);
        Assertions.assertThat((Object)task.partition()).isEqualTo((Object)partition);
        Assertions.assertThat(task.newFileChangelogFiles().keySet()).isEqualTo(newFilesChangelogMbs.keySet());
        Iterator iterator = task.newFileChangelogFiles().keySet().iterator();
        while (iterator.hasNext()) {
            bucket = (Integer)iterator.next();
            this.assertSameSizes((List)task.newFileChangelogFiles().get(bucket), newFilesChangelogMbs.get(bucket));
        }
        Assertions.assertThat(task.compactChangelogFiles().keySet()).isEqualTo(compactChangelogMbs.keySet());
        iterator = task.compactChangelogFiles().keySet().iterator();
        while (iterator.hasNext()) {
            bucket = (Integer)iterator.next();
            this.assertSameSizes((List)task.compactChangelogFiles().get(bucket), compactChangelogMbs.get(bucket));
        }
    }

    private void assertSameSizes(List<DataFileMeta> metas, List<Integer> mbs) {
        Assertions.assertThat((long[])metas.stream().mapToLong(DataFileMeta::fileSize).toArray()).containsExactlyInAnyOrder(mbs.stream().mapToLong(mb -> MemorySize.ofMebiBytes((long)mb.intValue()).getBytes()).toArray());
    }

    private Committable createCommittable(long checkpointId, BinaryRow partition, int bucket, List<Integer> newFilesChangelogMbs, List<Integer> compactChangelogMbs) {
        CommitMessageImpl message = new CommitMessageImpl(partition, bucket, Integer.valueOf(2), new DataIncrement(Collections.emptyList(), Collections.emptyList(), newFilesChangelogMbs.stream().map(this::createDataFileMetaOfSize).collect(Collectors.toList())), new CompactIncrement(Collections.emptyList(), Collections.emptyList(), compactChangelogMbs.stream().map(this::createDataFileMetaOfSize).collect(Collectors.toList())));
        return new Committable(checkpointId, Committable.Kind.FILE, (Object)message);
    }

    private DataFileMeta createDataFileMetaOfSize(int mb) {
        return DataFileMeta.forAppend((String)UUID.randomUUID().toString(), (long)MemorySize.ofMebiBytes((long)mb).getBytes(), (long)0L, (SimpleStats)SimpleStats.EMPTY_STATS, (long)0L, (long)0L, (long)1L, Collections.emptyList(), null, null, null, null, null, null);
    }

    private OneInputStreamOperatorTestHarness<Committable, Either<Committable, ChangelogCompactTask>> createTestHarness(ChangelogCompactCoordinateOperator operator) throws Exception {
        EitherSerializer serializer = new EitherSerializer(new CommittableTypeInfo().createSerializer(new ExecutionConfig()), new ChangelogTaskTypeInfo().createSerializer(new ExecutionConfig()));
        OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)operator, 1, 1, 0);
        harness.getStreamConfig().setupNetworkInputs(new TypeSerializer[]{(TypeSerializer)Preconditions.checkNotNull((Object)serializer)});
        harness.getStreamConfig().serializeAllConfigs();
        harness.setup((TypeSerializer)serializer);
        return harness;
    }
}

