/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.discovery.consul.recipes.leader;

import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.discovery.consul.recipes.ConsulRecipes;
import pl.allegro.tech.discovery.consul.recipes.internal.http.BodyParser;
import pl.allegro.tech.discovery.consul.recipes.internal.http.MediaType;
import pl.allegro.tech.discovery.consul.recipes.internal.thread.ThreadFactoryBuilder;
import pl.allegro.tech.discovery.consul.recipes.json.JsonDeserializer;
import pl.allegro.tech.discovery.consul.recipes.json.JsonSerializer;
import pl.allegro.tech.discovery.consul.recipes.leader.LeadershipObserver;
import pl.allegro.tech.discovery.consul.recipes.session.Session;
import pl.allegro.tech.discovery.consul.recipes.watch.Canceller;
import pl.allegro.tech.discovery.consul.recipes.watch.ConsulWatcher;
import pl.allegro.tech.discovery.consul.recipes.watch.WatchResult;

public class LeaderElector
implements Closeable {
    private static final Logger logger = LoggerFactory.getLogger(LeaderElector.class);
    private final String serviceName;
    private final String nodeId;
    private final OkHttpClient httpClient;
    private final HttpUrl baseUrl;
    private final ConsulWatcher consulWatcher;
    private final JsonDeserializer jsonDeserializer;
    private final List<LeadershipObserver> observers = new CopyOnWriteArrayList<LeadershipObserver>();
    private final Session session;
    private final ScheduledExecutorService acquirementPool;
    private final int lockDelaySeconds;
    private final int lockRescueDelaySeconds;
    private final LockAcquirer lockAcquirer;
    private Canceller watchCanceller;
    private volatile boolean isLeader = false;

    private LeaderElector(String serviceName, String nodeId, URI baseUri, OkHttpClient httpClient, Session session, ScheduledExecutorService acquirementPool, ConsulWatcher consulWatcher, JsonDeserializer jsonDeserializer, int lockDelaySeconds, int lockRescueDelaySeconds) {
        this.serviceName = serviceName;
        this.nodeId = nodeId;
        this.baseUrl = HttpUrl.get((URI)baseUri);
        this.httpClient = httpClient;
        this.session = session;
        this.acquirementPool = acquirementPool;
        this.consulWatcher = consulWatcher;
        this.jsonDeserializer = jsonDeserializer;
        this.lockDelaySeconds = lockDelaySeconds;
        this.lockRescueDelaySeconds = lockRescueDelaySeconds;
        this.lockAcquirer = new LockAcquirer(this.lockUrl(serviceName));
    }

    public static Builder forService(String serviceName, OkHttpClient httpClient, JsonSerializer jsonSerializer, JsonDeserializer jsonDeserializer) {
        if (jsonDeserializer == null) {
            throw new IllegalStateException("Configured JsonDeserializer required.");
        }
        if (jsonSerializer == null) {
            throw new IllegalStateException("Configured JsonSerializer required.");
        }
        return new Builder(serviceName, httpClient, jsonSerializer, jsonDeserializer);
    }

    public void start() {
        this.session.start();
        this.lockAcquirer.start();
        this.watchCanceller = this.consulWatcher.watchEndpoint(this.lockEndpoint(this.serviceName), x$0 -> this.lockAcquirer.leaderNodeUpdate(x$0), this::watchException);
    }

    @Override
    public void close() {
        this.session.close();
        this.lockAcquirer.close();
        if (this.watchCanceller != null) {
            this.watchCanceller.cancel();
        }
        this.notALeader();
    }

    public boolean isLeader() {
        return this.isLeader;
    }

    public void registerObserver(LeadershipObserver observer) {
        this.observers.add(observer);
    }

    public void unregisterObserver(LeadershipObserver observer) {
        this.observers.remove(observer);
    }

    private void watchException(Exception exception) {
        try {
            logger.info("Got a leadership watch exception. Clearing leadership status", (Throwable)exception);
            this.notALeader();
        }
        catch (Exception e) {
            logger.warn("Issue notifying about leader node watch exception", (Throwable)e);
        }
    }

    private void becameLeader() {
        boolean wasLeader = this.isLeader;
        this.isLeader = true;
        if (!wasLeader) {
            logger.info("Node({}) became a leader", (Object)this.nodeId);
            this.observers.forEach(LeadershipObserver::leadershipAcquired);
        }
    }

    private void notALeader() {
        boolean wasLeader = this.isLeader;
        this.isLeader = false;
        if (wasLeader) {
            logger.info("Node({}) is no longer a leader.", (Object)this.nodeId);
            this.observers.forEach(LeadershipObserver::leadershipLost);
        }
    }

    private String lockEndpoint(String serviceName) {
        return "/v1/kv/service/" + serviceName + "/leader";
    }

    private HttpUrl lockUrl(String serviceName) {
        return this.baseUrl.newBuilder(this.lockEndpoint(serviceName)).build();
    }

    public static class Builder {
        private final String serviceName;
        private final OkHttpClient httpClient;
        private final JsonSerializer jsonSerializer;
        private final JsonDeserializer jsonDeserializer;
        private ConsulWatcher consulWatcher = null;
        private ScheduledExecutorService lockAcquirementPool = null;
        private Session session = null;
        private URI agentUri = URI.create("http://localhost:8500");
        private String nodeId = UUID.randomUUID().toString();
        private int lockDelaySeconds = 16;
        private int lockRescueDelaySeconds = (int)Duration.ofMinutes(5L).getSeconds();

        private Builder(String serviceName, OkHttpClient httpClient, JsonSerializer jsonSerializer, JsonDeserializer jsonDeserializer) {
            this.serviceName = serviceName;
            this.httpClient = httpClient;
            this.jsonSerializer = jsonSerializer;
            this.jsonDeserializer = jsonDeserializer;
        }

        public LeaderElector build() {
            if (this.consulWatcher == null) {
                ExecutorService workerPool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder("consul-recipes-leader-watch-%d").build());
                this.consulWatcher = ConsulRecipes.consulRecipes().withAgentUri(this.agentUri).withJsonSerializer(this.jsonSerializer).withJsonDeserializer(this.jsonDeserializer).build().consulWatcher(workerPool).build();
            }
            if (this.lockAcquirementPool == null) {
                this.lockAcquirementPool = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder("consul-recipes-leader-lock-%d").build());
            }
            if (this.session == null) {
                this.session = Session.forService(this.serviceName, this.httpClient, this.jsonSerializer, this.jsonDeserializer).withAgentUri(this.agentUri).build();
            }
            return new LeaderElector(this.serviceName, this.nodeId, this.agentUri, this.httpClient, this.session, this.lockAcquirementPool, this.consulWatcher, this.jsonDeserializer, this.lockDelaySeconds, this.lockRescueDelaySeconds);
        }

        public Builder withAgentUri(URI agentUri) {
            this.agentUri = agentUri;
            return this;
        }

        public Builder withNodeId(String nodeId) {
            this.nodeId = nodeId;
            return this;
        }

        public Builder withConsulWatcher(ConsulWatcher consulWatcher) {
            this.consulWatcher = consulWatcher;
            return this;
        }

        public Builder withLockDelaySeconds(int seconds) {
            this.lockDelaySeconds = seconds;
            return this;
        }

        public Builder withLockRescueDelaySeconds(int seconds) {
            this.lockRescueDelaySeconds = seconds;
            return this;
        }

        public Builder withLockAcquirementPool(ScheduledExecutorService lockAcquirementPool) {
            this.lockAcquirementPool = lockAcquirementPool;
            return this;
        }

        public Builder withSession(Session session) {
            this.session = session;
            return this;
        }
    }

    private static class LockAcquisitionCallback
    implements Callback {
        private static final Logger logger = LoggerFactory.getLogger(LockAcquisitionCallback.class);
        private final Runnable becameLeaderCallback;
        private final ExecutorService lockAcquisitionPool;

        private LockAcquisitionCallback(Runnable becameLeaderCallback, ExecutorService lockAcquisitionPool) {
            this.becameLeaderCallback = becameLeaderCallback;
            this.lockAcquisitionPool = lockAcquisitionPool;
        }

        public void onFailure(Call call, IOException e) {
            logger.error("Failed HTTP call on lock attempt", (Throwable)e);
        }

        public void onResponse(Call call, Response response) throws IOException {
            if (!response.isSuccessful()) {
                logger.warn("Unsuccessful HTTP response on lock attempt. Code: {}; Body: {}", (Object)response.code(), (Object)BodyParser.readBodyOrFallback(response, "(couldn't parse)"));
                return;
            }
            try (ResponseBody body = response.body();){
                String result = body.string();
                if ("true".equals(result.toLowerCase().trim())) {
                    this.lockAcquisitionPool.submit(this.becameLeaderCallback::run);
                }
            }
        }
    }

    private class LockAcquirer
    implements Closeable {
        private final HttpUrl lockUrl;

        private LockAcquirer(HttpUrl lockUrl) {
            this.lockUrl = lockUrl;
        }

        private void start() {
            LeaderElector.this.acquirementPool.scheduleAtFixedRate(this::acquireLock, 0L, LeaderElector.this.lockRescueDelaySeconds, TimeUnit.SECONDS);
        }

        private void leaderNodeUpdate(WatchResult<String> watchResult) {
            try {
                boolean shouldAcquireLock = false;
                String nodeBody = watchResult.getBody();
                List<Map<String, Object>> nodeValue = LeaderElector.this.jsonDeserializer.deserializeMapList(nodeBody);
                if (nodeValue == null || nodeValue.size() < 1) {
                    logger.warn("Empty leader node value");
                    shouldAcquireLock = true;
                } else {
                    Map<String, Object> leaderInfo = nodeValue.get(0);
                    String currentLeaderSession = (String)leaderInfo.get("Session");
                    String currentLeaderValue = (String)leaderInfo.get("Value");
                    String currentLeader = Optional.ofNullable(currentLeaderValue).map(leader -> new String(Base64.getMimeDecoder().decode((String)leader), Charset.forName("UTF-8"))).orElse("");
                    logger.debug("Leader session changed to {}", (Object)currentLeaderSession);
                    if (!LeaderElector.this.nodeId.equals(currentLeader)) {
                        logger.info("This node({}) is not a leader. Current leader is {}.", (Object)LeaderElector.this.nodeId, (Object)currentLeader);
                        LeaderElector.this.notALeader();
                    }
                    if (currentLeaderSession == null || currentLeaderSession.equals("")) {
                        shouldAcquireLock = true;
                    }
                }
                if (shouldAcquireLock) {
                    LeaderElector.this.acquirementPool.schedule(this::acquireLock, (long)LeaderElector.this.lockDelaySeconds, TimeUnit.SECONDS);
                }
            }
            catch (IOException e) {
                logger.error("Couldn't deserialize lock body", (Throwable)e);
            }
            catch (Exception e) {
                logger.error("Unexpected issue on leader node update", (Throwable)e);
            }
        }

        private void acquireLock() {
            try {
                String body = LeaderElector.this.nodeId;
                Request request = new Request.Builder().url(this.lockUrl.newBuilder().addQueryParameter("acquire", LeaderElector.this.session.currentId()).build()).put(RequestBody.create((okhttp3.MediaType)MediaType.JSON_MEDIA_TYPE, (String)body)).build();
                LeaderElector.this.httpClient.newCall(request).enqueue((Callback)new LockAcquisitionCallback(() -> LeaderElector.this.becameLeader(), LeaderElector.this.acquirementPool));
            }
            catch (Exception e) {
                logger.error("Couldn't acquire lock", (Throwable)e);
            }
        }

        @Override
        public void close() {
            LeaderElector.this.acquirementPool.shutdown();
            try {
                LeaderElector.this.acquirementPool.awaitTermination(5L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

