/*
 * Decompiled with CFR 0.152.
 */
package com.spotify.google.cloud.pubsub.client;

import com.google.common.util.concurrent.MoreExecutors;
import com.spotify.google.cloud.pubsub.client.Acker;
import com.spotify.google.cloud.pubsub.client.Backoff;
import com.spotify.google.cloud.pubsub.client.Message;
import com.spotify.google.cloud.pubsub.client.Pubsub;
import com.spotify.google.cloud.pubsub.client.ReceivedMessage;
import com.spotify.google.cloud.pubsub.client.RequestFailedException;
import com.swrve.ratelimitedlogger.RateLimitedLog;
import java.io.Closeable;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Puller
implements Closeable {
    private static final int MAX_LOG_RATE = 3;
    private static final Duration MAX_LOG_DURATION = Duration.millis((long)2000L);
    private static final Logger logger = LoggerFactory.getLogger(Puller.class);
    private static final Logger LOG = RateLimitedLog.withRateLimit((Logger)logger).maxRate(3).every(MAX_LOG_DURATION).build();
    private final ScheduledExecutorService scheduler = MoreExecutors.getExitingScheduledExecutorService((ScheduledThreadPoolExecutor)new ScheduledThreadPoolExecutor(1));
    private final Acker acker;
    private final Pubsub pubsub;
    private final String project;
    private final String subscription;
    private final MessageHandler handler;
    private final int concurrency;
    private final int batchSize;
    private final int maxOutstandingMessages;
    private final int maxAckQueueSize;
    private final long pullIntervalMillis;
    private final Backoff backoff;
    private final AtomicInteger outstandingRequests = new AtomicInteger();
    private final AtomicInteger outstandingMessages = new AtomicInteger();

    public Puller(Builder builder) {
        this.pubsub = Objects.requireNonNull(builder.pubsub, "pubsub");
        this.project = Objects.requireNonNull(builder.project, "project");
        this.subscription = Objects.requireNonNull(builder.subscription, "subscription");
        this.handler = Objects.requireNonNull(builder.handler, "handler");
        this.concurrency = builder.concurrency;
        this.batchSize = builder.batchSize;
        this.maxOutstandingMessages = builder.maxOutstandingMessages;
        this.maxAckQueueSize = builder.maxAckQueueSize;
        this.pullIntervalMillis = builder.pullIntervalMillis;
        this.backoff = Backoff.builder().initialInterval(builder.pullIntervalMillis).maxBackoffMultiplier(builder.maxBackoffMultiplier).build();
        this.acker = Acker.builder().pubsub(this.pubsub).project(this.project).subscription(this.subscription).batchSize(this.batchSize).concurrency(this.concurrency).queueSize(this.maxAckQueueSize).build();
        this.pull();
        this.scheduler.scheduleWithFixedDelay(this::pull, this.pullIntervalMillis, this.pullIntervalMillis, TimeUnit.MILLISECONDS);
    }

    @Override
    public void close() throws IOException {
        this.scheduler.shutdownNow();
        try {
            this.scheduler.awaitTermination(30L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        this.acker.close();
    }

    public int maxAckQueueSize() {
        return this.maxAckQueueSize;
    }

    public int maxOutstandingMessages() {
        return this.maxOutstandingMessages;
    }

    public int outstandingMessages() {
        return this.outstandingMessages.get();
    }

    public int concurrency() {
        return this.concurrency;
    }

    public int outstandingRequests() {
        return this.outstandingRequests.get();
    }

    public int batchSize() {
        return this.batchSize;
    }

    public String subscription() {
        return this.subscription;
    }

    public String project() {
        return this.project;
    }

    public long pullIntervalMillis() {
        return this.pullIntervalMillis;
    }

    private void pull() {
        while (this.outstandingRequests.get() < this.concurrency && this.outstandingMessages.get() < this.maxOutstandingMessages) {
            this.pullBatch();
        }
    }

    private void pullBatch() {
        this.outstandingRequests.incrementAndGet();
        this.pubsub.pull(this.project, this.subscription, true, this.batchSize).whenComplete((messages, ex) -> {
            this.outstandingRequests.decrementAndGet();
            if (ex != null) {
                if (ex instanceof RequestFailedException && ((RequestFailedException)ex).statusCode() == 429) {
                    LOG.debug("Going too fast, backing off");
                } else {
                    LOG.error("Pull failed", ex);
                }
                this.backoff.sleep();
                return;
            }
            this.backoff.reset();
            this.outstandingMessages.addAndGet(messages.size());
            for (ReceivedMessage message : messages) {
                CompletionStage<String> handlerFuture;
                try {
                    handlerFuture = this.handler.handleMessage(this, this.subscription, message.message(), message.ackId());
                }
                catch (Exception e) {
                    this.outstandingMessages.decrementAndGet();
                    LOG.error("Message handler threw exception", (Throwable)e);
                    continue;
                }
                if (handlerFuture == null) {
                    this.outstandingMessages.decrementAndGet();
                    LOG.error("Message handler returned null");
                    continue;
                }
                handlerFuture.whenComplete((ignore, throwable) -> this.outstandingMessages.decrementAndGet());
                handlerFuture.thenAccept(this.acker::acknowledge).exceptionally(throwable -> {
                    if (!(throwable instanceof CancellationException)) {
                        LOG.error("Acking pubsub threw exception", throwable);
                    }
                    return null;
                });
            }
        });
    }

    public static Builder builder() {
        return new Builder();
    }

    public static class Builder {
        private Pubsub pubsub;
        private String project;
        private String subscription;
        private MessageHandler handler;
        private int concurrency = 64;
        private int batchSize = 1000;
        private int maxOutstandingMessages = 64000;
        private int maxAckQueueSize = 10 * this.batchSize;
        private long pullIntervalMillis = 1000L;
        private int maxBackoffMultiplier = 0;

        public Builder pubsub(Pubsub pubsub) {
            this.pubsub = pubsub;
            return this;
        }

        public Builder project(String project) {
            this.project = project;
            return this;
        }

        public Builder subscription(String subscription) {
            this.subscription = subscription;
            return this;
        }

        public Builder messageHandler(MessageHandler messageHandler) {
            this.handler = messageHandler;
            return this;
        }

        public Builder concurrency(int concurrency) {
            this.concurrency = concurrency;
            return this;
        }

        public Builder batchSize(int batchSize) {
            this.batchSize = batchSize;
            return this;
        }

        public Builder maxOutstandingMessages(int maxOutstandingMessages) {
            this.maxOutstandingMessages = maxOutstandingMessages;
            return this;
        }

        public Builder maxAckQueueSize(int maxAckQueueSize) {
            this.maxAckQueueSize = maxAckQueueSize;
            return this;
        }

        public Builder pullIntervalMillis(long pullIntervalMillis) {
            this.pullIntervalMillis = pullIntervalMillis;
            return this;
        }

        public Builder maxBackoffMultiplier(int maxBackoffMultiplier) {
            this.maxBackoffMultiplier = maxBackoffMultiplier;
            return this;
        }

        public Puller build() {
            return new Puller(this);
        }
    }

    public static interface MessageHandler {
        public CompletionStage<String> handleMessage(Puller var1, String var2, Message var3, String var4);
    }
}

