/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.server.coordination;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.InputStream;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.server.coordination.ChangeRequestHistory;
import org.apache.druid.server.coordination.ChangeRequestsSnapshot;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Duration;

public class ChangeRequestHttpSyncer<T> {
    private static final EmittingLogger log = new EmittingLogger(ChangeRequestHttpSyncer.class);
    public static final long HTTP_TIMEOUT_EXTRA_MS = 5000L;
    private static final long MAX_RETRY_BACKOFF = TimeUnit.MINUTES.toMillis(2L);
    private final ObjectMapper smileMapper;
    private final HttpClient httpClient;
    private final ScheduledExecutorService executor;
    private final URL baseServerURL;
    private final String baseRequestPath;
    private final TypeReference<ChangeRequestsSnapshot<T>> responseTypeReferences;
    private final long serverTimeoutMS;
    private final long serverHttpTimeout;
    private final Duration maxUnstableDuration;
    private final Duration maxDelayBetweenSyncRequests;
    private final Duration maxDurationToWaitForSync;
    private final Listener<T> listener;
    private final CountDownLatch initializationLatch = new CountDownLatch(1);
    private final LifecycleLock startStopLock = new LifecycleLock();
    private final String logIdentity;
    private int consecutiveFailedAttemptCount = 0;
    private final Stopwatch sinceSyncerStart = Stopwatch.createUnstarted();
    private final Stopwatch sinceLastSyncRequest = Stopwatch.createUnstarted();
    private final Stopwatch sinceLastSyncSuccess = Stopwatch.createUnstarted();
    private final Stopwatch sinceUnstable = Stopwatch.createUnstarted();
    @Nullable
    private ChangeRequestHistory.Counter counter = null;

    public ChangeRequestHttpSyncer(ObjectMapper smileMapper, HttpClient httpClient, ScheduledExecutorService executor, URL baseServerURL, String baseRequestPath, TypeReference<ChangeRequestsSnapshot<T>> responseTypeReferences, long serverTimeoutMS, long serverUnstabilityTimeout, Listener<T> listener) {
        this.smileMapper = smileMapper;
        this.httpClient = httpClient;
        this.executor = executor;
        this.baseServerURL = baseServerURL;
        this.baseRequestPath = baseRequestPath;
        this.responseTypeReferences = responseTypeReferences;
        this.serverTimeoutMS = serverTimeoutMS;
        this.serverHttpTimeout = serverTimeoutMS + 5000L;
        this.listener = listener;
        this.logIdentity = StringUtils.format((String)"%s_%d", (Object[])new Object[]{baseServerURL, System.currentTimeMillis()});
        this.maxDurationToWaitForSync = Duration.millis((long)(3L * this.serverHttpTimeout));
        this.maxDelayBetweenSyncRequests = Duration.millis((long)(3L * this.serverHttpTimeout + MAX_RETRY_BACKOFF));
        this.maxUnstableDuration = Duration.millis((long)serverUnstabilityTimeout);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        LifecycleLock lifecycleLock = this.startStopLock;
        synchronized (lifecycleLock) {
            if (!this.startStopLock.canStart()) {
                throw new ISE("Could not start sync for server[%s].", new Object[]{this.logIdentity});
            }
            try {
                log.info("Starting sync for server[%s].", new Object[]{this.logIdentity});
                this.startStopLock.started();
            }
            finally {
                this.startStopLock.exitStart();
            }
            this.sinceSyncerStart.restart();
            this.addNextSyncToWorkQueue();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        LifecycleLock lifecycleLock = this.startStopLock;
        synchronized (lifecycleLock) {
            if (!this.startStopLock.canStop()) {
                throw new ISE("Could not stop sync for server[%s].", new Object[]{this.logIdentity});
            }
            try {
                log.info("Stopping sync for server[%s].", new Object[]{this.logIdentity});
            }
            finally {
                this.startStopLock.exitStop();
            }
            log.info("Stopped sync for server[%s].", new Object[]{this.logIdentity});
        }
    }

    public boolean awaitInitialization() throws InterruptedException {
        return this.initializationLatch.await(this.maxDurationToWaitForSync.getMillis(), TimeUnit.MILLISECONDS);
    }

    public boolean isInitialized() {
        return this.initializationLatch.getCount() == 0L;
    }

    public Map<String, Object> getDebugInfo() {
        return ImmutableMap.of((Object)"millisSinceLastRequest", (Object)this.sinceLastSyncRequest.millisElapsed(), (Object)"millisSinceLastSuccess", (Object)this.sinceLastSyncSuccess.millisElapsed(), (Object)"consecutiveFailedAttemptCount", (Object)this.consecutiveFailedAttemptCount, (Object)"syncScheduled", (Object)this.startStopLock.isStarted());
    }

    public boolean needsReset() {
        if (this.sinceLastSyncRequest.isRunning()) {
            return this.sinceLastSyncRequest.hasElapsed(this.maxDelayBetweenSyncRequests);
        }
        return this.sinceSyncerStart.hasElapsed(this.maxDelayBetweenSyncRequests);
    }

    public long getUnstableTimeMillis() {
        return this.consecutiveFailedAttemptCount <= 0 ? 0L : this.sinceUnstable.millisElapsed();
    }

    public boolean isSyncedSuccessfully() {
        if (this.consecutiveFailedAttemptCount > 0) {
            return false;
        }
        return this.sinceLastSyncSuccess.hasNotElapsed(this.maxDurationToWaitForSync);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sync() {
        if (!this.startStopLock.awaitStarted(1L, TimeUnit.MILLISECONDS)) {
            log.info("Skipping sync for server[%s] as syncer has not started yet.", new Object[]{this.logIdentity});
            return;
        }
        this.sinceLastSyncRequest.restart();
        try {
            String req = this.getRequestString();
            final BytesAccumulatingResponseHandler responseHandler = new BytesAccumulatingResponseHandler();
            log.debug("Sending sync request to server[%s]", new Object[]{this.logIdentity});
            ListenableFuture syncRequestFuture = this.httpClient.go(new Request(HttpMethod.GET, new URL(this.baseServerURL, req)).addHeader("Accept", "application/x-jackson-smile").addHeader("Content-Type", "application/x-jackson-smile"), (HttpResponseHandler)responseHandler, Duration.millis((long)this.serverHttpTimeout));
            log.debug("Sent sync request to [%s]", new Object[]{this.logIdentity});
            Futures.addCallback((ListenableFuture)syncRequestFuture, (FutureCallback)new FutureCallback<InputStream>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 * Enabled aggressive block sorting
                 * Enabled unnecessary exception pruning
                 * Enabled aggressive exception aggregation
                 */
                public void onSuccess(InputStream stream) {
                    LifecycleLock lifecycleLock = ChangeRequestHttpSyncer.this.startStopLock;
                    synchronized (lifecycleLock) {
                        if (!ChangeRequestHttpSyncer.this.startStopLock.awaitStarted(1L, TimeUnit.MILLISECONDS)) {
                            log.info("Not handling response for server[%s] as syncer has not started yet.", new Object[]{ChangeRequestHttpSyncer.this.logIdentity});
                            return;
                        }
                        try {
                            int responseCode = responseHandler.getStatus();
                            if (responseCode == 204) {
                                log.debug("Received NO CONTENT from server[%s]", new Object[]{ChangeRequestHttpSyncer.this.logIdentity});
                                ChangeRequestHttpSyncer.this.sinceLastSyncSuccess.restart();
                                return;
                            }
                            if (responseCode != 200) {
                                this.handleFailure((Throwable)new ISE("Received sync response [%d]", new Object[]{responseCode}));
                                return;
                            }
                            log.debug("Received sync response from server[%s]", new Object[]{ChangeRequestHttpSyncer.this.logIdentity});
                            ChangeRequestsSnapshot changes = (ChangeRequestsSnapshot)ChangeRequestHttpSyncer.this.smileMapper.readValue(stream, ChangeRequestHttpSyncer.this.responseTypeReferences);
                            log.debug("Finished reading sync response from server[%s]", new Object[]{ChangeRequestHttpSyncer.this.logIdentity});
                            if (changes.isResetCounter()) {
                                log.info("Server[%s] requested resetCounter for reason[%s].", new Object[]{ChangeRequestHttpSyncer.this.logIdentity, changes.getResetCause()});
                                ChangeRequestHttpSyncer.this.counter = null;
                                return;
                            }
                            if (ChangeRequestHttpSyncer.this.counter == null) {
                                ChangeRequestHttpSyncer.this.listener.fullSync(changes.getRequests());
                            } else {
                                ChangeRequestHttpSyncer.this.listener.deltaSync(changes.getRequests());
                            }
                            ChangeRequestHttpSyncer.this.counter = changes.getCounter();
                            if (ChangeRequestHttpSyncer.this.initializationLatch.getCount() > 0L) {
                                ChangeRequestHttpSyncer.this.initializationLatch.countDown();
                                log.info("Server[%s] synced successfully for the first time.", new Object[]{ChangeRequestHttpSyncer.this.logIdentity});
                            }
                            if (ChangeRequestHttpSyncer.this.consecutiveFailedAttemptCount > 0) {
                                ChangeRequestHttpSyncer.this.consecutiveFailedAttemptCount = 0;
                                ChangeRequestHttpSyncer.this.sinceUnstable.reset();
                                log.info("Server[%s] synced successfully.", new Object[]{ChangeRequestHttpSyncer.this.logIdentity});
                            }
                            ChangeRequestHttpSyncer.this.sinceLastSyncSuccess.restart();
                        }
                        catch (Exception ex) {
                            ChangeRequestHttpSyncer.this.markServerUnstableAndAlert(ex, "Processing Response");
                        }
                        finally {
                            ChangeRequestHttpSyncer.this.addNextSyncToWorkQueue();
                        }
                        return;
                    }
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void onFailure(Throwable t) {
                    LifecycleLock lifecycleLock = ChangeRequestHttpSyncer.this.startStopLock;
                    synchronized (lifecycleLock) {
                        if (!ChangeRequestHttpSyncer.this.startStopLock.awaitStarted(1L, TimeUnit.MILLISECONDS)) {
                            log.info("Not handling sync failure for server[%s] as syncer has not started yet.", new Object[]{ChangeRequestHttpSyncer.this.logIdentity});
                            return;
                        }
                        try {
                            this.handleFailure(t);
                        }
                        finally {
                            ChangeRequestHttpSyncer.this.addNextSyncToWorkQueue();
                        }
                    }
                }

                private void handleFailure(Throwable t) {
                    String logMsg = StringUtils.format((String)"Handling response with code[%d], description[%s]", (Object[])new Object[]{responseHandler.getStatus(), responseHandler.getDescription()});
                    ChangeRequestHttpSyncer.this.markServerUnstableAndAlert(t, logMsg);
                }
            }, (Executor)this.executor);
        }
        catch (Throwable th) {
            try {
                this.markServerUnstableAndAlert(th, "Sending Request");
            }
            finally {
                this.addNextSyncToWorkQueue();
            }
        }
    }

    private String getRequestString() {
        String req = this.counter != null ? StringUtils.format((String)"%s?counter=%s&hash=%s&timeout=%s", (Object[])new Object[]{this.baseRequestPath, this.counter.getCounter(), this.counter.getHash(), this.serverTimeoutMS}) : StringUtils.format((String)"%s?counter=-1&timeout=%s", (Object[])new Object[]{this.baseRequestPath, this.serverTimeoutMS});
        return req;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addNextSyncToWorkQueue() {
        LifecycleLock lifecycleLock = this.startStopLock;
        synchronized (lifecycleLock) {
            if (!this.startStopLock.awaitStarted(1L, TimeUnit.MILLISECONDS)) {
                log.info("Not scheduling sync for server[%s]. Instance stopped.", new Object[]{this.logIdentity});
                return;
            }
            try {
                if (this.consecutiveFailedAttemptCount > 0) {
                    long delayMillis = Math.min(MAX_RETRY_BACKOFF, RetryUtils.nextRetrySleepMillis((int)this.consecutiveFailedAttemptCount));
                    log.info("Scheduling next sync for server[%s] in [%d] millis.", new Object[]{this.logIdentity, delayMillis});
                    this.executor.schedule(this::sync, delayMillis, TimeUnit.MILLISECONDS);
                } else {
                    this.executor.execute(this::sync);
                }
            }
            catch (Throwable th) {
                if (this.executor.isShutdown()) {
                    log.warn(th, "Could not schedule sync for server[%s] because executor is stopped.", new Object[]{this.logIdentity});
                }
                log.warn(th, "Could not schedule sync for server [%s]. This syncer will be reset automatically. If the issue persists, try restarting this Druid service.", new Object[]{this.logIdentity});
            }
        }
    }

    private void markServerUnstableAndAlert(Throwable throwable, String action) {
        if (this.consecutiveFailedAttemptCount++ == 0) {
            this.sinceUnstable.restart();
        }
        long unstableSeconds = this.getUnstableTimeMillis() / 1000L;
        String message = StringUtils.format((String)"Sync failed for server[%s] while [%s]. Failed [%d] times in the last [%d] seconds.", (Object[])new Object[]{this.baseServerURL, action, this.consecutiveFailedAttemptCount, unstableSeconds});
        if (this.sinceUnstable.hasElapsed(this.maxUnstableDuration)) {
            String alertMessage = StringUtils.format((String)"%s. Try restarting the Druid process on server[%s].", (Object[])new Object[]{message, this.baseServerURL});
            log.noStackTrace().makeAlert(throwable, alertMessage, new Object[0]).emit();
        } else if (log.isDebugEnabled()) {
            log.debug(throwable, message, new Object[0]);
        } else {
            log.noStackTrace().info(throwable, message, new Object[0]);
        }
    }

    @VisibleForTesting
    public boolean isExecutorShutdown() {
        return this.executor.isShutdown();
    }

    public static interface Listener<T> {
        public void fullSync(List<T> var1);

        public void deltaSync(List<T> var1);
    }
}

