/*
 * Decompiled with CFR 0.152.
 */
package com.abiquo.apiclient.stream;

import com.abiquo.tracing.model.Trace;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.Uninterruptibles;
import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.AsyncHttpClientConfig;
import com.ning.http.client.Param;
import com.ning.http.client.Realm;
import com.ning.http.client.Response;
import com.ning.http.client.SignatureCalculator;
import com.ning.http.client.oauth.ConsumerKey;
import com.ning.http.client.oauth.OAuthSignatureCalculator;
import com.ning.http.client.oauth.RequestToken;
import com.ning.http.client.uri.Uri;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import org.atmosphere.wasync.ClientFactory;
import org.atmosphere.wasync.Event;
import org.atmosphere.wasync.Request;
import org.atmosphere.wasync.Socket;
import org.atmosphere.wasync.impl.AtmosphereClient;
import org.atmosphere.wasync.impl.AtmosphereRequest;
import org.atmosphere.wasync.impl.DefaultOptionsBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamClient
implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger((String)"abiquo.stream");
    private final String mEndpoint;
    private final String username;
    private final String password;
    private final String apiKey;
    private final String apiSecret;
    private final String token;
    private final String tokenSecret;
    private final String apiEndpoint;
    private final SSLConfiguration sslConfiguration;
    private final List<Consumer<Trace>> consumers;
    public final String filters;
    public Long lastEventProcessed = System.currentTimeMillis();
    private final boolean reconnect;
    private final int reconnectAttempts;
    private final int pauseBeforeReconnectInSeconds;
    private final Consumer<StreamClient> beforeReconnection;
    private final Consumer<StreamClient> afterReconnection;
    private final Consumer<StreamClient> defaultReconnectCallback = this::getEventsFromApiAndProcessCallback;
    public static final ObjectMapper json = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    public final DateFormat dateFormat;
    public AsyncHttpClient httpClient;
    private Socket socket;
    private ExecutorService asyncHttpexecutor;
    private AtomicBoolean manuallyClosing = new AtomicBoolean(false);
    private AtomicBoolean reconnecting = new AtomicBoolean(false);
    private AtomicBoolean beforeReconnectExecuted = new AtomicBoolean(false);
    private ExecutorService reconnectionExecutor = Executors.newSingleThreadExecutor();

    private StreamClient(String mEndpoint, String username, String password, String apiKey, String apiSecret, String token, String tokenSecret, String apiEndpoint, SSLConfiguration sslConfiguration, List<Consumer<Trace>> consumers, Consumer<StreamClient> beforeReconnection, Consumer<StreamClient> afterReconnection, boolean reconnect, int reconnectAttempts, int pauseBeforeReconnectInSeconds, String filters) {
        this.mEndpoint = mEndpoint;
        this.username = username;
        this.password = password;
        this.apiKey = apiKey;
        this.apiSecret = apiSecret;
        this.token = token;
        this.tokenSecret = tokenSecret;
        this.apiEndpoint = apiEndpoint;
        this.consumers = consumers;
        this.sslConfiguration = sslConfiguration;
        this.reconnect = reconnect;
        this.filters = filters;
        this.reconnectAttempts = reconnectAttempts;
        this.pauseBeforeReconnectInSeconds = pauseBeforeReconnectInSeconds;
        this.beforeReconnection = beforeReconnection != null ? beforeReconnection : this.defaultReconnectCallback;
        this.afterReconnection = afterReconnection != null ? afterReconnection : this.defaultReconnectCallback;
        this.dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
        this.dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
    }

    public void connect() throws IOException {
        if (this.socket != null) {
            throw new IllegalStateException("already listening to events");
        }
        this.beforeReconnectExecuted.set(false);
        this.reconnecting.set(false);
        this.asyncHttpexecutor = Executors.newCachedThreadPool();
        AsyncHttpClientConfig.Builder config = new AsyncHttpClientConfig.Builder();
        config.setRequestTimeout(-1);
        config.setReadTimeout(-1);
        config.setExecutorService(this.asyncHttpexecutor);
        if (this.sslConfiguration != null) {
            config.setHostnameVerifier(this.sslConfiguration.hostnameVerifier());
            config.setSSLContext(this.sslConfiguration.sslContext());
        }
        if (this.username != null) {
            config.setRealm(new Realm.RealmBuilder().setPrincipal(this.username).setPassword(this.password).setUsePreemptiveAuth(true).setScheme(Realm.AuthScheme.BASIC).build());
        }
        this.httpClient = new AsyncHttpClient(config.build());
        if (this.apiKey != null) {
            this.httpClient.setSignatureCalculator((SignatureCalculator)new AbiquoAPIOAuth(new ConsumerKey(this.apiKey, this.apiSecret), new RequestToken(this.token, this.tokenSecret), this.apiEndpoint));
        }
        LOGGER.debug("Connecting to {} with filters  {} ", (Object)this.mEndpoint, (Object)this.filters);
        AtmosphereClient client = (AtmosphereClient)ClientFactory.getDefault().newClient(AtmosphereClient.class);
        AtmosphereRequest request = ((AtmosphereRequest.AtmosphereRequestBuilder)((AtmosphereRequest.AtmosphereRequestBuilder)client.newRequestBuilder().method(Request.METHOD.GET)).uri(this.mEndpoint + "/stream?Content-Type=application/json" + this.filters)).transport(Request.TRANSPORT.SSE).build();
        this.socket = client.create(((DefaultOptionsBuilder)client.newOptionsBuilder().runtime(this.httpClient)).build());
        this.socket.open((Request)request);
        this.socket.on(Event.MESSAGE, a -> this.onMessage((String)a)).on(Event.CLOSE, a -> this.onClose());
        LOGGER.debug("Connected!");
    }

    private void onMessage(String rawEvent) {
        Trace trace;
        if (!this.beforeReconnectExecuted.get()) {
            try {
                this.afterReconnection.accept(this);
            }
            catch (Throwable e) {
                LOGGER.error("Cannot run after reconnection logic", e);
            }
            this.beforeReconnectExecuted.set(true);
        }
        try {
            trace = (Trace)json.readValue(rawEvent, Trace.class);
        }
        catch (IOException ex) {
            LOGGER.warn("Cannot bind to Trace : " + rawEvent, (Throwable)ex);
            return;
        }
        this.processTrace(trace);
    }

    protected void processTrace(Trace trace) {
        this.lastEventProcessed = trace.getTimestamp();
        for (Consumer<Trace> consumer : this.consumers) {
            try {
                consumer.accept(trace);
            }
            catch (Throwable e) {
                String traceAsString = "";
                try {
                    traceAsString = json.writeValueAsString((Object)trace);
                }
                catch (JsonProcessingException jsonProcessingException) {
                    // empty catch block
                }
                LOGGER.error("Cannot process consumer for message : " + traceAsString, e);
            }
        }
    }

    private synchronized void onClose() {
        if (this.manuallyClosing.get() || this.reconnecting.get() || !this.reconnect) {
            throw new IllegalStateException("Connection closed");
        }
        this.reconnecting.set(true);
        this.reconnectionExecutor.execute(() -> {
            try {
                this.beforeReconnection.accept(this);
            }
            catch (Throwable e) {
                LOGGER.error("Cannot run before reconnection logic", e);
            }
            LOGGER.warn("Connection lost, going to reconnect");
            this.reconnect();
        });
    }

    private void reconnect() {
        Throwable lastE = null;
        for (int retry = 0; retry < this.reconnectAttempts; ++retry) {
            try {
                LOGGER.warn("Attempting to reconnect number : " + (retry + 1));
                this.closeConnection();
                this.connect();
                return;
            }
            catch (Throwable e) {
                lastE = e;
                Uninterruptibles.sleepUninterruptibly((long)this.pauseBeforeReconnectInSeconds, (TimeUnit)TimeUnit.SECONDS);
                continue;
            }
        }
        LOGGER.warn("Cannot reconnect after " + this.reconnectAttempts + " attempts", lastE);
        this.closeConnection();
    }

    @Override
    public synchronized void close() throws IOException {
        try {
            this.manuallyClosing.set(true);
            this.closeConnection();
            this.reconnectionExecutor.shutdownNow();
        }
        finally {
            this.manuallyClosing.set(false);
        }
    }

    private synchronized void closeConnection() {
        LOGGER.debug("Disconnecting...");
        if (this.httpClient != null) {
            try {
                this.httpClient.close();
            }
            catch (Throwable e) {
                LOGGER.debug("Failed to close async-http-client", e);
            }
            this.httpClient = null;
        }
        this.asyncHttpexecutor.shutdownNow();
        try {
            this.asyncHttpexecutor.awaitTermination(1L, TimeUnit.SECONDS);
        }
        catch (Throwable e) {
            LOGGER.error("Cannot stop executor service", e);
        }
        Uninterruptibles.sleepUninterruptibly((long)1L, (TimeUnit)TimeUnit.SECONDS);
        if (this.socket != null) {
            try {
                if (this.socket.status() != Socket.STATUS.CLOSE) {
                    this.socket.close();
                    while (this.socket.status() != Socket.STATUS.CLOSE) {
                        LOGGER.warn("Waiting socket close");
                        Uninterruptibles.sleepUninterruptibly((long)100L, (TimeUnit)TimeUnit.MILLISECONDS);
                    }
                }
            }
            catch (Throwable e) {
                LOGGER.debug("Failed to socket", e);
            }
            this.socket = null;
        }
        LOGGER.debug("Disconnected!");
    }

    public List<Trace> getEventsFromApiAndProcessCallback(StreamClient client) {
        List<Trace> traces;
        String apiEvents = String.format("%s/events?datefrom=%s%s&asc=true&limit=0", this.apiEndpoint, this.dateFormat.format(client.lastEventProcessed), client.filters);
        Response response = null;
        try {
            response = (Response)client.httpClient.prepareGet(apiEvents).execute().get();
        }
        catch (Exception e) {
            throw new RuntimeException("Cannot GET : " + apiEvents, e);
        }
        if (response.getStatusCode() != 200) {
            throw new RuntimeException("Failed GET (" + response.getStatusCode() + "): " + apiEvents);
        }
        try {
            traces = ((Traces)json.readValue(response.getResponseBodyAsStream(), Traces.class)).getCollection();
        }
        catch (IOException e) {
            throw new RuntimeException("Cannot serialize events MT");
        }
        if (traces != null) {
            for (Trace trace : traces) {
                this.processTrace(trace);
            }
        }
        return traces;
    }

    public static Builder builder(String endpointToM, String endpointToApi) {
        return new Builder().endpoints(endpointToM, endpointToApi);
    }

    public static class Traces {
        List<Trace> collection = new ArrayList<Trace>();

        public List<Trace> getCollection() {
            return this.collection;
        }

        public void setCollection(List<Trace> collection) {
            this.collection = collection;
        }
    }

    public static class AbiquoAPIOAuth
    extends OAuthSignatureCalculator {
        private final String endpointApi;

        public AbiquoAPIOAuth(ConsumerKey consumerAuth, RequestToken userAuth, String endpointApi) {
            super(consumerAuth, userAuth);
            this.endpointApi = endpointApi + "/login";
        }

        public String calculateSignature(String method, Uri uri, long oauthTimestamp, String nonce, List<Param> formParams, List<Param> queryParams) {
            return super.calculateSignature(method, Uri.create((String)this.endpointApi), oauthTimestamp, nonce, formParams, Collections.singletonList(new Param("expand", "privileges")));
        }
    }

    public static interface SSLConfiguration {
        public SSLContext sslContext();

        public HostnameVerifier hostnameVerifier();
    }

    public static class Builder {
        private String mEndpoint;
        private String username;
        private String password;
        private SSLConfiguration sslConfiguration;
        private List<Consumer<Trace>> consumers = new ArrayList<Consumer<Trace>>();
        private boolean reconnect = false;
        private int reconnectAttempts = 10;
        private int pauseBeforeReconnectInSeconds = 5;
        private Consumer<StreamClient> beforeReconnect;
        private Consumer<StreamClient> afterReconnect;
        private String apiKey;
        private String apiSecret;
        private String token;
        private String tokenSecret;
        private String apiEndpoint;
        private Set<String> severityFilters = new HashSet<String>();
        private Set<String> entityFilters = new HashSet<String>();
        private Set<String> actionFilters = new HashSet<String>();
        private Set<String> userFilters = new HashSet<String>();
        private Set<String> enterpriseFilters = new HashSet<String>();

        private Builder() {
        }

        private Builder endpoints(String endpointToM, String endpointToApi) {
            this.mEndpoint = Objects.requireNonNull(endpointToM, "endpointToM cannot be null");
            this.apiEndpoint = Objects.requireNonNull(endpointToApi, "endpointToApi cannot be null");
            return this;
        }

        public Builder basicAuth(String username, String password) {
            if (this.apiKey != null) {
                throw new IllegalStateException("oauth already configured");
            }
            this.username = Objects.requireNonNull(username, "username cannot be null");
            this.password = Objects.requireNonNull(password, "password cannot be null");
            return this;
        }

        public Builder oauth(String apiKey, String apiSecret, String token, String tokenSecret) {
            if (this.username != null) {
                throw new IllegalStateException("basic auth already configured");
            }
            this.apiKey = Objects.requireNonNull(apiKey, "apiKey cannot be null");
            this.apiSecret = Objects.requireNonNull(apiSecret, "apiSecret cannot be null");
            this.token = Objects.requireNonNull(token, "token cannot be null");
            this.tokenSecret = Objects.requireNonNull(tokenSecret, "tokenSecret cannot be null");
            return this;
        }

        public Builder sslConfiguration(SSLConfiguration sslConfiguration) {
            this.sslConfiguration = sslConfiguration;
            return this;
        }

        public Builder addFilterBySeverity(String severity) {
            this.severityFilters.add(Objects.requireNonNull(severity, "severity cannot be null"));
            return this;
        }

        public Builder addFilterByEntity(String entity) {
            this.entityFilters.add(Objects.requireNonNull(entity, "entity cannot be null"));
            return this;
        }

        public Builder addFilterByAction(String action) {
            this.actionFilters.add(Objects.requireNonNull(action, "action cannot be null"));
            return this;
        }

        public Builder addFilterByUser(Integer userId) {
            this.userFilters.add(Objects.requireNonNull(userId, "userId cannot be null").toString());
            return this;
        }

        public Builder addFilterByEnterprise(Integer enterpriseId) {
            this.enterpriseFilters.add(Objects.requireNonNull(enterpriseId, "enterpriseId cannot be null").toString());
            return this;
        }

        public Builder addCallback(Consumer<Trace> callback) {
            this.consumers.add(Objects.requireNonNull(callback, "callback cannot be null"));
            return this;
        }

        public Builder reconnect() {
            this.reconnect = true;
            return this;
        }

        public Builder reconnectAttempts(int attempts) {
            this.reconnectAttempts = attempts;
            return this;
        }

        public Builder pauseBeforeReconnectInSeconds(int seconds) {
            this.pauseBeforeReconnectInSeconds = seconds;
            return this;
        }

        public Builder beforeReconnect(Consumer<StreamClient> beforeReconnect) {
            this.beforeReconnect = beforeReconnect;
            return this;
        }

        public Builder afterReconnect(Consumer<StreamClient> afterReconnect) {
            this.afterReconnect = afterReconnect;
            return this;
        }

        public StreamClient build() {
            String filters = new String();
            if (!this.severityFilters.isEmpty()) {
                filters = filters + "&severity=" + this.severityFilters.stream().collect(Collectors.joining(","));
            }
            if (!this.entityFilters.isEmpty()) {
                filters = filters + "&entity=" + this.entityFilters.stream().collect(Collectors.joining(","));
            }
            if (!this.actionFilters.isEmpty()) {
                filters = filters + "&action=" + this.actionFilters.stream().collect(Collectors.joining(","));
            }
            if (!this.userFilters.isEmpty()) {
                filters = filters + "&user=" + this.userFilters.stream().collect(Collectors.joining(","));
            }
            if (!this.enterpriseFilters.isEmpty()) {
                filters = filters + "&enterprise=" + this.enterpriseFilters.stream().collect(Collectors.joining(","));
            }
            if (this.reconnect && this.reconnectAttempts <= 0) {
                throw new IllegalStateException(".reconnect() requires a positive 'reconnectAttempts'");
            }
            if (this.consumers.isEmpty()) {
                throw new IllegalStateException("need to add some callback, use .addCallback()");
            }
            if (this.username == null && this.apiKey == null) {
                throw new IllegalStateException("need to add some authorization mechanism, use .basicAuth(...) or .oauth(...)");
            }
            return new StreamClient(this.mEndpoint, this.username, this.password, this.apiKey, this.apiSecret, this.token, this.tokenSecret, this.apiEndpoint, this.sslConfiguration, this.consumers, this.beforeReconnect, this.afterReconnect, this.reconnect, this.reconnectAttempts, this.pauseBeforeReconnectInSeconds, filters);
        }
    }
}

