/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.discovery.consul.watch;

import io.micronaut.core.annotation.Internal;
import io.micronaut.discovery.consul.client.v1.KeyValue;
import io.micronaut.discovery.consul.client.v1.blockingqueries.BlockedQueriesConsulClient;
import io.micronaut.discovery.consul.client.v1.blockingqueries.BlockingQueriesConfiguration;
import io.micronaut.discovery.consul.watch.PropertiesChangeHandler;
import io.micronaut.discovery.consul.watch.Watcher;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.exceptions.ReadTimeoutException;
import java.time.Duration;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;

/*
 * Uses 'sealed' constructs - enablewith --sealed true
 */
@Internal
abstract class AbstractWatcher<V>
implements Watcher {
    protected static final Integer NO_INDEX = null;
    private static final Logger LOG = LoggerFactory.getLogger(AbstractWatcher.class);
    protected final BlockedQueriesConsulClient consulClient;
    protected final Map<String, V> kvHolder = new ConcurrentHashMap<String, V>();
    private final List<String> kvPaths;
    private final BlockingQueriesConfiguration blockingQueriesConfiguration;
    private final PropertiesChangeHandler propertiesChangeHandler;
    private final Map<String, Disposable> listeners = new ConcurrentHashMap<String, Disposable>();
    private final Base64.Decoder base64Decoder = Base64.getDecoder();
    private volatile boolean started = false;
    private volatile boolean isInit = false;

    AbstractWatcher(List<String> kvPaths, BlockedQueriesConsulClient consulClient, BlockingQueriesConfiguration blockingQueriesConfiguration, PropertiesChangeHandler propertiesChangeHandler) {
        this.kvPaths = kvPaths;
        this.consulClient = consulClient;
        this.blockingQueriesConfiguration = blockingQueriesConfiguration;
        this.propertiesChangeHandler = propertiesChangeHandler;
    }

    @Override
    public void start() {
        if (this.started) {
            throw new IllegalStateException("Watcher is already started");
        }
        try {
            LOG.debug("Starting KVs watcher");
            this.started = true;
            this.kvPaths.parallelStream().forEach(this::watchKvPath);
        }
        catch (Exception e) {
            LOG.error("Error watching configurations: {}", (Object)e.getMessage(), (Object)e);
            this.stop();
        }
    }

    @Override
    public boolean isWatching() {
        return this.started && this.isInit;
    }

    @Override
    public void stop() {
        if (!this.started) {
            LOG.warn("You tried to stop an unstarted Watcher");
            return;
        }
        LOG.debug("Stopping KVs watchers");
        this.listeners.forEach((key, value) -> {
            try {
                LOG.debug("Stopping watch for kvPath={}", key);
                value.dispose();
            }
            catch (Exception e) {
                LOG.error("Error stopping configurations watcher for kvPath={}", key, (Object)e);
            }
        });
        this.listeners.clear();
        this.kvHolder.clear();
        this.started = false;
        this.isInit = false;
    }

    private void watchKvPath(String kvPath) {
        if (!this.started) {
            LOG.warn("Watcher is not started");
            return;
        }
        Disposable disposable = Mono.delay((Duration)this.blockingQueriesConfiguration.getDelayDuration()).then(this.watchValue(kvPath)).subscribe(next -> this.onNext(kvPath, next), throwable -> this.onError(kvPath, (Throwable)throwable));
        this.listeners.put(kvPath, disposable);
    }

    protected abstract Mono<V> watchValue(String var1);

    private void onNext(String kvPath, V next) {
        V previous = this.kvHolder.put(kvPath, next);
        if (previous == null) {
            this.handleInit(kvPath);
        } else if (this.areEqual(previous, next)) {
            this.handleNoChange(kvPath);
        } else {
            this.handleChange(kvPath, next, previous);
        }
        this.watchKvPath(kvPath);
    }

    protected abstract boolean areEqual(V var1, V var2);

    protected abstract Map<String, Object> readValue(V var1);

    private void onError(String kvPath, Throwable throwable) {
        HttpClientResponseException e;
        if (throwable instanceof HttpClientResponseException && (e = (HttpClientResponseException)throwable).getStatus() == HttpStatus.NOT_FOUND) {
            LOG.trace("No KV found with kvPath={}", (Object)kvPath);
            this.listeners.remove(kvPath);
        } else if (throwable instanceof ReadTimeoutException) {
            LOG.warn("Timeout for kvPath={}", (Object)kvPath);
            this.watchKvPath(kvPath);
        } else {
            LOG.error("Watching kvPath={} failed", (Object)kvPath, (Object)throwable);
            this.listeners.remove(kvPath);
        }
    }

    private void handleInit(String kvPath) {
        LOG.debug("Init watcher for kvPath={}", (Object)kvPath);
        this.isInit = true;
    }

    private void handleNoChange(String kvPath) {
        LOG.debug("Nothing changed for kvPath={}", (Object)kvPath);
    }

    private void handleChange(String kvPath, V next, V previous) {
        LOG.debug("Changes detected for kvPath={}", (Object)kvPath);
        Map<String, Object> previousValue = this.readValue(previous);
        Map<String, Object> nextValue = this.readValue(next);
        this.propertiesChangeHandler.handleChanges(kvPath, previousValue, nextValue);
    }

    protected final byte[] decodeValue(KeyValue keyValue) {
        return this.base64Decoder.decode(keyValue.getValue());
    }
}

