/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.gcp.pubsub.reactive;

import com.google.api.gax.rpc.DeadlineExceededException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberOperations;
import org.springframework.cloud.gcp.pubsub.support.AcknowledgeablePubsubMessage;
import org.springframework.util.Assert;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Scheduler;

public final class PubSubReactiveFactory {
    private static final Log LOGGER = LogFactory.getLog(PubSubReactiveFactory.class);
    private final PubSubSubscriberOperations subscriberOperations;
    private final Scheduler scheduler;

    public PubSubReactiveFactory(PubSubSubscriberOperations subscriberOperations, Scheduler scheduler) {
        Assert.notNull((Object)subscriberOperations, (String)"subscriberOperations cannot be null.");
        Assert.notNull((Object)scheduler, (String)"scheduler cannot be null.");
        this.subscriberOperations = subscriberOperations;
        this.scheduler = scheduler;
    }

    public Flux<AcknowledgeablePubsubMessage> poll(String subscriptionName, long pollingPeriodMs) {
        return Flux.create(sink -> {
            Scheduler.Worker subscriptionWorker = this.scheduler.createWorker();
            sink.onRequest(numRequested -> {
                if (numRequested == Long.MAX_VALUE) {
                    subscriptionWorker.schedulePeriodically((Runnable)new NonBlockingUnlimitedDemandPullTask(subscriptionName, (FluxSink<AcknowledgeablePubsubMessage>)sink), 0L, pollingPeriodMs, TimeUnit.MILLISECONDS);
                } else {
                    subscriptionWorker.schedule((Runnable)new BlockingLimitedDemandPullTask(subscriptionName, numRequested, (FluxSink<AcknowledgeablePubsubMessage>)sink));
                }
            });
            sink.onCancel((Disposable)subscriptionWorker);
        });
    }

    private class NonBlockingUnlimitedDemandPullTask
    extends PubSubPullTask {
        NonBlockingUnlimitedDemandPullTask(String subscriptionName, FluxSink<AcknowledgeablePubsubMessage> sink) {
            super(subscriptionName, sink);
        }

        @Override
        public void run() {
            this.pullToSink(Integer.MAX_VALUE, false);
        }
    }

    private class BlockingLimitedDemandPullTask
    extends PubSubPullTask {
        private final long initialDemand;

        BlockingLimitedDemandPullTask(String subscriptionName, long initialDemand, FluxSink<AcknowledgeablePubsubMessage> sink) {
            super(subscriptionName, sink);
            this.initialDemand = initialDemand;
        }

        @Override
        public void run() {
            long demand = this.initialDemand;
            while (demand > 0L && !this.sink.isCancelled()) {
                try {
                    int intDemand = demand > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int)demand;
                    demand -= (long)this.pullToSink(intDemand, true);
                }
                catch (DeadlineExceededException e) {
                    if (!LOGGER.isTraceEnabled()) continue;
                    LOGGER.trace((Object)("Blocking pull timed out due to empty subscription " + this.subscriptionName + "; retrying."));
                }
            }
        }
    }

    private abstract class PubSubPullTask
    implements Runnable {
        protected final String subscriptionName;
        protected final FluxSink<AcknowledgeablePubsubMessage> sink;

        PubSubPullTask(String subscriptionName, FluxSink<AcknowledgeablePubsubMessage> sink) {
            this.subscriptionName = subscriptionName;
            this.sink = sink;
        }

        protected int pullToSink(int demand, boolean block) {
            List<AcknowledgeablePubsubMessage> messages = PubSubReactiveFactory.this.subscriberOperations.pull(this.subscriptionName, demand, !block);
            if (!this.sink.isCancelled()) {
                messages.forEach(arg_0 -> this.sink.next(arg_0));
            }
            return messages.size();
        }
    }
}

