/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink.shuffle;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.EventReceivingTasks;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortKey;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.flink.sink.shuffle.DataStatistics;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsCoordinator;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsEvent;
import org.apache.iceberg.flink.sink.shuffle.MapDataStatistics;
import org.apache.iceberg.flink.sink.shuffle.MapDataStatisticsSerializer;
import org.apache.iceberg.flink.sink.shuffle.SortKeySerializer;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;

public class TestDataStatisticsCoordinator {
    private static final String OPERATOR_NAME = "TestCoordinator";
    private static final OperatorID TEST_OPERATOR_ID = new OperatorID(1234L, 5678L);
    private static final int NUM_SUBTASKS = 2;
    private final Schema schema = new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"str", (Type)Types.StringType.get())});
    private final SortOrder sortOrder = ((SortOrder.Builder)SortOrder.builderFor((Schema)this.schema).asc("str")).build();
    private final SortKey sortKey = new SortKey(this.schema, this.sortOrder);
    private final MapDataStatisticsSerializer statisticsSerializer = MapDataStatisticsSerializer.fromSortKeySerializer((TypeSerializer)new SortKeySerializer(this.schema, this.sortOrder));
    private EventReceivingTasks receivingTasks;
    private DataStatisticsCoordinator<MapDataStatistics, Map<SortKey, Long>> dataStatisticsCoordinator;

    @Before
    public void before() throws Exception {
        this.receivingTasks = EventReceivingTasks.createForRunningTasks();
        this.dataStatisticsCoordinator = new DataStatisticsCoordinator(OPERATOR_NAME, (OperatorCoordinator.Context)new MockOperatorCoordinatorContext(TEST_OPERATOR_ID, 2), (TypeSerializer)this.statisticsSerializer);
    }

    private void tasksReady() throws Exception {
        this.dataStatisticsCoordinator.start();
        TestDataStatisticsCoordinator.setAllTasksReady(2, this.dataStatisticsCoordinator, this.receivingTasks);
    }

    @Test
    public void testThrowExceptionWhenNotStarted() {
        String failureMessage = "The coordinator of TestCoordinator has not started yet.";
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.dataStatisticsCoordinator.handleEventFromOperator(0, 0, (OperatorEvent)DataStatisticsEvent.create((long)0L, (DataStatistics)new MapDataStatistics(), (TypeSerializer)this.statisticsSerializer))).isInstanceOf(IllegalStateException.class)).hasMessage(failureMessage);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.dataStatisticsCoordinator.executionAttemptFailed(0, 0, null)).isInstanceOf(IllegalStateException.class)).hasMessage(failureMessage);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.dataStatisticsCoordinator.checkpointCoordinator(0L, null)).isInstanceOf(IllegalStateException.class)).hasMessage(failureMessage);
    }

    @Test
    public void testDataStatisticsEventHandling() throws Exception {
        this.tasksReady();
        SortKey key = this.sortKey.copy();
        MapDataStatistics checkpoint1Subtask0DataStatistic = new MapDataStatistics();
        key.set(0, (Object)"a");
        checkpoint1Subtask0DataStatistic.add(key);
        key.set(0, (Object)"b");
        checkpoint1Subtask0DataStatistic.add(key);
        key.set(0, (Object)"b");
        checkpoint1Subtask0DataStatistic.add(key);
        key.set(0, (Object)"c");
        checkpoint1Subtask0DataStatistic.add(key);
        key.set(0, (Object)"c");
        checkpoint1Subtask0DataStatistic.add(key);
        key.set(0, (Object)"c");
        checkpoint1Subtask0DataStatistic.add(key);
        DataStatisticsEvent checkpoint1Subtask0DataStatisticEvent = DataStatisticsEvent.create((long)1L, (DataStatistics)checkpoint1Subtask0DataStatistic, (TypeSerializer)this.statisticsSerializer);
        MapDataStatistics checkpoint1Subtask1DataStatistic = new MapDataStatistics();
        key.set(0, (Object)"a");
        checkpoint1Subtask1DataStatistic.add(key);
        key.set(0, (Object)"b");
        checkpoint1Subtask1DataStatistic.add(key);
        key.set(0, (Object)"c");
        checkpoint1Subtask1DataStatistic.add(key);
        key.set(0, (Object)"c");
        checkpoint1Subtask1DataStatistic.add(key);
        DataStatisticsEvent checkpoint1Subtask1DataStatisticEvent = DataStatisticsEvent.create((long)1L, (DataStatistics)checkpoint1Subtask1DataStatistic, (TypeSerializer)this.statisticsSerializer);
        this.dataStatisticsCoordinator.handleEventFromOperator(0, 0, (OperatorEvent)checkpoint1Subtask0DataStatisticEvent);
        this.dataStatisticsCoordinator.handleEventFromOperator(1, 0, (OperatorEvent)checkpoint1Subtask1DataStatisticEvent);
        TestDataStatisticsCoordinator.waitForCoordinatorToProcessActions(this.dataStatisticsCoordinator);
        SortKey keyA = this.sortKey.copy();
        keyA.set(0, (Object)"a");
        SortKey keyB = this.sortKey.copy();
        keyB.set(0, (Object)"b");
        SortKey keyC = this.sortKey.copy();
        keyC.set(0, (Object)"c");
        MapDataStatistics globalDataStatistics = (MapDataStatistics)this.dataStatisticsCoordinator.completedStatistics().dataStatistics();
        Assertions.assertThat((Map)globalDataStatistics.statistics()).containsExactlyInAnyOrderEntriesOf((Map)ImmutableMap.of((Object)keyA, (Object)2L, (Object)keyB, (Object)3L, (Object)keyC, (Object)5L));
    }

    static void setAllTasksReady(int subtasks, DataStatisticsCoordinator<MapDataStatistics, Map<SortKey, Long>> dataStatisticsCoordinator, EventReceivingTasks receivingTasks) {
        for (int i = 0; i < subtasks; ++i) {
            dataStatisticsCoordinator.executionAttemptReady(i, 0, receivingTasks.createGatewayForSubtask(i, 0));
        }
    }

    static void waitForCoordinatorToProcessActions(DataStatisticsCoordinator<MapDataStatistics, Map<SortKey, Long>> coordinator) {
        CompletableFuture future = new CompletableFuture();
        coordinator.callInCoordinatorThread(() -> {
            future.complete(null);
            return null;
        }, "Coordinator fails to process action");
        try {
            future.get();
        }
        catch (InterruptedException e) {
            throw new AssertionError((Object)"test interrupted");
        }
        catch (ExecutionException e) {
            ExceptionUtils.rethrow((Throwable)ExceptionUtils.stripExecutionException((Throwable)e));
        }
    }
}

