/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.function.Consumer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.event.WatermarkEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.MockOutput;
import org.apache.flink.streaming.util.MockStreamConfig;
import org.apache.flink.util.OutputTag;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.sink.LocalMergeOperator;
import org.apache.paimon.mergetree.localmerge.HashMapLocalMerger;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class LocalMergeOperatorTest {
    private LocalMergeOperator operator;

    LocalMergeOperatorTest() {
    }

    @Test
    public void testHashNormal() throws Exception {
        this.prepareHashOperator();
        ArrayList<String> result = new ArrayList<String>();
        this.setOutput(result);
        this.processElement("a", 1);
        this.processElement("b", 1);
        this.processElement("a", 2);
        this.processElement(RowKind.DELETE, "b", 2);
        this.operator.prepareSnapshotPreBarrier(0L);
        Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new String[]{"+I:a->2", "-D:b->2"});
        result.clear();
        this.processElement("c", 1);
        this.processElement("d", 1);
        this.operator.prepareSnapshotPreBarrier(0L);
        Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new String[]{"+I:c->1", "+I:d->1"});
        result.clear();
        HashMap<String, String> expected = new HashMap<String, String>();
        Random rnd = new Random();
        int records = 10000;
        for (int i = 0; i < records; ++i) {
            String key = rnd.nextInt(records) + "";
            expected.put(key, "+I:" + key + "->" + i);
            this.processElement(key, i);
        }
        this.operator.prepareSnapshotPreBarrier(0L);
        Assertions.assertThat(result).containsExactlyInAnyOrderElementsOf(expected.values());
        result.clear();
    }

    @Test
    public void testUserDefineSequence() throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(CoreOptions.SEQUENCE_FIELD.key(), "f1");
        this.prepareHashOperator(options);
        ArrayList<String> result = new ArrayList<String>();
        this.setOutput(result);
        this.processElement("a", 2);
        this.processElement("b", 1);
        this.processElement("a", 1);
        this.operator.prepareSnapshotPreBarrier(0L);
        Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new String[]{"+I:a->2", "+I:b->1"});
        result.clear();
    }

    @Test
    public void testIgnoreUpdateBefore() throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(CoreOptions.IGNORE_UPDATE_BEFORE.key(), "true");
        this.prepareHashOperator(options);
        ArrayList<String> result = new ArrayList<String>();
        this.setOutput(result);
        this.processElement("a", 1);
        this.processElement(RowKind.UPDATE_BEFORE, "a", 1);
        this.operator.prepareSnapshotPreBarrier(0L);
        Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new String[]{"+I:a->1"});
        result.clear();
        this.processElement("a", 1);
        this.processElement(RowKind.DELETE, "a", 1);
        this.operator.prepareSnapshotPreBarrier(1L);
        Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new String[]{"-D:a->1"});
        result.clear();
    }

    @Test
    public void testIgnoreDelete() throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(CoreOptions.IGNORE_DELETE.key(), "true");
        this.prepareHashOperator(options);
        ArrayList<String> result = new ArrayList<String>();
        this.setOutput(result);
        this.processElement("a", 1);
        this.processElement(RowKind.UPDATE_BEFORE, "a", 1);
        this.operator.prepareSnapshotPreBarrier(0L);
        Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new String[]{"+I:a->1"});
        result.clear();
        this.processElement("a", 1);
        this.processElement(RowKind.DELETE, "a", 1);
        this.operator.prepareSnapshotPreBarrier(1L);
        Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new String[]{"+I:a->1"});
        result.clear();
    }

    @Test
    public void testHashSpill() throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(CoreOptions.LOCAL_MERGE_BUFFER_SIZE.key(), "2 m");
        this.prepareHashOperator(options);
        ArrayList<String> result = new ArrayList<String>();
        this.setOutput(result);
        HashMap<String, String> expected = new HashMap<String, String>();
        for (int i = 0; i < 30000; ++i) {
            String key = i + "";
            expected.put(key, "+I:" + key + "->" + i);
            this.processElement(key, i);
        }
        this.operator.prepareSnapshotPreBarrier(0L);
        Assertions.assertThat(result).containsExactlyInAnyOrderElementsOf(expected.values());
        result.clear();
    }

    private void prepareHashOperator() throws Exception {
        this.prepareHashOperator(new HashMap<String, String>());
    }

    private void prepareHashOperator(Map<String, String> options) throws Exception {
        if (!options.containsKey(CoreOptions.LOCAL_MERGE_BUFFER_SIZE.key())) {
            options.put(CoreOptions.LOCAL_MERGE_BUFFER_SIZE.key(), "10 m");
        }
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT()});
        TableSchema schema = new TableSchema(0L, rowType.getFields(), rowType.getFieldCount(), Collections.emptyList(), Collections.singletonList("f0"), options, null);
        this.operator = (LocalMergeOperator)new LocalMergeOperator.Factory(schema).createStreamOperator(new StreamOperatorParameters((StreamTask)new SourceOperatorStreamTask((Environment)new DummyEnvironment()), (StreamConfig)new MockStreamConfig(new Configuration(), 1), (Output)new MockOutput(new ArrayList()), null, null, null));
        this.operator.open();
        Assertions.assertThat((Object)this.operator.merger()).isInstanceOf(HashMapLocalMerger.class);
    }

    private void setOutput(List<String> result) {
        this.operator.setOutput((Output)new TestOutput(row -> result.add(row.getRowKind().shortString() + ":" + row.getString(0) + "->" + row.getInt(1))));
    }

    private void processElement(String key, int value) throws Exception {
        this.processElement(RowKind.INSERT, key, value);
    }

    private void processElement(RowKind rowKind, String key, int value) throws Exception {
        this.operator.processElement(new StreamRecord((Object)GenericRow.ofKind((RowKind)rowKind, (Object[])new Object[]{BinaryString.fromString((String)key), value, value, value, value})));
    }

    private static class TestOutput
    implements Output<StreamRecord<InternalRow>> {
        private final Consumer<InternalRow> consumer;

        private TestOutput(Consumer<InternalRow> consumer) {
            this.consumer = consumer;
        }

        public void emitWatermark(Watermark mark) {
        }

        public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {
        }

        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
        }

        public void emitLatencyMarker(LatencyMarker latencyMarker) {
        }

        public void emitRecordAttributes(RecordAttributes recordAttributes) {
        }

        public void emitWatermark(WatermarkEvent watermarkEvent) {
        }

        public void collect(StreamRecord<InternalRow> record) {
            this.consumer.accept((InternalRow)record.getValue());
        }

        public void close() {
        }
    }
}

