/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.segmentstore.server.host.stat;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.segmentstore.contracts.Attributes;
import io.pravega.segmentstore.contracts.StreamSegmentStore;
import io.pravega.segmentstore.server.host.stat.AutoScaleProcessor;
import io.pravega.segmentstore.server.host.stat.SegmentAggregates;
import io.pravega.segmentstore.server.host.stat.SegmentStatsRecorder;
import io.pravega.shared.MetricsNames;
import io.pravega.shared.MetricsTags;
import io.pravega.shared.metrics.DynamicLogger;
import io.pravega.shared.metrics.MetricsProvider;
import io.pravega.shared.metrics.OpStatsLogger;
import io.pravega.shared.metrics.StatsLogger;
import io.pravega.shared.segment.ScaleType;
import io.pravega.shared.segment.StreamSegmentNameUtils;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SegmentStatsRecorderImpl
implements SegmentStatsRecorder {
    @SuppressFBWarnings(justification="generated code")
    private static final Logger log = LoggerFactory.getLogger(SegmentStatsRecorderImpl.class);
    private static final Duration DEFAULT_REPORTING_DURATION = Duration.ofMinutes(2L);
    private static final Duration DEFAULT_EXPIRY_DURATION = Duration.ofMinutes(20L);
    private static final Duration CACHE_CLEANUP_INTERVAL = Duration.ofMinutes(2L);
    private static final int INITIAL_CAPACITY = 1000;
    private static final int MAX_CACHE_SIZE = 100000;
    private static final Duration TIMEOUT = Duration.ofMinutes(1L);
    private static final StatsLogger STATS_LOGGER = MetricsProvider.createStatsLogger((String)"segmentstore");
    private final OpStatsLogger createStreamSegment = STATS_LOGGER.createStats("pravega.segmentstore.segment.create_latency_ms", new String[0]);
    private final OpStatsLogger readStreamSegment = STATS_LOGGER.createStats("pravega.segmentstore.segment.read_latency_ms", new String[0]);
    private final OpStatsLogger writeStreamSegment = STATS_LOGGER.createStats("pravega.segmentstore.segment.write_latency_ms", new String[0]);
    private final DynamicLogger dynamicLogger = MetricsProvider.getDynamicLogger();
    private final Set<String> pendingCacheLoads;
    private final Cache<String, SegmentAggregates> cache;
    private final Duration reportingDuration;
    private final AutoScaleProcessor reporter;
    private final StreamSegmentStore store;
    private final ScheduledFuture<?> cacheCleanup;
    private final ScheduledExecutorService executor;

    SegmentStatsRecorderImpl(AutoScaleProcessor reporter, StreamSegmentStore store, ScheduledExecutorService executor) {
        this(reporter, store, DEFAULT_REPORTING_DURATION, DEFAULT_EXPIRY_DURATION, executor);
    }

    @VisibleForTesting
    SegmentStatsRecorderImpl(@NonNull AutoScaleProcessor reporter, @NonNull StreamSegmentStore store, @NonNull Duration reportingDuration, @NonNull Duration expiryDuration, @NonNull ScheduledExecutorService executor) {
        if (reporter == null) {
            throw new NullPointerException("reporter is marked @NonNull but is null");
        }
        if (store == null) {
            throw new NullPointerException("store is marked @NonNull but is null");
        }
        if (reportingDuration == null) {
            throw new NullPointerException("reportingDuration is marked @NonNull but is null");
        }
        if (expiryDuration == null) {
            throw new NullPointerException("expiryDuration is marked @NonNull but is null");
        }
        if (executor == null) {
            throw new NullPointerException("executor is marked @NonNull but is null");
        }
        this.executor = executor;
        this.pendingCacheLoads = Collections.synchronizedSet(new HashSet());
        this.cache = CacheBuilder.newBuilder().initialCapacity(1000).maximumSize(100000L).expireAfterAccess(expiryDuration.toMillis(), TimeUnit.MILLISECONDS).build();
        this.cacheCleanup = executor.scheduleAtFixedRate(() -> this.cache.cleanUp(), CACHE_CLEANUP_INTERVAL.toMillis(), 2L, TimeUnit.MINUTES);
        this.reportingDuration = reportingDuration;
        this.store = store;
        this.reporter = reporter;
    }

    @Override
    public void close() {
        this.cacheCleanup.cancel(true);
        this.createStreamSegment.close();
        this.readStreamSegment.close();
        this.writeStreamSegment.close();
    }

    private SegmentAggregates getSegmentAggregate(String streamSegmentName) {
        SegmentAggregates aggregates = (SegmentAggregates)this.cache.getIfPresent((Object)streamSegmentName);
        if (aggregates == null && !StreamSegmentNameUtils.isTransactionSegment((String)streamSegmentName)) {
            this.loadAsynchronously(streamSegmentName);
        }
        return aggregates;
    }

    @VisibleForTesting
    protected CompletableFuture<Void> loadAsynchronously(String streamSegmentName) {
        if (!this.pendingCacheLoads.contains(streamSegmentName)) {
            this.pendingCacheLoads.add(streamSegmentName);
            if (this.store != null) {
                return this.store.getStreamSegmentInfo(streamSegmentName, TIMEOUT).thenAcceptAsync(prop -> {
                    if (prop != null && prop.getAttributes().containsKey(Attributes.SCALE_POLICY_TYPE) && prop.getAttributes().containsKey(Attributes.SCALE_POLICY_RATE)) {
                        byte type = ((Long)prop.getAttributes().get(Attributes.SCALE_POLICY_TYPE)).byteValue();
                        int rate = ((Long)prop.getAttributes().get(Attributes.SCALE_POLICY_RATE)).intValue();
                        this.cache.put((Object)streamSegmentName, (Object)SegmentAggregates.forPolicy(ScaleType.fromValue((byte)type), rate));
                    }
                    this.pendingCacheLoads.remove(streamSegmentName);
                }, (Executor)this.executor);
            }
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public void createSegment(String streamSegmentName, byte type, int targetRate, Duration elapsed) {
        this.getCreateStreamSegment().reportSuccessEvent(elapsed);
        SegmentAggregates sa = SegmentAggregates.forPolicy(ScaleType.fromValue((byte)type), targetRate);
        this.cache.put((Object)streamSegmentName, (Object)sa);
        if (sa.isScalingEnabled()) {
            this.reporter.notifyCreated(streamSegmentName);
        }
    }

    @Override
    public void deleteSegment(String streamSegmentName) {
        if (!StreamSegmentNameUtils.isTransactionSegment((String)streamSegmentName)) {
            this.getDynamicLogger().freezeCounter("pravega.segmentstore.segment.write_bytes", MetricsTags.segmentTags((String)streamSegmentName));
            this.getDynamicLogger().freezeCounter("pravega.segmentstore.segment.write_events", MetricsTags.segmentTags((String)streamSegmentName));
            this.getDynamicLogger().freezeCounter("pravega.segmentstore.segment.read_bytes", MetricsTags.segmentTags((String)streamSegmentName));
        }
    }

    @Override
    public void sealSegment(String streamSegmentName) {
        if (!StreamSegmentNameUtils.isTransactionSegment((String)streamSegmentName)) {
            this.getDynamicLogger().freezeCounter("pravega.segmentstore.segment.write_bytes", MetricsTags.segmentTags((String)streamSegmentName));
            this.getDynamicLogger().freezeCounter("pravega.segmentstore.segment.write_events", MetricsTags.segmentTags((String)streamSegmentName));
        }
        if (this.getSegmentAggregate(streamSegmentName) != null) {
            this.cache.invalidate((Object)streamSegmentName);
            this.reporter.notifySealed(streamSegmentName);
        }
    }

    @Override
    public void policyUpdate(String streamSegmentName, byte type, int targetRate) {
        SegmentAggregates aggregates = this.getSegmentAggregate(streamSegmentName);
        if (aggregates != null) {
            if (aggregates.getScaleType().getValue() != type) {
                this.cache.put((Object)streamSegmentName, (Object)SegmentAggregates.forPolicy(ScaleType.fromValue((byte)type), targetRate));
            } else {
                aggregates.setTargetRate(targetRate);
            }
        }
    }

    @Override
    public void recordAppend(String streamSegmentName, long dataLength, int numOfEvents, Duration elapsed) {
        this.getWriteStreamSegment().reportSuccessEvent(elapsed);
        DynamicLogger dl = this.getDynamicLogger();
        dl.incCounterValue(MetricsNames.globalMetricName((String)"pravega.segmentstore.segment.write_bytes"), dataLength, new String[0]);
        dl.incCounterValue(MetricsNames.globalMetricName((String)"pravega.segmentstore.segment.write_events"), (long)numOfEvents, new String[0]);
        if (!StreamSegmentNameUtils.isTransactionSegment((String)streamSegmentName)) {
            dl.incCounterValue("pravega.segmentstore.segment.write_bytes", dataLength, MetricsTags.segmentTags((String)streamSegmentName));
            dl.incCounterValue("pravega.segmentstore.segment.write_events", (long)numOfEvents, MetricsTags.segmentTags((String)streamSegmentName));
            try {
                SegmentAggregates aggregates = this.getSegmentAggregate(streamSegmentName);
                if (aggregates != null && aggregates.update(dataLength, numOfEvents)) {
                    this.report(streamSegmentName, aggregates);
                }
            }
            catch (Exception e) {
                log.warn("Record statistic for {} for data: {} and events:{} threw exception", new Object[]{streamSegmentName, dataLength, numOfEvents, e});
            }
        }
    }

    @Override
    public void merge(String streamSegmentName, long dataLength, int numOfEvents, long txnCreationTime) {
        this.getDynamicLogger().incCounterValue("pravega.segmentstore.segment.write_bytes", dataLength, MetricsTags.segmentTags((String)streamSegmentName));
        this.getDynamicLogger().incCounterValue("pravega.segmentstore.segment.write_events", (long)numOfEvents, MetricsTags.segmentTags((String)streamSegmentName));
        SegmentAggregates aggregates = this.getSegmentAggregate(streamSegmentName);
        if (aggregates != null && aggregates.updateTx(dataLength, numOfEvents, txnCreationTime)) {
            this.report(streamSegmentName, aggregates);
        }
    }

    @Override
    public void readComplete(Duration elapsed) {
        this.getReadStreamSegment().reportSuccessEvent(elapsed);
    }

    @Override
    public void read(String segment, int length) {
        this.getDynamicLogger().incCounterValue(MetricsNames.globalMetricName((String)"pravega.segmentstore.segment.read_bytes"), (long)length, new String[0]);
        this.getDynamicLogger().incCounterValue("pravega.segmentstore.segment.read_bytes", (long)length, MetricsTags.segmentTags((String)segment));
    }

    private void report(String streamSegmentName, SegmentAggregates aggregates) {
        if (aggregates.reportIfNeeded(this.reportingDuration)) {
            this.executor.execute(() -> {
                try {
                    this.reporter.report(streamSegmentName, aggregates.getTargetRate(), aggregates.getStartTime(), aggregates.getTwoMinuteRate(), aggregates.getFiveMinuteRate(), aggregates.getTenMinuteRate(), aggregates.getTwentyMinuteRate());
                }
                catch (Exception ex) {
                    log.error("Unable to report Segment Aggregates for '{}'.", (Object)streamSegmentName, (Object)ex);
                }
            });
        }
    }

    @VisibleForTesting
    SegmentAggregates getIfPresent(String streamSegmentName) {
        return (SegmentAggregates)this.cache.getIfPresent((Object)streamSegmentName);
    }

    @SuppressFBWarnings(justification="generated code")
    protected OpStatsLogger getCreateStreamSegment() {
        return this.createStreamSegment;
    }

    @SuppressFBWarnings(justification="generated code")
    protected OpStatsLogger getReadStreamSegment() {
        return this.readStreamSegment;
    }

    @SuppressFBWarnings(justification="generated code")
    protected OpStatsLogger getWriteStreamSegment() {
        return this.writeStreamSegment;
    }

    @SuppressFBWarnings(justification="generated code")
    protected DynamicLogger getDynamicLogger() {
        return this.dynamicLogger;
    }
}

