/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink.shuffle;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.EventReceivingTasks;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortKey;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.flink.sink.shuffle.DataStatistics;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsCoordinator;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsCoordinatorProvider;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsEvent;
import org.apache.iceberg.flink.sink.shuffle.MapDataStatistics;
import org.apache.iceberg.flink.sink.shuffle.MapDataStatisticsSerializer;
import org.apache.iceberg.flink.sink.shuffle.SortKeySerializer;
import org.apache.iceberg.flink.sink.shuffle.TestDataStatisticsCoordinator;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;

public class TestDataStatisticsCoordinatorProvider {
    private static final OperatorID OPERATOR_ID = new OperatorID();
    private static final int NUM_SUBTASKS = 1;
    private final Schema schema = new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"str", (Type)Types.StringType.get())});
    private final SortOrder sortOrder = ((SortOrder.Builder)SortOrder.builderFor((Schema)this.schema).asc("str")).build();
    private final SortKey sortKey = new SortKey(this.schema, this.sortOrder);
    private final MapDataStatisticsSerializer statisticsSerializer = MapDataStatisticsSerializer.fromSortKeySerializer((TypeSerializer)new SortKeySerializer(this.schema, this.sortOrder));
    private DataStatisticsCoordinatorProvider<MapDataStatistics, Map<SortKey, Long>> provider;
    private EventReceivingTasks receivingTasks;

    @Before
    public void before() {
        this.provider = new DataStatisticsCoordinatorProvider("DataStatisticsCoordinatorProvider", OPERATOR_ID, (TypeSerializer)this.statisticsSerializer);
        this.receivingTasks = EventReceivingTasks.createForRunningTasks();
    }

    @Test
    public void testCheckpointAndReset() throws Exception {
        SortKey keyA = this.sortKey.copy();
        keyA.set(0, (Object)"a");
        SortKey keyB = this.sortKey.copy();
        keyB.set(0, (Object)"b");
        SortKey keyC = this.sortKey.copy();
        keyC.set(0, (Object)"c");
        SortKey keyD = this.sortKey.copy();
        keyD.set(0, (Object)"c");
        SortKey keyE = this.sortKey.copy();
        keyE.set(0, (Object)"c");
        try (RecreateOnResetOperatorCoordinator coordinator = (RecreateOnResetOperatorCoordinator)this.provider.create((OperatorCoordinator.Context)new MockOperatorCoordinatorContext(OPERATOR_ID, 1));){
            DataStatisticsCoordinator dataStatisticsCoordinator = (DataStatisticsCoordinator)coordinator.getInternalCoordinator();
            coordinator.start();
            TestDataStatisticsCoordinator.setAllTasksReady(1, (DataStatisticsCoordinator<MapDataStatistics, Map<SortKey, Long>>)dataStatisticsCoordinator, this.receivingTasks);
            MapDataStatistics checkpoint1Subtask0DataStatistic = new MapDataStatistics();
            checkpoint1Subtask0DataStatistic.add(keyA);
            checkpoint1Subtask0DataStatistic.add(keyB);
            checkpoint1Subtask0DataStatistic.add(keyC);
            DataStatisticsEvent checkpoint1Subtask0DataStatisticEvent = DataStatisticsEvent.create((long)1L, (DataStatistics)checkpoint1Subtask0DataStatistic, (TypeSerializer)this.statisticsSerializer);
            coordinator.handleEventFromOperator(0, 0, (OperatorEvent)checkpoint1Subtask0DataStatisticEvent);
            TestDataStatisticsCoordinator.waitForCoordinatorToProcessActions((DataStatisticsCoordinator<MapDataStatistics, Map<SortKey, Long>>)dataStatisticsCoordinator);
            MapDataStatistics checkpoint1GlobalDataStatistics = (MapDataStatistics)dataStatisticsCoordinator.completedStatistics().dataStatistics();
            Assertions.assertThat((Map)checkpoint1GlobalDataStatistics.statistics()).isEqualTo((Object)checkpoint1Subtask0DataStatistic.statistics());
            byte[] checkpoint1Bytes = this.waitForCheckpoint(1L, (DataStatisticsCoordinator<MapDataStatistics, Map<SortKey, Long>>)dataStatisticsCoordinator);
            MapDataStatistics checkpoint2Subtask0DataStatistic = new MapDataStatistics();
            checkpoint2Subtask0DataStatistic.add(keyD);
            checkpoint2Subtask0DataStatistic.add(keyE);
            checkpoint2Subtask0DataStatistic.add(keyE);
            DataStatisticsEvent checkpoint2Subtask0DataStatisticEvent = DataStatisticsEvent.create((long)2L, (DataStatistics)checkpoint2Subtask0DataStatistic, (TypeSerializer)this.statisticsSerializer);
            coordinator.handleEventFromOperator(0, 0, (OperatorEvent)checkpoint2Subtask0DataStatisticEvent);
            TestDataStatisticsCoordinator.waitForCoordinatorToProcessActions((DataStatisticsCoordinator<MapDataStatistics, Map<SortKey, Long>>)dataStatisticsCoordinator);
            MapDataStatistics checkpoint2GlobalDataStatistics = (MapDataStatistics)dataStatisticsCoordinator.completedStatistics().dataStatistics();
            Assertions.assertThat((Map)checkpoint2GlobalDataStatistics.statistics()).isEqualTo((Object)checkpoint2Subtask0DataStatistic.statistics());
            this.waitForCheckpoint(2L, (DataStatisticsCoordinator<MapDataStatistics, Map<SortKey, Long>>)dataStatisticsCoordinator);
            coordinator.resetToCheckpoint(1L, checkpoint1Bytes);
            DataStatisticsCoordinator restoredDataStatisticsCoordinator = (DataStatisticsCoordinator)coordinator.getInternalCoordinator();
            Assertions.assertThat((Object)dataStatisticsCoordinator).isNotEqualTo((Object)restoredDataStatisticsCoordinator);
            MapDataStatistics restoredAggregateDataStatistics = (MapDataStatistics)restoredDataStatisticsCoordinator.completedStatistics().dataStatistics();
            Assertions.assertThat((Map)restoredAggregateDataStatistics.statistics()).isEqualTo((Object)checkpoint1GlobalDataStatistics.statistics());
        }
    }

    private byte[] waitForCheckpoint(long checkpointId, DataStatisticsCoordinator<MapDataStatistics, Map<SortKey, Long>> coordinator) throws InterruptedException, ExecutionException {
        CompletableFuture future = new CompletableFuture();
        coordinator.checkpointCoordinator(checkpointId, future);
        return (byte[])future.get();
    }
}

