/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.salesforce.internal.streaming;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.apache.camel.CamelException;
import org.apache.camel.component.salesforce.SalesforceComponent;
import org.apache.camel.component.salesforce.SalesforceEndpoint;
import org.apache.camel.component.salesforce.SalesforceEndpointConfig;
import org.apache.camel.component.salesforce.SalesforceHttpClient;
import org.apache.camel.component.salesforce.StreamingApiConsumer;
import org.apache.camel.component.salesforce.api.SalesforceException;
import org.apache.camel.component.salesforce.internal.SalesforceSession;
import org.apache.camel.component.salesforce.internal.streaming.ReplayExtension;
import org.apache.camel.support.service.ServiceSupport;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSession;
import org.cometd.bayeux.client.ClientSessionChannel;
import org.cometd.client.BayeuxClient;
import org.cometd.client.http.jetty.JettyHttpClientTransport;
import org.cometd.client.transport.ClientTransport;
import org.eclipse.jetty.client.Request;
import org.eclipse.jetty.http.HttpHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubscriptionHelper
extends ServiceSupport {
    static final ReplayExtension REPLAY_EXTENSION = new ReplayExtension();
    private static final Logger LOG = LoggerFactory.getLogger(SubscriptionHelper.class);
    private static final int CONNECT_TIMEOUT = 110;
    private static final String FAILURE_FIELD = "failure";
    private static final String EXCEPTION_FIELD = "exception";
    private static final String SFDC_FIELD = "sfdc";
    private static final String FAILURE_REASON_FIELD = "failureReason";
    private static final int DISCONNECT_INTERVAL = 5000;
    private static final String SERVER_TOO_BUSY_ERROR = "503::";
    private static final String AUTHENTICATION_INVALID = "401::Authentication invalid";
    private static final String INVALID_REPLAY_ID_PATTERN = "400::The replayId \\{.*} you provided was invalid.*";
    BayeuxClient client;
    private final SalesforceComponent component;
    private SalesforceSession session;
    private final long timeout = 60000L;
    private final Map<StreamingApiConsumer, ClientSessionChannel.MessageListener> listenerMap;
    private final long maxBackoff;
    private final long backoffIncrement;
    private ClientSessionChannel.MessageListener handshakeListener;
    private ClientSessionChannel.MessageListener connectListener;
    private volatile String handshakeError;
    private volatile Exception handshakeException;
    private volatile String connectError;
    private volatile Exception connectException;
    private volatile boolean reconnecting;
    private final AtomicLong handshakeBackoff;
    private final AtomicBoolean handshaking = new AtomicBoolean();

    public SubscriptionHelper(SalesforceComponent component) {
        this.component = component;
        this.listenerMap = new ConcurrentHashMap<StreamingApiConsumer, ClientSessionChannel.MessageListener>();
        this.handshakeBackoff = new AtomicLong();
        this.backoffIncrement = component.getConfig().getBackoffIncrement();
        this.maxBackoff = component.getConfig().getMaxBackoff();
    }

    protected void doStart() throws Exception {
        this.session = this.component.getSession();
        if (this.component.getLoginConfig().isLazyLogin()) {
            throw new CamelException("Lazy login is not supported by salesforce consumers.");
        }
        this.client = SubscriptionHelper.createClient(this.component, this.session);
        this.initMessageListeners();
        this.connect();
    }

    private void initMessageListeners() {
        if (this.handshakeListener == null) {
            this.handshakeListener = new ClientSessionChannel.MessageListener(){

                public void onMessage(ClientSessionChannel channel, Message message) {
                    SubscriptionHelper.this.component.getHttpClient().getWorkerPool().execute(() -> {
                        LOG.debug("[CHANNEL:META_HANDSHAKE]: {}", (Object)message);
                        if (!message.isSuccessful()) {
                            String failureReason;
                            LOG.warn("Handshake failure: {}", (Object)message);
                            SubscriptionHelper.this.handshakeError = (String)message.get((Object)"error");
                            SubscriptionHelper.this.handshakeException = SubscriptionHelper.getFailure(message);
                            if (SubscriptionHelper.this.handshakeError != null && SubscriptionHelper.this.handshakeError.startsWith("403::") && (failureReason = SubscriptionHelper.getFailureReason(message)).equals(SubscriptionHelper.AUTHENTICATION_INVALID)) {
                                LOG.debug("attempting login due to handshake error: 403 -> 401::Authentication invalid");
                                SubscriptionHelper.this.session.attemptLoginUntilSuccessful(SubscriptionHelper.this.backoffIncrement, SubscriptionHelper.this.maxBackoff);
                            }
                            LOG.debug("Handshake failed, so try again.");
                            SubscriptionHelper.this.handshake();
                        } else if (!SubscriptionHelper.this.listenerMap.isEmpty()) {
                            SubscriptionHelper.this.reconnecting = true;
                        }
                    });
                }
            };
        }
        this.client.getChannel("/meta/handshake").addListener((ClientSessionChannel.ClientSessionChannelListener)this.handshakeListener);
        if (this.connectListener == null) {
            this.connectListener = new ClientSessionChannel.MessageListener(){

                public void onMessage(ClientSessionChannel channel, Message message) {
                    SubscriptionHelper.this.component.getHttpClient().getWorkerPool().execute(() -> {
                        LOG.debug("[CHANNEL:META_CONNECT]: {}", (Object)message);
                        if (!message.isSuccessful()) {
                            LOG.warn("Connect failure: {}", (Object)message);
                            SubscriptionHelper.this.connectError = (String)message.get((Object)"error");
                            SubscriptionHelper.this.connectException = SubscriptionHelper.getFailure(message);
                            if (SubscriptionHelper.this.connectError != null && SubscriptionHelper.this.connectError.equals(SubscriptionHelper.AUTHENTICATION_INVALID)) {
                                LOG.debug("connectError: {}", (Object)SubscriptionHelper.this.connectError);
                                LOG.debug("Attempting login...");
                                SubscriptionHelper.this.session.attemptLoginUntilSuccessful(SubscriptionHelper.this.backoffIncrement, SubscriptionHelper.this.maxBackoff);
                            }
                            if (message.getAdvice() != null && !message.getAdvice().get("reconnect").equals("retry")) {
                                LOG.debug("Advice != retry, so handshaking");
                                SubscriptionHelper.this.handshake();
                            }
                        } else if (SubscriptionHelper.this.reconnecting) {
                            LOG.debug("Refreshing subscriptions to {} channels on reconnect", (Object)SubscriptionHelper.this.listenerMap.size());
                            HashMap<StreamingApiConsumer, ClientSessionChannel.MessageListener> map = new HashMap<StreamingApiConsumer, ClientSessionChannel.MessageListener>(SubscriptionHelper.this.listenerMap);
                            SubscriptionHelper.this.listenerMap.clear();
                            for (Map.Entry entry : map.entrySet()) {
                                StreamingApiConsumer consumer = (StreamingApiConsumer)((Object)((Object)entry.getKey()));
                                String topicName = consumer.getTopicName();
                                SubscriptionHelper.this.subscribe(topicName, consumer);
                            }
                            SubscriptionHelper.this.reconnecting = false;
                        }
                    });
                }
            };
        }
        this.client.getChannel("/meta/connect").addListener((ClientSessionChannel.ClientSessionChannelListener)this.connectListener);
    }

    private void connect() throws CamelException {
        this.client.handshake();
        long waitMs = TimeUnit.MILLISECONDS.convert(110L, TimeUnit.SECONDS);
        if (!this.client.waitFor(waitMs, BayeuxClient.State.CONNECTED, new BayeuxClient.State[0])) {
            if (this.handshakeException != null) {
                throw new CamelException(String.format("Exception during HANDSHAKE: %s", this.handshakeException.getMessage()), (Throwable)this.handshakeException);
            }
            if (this.handshakeError != null) {
                throw new CamelException(String.format("Error during HANDSHAKE: %s", this.handshakeError));
            }
            if (this.connectException != null) {
                throw new CamelException(String.format("Exception during CONNECT: %s", this.connectException.getMessage()), (Throwable)this.connectException);
            }
            if (this.connectError != null) {
                throw new CamelException(String.format("Error during CONNECT: %s", this.connectError));
            }
            throw new CamelException(String.format("Handshake request timeout after %s seconds", 110));
        }
    }

    private void handshake() {
        LOG.debug("Begin handshake if not already in progress.");
        if (!this.handshaking.compareAndSet(false, true)) {
            return;
        }
        LOG.debug("Continuing with handshake.");
        try {
            this.doHandshake();
        }
        finally {
            this.handshaking.set(false);
        }
    }

    private void doHandshake() {
        if (this.isStoppingOrStopped()) {
            return;
        }
        LOG.info("Handshaking after unexpected disconnect from Salesforce...");
        boolean abort = false;
        LOG.debug("Waiting to disconnect...");
        while (!abort && !this.client.isDisconnected()) {
            try {
                Thread.sleep(5000L);
            }
            catch (InterruptedException e) {
                LOG.error("Aborting handshake on interrupt!");
                abort = true;
            }
            abort = abort || this.isStoppingOrStopped();
        }
        if (!abort) {
            long backoff = this.handshakeBackoff.getAndAdd(this.backoffIncrement);
            if (backoff > this.maxBackoff) {
                LOG.error("Handshake aborted after exceeding {} msecs backoff", (Object)this.maxBackoff);
                abort = true;
            } else {
                LOG.debug("Pausing for {} msecs before handshake attempt", (Object)backoff);
                try {
                    Thread.sleep(backoff);
                }
                catch (InterruptedException e) {
                    LOG.error("Aborting handshake on interrupt!");
                    abort = true;
                }
            }
            if (!abort) {
                Object lastError = new SalesforceException("Unknown error", null);
                try {
                    if (this.client != null) {
                        this.client.disconnect();
                        boolean disconnected = this.client.waitFor(60000L, BayeuxClient.State.DISCONNECTED, new BayeuxClient.State[0]);
                        if (!disconnected) {
                            LOG.warn("Could not disconnect client connected to: {} after: {} msec.", (Object)SubscriptionHelper.getEndpointUrl(this.component), (Object)60000L);
                            this.client.abort();
                        }
                        this.client.handshake();
                        long waitMs = TimeUnit.MILLISECONDS.convert(110L, TimeUnit.SECONDS);
                        this.client.waitFor(waitMs, BayeuxClient.State.CONNECTED, new BayeuxClient.State[0]);
                    }
                }
                catch (Exception e) {
                    LOG.error("Error handshaking: {}", (Object)e.getMessage(), (Object)e);
                    lastError = e;
                }
                if (this.client != null && this.client.isHandshook()) {
                    LOG.debug("Successful handshake!");
                    this.handshakeBackoff.set(this.backoffIncrement);
                } else {
                    LOG.error("Failed to handshake after pausing for {} msecs", (Object)backoff);
                    if (backoff + this.backoffIncrement > this.maxBackoff) {
                        String abortMsg = "Aborting handshake attempt due to: " + ((Throwable)lastError).getMessage();
                        SalesforceException ex = new SalesforceException(abortMsg, (Throwable)lastError);
                        for (StreamingApiConsumer consumer : this.listenerMap.keySet()) {
                            consumer.handleException(abortMsg, (Throwable)((Object)ex));
                        }
                    }
                }
            }
        }
    }

    private static Exception getFailure(Message message) {
        Object exception = null;
        if (message.get((Object)EXCEPTION_FIELD) != null) {
            exception = (Exception)message.get((Object)EXCEPTION_FIELD);
        } else if (message.get((Object)FAILURE_FIELD) != null) {
            exception = (Exception)((Map)message.get((Object)FAILURE_FIELD)).get(EXCEPTION_FIELD);
        } else {
            String failureReason = SubscriptionHelper.getFailureReason(message);
            if (failureReason != null) {
                exception = new SalesforceException(failureReason, null);
            }
        }
        return exception;
    }

    private void closeChannel(String name, ClientSessionChannel.MessageListener listener) {
        if (this.client == null) {
            return;
        }
        ClientSessionChannel channel = this.client.getChannel(name);
        channel.removeListener((ClientSessionChannel.ClientSessionChannelListener)listener);
        channel.release();
    }

    protected void doStop() throws Exception {
        this.closeChannel("/meta/connect", this.connectListener);
        this.closeChannel("/meta/handshake", this.handshakeListener);
        for (Map.Entry<StreamingApiConsumer, ClientSessionChannel.MessageListener> entry : this.listenerMap.entrySet()) {
            StreamingApiConsumer consumer = entry.getKey();
            String topic = consumer.getTopicName();
            ClientSessionChannel.MessageListener listener = entry.getValue();
            this.closeChannel(SubscriptionHelper.getChannelName(topic), listener);
        }
        if (this.client == null) {
            return;
        }
        this.client.disconnect();
        boolean disconnected = this.client.waitFor(60000L, BayeuxClient.State.DISCONNECTED, new BayeuxClient.State[0]);
        if (!disconnected) {
            LOG.warn("Could not disconnect client connected to: {} after: {} msec.", (Object)SubscriptionHelper.getEndpointUrl(this.component), (Object)60000L);
            this.client.abort();
        }
        this.client = null;
        if (this.session != null) {
            this.session.logout();
        }
        LOG.debug("Stopped the helper and destroyed the client");
    }

    static BayeuxClient createClient(SalesforceComponent component, final SalesforceSession session) throws SalesforceException {
        SalesforceHttpClient httpClient = component.getConfig().getHttpClient();
        Map<String, Object> options = new HashMap<String, Long>();
        options.put("maxNetworkDelay", httpClient.getTimeout());
        if (component.getLongPollingTransportProperties() != null) {
            options = component.getLongPollingTransportProperties();
        }
        if (session.getAccessToken() == null && !component.getLoginConfig().isLazyLogin()) {
            session.login(null);
        }
        JettyHttpClientTransport transport = new JettyHttpClientTransport(options, httpClient){

            protected void customize(Request request) {
                super.customize(request);
                String accessToken = session.getAccessToken();
                if (accessToken == null) {
                    try {
                        accessToken = session.login(null);
                    }
                    catch (SalesforceException e) {
                        throw new RuntimeException((Throwable)((Object)e));
                    }
                }
                String finalAccessToken = new String(accessToken);
                request.headers(h -> h.add(HttpHeader.AUTHORIZATION, "OAuth " + finalAccessToken));
            }
        };
        BayeuxClient client = new BayeuxClient(SubscriptionHelper.getEndpointUrl(component), (ClientTransport)transport, new ClientTransport[0]);
        client.addExtension((ClientSession.Extension)REPLAY_EXTENSION);
        return client;
    }

    public void subscribe(String topicName, StreamingApiConsumer consumer) {
        this.subscribe(topicName, consumer, false);
    }

    public void subscribe(final String topicName, final StreamingApiConsumer consumer, boolean skipReplayId) {
        final String channelName = SubscriptionHelper.getChannelName(topicName);
        if (!this.reconnecting && !skipReplayId) {
            this.setupReplay((SalesforceEndpoint)consumer.getEndpoint());
        }
        LOG.info("Subscribing to channel {}...", (Object)channelName);
        final ClientSessionChannel.MessageListener listener = new ClientSessionChannel.MessageListener(){

            public void onMessage(ClientSessionChannel channel, Message message) {
                LOG.debug("Received Message: {}", (Object)message);
                consumer.processMessage(channel, message);
            }
        };
        ClientSessionChannel.MessageListener subscriptionListener = new ClientSessionChannel.MessageListener(){

            public void onMessage(ClientSessionChannel channel, Message message) {
                LOG.debug("[CHANNEL:META_SUBSCRIBE]: {}", (Object)message);
                String subscribedChannelName = message.get((Object)"subscription").toString();
                if (channelName.equals(subscribedChannelName)) {
                    if (!message.isSuccessful()) {
                        String error = (String)message.get((Object)"error");
                        if (error == null) {
                            error = "Missing error message";
                        }
                        Exception failure = SubscriptionHelper.getFailure(message);
                        String msg = String.format("Error subscribing to %s: %s", topicName, failure != null ? failure.getMessage() : error);
                        boolean abort = true;
                        if (SubscriptionHelper.isTemporaryError(message)) {
                            LOG.warn(msg);
                            long backoff = SubscriptionHelper.this.handshakeBackoff.getAndAdd(SubscriptionHelper.this.backoffIncrement);
                            if (backoff > SubscriptionHelper.this.maxBackoff) {
                                LOG.error("Subscribe aborted after exceeding {} msecs backoff", (Object)SubscriptionHelper.this.maxBackoff);
                            } else {
                                abort = false;
                                try {
                                    LOG.debug("Pausing for {} msecs before subscribe attempt", (Object)backoff);
                                    Thread.sleep(backoff);
                                    SubscriptionHelper.this.component.getHttpClient().getWorkerPool().execute(() -> SubscriptionHelper.this.subscribe(topicName, consumer));
                                }
                                catch (InterruptedException e) {
                                    LOG.warn("Aborting subscribe on interrupt!", (Throwable)e);
                                }
                            }
                        } else if (error.matches(SubscriptionHelper.INVALID_REPLAY_ID_PATTERN)) {
                            abort = false;
                            Long fallBackReplayId = ((SalesforceEndpoint)consumer.getEndpoint()).getConfiguration().getFallBackReplayId();
                            LOG.warn(error);
                            LOG.warn("Falling back to replayId {} for channel {}", (Object)fallBackReplayId, (Object)channelName);
                            REPLAY_EXTENSION.addChannelReplayId(channelName, fallBackReplayId);
                            SubscriptionHelper.this.subscribe(topicName, consumer, true);
                        }
                        if (abort && SubscriptionHelper.this.client != null) {
                            consumer.handleException(msg, (Throwable)((Object)new SalesforceException(msg, failure)));
                        }
                    } else {
                        LOG.info("Subscribed to channel {}", (Object)subscribedChannelName);
                        SubscriptionHelper.this.listenerMap.put(consumer, listener);
                        SubscriptionHelper.this.handshakeBackoff.set(0L);
                    }
                    if (SubscriptionHelper.this.client != null) {
                        SubscriptionHelper.this.client.getChannel("/meta/subscribe").removeListener((ClientSessionChannel.ClientSessionChannelListener)this);
                    } else {
                        LOG.warn("Trying to handle a subscription message but the client is already destroyed");
                    }
                }
            }
        };
        this.client.getChannel("/meta/subscribe").addListener((ClientSessionChannel.ClientSessionChannelListener)subscriptionListener);
        ClientSessionChannel clientChannel = this.client.getChannel(channelName);
        clientChannel.subscribe(listener);
    }

    private static boolean isTemporaryError(Message message) {
        String failureReason = SubscriptionHelper.getFailureReason(message);
        return failureReason != null && failureReason.startsWith(SERVER_TOO_BUSY_ERROR);
    }

    private static String getFailureReason(Message message) {
        Map sfdcFields;
        String failureReason = null;
        if (message.getExt() != null && (sfdcFields = (Map)message.getExt().get(SFDC_FIELD)) != null) {
            failureReason = (String)sfdcFields.get(FAILURE_REASON_FIELD);
        }
        return failureReason;
    }

    void setupReplay(SalesforceEndpoint endpoint) {
        String topicName = endpoint.getTopicName();
        Optional<Long> replayId = SubscriptionHelper.determineReplayIdFor(endpoint, topicName);
        if (replayId.isPresent()) {
            String channelName = SubscriptionHelper.getChannelName(topicName);
            Long replayIdValue = replayId.get();
            LOG.info("Set Replay extension to replay from `{}` for channel `{}`", (Object)replayIdValue, (Object)channelName);
            REPLAY_EXTENSION.addChannelReplayId(channelName, replayIdValue);
        }
    }

    static Optional<Long> determineReplayIdFor(SalesforceEndpoint endpoint, String topicName) {
        String channelName = SubscriptionHelper.getChannelName(topicName);
        Long replayId = endpoint.getReplayId();
        SalesforceComponent component = endpoint.getComponent();
        SalesforceEndpointConfig endpointConfiguration = endpoint.getConfiguration();
        Map<String, Long> endpointInitialReplayIdMap = endpointConfiguration.getInitialReplayIdMap();
        Long endpointReplayId = endpointInitialReplayIdMap.getOrDefault(topicName, endpointInitialReplayIdMap.get(channelName));
        Long endpointDefaultReplayId = endpointConfiguration.getDefaultReplayId();
        SalesforceEndpointConfig componentConfiguration = component.getConfig();
        Map<String, Long> componentInitialReplayIdMap = componentConfiguration.getInitialReplayIdMap();
        Long componentReplayId = componentInitialReplayIdMap.getOrDefault(topicName, componentInitialReplayIdMap.get(channelName));
        Long componentDefaultReplayId = componentConfiguration.getDefaultReplayId();
        return Stream.of(replayId, endpointReplayId, componentReplayId, endpointDefaultReplayId, componentDefaultReplayId).filter(Objects::nonNull).findFirst();
    }

    static String getChannelName(String topicName) {
        StringBuilder channelName = new StringBuilder();
        if (topicName.charAt(0) != '/') {
            channelName.append('/');
        }
        if (topicName.indexOf(47, 1) > 0) {
            channelName.append(topicName);
        } else {
            channelName.append("topic/");
            channelName.append(topicName);
        }
        return channelName.toString();
    }

    public void unsubscribe(String topicName, StreamingApiConsumer consumer) {
        String channelName = SubscriptionHelper.getChannelName(topicName);
        ClientSessionChannel.MessageListener listener = this.listenerMap.remove((Object)consumer);
        if (listener != null) {
            LOG.debug("Unsubscribing from channel {}...", (Object)channelName);
            ClientSessionChannel clientChannel = this.client.getChannel(channelName);
            clientChannel.unsubscribe(listener);
        }
    }

    static String getEndpointUrl(SalesforceComponent component) {
        if (Double.parseDouble(component.getConfig().getApiVersion()) == 36.0) {
            boolean replayOptionsPresent;
            boolean bl = replayOptionsPresent = component.getConfig().getDefaultReplayId() != null || !component.getConfig().getInitialReplayIdMap().isEmpty();
            if (replayOptionsPresent) {
                return component.getSession().getInstanceUrl() + "/cometd/replay/" + component.getConfig().getApiVersion();
            }
        }
        return component.getSession().getInstanceUrl() + "/cometd/" + component.getConfig().getApiVersion();
    }
}

