/*
 * Decompiled with CFR 0.152.
 */
package ratpack.stream.internal;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import ratpack.func.Function;

public class PeriodicPublisher<T>
implements Publisher<T> {
    private final ScheduledExecutorService executorService;
    private final Function<Integer, T> producer;
    private final long delay;
    private final TimeUnit timeUnit;

    public PeriodicPublisher(ScheduledExecutorService executorService, Function<Integer, T> producer, long delay, TimeUnit timeUnit) {
        this.executorService = executorService;
        this.producer = producer;
        this.delay = delay;
        this.timeUnit = timeUnit;
    }

    public void subscribe(final Subscriber<T> s) {
        s.onSubscribe(new Subscription(){
            private final AtomicInteger counter = new AtomicInteger(0);
            private final ScheduledFuture<?> future = PeriodicPublisher.access$500(PeriodicPublisher.this).scheduleWithFixedDelay(new Runnable(){

                @Override
                public void run() {
                    Object value;
                    int i = counter.getAndIncrement();
                    try {
                        value = PeriodicPublisher.this.producer.apply(i);
                    }
                    catch (Exception e) {
                        future.cancel(false);
                        s.onError((Throwable)e);
                        return;
                    }
                    if (value == null) {
                        s.onComplete();
                        future.cancel(false);
                    } else {
                        s.onNext(value);
                    }
                }
            }, 0L, PeriodicPublisher.access$300(PeriodicPublisher.this), PeriodicPublisher.access$400(PeriodicPublisher.this));

            public void request(int n) {
            }

            public void cancel() {
                this.future.cancel(false);
            }
        });
    }

    static /* synthetic */ long access$300(PeriodicPublisher x0) {
        return x0.delay;
    }

    static /* synthetic */ TimeUnit access$400(PeriodicPublisher x0) {
        return x0.timeUnit;
    }

    static /* synthetic */ ScheduledExecutorService access$500(PeriodicPublisher x0) {
        return x0.executorService;
    }
}

