/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink.shuffle;

import java.util.Map;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.iceberg.flink.sink.shuffle.AggregatedStatistics;
import org.apache.iceberg.flink.sink.shuffle.AggregatedStatisticsTracker;
import org.apache.iceberg.flink.sink.shuffle.DataStatistics;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsEvent;
import org.apache.iceberg.flink.sink.shuffle.MapDataStatistics;
import org.apache.iceberg.flink.sink.shuffle.MapDataStatisticsSerializer;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;

public class TestAggregatedStatisticsTracker {
    private static final int NUM_SUBTASKS = 2;
    private final RowType rowType = RowType.of((LogicalType[])new LogicalType[]{new VarCharType()});
    private final BinaryRowData binaryRowDataA = new RowDataSerializer(this.rowType).toBinaryRow((RowData)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"a")}));
    private final BinaryRowData binaryRowDataB = new RowDataSerializer(this.rowType).toBinaryRow((RowData)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"b")}));
    private final TypeSerializer<RowData> rowSerializer = new RowDataSerializer(this.rowType);
    private final TypeSerializer<DataStatistics<MapDataStatistics, Map<RowData, Long>>> statisticsSerializer = MapDataStatisticsSerializer.fromKeySerializer(this.rowSerializer);
    private AggregatedStatisticsTracker<MapDataStatistics, Map<RowData, Long>> aggregatedStatisticsTracker;

    @Before
    public void before() throws Exception {
        this.aggregatedStatisticsTracker = new AggregatedStatisticsTracker("testOperator", this.statisticsSerializer, 2);
    }

    @Test
    public void receiveNewerDataStatisticEvent() {
        MapDataStatistics checkpoint1Subtask0DataStatistic = new MapDataStatistics();
        checkpoint1Subtask0DataStatistic.add((RowData)this.binaryRowDataA);
        DataStatisticsEvent checkpoint1Subtask0DataStatisticEvent = DataStatisticsEvent.create((long)1L, (DataStatistics)checkpoint1Subtask0DataStatistic, this.statisticsSerializer);
        Assertions.assertThat((Object)this.aggregatedStatisticsTracker.updateAndCheckCompletion(0, checkpoint1Subtask0DataStatisticEvent)).isNull();
        Assertions.assertThat((long)this.aggregatedStatisticsTracker.inProgressStatistics().checkpointId()).isEqualTo(1L);
        MapDataStatistics checkpoint2Subtask0DataStatistic = new MapDataStatistics();
        checkpoint2Subtask0DataStatistic.add((RowData)this.binaryRowDataA);
        DataStatisticsEvent checkpoint2Subtask0DataStatisticEvent = DataStatisticsEvent.create((long)2L, (DataStatistics)checkpoint2Subtask0DataStatistic, this.statisticsSerializer);
        Assertions.assertThat((Object)this.aggregatedStatisticsTracker.updateAndCheckCompletion(0, checkpoint2Subtask0DataStatisticEvent)).isNull();
        Assertions.assertThat((long)this.aggregatedStatisticsTracker.inProgressStatistics().checkpointId()).isEqualTo(2L);
    }

    @Test
    public void receiveOlderDataStatisticEventTest() {
        MapDataStatistics checkpoint2Subtask0DataStatistic = new MapDataStatistics();
        checkpoint2Subtask0DataStatistic.add((RowData)this.binaryRowDataA);
        checkpoint2Subtask0DataStatistic.add((RowData)this.binaryRowDataB);
        checkpoint2Subtask0DataStatistic.add((RowData)this.binaryRowDataB);
        DataStatisticsEvent checkpoint3Subtask0DataStatisticEvent = DataStatisticsEvent.create((long)2L, (DataStatistics)checkpoint2Subtask0DataStatistic, this.statisticsSerializer);
        Assertions.assertThat((Object)this.aggregatedStatisticsTracker.updateAndCheckCompletion(0, checkpoint3Subtask0DataStatisticEvent)).isNull();
        MapDataStatistics checkpoint1Subtask1DataStatistic = new MapDataStatistics();
        checkpoint1Subtask1DataStatistic.add((RowData)this.binaryRowDataB);
        DataStatisticsEvent checkpoint1Subtask1DataStatisticEvent = DataStatisticsEvent.create((long)1L, (DataStatistics)checkpoint1Subtask1DataStatistic, this.statisticsSerializer);
        Assertions.assertThat((Object)this.aggregatedStatisticsTracker.updateAndCheckCompletion(1, checkpoint1Subtask1DataStatisticEvent)).isNull();
        Assertions.assertThat((long)this.aggregatedStatisticsTracker.inProgressStatistics().checkpointId()).isEqualTo(2L);
    }

    @Test
    public void receiveCompletedDataStatisticEvent() {
        MapDataStatistics checkpoint1Subtask0DataStatistic = new MapDataStatistics();
        checkpoint1Subtask0DataStatistic.add((RowData)this.binaryRowDataA);
        checkpoint1Subtask0DataStatistic.add((RowData)this.binaryRowDataB);
        checkpoint1Subtask0DataStatistic.add((RowData)this.binaryRowDataB);
        DataStatisticsEvent checkpoint1Subtask0DataStatisticEvent = DataStatisticsEvent.create((long)1L, (DataStatistics)checkpoint1Subtask0DataStatistic, this.statisticsSerializer);
        Assertions.assertThat((Object)this.aggregatedStatisticsTracker.updateAndCheckCompletion(0, checkpoint1Subtask0DataStatisticEvent)).isNull();
        MapDataStatistics checkpoint1Subtask1DataStatistic = new MapDataStatistics();
        checkpoint1Subtask1DataStatistic.add((RowData)this.binaryRowDataA);
        checkpoint1Subtask1DataStatistic.add((RowData)this.binaryRowDataA);
        checkpoint1Subtask1DataStatistic.add((RowData)this.binaryRowDataB);
        DataStatisticsEvent checkpoint1Subtask1DataStatisticEvent = DataStatisticsEvent.create((long)1L, (DataStatistics)checkpoint1Subtask1DataStatistic, this.statisticsSerializer);
        AggregatedStatistics completedStatistics = this.aggregatedStatisticsTracker.updateAndCheckCompletion(1, checkpoint1Subtask1DataStatisticEvent);
        Assertions.assertThat((Object)completedStatistics).isNotNull();
        Assertions.assertThat((long)completedStatistics.checkpointId()).isEqualTo(1L);
        MapDataStatistics globalDataStatistics = (MapDataStatistics)completedStatistics.dataStatistics();
        Assertions.assertThat((long)((Long)globalDataStatistics.statistics().get(this.binaryRowDataA))).isEqualTo((Long)checkpoint1Subtask0DataStatistic.statistics().get(this.binaryRowDataA) + (Long)checkpoint1Subtask1DataStatistic.statistics().get(this.binaryRowDataA));
        Assertions.assertThat((long)((Long)globalDataStatistics.statistics().get(this.binaryRowDataB))).isEqualTo((Long)checkpoint1Subtask0DataStatistic.statistics().get(this.binaryRowDataB) + (Long)checkpoint1Subtask1DataStatistic.statistics().get(this.binaryRowDataB));
        Assertions.assertThat((long)this.aggregatedStatisticsTracker.inProgressStatistics().checkpointId()).isEqualTo(completedStatistics.checkpointId() + 1L);
        MapDataStatistics checkpoint2Subtask0DataStatistic = new MapDataStatistics();
        checkpoint2Subtask0DataStatistic.add((RowData)this.binaryRowDataA);
        DataStatisticsEvent checkpoint2Subtask0DataStatisticEvent = DataStatisticsEvent.create((long)2L, (DataStatistics)checkpoint2Subtask0DataStatistic, this.statisticsSerializer);
        Assertions.assertThat((Object)this.aggregatedStatisticsTracker.updateAndCheckCompletion(0, checkpoint2Subtask0DataStatisticEvent)).isNull();
        Assertions.assertThat((long)completedStatistics.checkpointId()).isEqualTo(1L);
        MapDataStatistics checkpoint2Subtask1DataStatistic = new MapDataStatistics();
        checkpoint2Subtask1DataStatistic.add((RowData)this.binaryRowDataB);
        DataStatisticsEvent checkpoint2Subtask1DataStatisticEvent = DataStatisticsEvent.create((long)2L, (DataStatistics)checkpoint2Subtask1DataStatistic, this.statisticsSerializer);
        completedStatistics = this.aggregatedStatisticsTracker.updateAndCheckCompletion(1, checkpoint2Subtask1DataStatisticEvent);
        Assertions.assertThat((Object)completedStatistics).isNotNull();
        Assertions.assertThat((long)completedStatistics.checkpointId()).isEqualTo(2L);
        Assertions.assertThat((long)this.aggregatedStatisticsTracker.inProgressStatistics().checkpointId()).isEqualTo(completedStatistics.checkpointId() + 1L);
    }
}

