/*
 * Decompiled with CFR 0.152.
 */
package com.salesforce.emp.connector;

import com.salesforce.emp.connector.BayeuxParameters;
import com.salesforce.emp.connector.CannotSubscribe;
import com.salesforce.emp.connector.ReplayExtension;
import com.salesforce.emp.connector.TopicSubscription;
import java.net.ConnectException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSession;
import org.cometd.bayeux.client.ClientSessionChannel;
import org.cometd.client.BayeuxClient;
import org.cometd.client.transport.ClientTransport;
import org.cometd.client.transport.LongPollingTransport;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EmpConnector {
    private static final String ERROR = "error";
    private static final String FAILURE = "failure";
    public static long REPLAY_FROM_EARLIEST = -2L;
    public static long REPLAY_FROM_TIP = -1L;
    private static String AUTHORIZATION = "Authorization";
    private static final Logger log = LoggerFactory.getLogger(EmpConnector.class);
    private volatile BayeuxClient client;
    private final HttpClient httpClient;
    private final BayeuxParameters parameters;
    private final ConcurrentMap<String, Long> replay = new ConcurrentHashMap<String, Long>();
    private final AtomicBoolean running = new AtomicBoolean();
    private final Set<SubscriptionImpl> subscriptions = new CopyOnWriteArraySet<SubscriptionImpl>();
    private final Set<MessageListenerInfo> listenerInfos = new CopyOnWriteArraySet<MessageListenerInfo>();
    private Function<Boolean, String> bearerTokenProvider;
    private AtomicBoolean reauthenticate = new AtomicBoolean(false);

    public EmpConnector(BayeuxParameters parameters) {
        this.parameters = parameters;
        SslContextFactory ctxFactory = parameters.sslContextFactory();
        this.httpClient = new HttpClient(ctxFactory);
        this.httpClient.getProxyConfiguration().getProxies().addAll(parameters.proxies());
    }

    public EmpConnector(BayeuxParameters parameters, HttpClient httpClient) {
        this.parameters = parameters;
        this.httpClient = httpClient;
        this.httpClient.getProxyConfiguration().getProxies().addAll(parameters.proxies());
    }

    public Future<Boolean> start() {
        if (this.running.compareAndSet(false, true)) {
            this.addListener("/meta/connect", new AuthFailureListener());
            this.addListener("/meta/handshake", new AuthFailureListener());
            this.replay.clear();
            return this.connect();
        }
        CompletableFuture<Boolean> future = new CompletableFuture<Boolean>();
        future.complete(true);
        return future;
    }

    public void stop() {
        if (!this.running.compareAndSet(true, false)) {
            return;
        }
        if (this.client != null) {
            log.info("Disconnecting Bayeux Client in EmpConnector");
            this.client.disconnect();
            this.client = null;
        }
        if (this.httpClient != null) {
            try {
                this.httpClient.stop();
            }
            catch (Exception e) {
                log.error("Unable to stop HTTP transport[{}]", (Object)this.parameters.endpoint(), (Object)e);
            }
        }
    }

    public void setBearerTokenProvider(Function<Boolean, String> bearerTokenProvider) {
        this.bearerTokenProvider = bearerTokenProvider;
    }

    public Future<TopicSubscription> subscribe(String topic, long replayFrom, Consumer<Map<String, Object>> consumer) {
        if (!this.running.get()) {
            throw new IllegalStateException(String.format("Connector[%s} has not been started", this.parameters.endpoint()));
        }
        String topicWithoutQueryString = EmpConnector.topicWithoutQueryString(topic = topic.replaceAll("/$", ""));
        if (this.replay.putIfAbsent(topicWithoutQueryString, replayFrom) != null) {
            throw new IllegalStateException(String.format("Already subscribed to %s [%s]", topic, this.parameters.endpoint()));
        }
        SubscriptionImpl subscription = new SubscriptionImpl(topic, consumer);
        return subscription.subscribe();
    }

    public Future<TopicSubscription> subscribeEarliest(String topic, Consumer<Map<String, Object>> consumer) {
        return this.subscribe(topic, REPLAY_FROM_EARLIEST, consumer);
    }

    public Future<TopicSubscription> subscribeTip(String topic, Consumer<Map<String, Object>> consumer) {
        return this.subscribe(topic, REPLAY_FROM_TIP, consumer);
    }

    public EmpConnector addListener(String channel, ClientSessionChannel.MessageListener messageListener) {
        this.listenerInfos.add(new MessageListenerInfo(channel, messageListener));
        return this;
    }

    public boolean isConnected() {
        return this.client != null && this.client.isConnected();
    }

    public boolean isDisconnected() {
        return this.client == null || this.client.isDisconnected();
    }

    public boolean isHandshook() {
        return this.client != null && this.client.isHandshook();
    }

    public long getLastReplayId(String topic) {
        return (Long)this.replay.get(topic);
    }

    private static String topicWithoutQueryString(String fullTopic) {
        return fullTopic.split("\\?")[0];
    }

    private Future<Boolean> connect() {
        log.info("EmpConnector connecting");
        CompletableFuture<Boolean> future = new CompletableFuture<Boolean>();
        try {
            this.httpClient.start();
        }
        catch (Exception e) {
            log.error("Unable to start HTTP transport[{}]", (Object)this.parameters.endpoint(), (Object)e);
            this.running.set(false);
            future.complete(false);
            return future;
        }
        final String bearerToken = this.bearerToken();
        LongPollingTransport httpTransport = new LongPollingTransport(this.parameters.longPollingOptions(), this.httpClient){

            protected void customize(Request request) {
                request.header(AUTHORIZATION, bearerToken);
            }
        };
        this.client = new BayeuxClient(this.parameters.endpoint().toExternalForm(), (ClientTransport)httpTransport, new ClientTransport[0]);
        this.client.addExtension((ClientSession.Extension)new ReplayExtension(this.replay));
        this.addListeners(this.client);
        this.client.handshake((c, m) -> {
            if (!m.isSuccessful()) {
                Object error = m.get((Object)ERROR);
                if (error == null) {
                    error = m.get((Object)FAILURE);
                }
                future.completeExceptionally(new ConnectException(String.format("Cannot connect [%s] : %s", this.parameters.endpoint(), error)));
                this.running.set(false);
            } else {
                this.subscriptions.forEach(SubscriptionImpl::subscribe);
                future.complete(true);
            }
        });
        return future;
    }

    private void addListeners(BayeuxClient client) {
        for (MessageListenerInfo info : this.listenerInfos) {
            client.getChannel(info.getChannelName()).addListener((ClientSessionChannel.ClientSessionChannelListener)info.getMessageListener());
        }
    }

    private String bearerToken() {
        String bearerToken;
        if (this.bearerTokenProvider != null) {
            bearerToken = this.bearerTokenProvider.apply(this.reauthenticate.get());
            this.reauthenticate.compareAndSet(true, false);
        } else {
            bearerToken = this.parameters.bearerToken();
        }
        return bearerToken;
    }

    private void reconnect() {
        if (this.running.compareAndSet(false, true)) {
            this.connect();
        } else {
            log.error("The current value of running is not as we expect, this means our reconnection may not happen");
        }
    }

    private static class MessageListenerInfo {
        private String channelName;
        private ClientSessionChannel.MessageListener messageListener;

        MessageListenerInfo(String channelName, ClientSessionChannel.MessageListener messageListener) {
            this.channelName = channelName;
            this.messageListener = messageListener;
        }

        String getChannelName() {
            return this.channelName;
        }

        ClientSessionChannel.MessageListener getMessageListener() {
            return this.messageListener;
        }
    }

    private class AuthFailureListener
    implements ClientSessionChannel.MessageListener {
        private static final String ERROR_401 = "401";
        private static final String ERROR_403 = "403";

        private AuthFailureListener() {
        }

        public void onMessage(ClientSessionChannel channel, Message message) {
            if (!message.isSuccessful() && (this.isError(message, ERROR_401) || this.isError(message, ERROR_403))) {
                EmpConnector.this.reauthenticate.set(true);
                EmpConnector.this.stop();
                EmpConnector.this.reconnect();
            }
        }

        private boolean isError(Message message, String errorCode) {
            String error = (String)message.get((Object)EmpConnector.ERROR);
            String failureReason = this.getFailureReason(message);
            return error != null && error.startsWith(errorCode) || failureReason != null && failureReason.startsWith(errorCode);
        }

        private String getFailureReason(Message message) {
            Map sfdc;
            String failureReason = null;
            Map ext = message.getExt();
            if (ext != null && (sfdc = (Map)ext.get("sfdc")) != null) {
                failureReason = (String)sfdc.get("failureReason");
            }
            return failureReason;
        }
    }

    private class SubscriptionImpl
    implements TopicSubscription {
        private final String topic;
        private final Consumer<Map<String, Object>> consumer;

        private SubscriptionImpl(String topic, Consumer<Map<String, Object>> consumer) {
            this.topic = topic;
            this.consumer = consumer;
            EmpConnector.this.subscriptions.add(this);
        }

        @Override
        public void cancel() {
            EmpConnector.this.replay.remove(EmpConnector.topicWithoutQueryString(this.topic));
            if (EmpConnector.this.running.get() && EmpConnector.this.client != null) {
                EmpConnector.this.client.getChannel(this.topic).unsubscribe();
                EmpConnector.this.subscriptions.remove(this);
            }
        }

        @Override
        public long getReplayFrom() {
            return EmpConnector.this.replay.getOrDefault(EmpConnector.topicWithoutQueryString(this.topic), REPLAY_FROM_EARLIEST);
        }

        @Override
        public String getTopic() {
            return this.topic;
        }

        public String toString() {
            return String.format("Subscription [%s:%s]", this.getTopic(), this.getReplayFrom());
        }

        Future<TopicSubscription> subscribe() {
            Long replayFrom = this.getReplayFrom();
            ClientSessionChannel channel = EmpConnector.this.client.getChannel(this.topic);
            CompletableFuture<TopicSubscription> future = new CompletableFuture<TopicSubscription>();
            channel.subscribe((c, message) -> this.consumer.accept(message.getDataAsMap()), (c, message) -> {
                if (message.isSuccessful()) {
                    future.complete(this);
                } else {
                    Object error = message.get((Object)EmpConnector.ERROR);
                    if (error == null) {
                        error = message.get((Object)EmpConnector.FAILURE);
                    }
                    future.completeExceptionally(new CannotSubscribe(EmpConnector.this.parameters.endpoint(), this.topic, replayFrom, error != null ? error : message));
                }
            });
            return future;
        }
    }
}

