/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink.shuffle;

import java.util.Set;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.iceberg.flink.sink.shuffle.AggregatedStatistics;
import org.apache.iceberg.flink.sink.shuffle.DataStatistics;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsEvent;
import org.apache.iceberg.flink.sink.shuffle.DataStatisticsUtil;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class AggregatedStatisticsTracker<D extends DataStatistics<D, S>, S> {
    private static final Logger LOG = LoggerFactory.getLogger(AggregatedStatisticsTracker.class);
    private static final double ACCEPT_PARTIAL_AGGR_THRESHOLD = 90.0;
    private final String operatorName;
    private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
    private final int parallelism;
    private final Set<Integer> inProgressSubtaskSet;
    private volatile AggregatedStatistics<D, S> inProgressStatistics;

    AggregatedStatisticsTracker(String operatorName, TypeSerializer<DataStatistics<D, S>> statisticsSerializer, int parallelism) {
        this.operatorName = operatorName;
        this.statisticsSerializer = statisticsSerializer;
        this.parallelism = parallelism;
        this.inProgressSubtaskSet = Sets.newHashSet();
    }

    AggregatedStatistics<D, S> updateAndCheckCompletion(int subtask, DataStatisticsEvent<D, S> event) {
        long checkpointId = event.checkpointId();
        if (this.inProgressStatistics != null && this.inProgressStatistics.checkpointId() > checkpointId) {
            LOG.info("Expect data statistics for operator {} checkpoint {}, but receive event from older checkpoint {}. Ignore it.", new Object[]{this.operatorName, this.inProgressStatistics.checkpointId(), checkpointId});
            return null;
        }
        AggregatedStatistics<D, S> completedStatistics = null;
        if (this.inProgressStatistics != null && this.inProgressStatistics.checkpointId() < checkpointId) {
            if ((double)this.inProgressSubtaskSet.size() / (double)this.parallelism * 100.0 >= 90.0) {
                completedStatistics = this.inProgressStatistics;
                LOG.info("Received data statistics from {} subtasks out of total {} for operator {} at checkpoint {}. Complete data statistics aggregation at checkpoint {} as it is more than the threshold of {} percentage", new Object[]{this.inProgressSubtaskSet.size(), this.parallelism, this.operatorName, checkpointId, this.inProgressStatistics.checkpointId(), 90.0});
            } else {
                LOG.info("Received data statistics from {} subtasks out of total {} for operator {} at checkpoint {}. Aborting the incomplete aggregation for checkpoint {}", new Object[]{this.inProgressSubtaskSet.size(), this.parallelism, this.operatorName, checkpointId, this.inProgressStatistics.checkpointId()});
            }
            this.inProgressStatistics = null;
            this.inProgressSubtaskSet.clear();
        }
        if (this.inProgressStatistics == null) {
            LOG.info("Starting a new data statistics for checkpoint {}", (Object)checkpointId);
            this.inProgressStatistics = new AggregatedStatistics<D, S>(checkpointId, this.statisticsSerializer);
            this.inProgressSubtaskSet.clear();
        }
        if (!this.inProgressSubtaskSet.add(subtask)) {
            LOG.debug("Ignore duplicated data statistics from operator {} subtask {} for checkpoint {}.", new Object[]{this.operatorName, subtask, checkpointId});
        } else {
            this.inProgressStatistics.mergeDataStatistic(this.operatorName, event.checkpointId(), DataStatisticsUtil.deserializeDataStatistics(event.statisticsBytes(), this.statisticsSerializer));
        }
        if (this.inProgressSubtaskSet.size() == this.parallelism) {
            completedStatistics = this.inProgressStatistics;
            LOG.info("Received data statistics from all {} operators {} for checkpoint {}. Return last completed aggregator {}.", new Object[]{this.parallelism, this.operatorName, this.inProgressStatistics.checkpointId(), completedStatistics.dataStatistics()});
            this.inProgressStatistics = new AggregatedStatistics<D, S>(checkpointId + 1L, this.statisticsSerializer);
            this.inProgressSubtaskSet.clear();
        }
        return completedStatistics;
    }

    @VisibleForTesting
    AggregatedStatistics<D, S> inProgressStatistics() {
        return this.inProgressStatistics;
    }
}

