/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink.shuffle;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.MockOutput;
import org.apache.flink.streaming.util.MockStreamConfig;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.iceberg.flink.sink.shuffle.DataStatistics;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsEvent;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsOperator;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsOrRecord;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsOrRecordSerializer;
import org.apache.iceberg.flink.sink.shuffle.MapDataStatistics;
import org.apache.iceberg.flink.sink.shuffle.MapDataStatisticsSerializer;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class TestDataStatisticsOperator {
    private final RowType rowType = RowType.of((LogicalType[])new LogicalType[]{new VarCharType()});
    private final TypeSerializer<RowData> rowSerializer = new RowDataSerializer(this.rowType);
    private final GenericRowData genericRowDataA = GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"a")});
    private final GenericRowData genericRowDataB = GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"b")});
    private final BinaryRowData binaryRowDataA = new RowDataSerializer(this.rowType).toBinaryRow((RowData)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"a")}));
    private final BinaryRowData binaryRowDataB = new RowDataSerializer(this.rowType).toBinaryRow((RowData)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"b")}));
    private final BinaryRowData binaryRowDataC = new RowDataSerializer(this.rowType).toBinaryRow((RowData)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"c")}));
    private final TypeSerializer<DataStatistics<MapDataStatistics, Map<RowData, Long>>> statisticsSerializer = MapDataStatisticsSerializer.fromKeySerializer(this.rowSerializer);
    private DataStatisticsOperator<MapDataStatistics, Map<RowData, Long>> operator;

    private Environment getTestingEnvironment() {
        return new StreamMockEnvironment(new Configuration(), new Configuration(), new ExecutionConfig(), 1L, new MockInputSplitProvider(), 1, (TaskStateManager)new TestTaskStateManager());
    }

    @Before
    public void before() throws Exception {
        this.operator = this.createOperator();
        Environment env = this.getTestingEnvironment();
        this.operator.setup((StreamTask)new OneInputStreamTask(env), (StreamConfig)new MockStreamConfig(new Configuration(), 1), (Output)new MockOutput((Collection)Lists.newArrayList()));
    }

    private DataStatisticsOperator<MapDataStatistics, Map<RowData, Long>> createOperator() {
        MockOperatorEventGateway mockGateway = new MockOperatorEventGateway();
        KeySelector<RowData, RowData> keySelector = new KeySelector<RowData, RowData>(){
            private static final long serialVersionUID = 7662520075515707428L;

            public RowData getKey(RowData value) {
                return value;
            }
        };
        return new DataStatisticsOperator("testOperator", (KeySelector)keySelector, (OperatorEventGateway)mockGateway, this.statisticsSerializer);
    }

    @After
    public void clean() throws Exception {
        this.operator.close();
    }

    @Test
    public void testProcessElement() throws Exception {
        try (OneInputStreamOperatorTestHarness<RowData, DataStatisticsOrRecord<MapDataStatistics, Map<RowData, Long>>> testHarness = this.createHarness(this.operator);){
            StateInitializationContext stateContext = this.getStateContext();
            this.operator.initializeState(stateContext);
            this.operator.processElement(new StreamRecord((Object)this.genericRowDataA));
            this.operator.processElement(new StreamRecord((Object)this.genericRowDataA));
            this.operator.processElement(new StreamRecord((Object)this.genericRowDataB));
            Assertions.assertThat((Object)this.operator.localDataStatistics()).isInstanceOf(MapDataStatistics.class);
            MapDataStatistics mapDataStatistics = (MapDataStatistics)this.operator.localDataStatistics();
            Map statsMap = mapDataStatistics.statistics();
            Assertions.assertThat((Map)statsMap).hasSize(2);
            Assertions.assertThat((Map)statsMap).containsExactlyInAnyOrderEntriesOf((Map)ImmutableMap.of((Object)this.genericRowDataA, (Object)2L, (Object)this.genericRowDataB, (Object)1L));
            testHarness.endInput();
        }
    }

    @Test
    public void testOperatorOutput() throws Exception {
        try (OneInputStreamOperatorTestHarness<RowData, DataStatisticsOrRecord<MapDataStatistics, Map<RowData, Long>>> testHarness = this.createHarness(this.operator);){
            testHarness.processElement(new StreamRecord((Object)this.genericRowDataA));
            testHarness.processElement(new StreamRecord((Object)this.genericRowDataB));
            testHarness.processElement(new StreamRecord((Object)this.genericRowDataB));
            List recordsOutput = testHarness.extractOutputValues().stream().filter(DataStatisticsOrRecord::hasRecord).map(DataStatisticsOrRecord::record).collect(Collectors.toList());
            Assertions.assertThat(recordsOutput).containsExactlyInAnyOrderElementsOf((Iterable)ImmutableList.of((Object)this.genericRowDataA, (Object)this.genericRowDataB, (Object)this.genericRowDataB));
        }
    }

    @Test
    public void testRestoreState() throws Exception {
        OperatorSubtaskState snapshot;
        try (OneInputStreamOperatorTestHarness<RowData, DataStatisticsOrRecord<MapDataStatistics, Map<RowData, Long>>> testHarness1 = this.createHarness(this.operator);){
            MapDataStatistics mapDataStatistics = new MapDataStatistics();
            mapDataStatistics.add((RowData)this.binaryRowDataA);
            mapDataStatistics.add((RowData)this.binaryRowDataA);
            mapDataStatistics.add((RowData)this.binaryRowDataB);
            mapDataStatistics.add((RowData)this.binaryRowDataC);
            this.operator.handleOperatorEvent((OperatorEvent)DataStatisticsEvent.create((long)0L, (DataStatistics)mapDataStatistics, this.statisticsSerializer));
            Assertions.assertThat((Object)this.operator.globalDataStatistics()).isInstanceOf(MapDataStatistics.class);
            Assertions.assertThat((Map)((Map)this.operator.globalDataStatistics().statistics())).containsExactlyInAnyOrderEntriesOf((Map)ImmutableMap.of((Object)this.binaryRowDataA, (Object)2L, (Object)this.binaryRowDataB, (Object)1L, (Object)this.binaryRowDataC, (Object)1L));
            snapshot = testHarness1.snapshot(1L, 0L);
        }
        DataStatisticsOperator<MapDataStatistics, Map<RowData, Long>> restoredOperator = this.createOperator();
        try (OneInputStreamOperatorTestHarness testHarness2 = new OneInputStreamOperatorTestHarness(restoredOperator, 2, 2, 1);){
            testHarness2.setup();
            testHarness2.initializeState(snapshot);
            Assertions.assertThat((Object)restoredOperator.globalDataStatistics()).isInstanceOf(MapDataStatistics.class);
            Assertions.assertThat((Map)((Map)restoredOperator.globalDataStatistics().statistics())).containsExactlyInAnyOrderEntriesOf((Map)ImmutableMap.of((Object)this.binaryRowDataA, (Object)2L, (Object)this.binaryRowDataB, (Object)1L, (Object)this.binaryRowDataC, (Object)1L));
        }
    }

    private StateInitializationContext getStateContext() throws Exception {
        MockEnvironment env = new MockEnvironmentBuilder().build();
        HashMapStateBackend abstractStateBackend = new HashMapStateBackend();
        CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
        OperatorStateBackend operatorStateStore = abstractStateBackend.createOperatorStateBackend((Environment)env, "test-operator", Collections.emptyList(), cancelStreamRegistry);
        return new StateInitializationContextImpl(null, (OperatorStateStore)operatorStateStore, null, null, null);
    }

    private OneInputStreamOperatorTestHarness<RowData, DataStatisticsOrRecord<MapDataStatistics, Map<RowData, Long>>> createHarness(DataStatisticsOperator<MapDataStatistics, Map<RowData, Long>> dataStatisticsOperator) throws Exception {
        OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness(dataStatisticsOperator, 1, 1, 0);
        harness.setup((TypeSerializer)new DataStatisticsOrRecordSerializer(MapDataStatisticsSerializer.fromKeySerializer(this.rowSerializer), this.rowSerializer));
        harness.open();
        return harness;
    }
}

