/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.discovery.consul.recipes.watch;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.HttpUrl;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.discovery.consul.recipes.watch.BackoffRunner;
import pl.allegro.tech.discovery.consul.recipes.watch.Canceller;
import pl.allegro.tech.discovery.consul.recipes.watch.ConsulWatcherStats;
import pl.allegro.tech.discovery.consul.recipes.watch.ReconnectCallback;
import pl.allegro.tech.discovery.consul.recipes.watch.WatchResult;

class ConsulLongPollCallback
implements Callback {
    private static final Logger logger = LoggerFactory.getLogger(ConsulLongPollCallback.class);
    private final ExecutorService workerPool;
    private final HttpUrl endpoint;
    private final Consumer<WatchResult<String>> consumer;
    private final Consumer<Exception> failureConsumer;
    private final ReconnectCallback reconnect;
    private final BackoffRunner backoffRunner;
    private AtomicReference<byte[]> lastValue = new AtomicReference<byte[]>(new byte[]{0});
    private final AtomicLong currentIndex = new AtomicLong(0L);
    private final AtomicInteger retryCount = new AtomicInteger(0);
    private final ConsulWatcherStats stats;
    private final Canceller callbackCanceller;

    ConsulLongPollCallback(ExecutorService workerPool, BackoffRunner backoffRunner, HttpUrl endpoint, Consumer<WatchResult<String>> consumer, Consumer<Exception> failureConsumer, ReconnectCallback reconnect, ConsulWatcherStats stats, Canceller callbackCanceller) {
        this.workerPool = workerPool;
        this.backoffRunner = backoffRunner;
        this.endpoint = endpoint;
        this.consumer = consumer;
        this.failureConsumer = failureConsumer;
        this.reconnect = reconnect;
        this.stats = stats;
        this.callbackCanceller = callbackCanceller;
    }

    public void onResponse(Call call, Response response) {
        if (this.isCancelled()) {
            if (response.body() != null) {
                response.close();
            }
            return;
        }
        if (response.isSuccessful()) {
            this.onSuccessfulResponse(call, response);
        } else {
            this.onNonOkHttpResponse(response);
        }
    }

    public void onFailure(Call call, IOException exception) {
        if (this.isCancelled()) {
            return;
        }
        this.failureConsumer.accept(exception);
        this.reconnectAfterFailureAndRun(backoff -> logger.error("Long poll failed on endpoint {}, retrying with {}ms backoff", new Object[]{this.endpoint, backoff, exception}));
    }

    boolean isCancelled() {
        return this.callbackCanceller.isCancelled();
    }

    private void onSuccessfulResponse(Call call, Response response) {
        this.stats.eventReceived();
        try (ResponseBody body = response.body();){
            long index = this.updateIndex(response);
            if (index >= 0L) {
                logger.trace("Long poll returned with a result on endpoint {} with index {}", (Object)call.request().url(), (Object)index);
                byte[] content = body.bytes();
                if (this.contentChanged(content)) {
                    this.handleContentChanged(call, index, content);
                } else {
                    this.handleContentUnchanged(index);
                }
            } else {
                this.handleContentUnchanged(index);
            }
            this.reconnectAfterSuccessfulResponse();
        }
        catch (IOException exception) {
            this.handleSucessfulResponseProcessingException(exception);
        }
    }

    private void reconnectAfterFailureAndRun(Consumer<Long> backoffConsumer) {
        this.stats.failed();
        try {
            long backoff = this.reconnectWithBackoff();
            backoffConsumer.accept(backoff);
        }
        catch (RejectedExecutionException e) {
            logger.warn("Can't reconnect. Executor probably closed.", (Throwable)e);
        }
    }

    private void onNonOkHttpResponse(Response response) {
        this.reconnectAfterFailureAndRun(backoff -> {
            try (ResponseBody body = response.body();){
                this.logNonOkHttpResponseWithBody(response, (long)backoff, body.string());
            }
            catch (IOException e) {
                this.logNonOkHttpResponseWithException(response, (long)backoff, e);
            }
        });
    }

    private void reconnectAfterSuccessfulResponse() {
        this.reconnect.reconnect(this.endpoint, this.currentIndex.get(), this);
        this.retryCount.set(0);
    }

    private void handleSucessfulResponseProcessingException(IOException exception) {
        this.reconnectAfterFailureAndRun(backoff -> logger.error("Failed to submit work after reading from {}, retrying with {}ms backoff", new Object[]{this.endpoint, backoff, exception}));
    }

    private void handleContentChanged(Call call, long index, byte[] content) {
        this.stats.callbackCalled();
        if (logger.isTraceEnabled()) {
            logger.trace("Dispatching work on endpoint {} index {} to worker, text: {}", new Object[]{call.request().url(), index, new String(content, StandardCharsets.UTF_8)});
        }
        this.workerPool.submit(() -> this.consumer.accept(new WatchResult<String>(index, new String(content, StandardCharsets.UTF_8))));
    }

    private void handleContentUnchanged(long index) {
        this.stats.contentNotChanged();
        logger.trace("Discarding event on endpoint {} index {} as content did not change", (Object)this.endpoint, (Object)index);
    }

    private void logNonOkHttpResponseWithException(Response response, long backoff, IOException e) {
        logger.error("Long poll on endpoint {} returned non-ok response. Code: [{}], Failed to read body. Retrying with {}ms backoff", new Object[]{this.endpoint, response.code(), backoff, e});
    }

    private void logNonOkHttpResponseWithBody(Response response, long backoff, String bodyString) {
        logger.error("Long poll on endpoint {} returned non-ok response. Code: [{}], Body: [{}]. Retrying with {}ms backoff", new Object[]{this.endpoint, response.code(), bodyString, backoff});
    }

    private boolean contentChanged(byte[] newContent) {
        byte[] oldContent = this.lastValue.getAndSet(newContent);
        return !Arrays.equals(oldContent, newContent);
    }

    private long updateIndex(Response response) {
        String indexString = response.header("X-Consul-Index");
        if (indexString == null) {
            logger.error("There was no X-Consul-Index header in response for {} endpoint, retrying", (Object)this.endpoint);
            return -1L;
        }
        long index = Long.valueOf(indexString);
        if (this.currentIndex.get() == index) {
            this.stats.indexNotChanged();
            logger.trace("Discarding event on endpoint {} index {} as index did not change", (Object)this.endpoint, (Object)index);
            return -1L;
        }
        this.currentIndex.set(index);
        return index;
    }

    long reconnectWithBackoff() {
        return this.backoffRunner.runWithBackoff(this.retryCount.getAndIncrement(), () -> this.reconnect.reconnect(this.endpoint, this.currentIndex.get(), this));
    }
}

