/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.config.etcd;

import io.helidon.common.reactive.Flow;
import io.helidon.common.reactive.SubmissionPublisher;
import io.helidon.config.ConfigException;
import io.helidon.config.ConfigHelper;
import io.helidon.config.etcd.EtcdConfigSourceBuilder;
import io.helidon.config.etcd.internal.client.EtcdClient;
import io.helidon.config.etcd.internal.client.EtcdClientException;
import io.helidon.config.spi.PollingStrategy;
import java.time.Instant;
import java.util.logging.Level;
import java.util.logging.Logger;

public class EtcdWatchPollingStrategy
implements PollingStrategy {
    private static final Logger LOGGER = Logger.getLogger(EtcdWatchPollingStrategy.class.getName());
    private final EtcdConfigSourceBuilder.EtcdEndpoint endpoint;
    private final EtcdClient etcdClient;
    private final SubmissionPublisher<PollingStrategy.PollingEvent> ticksSubmitter;
    private final Flow.Publisher<PollingStrategy.PollingEvent> ticksPublisher;
    private EtcdWatchSubscriber etcdWatchSubscriber;

    public static EtcdWatchPollingStrategy create(EtcdConfigSourceBuilder.EtcdEndpoint endpoint) {
        return new EtcdWatchPollingStrategy(endpoint);
    }

    public EtcdWatchPollingStrategy(EtcdConfigSourceBuilder.EtcdEndpoint endpoint) {
        this.endpoint = endpoint;
        this.etcdClient = endpoint.api().clientFactory().createClient(endpoint.uri());
        this.ticksSubmitter = new SubmissionPublisher(Runnable::run, 1);
        this.ticksPublisher = ConfigHelper.suspendablePublisher(this.ticksSubmitter, this::subscribePollingStrategy, this::cancelPollingStrategy);
    }

    EtcdClient etcdClient() {
        return this.etcdClient;
    }

    void subscribePollingStrategy() {
        this.etcdWatchSubscriber = new EtcdWatchSubscriber();
        try {
            Flow.Publisher<Long> watchPublisher = this.etcdClient().watch(this.endpoint.key());
            watchPublisher.subscribe((Flow.Subscriber)this.etcdWatchSubscriber);
        }
        catch (EtcdClientException ex) {
            this.ticksSubmitter.closeExceptionally((Throwable)new ConfigException(String.format("Subscription on watching on '%s' key has failed. Watching by '%s' polling strategy will not start.", this.endpoint.key(), this), (Throwable)ex));
        }
    }

    void cancelPollingStrategy() {
        this.etcdWatchSubscriber.cancelSubscription();
        this.etcdWatchSubscriber = null;
    }

    public Flow.Publisher<PollingStrategy.PollingEvent> ticks() {
        return this.ticksPublisher;
    }

    private void fireEvent(Long item) {
        this.ticksSubmitter.offer((Object)EtcdPollingEvent.from(item), (subscriber, pollingEvent) -> {
            LOGGER.log(Level.FINER, String.format("Event %s has not been delivered to %s.", pollingEvent, subscriber));
            return false;
        });
    }

    EtcdConfigSourceBuilder.EtcdEndpoint etcdEndpoint() {
        return this.endpoint;
    }

    private class EtcdWatchSubscriber
    implements Flow.Subscriber<Long> {
        private Flow.Subscription subscription;

        private EtcdWatchSubscriber() {
        }

        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            subscription.request(Long.MAX_VALUE);
        }

        public void onNext(Long item) {
            EtcdWatchPollingStrategy.this.fireEvent(item);
        }

        public void onError(Throwable throwable) {
            EtcdWatchPollingStrategy.this.ticksSubmitter.closeExceptionally((Throwable)new ConfigException(String.format("Watching on '%s' key has failed. Watching by '%s' polling strategy will not continue. %s", EtcdWatchPollingStrategy.this.endpoint.key(), EtcdWatchPollingStrategy.this, throwable.getLocalizedMessage()), throwable));
        }

        public void onComplete() {
            LOGGER.fine(String.format("Watching on '%s' key has completed. Watching by '%s' polling strategy will not continue.", EtcdWatchPollingStrategy.this.endpoint.key(), EtcdWatchPollingStrategy.this));
            EtcdWatchPollingStrategy.this.ticksSubmitter.close();
        }

        private void cancelSubscription() {
            if (this.subscription != null) {
                this.subscription.cancel();
            }
        }
    }

    private static interface EtcdPollingEvent
    extends PollingStrategy.PollingEvent {
        public Long index();

        public static EtcdPollingEvent from(final Long index) {
            final Instant timestamp = Instant.now();
            return new EtcdPollingEvent(){

                @Override
                public Long index() {
                    return index;
                }

                public Instant timestamp() {
                    return timestamp;
                }

                public String toString() {
                    return "EtcdPollingEvent @ " + timestamp + " # " + index;
                }
            };
        }
    }
}

