/*
 * Decompiled with CFR 0.152.
 */
package com.twitter.hbc.httpclient;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.twitter.hbc.RateTracker;
import com.twitter.hbc.ReconnectionManager;
import com.twitter.hbc.core.Hosts;
import com.twitter.hbc.core.HttpConstants;
import com.twitter.hbc.core.StatsReporter;
import com.twitter.hbc.core.endpoint.StreamingEndpoint;
import com.twitter.hbc.core.event.ConnectionEvent;
import com.twitter.hbc.core.event.Event;
import com.twitter.hbc.core.event.EventType;
import com.twitter.hbc.core.event.HttpResponseEvent;
import com.twitter.hbc.core.processor.HosebirdMessageProcessor;
import com.twitter.hbc.httpclient.Connection;
import com.twitter.hbc.httpclient.RestartableHttpClient;
import com.twitter.hbc.httpclient.auth.Authentication;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.http.StatusLine;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpUriRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ClientBase
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(ClientBase.class);
    private final String name;
    private final HttpClient client;
    private final StreamingEndpoint endpoint;
    private final Hosts hosts;
    private final Authentication auth;
    private final HosebirdMessageProcessor processor;
    private final ReconnectionManager reconnectionManager;
    private final AtomicReference<Event> exitEvent;
    private final CountDownLatch isRunning;
    private final RateTracker rateTracker;
    private final BlockingQueue<Event> eventsQueue;
    private final StatsReporter statsReporter;
    private final AtomicBoolean connectionEstablished;
    private final AtomicBoolean reconnect;

    ClientBase(String name, HttpClient client, Hosts hosts, StreamingEndpoint endpoint, Authentication auth, HosebirdMessageProcessor processor, ReconnectionManager manager, RateTracker rateTracker) {
        this(name, client, hosts, endpoint, auth, processor, manager, rateTracker, null);
    }

    ClientBase(String name, HttpClient client, Hosts hosts, StreamingEndpoint endpoint, Authentication auth, HosebirdMessageProcessor processor, ReconnectionManager manager, RateTracker rateTracker, @Nullable BlockingQueue<Event> eventsQueue) {
        this.client = (HttpClient)Preconditions.checkNotNull((Object)client);
        this.name = (String)Preconditions.checkNotNull((Object)name);
        this.endpoint = (StreamingEndpoint)Preconditions.checkNotNull((Object)endpoint);
        this.hosts = (Hosts)Preconditions.checkNotNull((Object)hosts);
        this.auth = (Authentication)Preconditions.checkNotNull((Object)auth);
        this.processor = (HosebirdMessageProcessor)Preconditions.checkNotNull((Object)processor);
        this.reconnectionManager = (ReconnectionManager)Preconditions.checkNotNull((Object)manager);
        this.rateTracker = (RateTracker)Preconditions.checkNotNull((Object)rateTracker);
        this.eventsQueue = eventsQueue;
        this.exitEvent = new AtomicReference();
        this.isRunning = new CountDownLatch(1);
        this.statsReporter = new StatsReporter();
        this.connectionEstablished = new AtomicBoolean(false);
        this.reconnect = new AtomicBoolean(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            if (this.client instanceof RestartableHttpClient) {
                ((RestartableHttpClient)this.client).setup();
            }
            this.rateTracker.start();
            while (!this.isDone()) {
                HttpUriRequest request;
                String host = this.hosts.nextHost();
                if (host == null) {
                    this.setExitStatus(new Event(EventType.STOPPED_BY_ERROR, "No hosts available"));
                    break;
                }
                double rate = this.rateTracker.getCurrentRateSeconds();
                if (!Double.isNaN(rate)) {
                    this.endpoint.setBackfillCount(this.reconnectionManager.estimateBackfill(rate));
                }
                if ((request = HttpConstants.constructRequest(host, this.endpoint, this.auth)) != null) {
                    String postContent = null;
                    if (this.endpoint.getHttpMethod().equalsIgnoreCase("POST")) {
                        postContent = this.endpoint.getPostParamString();
                    }
                    this.auth.signRequest(request, postContent);
                    Connection conn = new Connection(this.client, this.processor);
                    StatusLine status = this.establishConnection(conn, request);
                    if (this.handleConnectionResult(status)) {
                        this.rateTracker.resume();
                        this.processConnectionData(conn);
                        this.rateTracker.pause();
                    }
                    logger.info("{} Done processing, preparing to close connection", (Object)this.name);
                    conn.close();
                    continue;
                }
                this.addEvent(new Event(EventType.CONNECTION_ERROR, String.format("Error creating request: %s, %s, %s", this.endpoint.getHttpMethod(), host, this.endpoint.getURI())));
            }
        }
        catch (Throwable e) {
            logger.warn(this.name + " Uncaught exception", e);
            Exception laundered = e instanceof Exception ? (Exception)e : new RuntimeException(e);
            this.setExitStatus(new Event(EventType.STOPPED_BY_ERROR, laundered));
        }
        finally {
            this.rateTracker.stop();
            logger.info("{} Shutting down httpclient connection manager", (Object)this.name);
            this.client.getConnectionManager().shutdown();
            this.isRunning.countDown();
        }
    }

    @Nullable
    @VisibleForTesting
    StatusLine establishConnection(Connection conn, HttpUriRequest request) {
        logger.info("{} Establishing a connection", (Object)this.name);
        StatusLine status = null;
        try {
            this.addEvent(new ConnectionEvent(EventType.CONNECTION_ATTEMPT, request));
            status = conn.connect(request);
        }
        catch (UnknownHostException e) {
            logger.warn("{} Unknown host - {}", (Object)this.name, (Object)request.getURI().getHost());
            this.addEvent(new Event(EventType.CONNECTION_ERROR, e));
        }
        catch (IOException e) {
            logger.warn("{} IOException caught when establishing connection to {}", (Object)this.name, (Object)request.getURI());
            this.addEvent(new Event(EventType.CONNECTION_ERROR, e));
            this.reconnectionManager.handleLinearBackoff();
        }
        catch (Exception e) {
            logger.error(String.format("%s Unknown exception while establishing connection to %s", this.name, request.getURI()), (Throwable)e);
            this.setExitStatus(new Event(EventType.STOPPED_BY_ERROR, e));
        }
        return status;
    }

    @VisibleForTesting
    boolean handleConnectionResult(@Nullable StatusLine statusLine) {
        this.statsReporter.incrNumConnects();
        if (statusLine == null) {
            logger.warn("{} failed to establish connection properly", (Object)this.name);
            this.addEvent(new Event(EventType.CONNECTION_ERROR, "Failed to establish connection properly"));
            return false;
        }
        int statusCode = statusLine.getStatusCode();
        if (statusCode == 200) {
            logger.debug("{} Connection successfully established", (Object)this.name);
            this.statsReporter.incrNum200s();
            this.connectionEstablished.set(true);
            this.addEvent(new HttpResponseEvent(EventType.CONNECTED, statusLine));
            this.reconnectionManager.resetCounts();
            return true;
        }
        logger.warn(this.name + " Error connecting w/ status code - {}, reason - {}", (Object)statusCode, (Object)statusLine.getReasonPhrase());
        this.statsReporter.incrNumConnectionFailures();
        this.addEvent(new HttpResponseEvent(EventType.HTTP_ERROR, statusLine));
        if (HttpConstants.FATAL_CODES.contains(statusCode)) {
            this.setExitStatus(new Event(EventType.STOPPED_BY_ERROR, "Fatal error code: " + statusCode));
        } else if (statusCode < 500 && statusCode >= 400) {
            this.statsReporter.incrNum400s();
            if (this.reconnectionManager.shouldReconnectOn400s()) {
                logger.debug("{} Reconnecting on {}", (Object)this.name, (Object)statusCode);
                this.reconnectionManager.handleExponentialBackoff();
            } else {
                logger.debug("{} Reconnecting retries exhausted for {}", (Object)this.name, (Object)statusCode);
                this.setExitStatus(new Event(EventType.STOPPED_BY_ERROR, "Retries exhausted"));
            }
        } else if (statusCode >= 500) {
            this.statsReporter.incrNum500s();
            this.reconnectionManager.handleExponentialBackoff();
        } else {
            this.setExitStatus(new Event(EventType.STOPPED_BY_ERROR, statusLine.getReasonPhrase()));
        }
        return false;
    }

    private void processConnectionData(Connection conn) {
        logger.info("{} Processing connection data", (Object)this.name);
        try {
            this.addEvent(new Event(EventType.PROCESSING, "Processing messages"));
            while (!this.isDone() && !this.reconnect.getAndSet(false)) {
                if (conn.processResponse()) {
                    this.statsReporter.incrNumMessages();
                } else {
                    this.statsReporter.incrNumMessagesDropped();
                }
                this.rateTracker.eventObserved();
            }
        }
        catch (RuntimeException e) {
            logger.warn(this.name + " Unknown error processing connection: ", (Throwable)e);
            this.statsReporter.incrNumDisconnects();
            this.addEvent(new Event(EventType.DISCONNECTED, e));
        }
        catch (IOException ex) {
            logger.info("{} Disconnected during processing - will reconnect", (Object)this.name);
            this.statsReporter.incrNumDisconnects();
            this.addEvent(new Event(EventType.DISCONNECTED, ex));
        }
        catch (InterruptedException interrupt) {
            logger.info("{} Thread interrupted during processing, exiting", (Object)this.name);
            this.statsReporter.incrNumDisconnects();
            this.setExitStatus(new Event(EventType.STOPPED_BY_ERROR, interrupt));
        }
        catch (Exception e) {
            logger.warn(this.name + " Unexpected exception during processing", (Throwable)e);
            this.statsReporter.incrNumDisconnects();
            this.setExitStatus(new Event(EventType.STOPPED_BY_ERROR, e));
        }
    }

    private void setExitStatus(Event event) {
        logger.info("{} exit event - {}", (Object)this.name, (Object)event.getMessage());
        this.addEvent(event);
        this.exitEvent.set(event);
    }

    private void addEvent(Event event) {
        if (this.eventsQueue != null && !this.eventsQueue.offer(event)) {
            this.statsReporter.incrNumClientEventsDropped();
        }
    }

    public void reconnect() {
        if (this.connectionEstablished.get()) {
            this.reconnect.set(true);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop(int waitMillis) throws InterruptedException {
        try {
            if (!this.isDone()) {
                this.setExitStatus(new Event(EventType.STOPPED_BY_USER, String.format("Stopped by user: waiting for %d ms", waitMillis)));
            }
            if (!this.waitForFinish(waitMillis)) {
                logger.warn("{} Client thread failed to finish in {} millis", (Object)this.name, (Object)waitMillis);
            }
        }
        finally {
            this.rateTracker.shutdown();
        }
    }

    public void shutdown(int millis) {
        try {
            this.stop(millis);
        }
        catch (InterruptedException e) {
            logger.warn("Client failed to shutdown due to interruption", (Throwable)e);
        }
    }

    public boolean isDone() {
        return this.exitEvent.get() != null;
    }

    public Event getExitEvent() {
        if (!this.isDone()) {
            throw new IllegalStateException(this.name + " Still running");
        }
        return this.exitEvent.get();
    }

    public boolean waitForFinish(int millis) throws InterruptedException {
        return this.isRunning.await(millis, TimeUnit.MILLISECONDS);
    }

    public void waitForFinish() throws InterruptedException {
        this.isRunning.await();
    }

    public String toString() {
        return String.format("%s, endpoint: %s", this.getName(), this.endpoint.getURI());
    }

    public String getName() {
        return this.name;
    }

    public StreamingEndpoint getEndpoint() {
        return this.endpoint;
    }

    public StatsReporter.StatsTracker getStatsTracker() {
        return this.statsReporter.getStatsTracker();
    }
}

