/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink.shuffle;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.EventReceivingTasks;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.util.ExceptionUtils;
import org.apache.iceberg.flink.sink.shuffle.DataStatistics;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsCoordinator;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsEvent;
import org.apache.iceberg.flink.sink.shuffle.MapDataStatistics;
import org.apache.iceberg.flink.sink.shuffle.MapDataStatisticsSerializer;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;

public class TestDataStatisticsCoordinator {
    private static final String OPERATOR_NAME = "TestCoordinator";
    private static final OperatorID TEST_OPERATOR_ID = new OperatorID(1234L, 5678L);
    private static final int NUM_SUBTASKS = 2;
    private TypeSerializer<DataStatistics<MapDataStatistics, Map<RowData, Long>>> statisticsSerializer;
    private EventReceivingTasks receivingTasks;
    private DataStatisticsCoordinator<MapDataStatistics, Map<RowData, Long>> dataStatisticsCoordinator;

    @Before
    public void before() throws Exception {
        this.receivingTasks = EventReceivingTasks.createForRunningTasks();
        this.statisticsSerializer = MapDataStatisticsSerializer.fromKeySerializer((TypeSerializer)new RowDataSerializer(RowType.of((LogicalType[])new LogicalType[]{new VarCharType()})));
        this.dataStatisticsCoordinator = new DataStatisticsCoordinator(OPERATOR_NAME, (OperatorCoordinator.Context)new MockOperatorCoordinatorContext(TEST_OPERATOR_ID, 2), this.statisticsSerializer);
    }

    private void tasksReady() throws Exception {
        this.dataStatisticsCoordinator.start();
        TestDataStatisticsCoordinator.setAllTasksReady(2, this.dataStatisticsCoordinator, this.receivingTasks);
    }

    @Test
    public void testThrowExceptionWhenNotStarted() {
        String failureMessage = "The coordinator of TestCoordinator has not started yet.";
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.dataStatisticsCoordinator.handleEventFromOperator(0, 0, (OperatorEvent)DataStatisticsEvent.create((long)0L, (DataStatistics)new MapDataStatistics(), this.statisticsSerializer))).isInstanceOf(IllegalStateException.class)).hasMessage(failureMessage);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.dataStatisticsCoordinator.executionAttemptFailed(0, 0, null)).isInstanceOf(IllegalStateException.class)).hasMessage(failureMessage);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.dataStatisticsCoordinator.checkpointCoordinator(0L, null)).isInstanceOf(IllegalStateException.class)).hasMessage(failureMessage);
    }

    @Test
    public void testDataStatisticsEventHandling() throws Exception {
        this.tasksReady();
        RowType rowType = RowType.of((LogicalType[])new LogicalType[]{new VarCharType()});
        BinaryRowData binaryRowDataA = new RowDataSerializer(rowType).toBinaryRow((RowData)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"a")}));
        BinaryRowData binaryRowDataB = new RowDataSerializer(rowType).toBinaryRow((RowData)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"b")}));
        BinaryRowData binaryRowDataC = new RowDataSerializer(rowType).toBinaryRow((RowData)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"c")}));
        MapDataStatistics checkpoint1Subtask0DataStatistic = new MapDataStatistics();
        checkpoint1Subtask0DataStatistic.add((RowData)binaryRowDataA);
        checkpoint1Subtask0DataStatistic.add((RowData)binaryRowDataB);
        checkpoint1Subtask0DataStatistic.add((RowData)binaryRowDataB);
        checkpoint1Subtask0DataStatistic.add((RowData)binaryRowDataC);
        checkpoint1Subtask0DataStatistic.add((RowData)binaryRowDataC);
        checkpoint1Subtask0DataStatistic.add((RowData)binaryRowDataC);
        DataStatisticsEvent checkpoint1Subtask0DataStatisticEvent = DataStatisticsEvent.create((long)1L, (DataStatistics)checkpoint1Subtask0DataStatistic, this.statisticsSerializer);
        MapDataStatistics checkpoint1Subtask1DataStatistic = new MapDataStatistics();
        checkpoint1Subtask1DataStatistic.add((RowData)binaryRowDataA);
        checkpoint1Subtask1DataStatistic.add((RowData)binaryRowDataB);
        checkpoint1Subtask1DataStatistic.add((RowData)binaryRowDataC);
        checkpoint1Subtask1DataStatistic.add((RowData)binaryRowDataC);
        DataStatisticsEvent checkpoint1Subtask1DataStatisticEvent = DataStatisticsEvent.create((long)1L, (DataStatistics)checkpoint1Subtask1DataStatistic, this.statisticsSerializer);
        this.dataStatisticsCoordinator.handleEventFromOperator(0, 0, (OperatorEvent)checkpoint1Subtask0DataStatisticEvent);
        this.dataStatisticsCoordinator.handleEventFromOperator(1, 0, (OperatorEvent)checkpoint1Subtask1DataStatisticEvent);
        TestDataStatisticsCoordinator.waitForCoordinatorToProcessActions(this.dataStatisticsCoordinator);
        MapDataStatistics globalDataStatistics = (MapDataStatistics)this.dataStatisticsCoordinator.completedStatistics().dataStatistics();
        Assertions.assertThat((Map)globalDataStatistics.statistics()).containsExactlyInAnyOrderEntriesOf((Map)ImmutableMap.of((Object)binaryRowDataA, (Object)((Long)checkpoint1Subtask0DataStatistic.statistics().get(binaryRowDataA) + (Long)checkpoint1Subtask1DataStatistic.statistics().get(binaryRowDataA)), (Object)binaryRowDataB, (Object)((Long)checkpoint1Subtask0DataStatistic.statistics().get(binaryRowDataB) + (Long)checkpoint1Subtask1DataStatistic.statistics().get(binaryRowDataB)), (Object)binaryRowDataC, (Object)((Long)checkpoint1Subtask0DataStatistic.statistics().get(binaryRowDataC) + (Long)checkpoint1Subtask1DataStatistic.statistics().get(binaryRowDataC))));
    }

    static void setAllTasksReady(int subtasks, DataStatisticsCoordinator<MapDataStatistics, Map<RowData, Long>> dataStatisticsCoordinator, EventReceivingTasks receivingTasks) {
        for (int i = 0; i < subtasks; ++i) {
            dataStatisticsCoordinator.executionAttemptReady(i, 0, receivingTasks.createGatewayForSubtask(i, 0));
        }
    }

    static void waitForCoordinatorToProcessActions(DataStatisticsCoordinator<MapDataStatistics, Map<RowData, Long>> coordinator) {
        CompletableFuture future = new CompletableFuture();
        coordinator.callInCoordinatorThread(() -> {
            future.complete(null);
            return null;
        }, "Coordinator fails to process action");
        try {
            future.get();
        }
        catch (InterruptedException e) {
            throw new AssertionError((Object)"test interrupted");
        }
        catch (ExecutionException e) {
            ExceptionUtils.rethrow((Throwable)ExceptionUtils.stripExecutionException((Throwable)e));
        }
    }
}

