/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.segmentstore.server.host.stat;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalCause;
import com.google.common.cache.RemovalListeners;
import io.pravega.client.ClientConfig;
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.Serializer;
import io.pravega.common.LoggerHelpers;
import io.pravega.common.hash.RandomFactory;
import io.pravega.common.tracing.TagLogger;
import io.pravega.common.util.Retry;
import io.pravega.segmentstore.server.host.stat.AutoScalerConfig;
import io.pravega.shared.controller.event.AutoScaleEvent;
import io.pravega.shared.controller.event.ControllerEvent;
import io.pravega.shared.controller.event.ControllerEventSerializer;
import java.net.URI;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import lombok.NonNull;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AutoScaleProcessor
implements AutoCloseable {
    private static final TagLogger log = new TagLogger(LoggerFactory.getLogger(AutoScaleProcessor.class));
    private static final EventSerializer SERIALIZER = new EventSerializer();
    private static final long TWO_MINUTES = Duration.ofMinutes(2L).toMillis();
    private static final long FIVE_MINUTES = Duration.ofMinutes(5L).toMillis();
    private static final long TEN_MINUTES = Duration.ofMinutes(10L).toMillis();
    private static final long TWENTY_MINUTES = Duration.ofMinutes(20L).toMillis();
    private static final int MAX_CACHE_SIZE = 1000000;
    private static final int INITIAL_CAPACITY = 1000;
    private final EventStreamClientFactory clientFactory;
    private final Cache<String, Pair<Long, Long>> cache;
    private final AtomicReference<EventStreamWriter<AutoScaleEvent>> writer;
    private final AutoScalerConfig configuration;
    private final Supplier<Long> requestIdGenerator = RandomFactory.create()::nextLong;
    private final ScheduledFuture<?> cacheCleanup;

    AutoScaleProcessor(@NonNull AutoScalerConfig configuration, @NonNull ScheduledExecutorService executor) {
        this(configuration, AutoScaleProcessor.createFactory(configuration), executor);
        if (configuration == null) {
            throw new NullPointerException("configuration is marked @NonNull but is null");
        }
        if (executor == null) {
            throw new NullPointerException("executor is marked @NonNull but is null");
        }
    }

    @VisibleForTesting
    AutoScaleProcessor(@NonNull EventStreamWriter<AutoScaleEvent> writer, @NonNull AutoScalerConfig configuration, @NonNull ScheduledExecutorService executor) {
        this(configuration, null, executor);
        if (writer == null) {
            throw new NullPointerException("writer is marked @NonNull but is null");
        }
        if (configuration == null) {
            throw new NullPointerException("configuration is marked @NonNull but is null");
        }
        if (executor == null) {
            throw new NullPointerException("executor is marked @NonNull but is null");
        }
        this.writer.set(writer);
    }

    @VisibleForTesting
    AutoScaleProcessor(@NonNull AutoScalerConfig configuration, EventStreamClientFactory clientFactory, @NonNull ScheduledExecutorService executor) {
        if (configuration == null) {
            throw new NullPointerException("configuration is marked @NonNull but is null");
        }
        if (executor == null) {
            throw new NullPointerException("executor is marked @NonNull but is null");
        }
        this.configuration = configuration;
        this.writer = new AtomicReference();
        this.clientFactory = clientFactory;
        this.cache = CacheBuilder.newBuilder().initialCapacity(1000).maximumSize(1000000L).expireAfterAccess(configuration.getCacheExpiry().getSeconds(), TimeUnit.SECONDS).removalListener(RemovalListeners.asynchronous(notification -> {
            if (notification.getCause().equals((Object)RemovalCause.EXPIRED)) {
                this.triggerScaleDown((String)notification.getKey(), true);
            }
        }, (Executor)executor)).build();
        this.cacheCleanup = executor.scheduleAtFixedRate(() -> this.cache.cleanUp(), 0L, configuration.getCacheCleanup().getSeconds(), TimeUnit.SECONDS);
        if (clientFactory != null) {
            this.bootstrapRequestWriters(clientFactory, executor);
        }
    }

    @Override
    public void close() {
        EventStreamWriter<AutoScaleEvent> w = this.writer.get();
        if (w != null) {
            w.close();
            this.writer.set(null);
        }
        this.clientFactory.close();
        this.cacheCleanup.cancel(true);
    }

    private void bootstrapRequestWriters(EventStreamClientFactory clientFactory, ScheduledExecutorService executor) {
        executor.schedule(() -> Retry.indefinitelyWithExpBackoff((long)100L, (int)10, (long)10000L, this::handleBootstrapException).runInExecutor(() -> this.bootstrapOnce(clientFactory), executor), 10L, TimeUnit.SECONDS);
    }

    private void handleBootstrapException(Throwable e) {
        log.warn("Unable to create writer for requeststream: {}.", LoggerHelpers.exceptionSummary((Logger)log, (Throwable)e));
    }

    private void bootstrapOnce(EventStreamClientFactory clientFactory) {
        EventWriterConfig writerConfig = EventWriterConfig.builder().build();
        this.writer.set((EventStreamWriter<AutoScaleEvent>)clientFactory.createEventWriter(this.configuration.getInternalRequestStream(), (Serializer)SERIALIZER, writerConfig));
        log.info("AutoScale Processor Initialized. RequestStream={}", (Object)this.configuration.getInternalRequestStream());
    }

    private static EventStreamClientFactory createFactory(AutoScalerConfig configuration) {
        return EventStreamClientFactory.withScope((String)"_system", (ClientConfig)AutoScaleProcessor.prepareClientConfig(configuration));
    }

    @VisibleForTesting
    static ClientConfig prepareClientConfig(AutoScalerConfig configuration) {
        ClientConfig.ClientConfigBuilder clientConfigBuilder = ClientConfig.builder().controllerURI(configuration.getControllerUri());
        if (configuration.isTlsEnabled()) {
            log.debug("Auto scale TLS is enabled");
            clientConfigBuilder.trustStore(configuration.getTlsCertFile()).validateHostName(configuration.isValidateHostName());
            if (!AutoScaleProcessor.hasTlsEnabled(configuration.getControllerUri())) {
                log.debug("Auto scale Controller URI [{}] has a non-TLS scheme, although auto scale TLS is enabled", (Object)configuration.getControllerUri());
                clientConfigBuilder.enableTlsToSegmentStore(true);
            }
        }
        return clientConfigBuilder.build();
    }

    @VisibleForTesting
    static boolean hasTlsEnabled(@NonNull URI controllerURI) {
        if (controllerURI == null) {
            throw new NullPointerException("controllerURI is marked @NonNull but is null");
        }
        Preconditions.checkNotNull((Object)controllerURI);
        String uriScheme = controllerURI.getScheme();
        if (uriScheme == null) {
            return false;
        }
        return uriScheme.equals("tls") || uriScheme.equals("pravegas");
    }

    private boolean isInitialized() {
        return this.writer.get() != null;
    }

    private void triggerScaleUp(String streamSegmentName, int numOfSplits) {
        if (this.isInitialized()) {
            Pair pair = (Pair)this.cache.getIfPresent((Object)streamSegmentName);
            long lastRequestTs = 0L;
            if (pair != null && pair.getKey() != null) {
                lastRequestTs = (Long)pair.getKey();
            }
            long timestamp = System.currentTimeMillis();
            long requestId = this.requestIdGenerator.get();
            if (timestamp - lastRequestTs > this.configuration.getMuteDuration().toMillis()) {
                log.info(requestId, "sending request for scale up for {}", new Object[]{streamSegmentName});
                Segment segment = Segment.fromScopedName((String)streamSegmentName);
                AutoScaleEvent event = new AutoScaleEvent(segment.getScope(), segment.getStreamName(), segment.getSegmentId(), 0, timestamp, numOfSplits, false, requestId);
                this.writeRequest(event, () -> this.cache.put((Object)streamSegmentName, (Object)new ImmutablePair((Object)timestamp, (Object)timestamp)));
            }
        }
    }

    private void triggerScaleDown(String streamSegmentName, boolean silent) {
        if (this.isInitialized()) {
            Pair pair = (Pair)this.cache.getIfPresent((Object)streamSegmentName);
            long lastRequestTs = 0L;
            if (pair != null && pair.getValue() != null) {
                lastRequestTs = (Long)pair.getValue();
            }
            long timestamp = System.currentTimeMillis();
            long requestId = this.requestIdGenerator.get();
            if (timestamp - lastRequestTs > this.configuration.getMuteDuration().toMillis()) {
                log.info(requestId, "sending request for scale down for {}", new Object[]{streamSegmentName});
                Segment segment = Segment.fromScopedName((String)streamSegmentName);
                AutoScaleEvent event = new AutoScaleEvent(segment.getScope(), segment.getStreamName(), segment.getSegmentId(), 1, timestamp, 0, silent, requestId);
                this.writeRequest(event, () -> {
                    if (!silent) {
                        this.cache.put((Object)streamSegmentName, (Object)new ImmutablePair((Object)0L, (Object)timestamp));
                    }
                });
            }
        }
    }

    private void writeRequest(AutoScaleEvent event, Runnable successCallback) {
        EventStreamWriter<AutoScaleEvent> writer = this.writer.get();
        if (writer == null) {
            log.warn(event.getRequestId(), "Writer not bootstrapped; unable to post Scale Event {}.", new Object[]{event});
        } else {
            writer.writeEvent(event.getKey(), (Object)event).whenComplete((r, e) -> {
                if (e != null) {
                    log.error(event.getRequestId(), "Unable to post Scale Event to RequestStream '{}'.", new Object[]{this.configuration.getInternalRequestStream(), e});
                } else {
                    log.debug(event.getRequestId(), "Scale Event posted successfully: {}.", new Object[]{event});
                    successCallback.run();
                }
            });
        }
    }

    void report(String streamSegmentName, long targetRate, long startTime, double twoMinuteRate, double fiveMinuteRate, double tenMinuteRate, double twentyMinuteRate) {
        long currentTime;
        log.info("received traffic for {} with twoMinute rate = {} and targetRate = {}", new Object[]{streamSegmentName, twoMinuteRate, targetRate});
        if (this.isInitialized() && (currentTime = System.currentTimeMillis()) - startTime > this.configuration.getCooldownDuration().toMillis()) {
            log.debug("cool down period elapsed for {}", (Object)streamSegmentName);
            if (twoMinuteRate > 5.0 * (double)targetRate && currentTime - startTime > TWO_MINUTES || fiveMinuteRate > 2.0 * (double)targetRate && currentTime - startTime > FIVE_MINUTES || tenMinuteRate > (double)targetRate && currentTime - startTime > TEN_MINUTES) {
                int numOfSplits = Math.max(2, (int)(Double.max(Double.max(twoMinuteRate, fiveMinuteRate), tenMinuteRate) / (double)targetRate));
                log.debug("triggering scale up for {} with number of splits {}", (Object)streamSegmentName, (Object)numOfSplits);
                this.triggerScaleUp(streamSegmentName, numOfSplits);
            }
            if (twoMinuteRate < (double)targetRate && fiveMinuteRate < (double)targetRate && tenMinuteRate < (double)targetRate && twentyMinuteRate < (double)targetRate / 2.0 && currentTime - startTime > TWENTY_MINUTES) {
                log.debug("triggering scale down for {}", (Object)streamSegmentName);
                this.triggerScaleDown(streamSegmentName, false);
            }
        }
    }

    void notifyCreated(String segmentStreamName) {
        this.cache.put((Object)segmentStreamName, (Object)new ImmutablePair((Object)System.currentTimeMillis(), (Object)System.currentTimeMillis()));
    }

    void notifySealed(String segmentStreamName) {
        this.cache.invalidate((Object)segmentStreamName);
    }

    @VisibleForTesting
    void put(String streamSegmentName, ImmutablePair<Long, Long> lrImmutablePair) {
        this.cache.put((Object)streamSegmentName, lrImmutablePair);
    }

    @VisibleForTesting
    Pair<Long, Long> get(String streamSegmentName) {
        return (Pair)this.cache.getIfPresent((Object)streamSegmentName);
    }

    private static class EventSerializer
    implements Serializer<AutoScaleEvent> {
        private final ControllerEventSerializer baseSerializer = new ControllerEventSerializer();

        private EventSerializer() {
        }

        public ByteBuffer serialize(AutoScaleEvent value) {
            return this.baseSerializer.toByteBuffer((ControllerEvent)value);
        }

        public AutoScaleEvent deserialize(ByteBuffer serializedValue) {
            return (AutoScaleEvent)this.baseSerializer.fromByteBuffer(serializedValue);
        }
    }
}

