/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.loadbalance.extensions.reporter;

import com.google.common.annotations.VisibleForTesting;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.manager.StateChangeListener;
import org.apache.pulsar.broker.loadbalance.extensions.models.TopKBundles;
import org.apache.pulsar.broker.loadbalance.extensions.reporter.LoadDataReporter;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.broker.service.PulsarStats;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopBundleLoadDataReporter
implements LoadDataReporter<TopBundlesLoadData>,
StateChangeListener {
    private static final Logger log = LoggerFactory.getLogger(TopBundleLoadDataReporter.class);
    private static final long TOMBSTONE_DELAY_IN_MILLIS = 10000L;
    private final PulsarService pulsar;
    private final String lookupServiceAddress;
    private final LoadDataStore<TopBundlesLoadData> bundleLoadDataStore;
    private final TopKBundles topKBundles;
    private long lastBundleStatsUpdatedAt;
    private volatile long lastTombstonedAt;
    private long tombstoneDelayInMillis;

    public TopBundleLoadDataReporter(PulsarService pulsar, String lookupServiceAddress, LoadDataStore<TopBundlesLoadData> bundleLoadDataStore) {
        this.pulsar = pulsar;
        this.lookupServiceAddress = lookupServiceAddress;
        this.bundleLoadDataStore = bundleLoadDataStore;
        this.lastBundleStatsUpdatedAt = 0L;
        this.topKBundles = new TopKBundles(pulsar);
        this.tombstoneDelayInMillis = 10000L;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public TopBundlesLoadData generateLoadData() {
        PulsarStats pulsarStats = this.pulsar.getBrokerService().getPulsarStats();
        TopBundlesLoadData result = null;
        PulsarStats pulsarStats2 = pulsarStats;
        synchronized (pulsarStats2) {
            long pulsarStatsUpdatedAt = pulsarStats.getUpdatedAt();
            if (pulsarStatsUpdatedAt > this.lastBundleStatsUpdatedAt) {
                Map<String, NamespaceBundleStats> bundleStats = this.pulsar.getBrokerService().getBundleStats();
                int topk = this.pulsar.getConfiguration().getLoadBalancerMaxNumberOfBundlesInBundleLoadReport();
                this.topKBundles.update(bundleStats, topk);
                this.lastBundleStatsUpdatedAt = pulsarStatsUpdatedAt;
                result = this.topKBundles.getLoadData();
            }
        }
        return result;
    }

    @Override
    public CompletableFuture<Void> reportAsync(boolean force) {
        TopBundlesLoadData topBundlesLoadData = this.generateLoadData();
        if (topBundlesLoadData != null || force) {
            if (ExtensibleLoadManagerImpl.debug(this.pulsar.getConfiguration(), log)) {
                log.info("Reporting TopBundlesLoadData:{}", (Object)this.topKBundles.getLoadData());
            }
            return this.bundleLoadDataStore.pushAsync(this.lookupServiceAddress, this.topKBundles.getLoadData()).exceptionally(e -> {
                log.error("Failed to report top-bundles load data.", e);
                return null;
            });
        }
        return CompletableFuture.completedFuture(null);
    }

    @VisibleForTesting
    protected void tombstone() {
        long now = System.currentTimeMillis();
        if (now - this.lastTombstonedAt < this.tombstoneDelayInMillis) {
            return;
        }
        long lastSuccessfulTombstonedAt = this.lastTombstonedAt;
        this.lastTombstonedAt = now;
        this.bundleLoadDataStore.removeAsync(this.lookupServiceAddress).whenComplete((__, e) -> {
            if (e != null) {
                log.error("Failed to clean broker load data.", e);
                this.lastTombstonedAt = lastSuccessfulTombstonedAt;
            } else {
                boolean debug = ExtensibleLoadManagerImpl.debug(this.pulsar.getConfiguration(), log);
                if (debug) {
                    log.info("Cleaned broker load data.");
                }
            }
        });
    }

    @Override
    public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) {
        if (t != null) {
            return;
        }
        ServiceUnitState state = ServiceUnitStateData.state(data);
        switch (state) {
            case Releasing: 
            case Splitting: {
                if (!StringUtils.equals((CharSequence)data.sourceBroker(), (CharSequence)this.lookupServiceAddress)) break;
                this.tombstone();
                break;
            }
            case Owned: {
                if (!StringUtils.equals((CharSequence)data.dstBroker(), (CharSequence)this.lookupServiceAddress)) break;
                this.tombstone();
            }
        }
    }
}

