/*
 * Decompiled with CFR 0.152.
 */
package ratpack.stream.internal;

import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import ratpack.func.Function;

public class PeriodicPublisher<T>
implements Publisher<T> {
    private final ScheduledExecutorService executorService;
    private final Function<Integer, T> producer;
    private final Duration duration;

    public PeriodicPublisher(ScheduledExecutorService executorService, Function<Integer, T> producer, Duration duration) {
        this.executorService = executorService;
        this.producer = producer;
        this.duration = duration;
    }

    public void subscribe(final Subscriber<? super T> s) {
        s.onSubscribe(new Subscription(){
            private final AtomicInteger counter = new AtomicInteger(0);
            private final ScheduledFuture<?> future = PeriodicPublisher.access$100(PeriodicPublisher.this).scheduleWithFixedDelay(() -> {
                Object value;
                int i = this.counter.getAndIncrement();
                try {
                    value = PeriodicPublisher.this.producer.apply(i);
                }
                catch (Exception e) {
                    this.cancel();
                    s.onError((Throwable)e);
                    return;
                }
                if (value == null) {
                    s.onComplete();
                    this.cancel();
                } else {
                    s.onNext(value);
                }
            }, 0L, PeriodicPublisher.access$000(PeriodicPublisher.this).toNanos(), TimeUnit.NANOSECONDS);

            public void request(long n) {
            }

            public void cancel() {
                this.future.cancel(false);
            }
        });
    }

    static /* synthetic */ Duration access$000(PeriodicPublisher x0) {
        return x0.duration;
    }

    static /* synthetic */ ScheduledExecutorService access$100(PeriodicPublisher x0) {
        return x0.executorService;
    }
}

