/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink.shuffle;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.EventReceivingTasks;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.iceberg.flink.sink.shuffle.DataStatistics;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsCoordinator;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsCoordinatorProvider;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsEvent;
import org.apache.iceberg.flink.sink.shuffle.MapDataStatistics;
import org.apache.iceberg.flink.sink.shuffle.MapDataStatisticsSerializer;
import org.apache.iceberg.flink.sink.shuffle.TestDataStatisticsCoordinator;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;

public class TestDataStatisticsCoordinatorProvider {
    private static final OperatorID OPERATOR_ID = new OperatorID();
    private static final int NUM_SUBTASKS = 1;
    private DataStatisticsCoordinatorProvider<MapDataStatistics, Map<RowData, Long>> provider;
    private EventReceivingTasks receivingTasks;
    private TypeSerializer<DataStatistics<MapDataStatistics, Map<RowData, Long>>> statisticsSerializer;

    @Before
    public void before() {
        this.statisticsSerializer = MapDataStatisticsSerializer.fromKeySerializer((TypeSerializer)new RowDataSerializer(RowType.of((LogicalType[])new LogicalType[]{new VarCharType()})));
        this.provider = new DataStatisticsCoordinatorProvider("DataStatisticsCoordinatorProvider", OPERATOR_ID, this.statisticsSerializer);
        this.receivingTasks = EventReceivingTasks.createForRunningTasks();
    }

    @Test
    public void testCheckpointAndReset() throws Exception {
        RowType rowType = RowType.of((LogicalType[])new LogicalType[]{new VarCharType()});
        BinaryRowData binaryRowDataA = new RowDataSerializer(rowType).toBinaryRow((RowData)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"a")}));
        BinaryRowData binaryRowDataB = new RowDataSerializer(rowType).toBinaryRow((RowData)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"b")}));
        BinaryRowData binaryRowDataC = new RowDataSerializer(rowType).toBinaryRow((RowData)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"c")}));
        BinaryRowData binaryRowDataD = new RowDataSerializer(rowType).toBinaryRow((RowData)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"d")}));
        BinaryRowData binaryRowDataE = new RowDataSerializer(rowType).toBinaryRow((RowData)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"e")}));
        RecreateOnResetOperatorCoordinator coordinator = (RecreateOnResetOperatorCoordinator)this.provider.create((OperatorCoordinator.Context)new MockOperatorCoordinatorContext(OPERATOR_ID, 1));
        DataStatisticsCoordinator dataStatisticsCoordinator = (DataStatisticsCoordinator)coordinator.getInternalCoordinator();
        coordinator.start();
        TestDataStatisticsCoordinator.setAllTasksReady(1, (DataStatisticsCoordinator<MapDataStatistics, Map<RowData, Long>>)dataStatisticsCoordinator, this.receivingTasks);
        MapDataStatistics checkpoint1Subtask0DataStatistic = new MapDataStatistics();
        checkpoint1Subtask0DataStatistic.add((RowData)binaryRowDataA);
        checkpoint1Subtask0DataStatistic.add((RowData)binaryRowDataB);
        checkpoint1Subtask0DataStatistic.add((RowData)binaryRowDataC);
        DataStatisticsEvent checkpoint1Subtask0DataStatisticEvent = DataStatisticsEvent.create((long)1L, (DataStatistics)checkpoint1Subtask0DataStatistic, this.statisticsSerializer);
        coordinator.handleEventFromOperator(0, 0, (OperatorEvent)checkpoint1Subtask0DataStatisticEvent);
        TestDataStatisticsCoordinator.waitForCoordinatorToProcessActions((DataStatisticsCoordinator<MapDataStatistics, Map<RowData, Long>>)dataStatisticsCoordinator);
        MapDataStatistics checkpoint1GlobalDataStatistics = (MapDataStatistics)dataStatisticsCoordinator.completedStatistics().dataStatistics();
        Assertions.assertThat((Map)checkpoint1GlobalDataStatistics.statistics()).isEqualTo((Object)checkpoint1Subtask0DataStatistic.statistics());
        byte[] checkpoint1Bytes = this.waitForCheckpoint(1L, (DataStatisticsCoordinator<MapDataStatistics, Map<RowData, Long>>)dataStatisticsCoordinator);
        MapDataStatistics checkpoint2Subtask0DataStatistic = new MapDataStatistics();
        checkpoint2Subtask0DataStatistic.add((RowData)binaryRowDataD);
        checkpoint2Subtask0DataStatistic.add((RowData)binaryRowDataE);
        checkpoint2Subtask0DataStatistic.add((RowData)binaryRowDataE);
        DataStatisticsEvent checkpoint2Subtask0DataStatisticEvent = DataStatisticsEvent.create((long)2L, (DataStatistics)checkpoint2Subtask0DataStatistic, this.statisticsSerializer);
        coordinator.handleEventFromOperator(0, 0, (OperatorEvent)checkpoint2Subtask0DataStatisticEvent);
        TestDataStatisticsCoordinator.waitForCoordinatorToProcessActions((DataStatisticsCoordinator<MapDataStatistics, Map<RowData, Long>>)dataStatisticsCoordinator);
        MapDataStatistics checkpoint2GlobalDataStatistics = (MapDataStatistics)dataStatisticsCoordinator.completedStatistics().dataStatistics();
        Assertions.assertThat((Map)checkpoint2GlobalDataStatistics.statistics()).isEqualTo((Object)checkpoint2Subtask0DataStatistic.statistics());
        this.waitForCheckpoint(2L, (DataStatisticsCoordinator<MapDataStatistics, Map<RowData, Long>>)dataStatisticsCoordinator);
        coordinator.resetToCheckpoint(1L, checkpoint1Bytes);
        DataStatisticsCoordinator restoredDataStatisticsCoordinator = (DataStatisticsCoordinator)coordinator.getInternalCoordinator();
        Assertions.assertThat((Object)dataStatisticsCoordinator).isNotEqualTo((Object)restoredDataStatisticsCoordinator);
        MapDataStatistics restoredAggregateDataStatistics = (MapDataStatistics)restoredDataStatisticsCoordinator.completedStatistics().dataStatistics();
        Assertions.assertThat((Map)restoredAggregateDataStatistics.statistics()).isEqualTo((Object)checkpoint1GlobalDataStatistics.statistics());
    }

    private byte[] waitForCheckpoint(long checkpointId, DataStatisticsCoordinator<MapDataStatistics, Map<RowData, Long>> coordinator) throws InterruptedException, ExecutionException {
        CompletableFuture future = new CompletableFuture();
        coordinator.checkpointCoordinator(checkpointId, future);
        return (byte[])future.get();
    }
}

