/*
 * Decompiled with CFR 0.152.
 */
package ratpack.stream.internal;

import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import ratpack.func.Function;
import ratpack.stream.TransformablePublisher;

public class PeriodicPublisher<T>
implements TransformablePublisher<T> {
    private final ScheduledExecutorService executorService;
    private final Function<? super Integer, ? extends T> producer;
    private final Duration duration;

    public PeriodicPublisher(ScheduledExecutorService executorService, Function<? super Integer, ? extends T> producer, Duration duration) {
        this.executorService = executorService;
        this.producer = producer;
        this.duration = duration;
    }

    public void subscribe(Subscriber<? super T> s) {
        s.onSubscribe((Subscription)new PeriodicSubscription(s));
    }

    private class PeriodicSubscription
    implements Subscription {
        private final AtomicInteger counter;
        private final ScheduledFuture<?> future;
        private Subscriber<? super T> s;

        public PeriodicSubscription(Subscriber<? super T> subscription) {
            this.s = subscription;
            this.counter = new AtomicInteger(0);
            this.future = PeriodicPublisher.this.executorService.scheduleWithFixedDelay(this::run, 0L, PeriodicPublisher.this.duration.toNanos(), TimeUnit.NANOSECONDS);
        }

        private void run() {
            Object value;
            int i = this.counter.getAndIncrement();
            try {
                value = PeriodicPublisher.this.producer.apply(i);
            }
            catch (Exception e) {
                this.s.onError((Throwable)e);
                this.cancel();
                return;
            }
            if (value == null) {
                this.s.onComplete();
                this.cancel();
            } else {
                this.s.onNext(value);
            }
        }

        public void request(long n) {
        }

        public void cancel() {
            this.future.cancel(false);
            this.s = null;
        }
    }
}

