/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink.shuffle;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.MockOutput;
import org.apache.flink.streaming.util.MockStreamConfig;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortKey;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.flink.sink.shuffle.DataStatistics;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsEvent;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsOperator;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsOrRecord;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsOrRecordSerializer;
import org.apache.iceberg.flink.sink.shuffle.MapDataStatistics;
import org.apache.iceberg.flink.sink.shuffle.MapDataStatisticsSerializer;
import org.apache.iceberg.flink.sink.shuffle.SortKeySerializer;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class TestDataStatisticsOperator {
    private final Schema schema = new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"id", (Type)Types.StringType.get()), Types.NestedField.optional((int)2, (String)"number", (Type)Types.IntegerType.get())});
    private final SortOrder sortOrder = ((SortOrder.Builder)SortOrder.builderFor((Schema)this.schema).asc("id")).build();
    private final SortKey sortKey = new SortKey(this.schema, this.sortOrder);
    private final RowType rowType = RowType.of((LogicalType[])new LogicalType[]{new VarCharType(), new IntType()});
    private final TypeSerializer<RowData> rowSerializer = new RowDataSerializer(this.rowType);
    private final TypeSerializer<DataStatistics<MapDataStatistics, Map<SortKey, Long>>> statisticsSerializer = MapDataStatisticsSerializer.fromSortKeySerializer((TypeSerializer)new SortKeySerializer(this.schema, this.sortOrder));
    private DataStatisticsOperator<MapDataStatistics, Map<SortKey, Long>> operator;

    private Environment getTestingEnvironment() {
        return new StreamMockEnvironment(new Configuration(), new Configuration(), new ExecutionConfig(), 1L, new MockInputSplitProvider(), 1, (TaskStateManager)new TestTaskStateManager());
    }

    @Before
    public void before() throws Exception {
        this.operator = this.createOperator();
        Environment env = this.getTestingEnvironment();
        this.operator.setup((StreamTask)new OneInputStreamTask(env), (StreamConfig)new MockStreamConfig(new Configuration(), 1), (Output)new MockOutput((Collection)Lists.newArrayList()));
    }

    private DataStatisticsOperator<MapDataStatistics, Map<SortKey, Long>> createOperator() {
        MockOperatorEventGateway mockGateway = new MockOperatorEventGateway();
        return new DataStatisticsOperator("testOperator", this.schema, this.sortOrder, (OperatorEventGateway)mockGateway, this.statisticsSerializer);
    }

    @After
    public void clean() throws Exception {
        this.operator.close();
    }

    @Test
    public void testProcessElement() throws Exception {
        try (OneInputStreamOperatorTestHarness<RowData, DataStatisticsOrRecord<MapDataStatistics, Map<SortKey, Long>>> testHarness = this.createHarness(this.operator);){
            StateInitializationContext stateContext = this.getStateContext();
            this.operator.initializeState(stateContext);
            this.operator.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"a"), 5})));
            this.operator.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"a"), 3})));
            this.operator.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"b"), 1})));
            Assertions.assertThat((Object)this.operator.localDataStatistics()).isInstanceOf(MapDataStatistics.class);
            SortKey keyA = this.sortKey.copy();
            keyA.set(0, (Object)"a");
            SortKey keyB = this.sortKey.copy();
            keyB.set(0, (Object)"b");
            ImmutableMap expectedMap = ImmutableMap.of((Object)keyA, (Object)2L, (Object)keyB, (Object)1L);
            MapDataStatistics mapDataStatistics = (MapDataStatistics)this.operator.localDataStatistics();
            Map statsMap = mapDataStatistics.statistics();
            Assertions.assertThat((Map)statsMap).hasSize(2);
            Assertions.assertThat((Map)statsMap).containsExactlyInAnyOrderEntriesOf((Map)expectedMap);
            testHarness.endInput();
        }
    }

    @Test
    public void testOperatorOutput() throws Exception {
        try (OneInputStreamOperatorTestHarness<RowData, DataStatisticsOrRecord<MapDataStatistics, Map<SortKey, Long>>> testHarness = this.createHarness(this.operator);){
            testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"a"), 2})));
            testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"b"), 3})));
            testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"b"), 1})));
            List recordsOutput = testHarness.extractOutputValues().stream().filter(DataStatisticsOrRecord::hasRecord).map(DataStatisticsOrRecord::record).collect(Collectors.toList());
            Assertions.assertThat(recordsOutput).containsExactlyInAnyOrderElementsOf((Iterable)ImmutableList.of((Object)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"a"), 2}), (Object)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"b"), 3}), (Object)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"b"), 1})));
        }
    }

    @Test
    public void testRestoreState() throws Exception {
        OperatorSubtaskState snapshot;
        ImmutableMap expectedMap;
        SortKey keyC;
        SortKey keyB;
        SortKey keyA;
        try (OneInputStreamOperatorTestHarness<RowData, DataStatisticsOrRecord<MapDataStatistics, Map<SortKey, Long>>> testHarness1 = this.createHarness(this.operator);){
            MapDataStatistics mapDataStatistics = new MapDataStatistics();
            SortKey key = this.sortKey.copy();
            key.set(0, (Object)"a");
            mapDataStatistics.add(key);
            key.set(0, (Object)"a");
            mapDataStatistics.add(key);
            key.set(0, (Object)"b");
            mapDataStatistics.add(key);
            key.set(0, (Object)"c");
            mapDataStatistics.add(key);
            keyA = this.sortKey.copy();
            keyA.set(0, (Object)"a");
            keyB = this.sortKey.copy();
            keyB.set(0, (Object)"b");
            keyC = this.sortKey.copy();
            keyC.set(0, (Object)"c");
            expectedMap = ImmutableMap.of((Object)keyA, (Object)2L, (Object)keyB, (Object)1L, (Object)keyC, (Object)1L);
            DataStatisticsEvent event = DataStatisticsEvent.create((long)0L, (DataStatistics)mapDataStatistics, this.statisticsSerializer);
            this.operator.handleOperatorEvent((OperatorEvent)event);
            Assertions.assertThat((Object)this.operator.globalDataStatistics()).isInstanceOf(MapDataStatistics.class);
            Assertions.assertThat((Map)((Map)this.operator.globalDataStatistics().statistics())).containsExactlyInAnyOrderEntriesOf((Map)expectedMap);
            snapshot = testHarness1.snapshot(1L, 0L);
        }
        DataStatisticsOperator<MapDataStatistics, Map<SortKey, Long>> restoredOperator = this.createOperator();
        try (OneInputStreamOperatorTestHarness testHarness2 = new OneInputStreamOperatorTestHarness(restoredOperator, 2, 2, 1);){
            testHarness2.setup();
            testHarness2.initializeState(snapshot);
            Assertions.assertThat((Object)restoredOperator.globalDataStatistics()).isInstanceOf(MapDataStatistics.class);
            HashMap restoredStatistics = Maps.newHashMap();
            restoredStatistics.putAll((Map)restoredOperator.globalDataStatistics().statistics());
            keyA = this.sortKey.copy();
            keyA.set(0, (Object)"a");
            keyB = this.sortKey.copy();
            keyB.set(0, (Object)"b");
            keyC = this.sortKey.copy();
            keyC.set(0, (Object)"c");
            expectedMap = ImmutableMap.of((Object)keyA, (Object)2L, (Object)keyB, (Object)1L, (Object)keyC, (Object)1L);
            Assertions.assertThat((Map)restoredStatistics).containsExactlyInAnyOrderEntriesOf((Map)expectedMap);
        }
    }

    private StateInitializationContext getStateContext() throws Exception {
        MockEnvironment env = new MockEnvironmentBuilder().build();
        HashMapStateBackend abstractStateBackend = new HashMapStateBackend();
        CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
        OperatorStateBackend operatorStateStore = abstractStateBackend.createOperatorStateBackend((Environment)env, "test-operator", Collections.emptyList(), cancelStreamRegistry);
        return new StateInitializationContextImpl(null, (OperatorStateStore)operatorStateStore, null, null, null);
    }

    private OneInputStreamOperatorTestHarness<RowData, DataStatisticsOrRecord<MapDataStatistics, Map<SortKey, Long>>> createHarness(DataStatisticsOperator<MapDataStatistics, Map<SortKey, Long>> dataStatisticsOperator) throws Exception {
        OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness(dataStatisticsOperator, 1, 1, 0);
        harness.setup((TypeSerializer)new DataStatisticsOrRecordSerializer(this.statisticsSerializer, this.rowSerializer));
        harness.open();
        return harness;
    }
}

