/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.controller.server.bucket;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.AbstractService;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.impl.StreamImpl;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.tracing.TagLogger;
import io.pravega.common.util.BlockingDrainingQueue;
import io.pravega.controller.server.bucket.BucketWork;
import io.pravega.controller.store.stream.BucketStore;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.GuardedBy;
import org.slf4j.LoggerFactory;

abstract class BucketService
extends AbstractService {
    private static final int MAX_NOTIFICATIONS_TO_TAKE = 100;
    private static final long DELAY_IN_MILLIS = 100L;
    private static final TagLogger log = new TagLogger(LoggerFactory.getLogger(BucketService.class));
    protected final ScheduledExecutorService executor;
    private final int bucketId;
    private final BucketStore.ServiceType serviceType;
    @GuardedBy(value="lock")
    private int availableSlots;
    private final Object lock = new Object();
    @GuardedBy(value="lock")
    private final PriorityQueue<QueueElement> workQueue;
    @GuardedBy(value="lock")
    private final Set<Stream> knownStreams;
    private final BlockingDrainingQueue<StreamNotification> notifications;
    private final CompletableFuture<Void> serviceStartFuture;
    private final AtomicReference<CompletableFuture<Void>> notificationLoop;
    private final AtomicReference<CompletableFuture<Void>> workerLoop;
    private final Duration executionPeriod;
    private final BucketWork bucketWork;

    BucketService(BucketStore.ServiceType serviceType, int bucketId, ScheduledExecutorService executor, int maxConcurrentExecutions, Duration executionPeriod, BucketWork bucketWork) {
        this.serviceType = serviceType;
        this.bucketId = bucketId;
        this.executor = executor;
        this.notifications = new BlockingDrainingQueue();
        this.serviceStartFuture = new CompletableFuture();
        this.notificationLoop = new AtomicReference<CompletableFuture<Object>>(CompletableFuture.completedFuture(null));
        this.workerLoop = new AtomicReference<CompletableFuture<Object>>(CompletableFuture.completedFuture(null));
        this.availableSlots = maxConcurrentExecutions;
        this.knownStreams = new HashSet<Stream>();
        this.workQueue = new PriorityQueue<QueueElement>(Comparator.comparingLong(x -> ((QueueElement)x).nextExecutionTimeInMillis));
        this.executionPeriod = executionPeriod;
        this.bucketWork = bucketWork;
    }

    public void doStart() {
        CompletableFuture.runAsync(() -> {
            try {
                this.startBucketChangeListener();
                this.notifyStarted();
                this.notificationLoop.set(Futures.loop(() -> ((BucketService)this).isRunning(), this::processNotification, (Executor)this.executor));
                log.info("{}: Notification loop started for bucket {}", (Object)this.serviceType, (Object)this.bucketId);
                this.workerLoop.set(Futures.loop(() -> ((BucketService)this).isRunning(), this::work, (Executor)this.executor));
                log.info("{}: Notification loop started for bucket {}", (Object)this.serviceType, (Object)this.bucketId);
            }
            finally {
                log.info("{}: bucket {} service start completed", (Object)this.getServiceType(), (Object)this.getBucketId());
                this.serviceStartFuture.complete(null);
            }
        });
    }

    abstract void startBucketChangeListener();

    abstract void stopBucketChangeListener();

    private CompletableFuture<Void> processNotification() {
        return this.notifications.take(100).thenAccept(queue -> queue.forEach(notification -> {
            switch (notification.getType()) {
                case StreamAdded: {
                    this.handleStreamAdded((StreamNotification)notification);
                    break;
                }
                case StreamRemoved: {
                    this.handleStreamRemoved((StreamNotification)notification);
                    break;
                }
                case ConnectivityError: {
                    log.warn("{}: StreamNotification for connectivity error", (Object)this.serviceType);
                }
            }
        }));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleStreamRemoved(StreamNotification notification) {
        log.info("{}: Stream {}/{} removed from bucket {}", new Object[]{this.serviceType, notification.getScope(), notification.getStream(), this.bucketId});
        StreamImpl stream = new StreamImpl(notification.getScope(), notification.getStream());
        Object object = this.lock;
        synchronized (object) {
            this.knownStreams.remove(stream);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleStreamAdded(StreamNotification notification) {
        log.info("{}: New stream {}/{} added to bucket {} ", new Object[]{this.serviceType, notification.getScope(), notification.getStream(), this.bucketId});
        StreamImpl stream = new StreamImpl(notification.getScope(), notification.getStream());
        long nextRun = System.currentTimeMillis() + this.executionPeriod.toMillis();
        Object object = this.lock;
        synchronized (object) {
            if (!this.knownStreams.contains(stream)) {
                this.knownStreams.add((Stream)stream);
                this.workQueue.add(new QueueElement((Stream)stream, nextRun));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> work() {
        QueueElement element;
        long time = System.currentTimeMillis();
        long delayInMillis = 0L;
        Object object = this.lock;
        synchronized (object) {
            element = this.workQueue.peek();
            if (this.availableSlots > 0 && element != null && element.nextExecutionTimeInMillis <= time) {
                element = this.workQueue.poll();
                if (!this.knownStreams.contains(element.getStream())) {
                    element = null;
                } else {
                    --this.availableSlots;
                }
            } else {
                delayInMillis = 100L;
                element = null;
            }
        }
        if (element != null) {
            Stream stream = element.getStream();
            this.bucketWork.doWork(stream).handle((r, e) -> {
                long nextRun = System.currentTimeMillis() + this.executionPeriod.toMillis();
                Object object = this.lock;
                synchronized (object) {
                    if (this.knownStreams.contains(stream)) {
                        this.workQueue.add(new QueueElement(stream, nextRun));
                    }
                    ++this.availableSlots;
                    return null;
                }
            });
        }
        return Futures.delayedFuture((Duration)Duration.ofMillis(delayInMillis), (ScheduledExecutorService)this.executor);
    }

    protected void doStop() {
        log.info("{}: Stop request received for bucket {}", (Object)this.serviceType, (Object)this.bucketId);
        this.serviceStartFuture.thenRun(() -> {
            this.notificationLoop.get().cancel(true);
            this.workerLoop.get().cancel(true);
            this.stopBucketChangeListener();
            CompletableFuture.allOf(this.notificationLoop.get(), this.workerLoop.get()).whenComplete((r, e) -> {
                log.info("{}: Cancellation for all background work for bucket {} issued", (Object)this.serviceType, (Object)this.bucketId);
                this.notifyStopped();
            });
        });
    }

    public void notify(StreamNotification notification) {
        this.notifications.add((Object)notification);
    }

    @VisibleForTesting
    Set<Stream> getKnownStreams() {
        return Collections.unmodifiableSet(this.knownStreams);
    }

    @VisibleForTesting
    Collection<QueueElement> getWorkerQueue() {
        return Collections.unmodifiableCollection(this.workQueue);
    }

    @SuppressFBWarnings(justification="generated code")
    protected int getBucketId() {
        return this.bucketId;
    }

    @SuppressFBWarnings(justification="generated code")
    protected BucketStore.ServiceType getServiceType() {
        return this.serviceType;
    }

    public static enum NotificationType {
        StreamAdded,
        StreamRemoved,
        ConnectivityError;

    }

    static class StreamNotification {
        private final String scope;
        private final String stream;
        private final NotificationType type;

        @ConstructorProperties(value={"scope", "stream", "type"})
        @SuppressFBWarnings(justification="generated code")
        public StreamNotification(String scope, String stream, NotificationType type) {
            this.scope = scope;
            this.stream = stream;
            this.type = type;
        }

        @SuppressFBWarnings(justification="generated code")
        public String getScope() {
            return this.scope;
        }

        @SuppressFBWarnings(justification="generated code")
        public String getStream() {
            return this.stream;
        }

        @SuppressFBWarnings(justification="generated code")
        public NotificationType getType() {
            return this.type;
        }

        @SuppressFBWarnings(justification="generated code")
        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof StreamNotification)) {
                return false;
            }
            StreamNotification other = (StreamNotification)o;
            if (!other.canEqual(this)) {
                return false;
            }
            String this$scope = this.getScope();
            String other$scope = other.getScope();
            if (this$scope == null ? other$scope != null : !this$scope.equals(other$scope)) {
                return false;
            }
            String this$stream = this.getStream();
            String other$stream = other.getStream();
            if (this$stream == null ? other$stream != null : !this$stream.equals(other$stream)) {
                return false;
            }
            NotificationType this$type = this.getType();
            NotificationType other$type = other.getType();
            return !(this$type == null ? other$type != null : !((Object)((Object)this$type)).equals((Object)other$type));
        }

        @SuppressFBWarnings(justification="generated code")
        protected boolean canEqual(Object other) {
            return other instanceof StreamNotification;
        }

        @SuppressFBWarnings(justification="generated code")
        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $scope = this.getScope();
            result = result * 59 + ($scope == null ? 43 : $scope.hashCode());
            String $stream = this.getStream();
            result = result * 59 + ($stream == null ? 43 : $stream.hashCode());
            NotificationType $type = this.getType();
            result = result * 59 + ($type == null ? 43 : ((Object)((Object)$type)).hashCode());
            return result;
        }

        @SuppressFBWarnings(justification="generated code")
        public String toString() {
            return "BucketService.StreamNotification(scope=" + this.getScope() + ", stream=" + this.getStream() + ", type=" + (Object)((Object)this.getType()) + ")";
        }
    }

    static class QueueElement {
        private final Stream stream;
        private final long nextExecutionTimeInMillis;

        @ConstructorProperties(value={"stream", "nextExecutionTimeInMillis"})
        @SuppressFBWarnings(justification="generated code")
        public QueueElement(Stream stream, long nextExecutionTimeInMillis) {
            this.stream = stream;
            this.nextExecutionTimeInMillis = nextExecutionTimeInMillis;
        }

        @SuppressFBWarnings(justification="generated code")
        public Stream getStream() {
            return this.stream;
        }

        @SuppressFBWarnings(justification="generated code")
        public long getNextExecutionTimeInMillis() {
            return this.nextExecutionTimeInMillis;
        }

        @SuppressFBWarnings(justification="generated code")
        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof QueueElement)) {
                return false;
            }
            QueueElement other = (QueueElement)o;
            if (!other.canEqual(this)) {
                return false;
            }
            Stream this$stream = this.getStream();
            Stream other$stream = other.getStream();
            if (this$stream == null ? other$stream != null : !this$stream.equals(other$stream)) {
                return false;
            }
            return this.getNextExecutionTimeInMillis() == other.getNextExecutionTimeInMillis();
        }

        @SuppressFBWarnings(justification="generated code")
        protected boolean canEqual(Object other) {
            return other instanceof QueueElement;
        }

        @SuppressFBWarnings(justification="generated code")
        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            Stream $stream = this.getStream();
            result = result * 59 + ($stream == null ? 43 : $stream.hashCode());
            long $nextExecutionTimeInMillis = this.getNextExecutionTimeInMillis();
            result = result * 59 + (int)($nextExecutionTimeInMillis >>> 32 ^ $nextExecutionTimeInMillis);
            return result;
        }

        @SuppressFBWarnings(justification="generated code")
        public String toString() {
            return "BucketService.QueueElement(stream=" + this.getStream() + ", nextExecutionTimeInMillis=" + this.getNextExecutionTimeInMillis() + ")";
        }
    }
}

