/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink.shuffle;

import java.util.Comparator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.datasketches.sampling.ReservoirItemsSketch;
import org.apache.datasketches.sampling.ReservoirItemsUnion;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortKey;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.SortOrderComparators;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.flink.sink.shuffle.AggregatedStatistics;
import org.apache.iceberg.flink.sink.shuffle.DataStatistics;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsSerializer;
import org.apache.iceberg.flink.sink.shuffle.SketchUtil;
import org.apache.iceberg.flink.sink.shuffle.SortKeySerializer;
import org.apache.iceberg.flink.sink.shuffle.StatisticsEvent;
import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
import org.apache.iceberg.flink.sink.shuffle.StatisticsUtil;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class AggregatedStatisticsTracker {
    private static final Logger LOG = LoggerFactory.getLogger(AggregatedStatisticsTracker.class);
    private final String operatorName;
    private final int parallelism;
    private final TypeSerializer<DataStatistics> statisticsSerializer;
    private final int downstreamParallelism;
    private final StatisticsType statisticsType;
    private final int switchToSketchThreshold;
    private final Comparator<StructLike> comparator;
    private final NavigableMap<Long, Aggregation> aggregationsPerCheckpoint;
    private AggregatedStatistics completedStatistics;

    AggregatedStatisticsTracker(String operatorName, int parallelism, Schema schema, SortOrder sortOrder, int downstreamParallelism, StatisticsType statisticsType, int switchToSketchThreshold, @Nullable AggregatedStatistics restoredStatistics) {
        this.operatorName = operatorName;
        this.parallelism = parallelism;
        this.statisticsSerializer = new DataStatisticsSerializer(new SortKeySerializer(schema, sortOrder));
        this.downstreamParallelism = downstreamParallelism;
        this.statisticsType = statisticsType;
        this.switchToSketchThreshold = switchToSketchThreshold;
        this.completedStatistics = restoredStatistics;
        this.comparator = SortOrderComparators.forSchema(schema, sortOrder);
        this.aggregationsPerCheckpoint = Maps.newTreeMap();
    }

    AggregatedStatistics updateAndCheckCompletion(int subtask, StatisticsEvent event) {
        DataStatistics dataStatistics;
        long checkpointId = event.checkpointId();
        LOG.debug("Handling statistics event from subtask {} of operator {} for checkpoint {}", new Object[]{subtask, this.operatorName, checkpointId});
        if (this.completedStatistics != null && this.completedStatistics.checkpointId() > checkpointId) {
            LOG.info("Ignore stale statistics event from operator {} subtask {} for older checkpoint {}. Was expecting data statistics from checkpoint higher than {}", new Object[]{this.operatorName, subtask, checkpointId, this.completedStatistics.checkpointId()});
            return null;
        }
        Aggregation aggregation = this.aggregationsPerCheckpoint.computeIfAbsent(checkpointId, ignored -> new Aggregation(this.parallelism, this.downstreamParallelism, this.switchToSketchThreshold, this.comparator, this.statisticsType, StatisticsUtil.collectType(this.statisticsType, this.completedStatistics)));
        if (!aggregation.merge(subtask, dataStatistics = StatisticsUtil.deserializeDataStatistics(event.statisticsBytes(), this.statisticsSerializer))) {
            LOG.debug("Ignore duplicate data statistics from operator {} subtask {} for checkpoint {}.", new Object[]{this.operatorName, subtask, checkpointId});
        }
        if (aggregation.isComplete()) {
            this.completedStatistics = aggregation.completedStatistics(checkpointId);
            this.aggregationsPerCheckpoint.headMap(checkpointId, true).clear();
            return this.completedStatistics;
        }
        return null;
    }

    @VisibleForTesting
    NavigableMap<Long, Aggregation> aggregationsPerCheckpoint() {
        return this.aggregationsPerCheckpoint;
    }

    static class Aggregation {
        private static final Logger LOG = LoggerFactory.getLogger(Aggregation.class);
        private final Set<Integer> subtaskSet = Sets.newHashSet();
        private final int parallelism;
        private final int downstreamParallelism;
        private final int switchToSketchThreshold;
        private final Comparator<StructLike> comparator;
        private final StatisticsType configuredType;
        private StatisticsType currentType;
        private Map<SortKey, Long> mapStatistics;
        private ReservoirItemsUnion<SortKey> sketchStatistics;

        Aggregation(int parallelism, int downstreamParallelism, int switchToSketchThreshold, Comparator<StructLike> comparator, StatisticsType configuredType, StatisticsType currentType) {
            this.parallelism = parallelism;
            this.downstreamParallelism = downstreamParallelism;
            this.switchToSketchThreshold = switchToSketchThreshold;
            this.comparator = comparator;
            this.configuredType = configuredType;
            this.currentType = currentType;
            if (currentType == StatisticsType.Map) {
                this.mapStatistics = Maps.newHashMap();
                this.sketchStatistics = null;
            } else {
                this.mapStatistics = null;
                this.sketchStatistics = ReservoirItemsUnion.newInstance(SketchUtil.determineCoordinatorReservoirSize(downstreamParallelism));
            }
        }

        @VisibleForTesting
        Set<Integer> subtaskSet() {
            return this.subtaskSet;
        }

        @VisibleForTesting
        StatisticsType currentType() {
            return this.currentType;
        }

        @VisibleForTesting
        Map<SortKey, Long> mapStatistics() {
            return this.mapStatistics;
        }

        @VisibleForTesting
        ReservoirItemsUnion<SortKey> sketchStatistics() {
            return this.sketchStatistics;
        }

        private boolean isComplete() {
            return this.subtaskSet.size() == this.parallelism;
        }

        private boolean merge(int subtask, DataStatistics taskStatistics) {
            if (this.subtaskSet.contains(subtask)) {
                return false;
            }
            this.subtaskSet.add(subtask);
            this.merge(taskStatistics);
            return true;
        }

        private void merge(DataStatistics taskStatistics) {
            if (taskStatistics.type() == StatisticsType.Map) {
                Map taskMapStats = (Map)taskStatistics.result();
                if (this.currentType == StatisticsType.Map) {
                    taskMapStats.forEach((key, count) -> this.mapStatistics.merge((SortKey)key, (Long)count, Long::sum));
                    if (this.configuredType == StatisticsType.Auto && this.mapStatistics.size() > this.switchToSketchThreshold) {
                        this.convertCoordinatorToSketch();
                    }
                } else {
                    ReservoirItemsSketch taskSketch = ReservoirItemsSketch.newInstance(SketchUtil.determineOperatorReservoirSize(this.parallelism, this.downstreamParallelism));
                    SketchUtil.convertMapToSketch(taskMapStats, taskSketch::update);
                    this.sketchStatistics.update((SortKey)((Object)taskSketch));
                }
            } else {
                ReservoirItemsSketch taskSketch = (ReservoirItemsSketch)taskStatistics.result();
                if (this.currentType == StatisticsType.Map) {
                    this.convertCoordinatorToSketch();
                }
                this.sketchStatistics.update((SortKey)((Object)taskSketch));
            }
        }

        private void convertCoordinatorToSketch() {
            this.sketchStatistics = ReservoirItemsUnion.newInstance(SketchUtil.determineCoordinatorReservoirSize(this.downstreamParallelism));
            SketchUtil.convertMapToSketch(this.mapStatistics, this.sketchStatistics::update);
            this.currentType = StatisticsType.Sketch;
            this.mapStatistics = null;
        }

        private AggregatedStatistics completedStatistics(long checkpointId) {
            if (this.currentType == StatisticsType.Map) {
                LOG.info("Completed map statistics aggregation with {} keys", (Object)this.mapStatistics.size());
                return AggregatedStatistics.fromKeyFrequency(checkpointId, this.mapStatistics);
            }
            ReservoirItemsSketch<SortKey> sketch = this.sketchStatistics.getResult();
            LOG.info("Completed sketch statistics aggregation: reservoir size = {}, number of items seen = {}, number of samples = {}", new Object[]{sketch.getK(), sketch.getN(), sketch.getNumSamples()});
            return AggregatedStatistics.fromRangeBounds(checkpointId, SketchUtil.rangeBounds(this.downstreamParallelism, this.comparator, sketch));
        }
    }
}

