/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.salesforce.internal.streaming;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.apache.camel.CamelException;
import org.apache.camel.component.salesforce.SalesforceComponent;
import org.apache.camel.component.salesforce.SalesforceConsumer;
import org.apache.camel.component.salesforce.SalesforceEndpoint;
import org.apache.camel.component.salesforce.SalesforceEndpointConfig;
import org.apache.camel.component.salesforce.SalesforceHttpClient;
import org.apache.camel.component.salesforce.api.SalesforceException;
import org.apache.camel.component.salesforce.internal.SalesforceSession;
import org.apache.camel.component.salesforce.internal.streaming.CometDReplayExtension;
import org.apache.camel.support.service.ServiceSupport;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSession;
import org.cometd.bayeux.client.ClientSessionChannel;
import org.cometd.client.BayeuxClient;
import org.cometd.client.transport.ClientTransport;
import org.cometd.client.transport.LongPollingTransport;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.http.HttpHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubscriptionHelper
extends ServiceSupport {
    static final CometDReplayExtension REPLAY_EXTENSION = new CometDReplayExtension();
    private static final Logger LOG = LoggerFactory.getLogger(SubscriptionHelper.class);
    private static final int CONNECT_TIMEOUT = 110;
    private static final int CHANNEL_TIMEOUT = 40;
    private static final String FAILURE_FIELD = "failure";
    private static final String EXCEPTION_FIELD = "exception";
    private static final String SFDC_FIELD = "sfdc";
    private static final String FAILURE_REASON_FIELD = "failureReason";
    private static final int DISCONNECT_INTERVAL = 5000;
    private static final String SERVER_TOO_BUSY_ERROR = "503::";
    BayeuxClient client;
    private final SalesforceComponent component;
    private final SalesforceSession session;
    private final long timeout = 60000L;
    private final Map<SalesforceConsumer, ClientSessionChannel.MessageListener> listenerMap;
    private final long maxBackoff;
    private final long backoffIncrement;
    private ClientSessionChannel.MessageListener handshakeListener;
    private ClientSessionChannel.MessageListener connectListener;
    private ClientSessionChannel.MessageListener disconnectListener;
    private volatile String handshakeError;
    private volatile Exception handshakeException;
    private volatile String connectError;
    private volatile Exception connectException;
    private volatile boolean reconnecting;
    private final AtomicLong restartBackoff;

    public SubscriptionHelper(SalesforceComponent component) throws SalesforceException {
        this.component = component;
        this.session = component.getSession();
        this.listenerMap = new ConcurrentHashMap<SalesforceConsumer, ClientSessionChannel.MessageListener>();
        this.restartBackoff = new AtomicLong(0L);
        this.backoffIncrement = component.getConfig().getBackoffIncrement();
        this.maxBackoff = component.getConfig().getMaxBackoff();
    }

    protected void doStart() throws Exception {
        this.client = SubscriptionHelper.createClient(this.component);
        this.handshakeError = null;
        this.handshakeException = null;
        this.connectError = null;
        this.connectException = null;
        if (this.handshakeListener == null) {
            this.handshakeListener = new ClientSessionChannel.MessageListener(){

                public void onMessage(ClientSessionChannel channel, Message message) {
                    LOG.debug("[CHANNEL:META_HANDSHAKE]: {}", (Object)message);
                    if (!message.isSuccessful()) {
                        LOG.warn("Handshake failure: {}", (Object)message);
                        SubscriptionHelper.this.handshakeError = (String)message.get((Object)"error");
                        SubscriptionHelper.this.handshakeException = SubscriptionHelper.getFailure(message);
                        if (SubscriptionHelper.this.handshakeError != null) {
                            if (SubscriptionHelper.this.handshakeError.startsWith("401::")) {
                                try {
                                    LOG.info("Refreshing OAuth token...");
                                    SubscriptionHelper.this.session.login(SubscriptionHelper.this.session.getAccessToken());
                                    LOG.info("Refreshed OAuth token for re-handshake");
                                }
                                catch (SalesforceException e) {
                                    LOG.warn("Error renewing OAuth token on 401 error: " + e.getMessage(), (Throwable)((Object)e));
                                }
                            }
                            if (SubscriptionHelper.this.handshakeError.startsWith("403::")) {
                                try {
                                    LOG.info("Cleaning session (logout) from SalesforceSession before restarting client");
                                    SubscriptionHelper.this.session.logout();
                                }
                                catch (SalesforceException e) {
                                    LOG.warn("Error while cleaning session: " + e.getMessage(), (Throwable)((Object)e));
                                }
                            }
                        }
                        SubscriptionHelper.this.restartClient();
                    } else if (!SubscriptionHelper.this.listenerMap.isEmpty()) {
                        SubscriptionHelper.this.reconnecting = true;
                    }
                }
            };
        }
        this.client.getChannel("/meta/handshake").addListener((ClientSessionChannel.ClientSessionChannelListener)this.handshakeListener);
        if (this.connectListener == null) {
            this.connectListener = new ClientSessionChannel.MessageListener(){

                public void onMessage(ClientSessionChannel channel, Message message) {
                    LOG.debug("[CHANNEL:META_CONNECT]: {}", (Object)message);
                    if (!message.isSuccessful()) {
                        LOG.warn("Connect failure: {}", (Object)message);
                        SubscriptionHelper.this.connectError = (String)message.get((Object)"error");
                        SubscriptionHelper.this.connectException = SubscriptionHelper.getFailure(message);
                    } else if (SubscriptionHelper.this.reconnecting) {
                        SubscriptionHelper.this.reconnecting = false;
                        LOG.debug("Refreshing subscriptions to {} channels on reconnect", (Object)SubscriptionHelper.this.listenerMap.size());
                        HashMap map = new HashMap();
                        map.putAll(SubscriptionHelper.this.listenerMap);
                        SubscriptionHelper.this.listenerMap.clear();
                        for (Map.Entry entry : map.entrySet()) {
                            SalesforceConsumer consumer = (SalesforceConsumer)((Object)entry.getKey());
                            String topicName = consumer.getTopicName();
                            SubscriptionHelper.this.subscribe(topicName, consumer);
                        }
                    }
                }
            };
        }
        this.client.getChannel("/meta/connect").addListener((ClientSessionChannel.ClientSessionChannelListener)this.connectListener);
        if (this.disconnectListener == null) {
            this.disconnectListener = new ClientSessionChannel.MessageListener(){

                public void onMessage(ClientSessionChannel clientSessionChannel, Message message) {
                    SubscriptionHelper.this.restartClient();
                }
            };
        }
        this.client.getChannel("/meta/disconnect").addListener((ClientSessionChannel.ClientSessionChannelListener)this.disconnectListener);
        this.client.handshake();
        long waitMs = TimeUnit.MILLISECONDS.convert(110L, TimeUnit.SECONDS);
        if (!this.client.waitFor(waitMs, BayeuxClient.State.CONNECTED, new BayeuxClient.State[0])) {
            if (this.handshakeException != null) {
                throw new CamelException(String.format("Exception during HANDSHAKE: %s", this.handshakeException.getMessage()), (Throwable)this.handshakeException);
            }
            if (this.handshakeError != null) {
                throw new CamelException(String.format("Error during HANDSHAKE: %s", this.handshakeError));
            }
            if (this.connectException != null) {
                throw new CamelException(String.format("Exception during CONNECT: %s", this.connectException.getMessage()), (Throwable)this.connectException);
            }
            if (this.connectError != null) {
                throw new CamelException(String.format("Error during CONNECT: %s", this.connectError));
            }
            throw new CamelException(String.format("Handshake request timeout after %s seconds", 110));
        }
    }

    private void restartClient() {
        SalesforceHttpClient httpClient = this.component.getConfig().getHttpClient();
        httpClient.getExecutor().execute(new Runnable(){

            @Override
            public void run() {
                LOG.info("Restarting on unexpected disconnect from Salesforce...");
                boolean abort = false;
                LOG.debug("Waiting to disconnect...");
                while (!SubscriptionHelper.this.client.isDisconnected()) {
                    try {
                        Thread.sleep(5000L);
                    }
                    catch (InterruptedException e) {
                        LOG.error("Aborting restart on interrupt!");
                        abort = true;
                    }
                }
                if (!abort) {
                    long backoff = SubscriptionHelper.this.restartBackoff.getAndAdd(SubscriptionHelper.this.backoffIncrement);
                    if (backoff > SubscriptionHelper.this.maxBackoff) {
                        LOG.error("Restart aborted after exceeding {} msecs backoff", (Object)SubscriptionHelper.this.maxBackoff);
                        abort = true;
                    } else {
                        LOG.debug("Pausing for {} msecs before restart attempt", (Object)backoff);
                        try {
                            Thread.sleep(backoff);
                        }
                        catch (InterruptedException e) {
                            LOG.error("Aborting restart on interrupt!");
                            abort = true;
                        }
                    }
                    if (!abort) {
                        Object lastError = new SalesforceException("Unknown error", null);
                        try {
                            SubscriptionHelper.this.doStop();
                            SubscriptionHelper.this.doStart();
                        }
                        catch (Exception e) {
                            LOG.error("Error restarting: " + e.getMessage(), (Throwable)e);
                            lastError = e;
                        }
                        if (SubscriptionHelper.this.client != null && SubscriptionHelper.this.client.isHandshook()) {
                            LOG.info("Successfully restarted!");
                            SubscriptionHelper.this.restartBackoff.set(SubscriptionHelper.this.client.getBackoffIncrement());
                        } else {
                            LOG.error("Failed to restart after pausing for {} msecs", (Object)backoff);
                            if (backoff + SubscriptionHelper.this.backoffIncrement > SubscriptionHelper.this.maxBackoff) {
                                String abortMsg = "Aborting restart attempt due to: " + ((Throwable)lastError).getMessage();
                                SalesforceException ex = new SalesforceException(abortMsg, (Throwable)lastError);
                                for (SalesforceConsumer consumer : SubscriptionHelper.this.listenerMap.keySet()) {
                                    consumer.handleException(abortMsg, (Throwable)((Object)ex));
                                }
                            }
                        }
                    }
                }
            }
        });
    }

    private static Exception getFailure(Message message) {
        Object exception = null;
        if (message.get((Object)EXCEPTION_FIELD) != null) {
            exception = (Exception)message.get((Object)EXCEPTION_FIELD);
        } else if (message.get((Object)FAILURE_FIELD) != null) {
            exception = (Exception)((Map)message.get((Object)FAILURE_FIELD)).get(EXCEPTION_FIELD);
        } else {
            String failureReason = SubscriptionHelper.getFailureReason(message);
            if (failureReason != null) {
                exception = new SalesforceException(failureReason, null);
            }
        }
        return exception;
    }

    protected void doStop() throws Exception {
        this.client.getChannel("/meta/disconnect").removeListener((ClientSessionChannel.ClientSessionChannelListener)this.disconnectListener);
        this.client.getChannel("/meta/connect").removeListener((ClientSessionChannel.ClientSessionChannelListener)this.connectListener);
        this.client.getChannel("/meta/handshake").removeListener((ClientSessionChannel.ClientSessionChannelListener)this.handshakeListener);
        this.client.disconnect();
        boolean disconnected = this.client.waitFor(60000L, BayeuxClient.State.DISCONNECTED, new BayeuxClient.State[0]);
        if (!disconnected) {
            LOG.warn("Could not disconnect client connected to: {} after: {} msec.", (Object)SubscriptionHelper.getEndpointUrl(this.component), (Object)60000L);
            this.client.abort();
        }
        this.client = null;
    }

    static BayeuxClient createClient(SalesforceComponent component) throws SalesforceException {
        SalesforceSession session;
        SalesforceHttpClient httpClient = component.getConfig().getHttpClient();
        Map<String, Object> options = new HashMap<String, Long>();
        options.put("maxNetworkDelay", httpClient.getTimeout());
        if (component.getLongPollingTransportProperties() != null) {
            options = component.getLongPollingTransportProperties();
        }
        if ((session = component.getSession()).getAccessToken() == null) {
            session.login(null);
        }
        LongPollingTransport transport = new LongPollingTransport(options, httpClient){

            protected void customize(Request request) {
                super.customize(request);
                request.getHeaders().put(HttpHeader.AUTHORIZATION, "OAuth " + session.getAccessToken());
            }
        };
        BayeuxClient client = new BayeuxClient(SubscriptionHelper.getEndpointUrl(component), (ClientTransport)transport, new ClientTransport[0]);
        client.addExtension((ClientSession.Extension)REPLAY_EXTENSION);
        return client;
    }

    public void subscribe(final String topicName, final SalesforceConsumer consumer) {
        final String channelName = SubscriptionHelper.getChannelName(topicName);
        this.setupReplay((SalesforceEndpoint)consumer.getEndpoint());
        LOG.info("Subscribing to channel {}...", (Object)channelName);
        final ClientSessionChannel.MessageListener listener = new ClientSessionChannel.MessageListener(){

            public void onMessage(ClientSessionChannel channel, Message message) {
                LOG.debug("Received Message: {}", (Object)message);
                consumer.processMessage(channel, message);
            }
        };
        ClientSessionChannel clientChannel = this.client.getChannel(channelName);
        ClientSessionChannel.MessageListener subscriptionListener = new ClientSessionChannel.MessageListener(){

            public void onMessage(ClientSessionChannel channel, Message message) {
                LOG.debug("[CHANNEL:META_SUBSCRIBE]: {}", (Object)message);
                String subscribedChannelName = message.get((Object)"subscription").toString();
                if (channelName.equals(subscribedChannelName)) {
                    if (!message.isSuccessful()) {
                        String error = (String)message.get((Object)"error");
                        if (error == null) {
                            error = "Missing error message";
                        }
                        Exception failure = SubscriptionHelper.getFailure(message);
                        String msg = String.format("Error subscribing to %s: %s", topicName, failure != null ? failure.getMessage() : error);
                        boolean abort = true;
                        if (SubscriptionHelper.isTemporaryError(message)) {
                            LOG.warn(msg);
                            long backoff = SubscriptionHelper.this.restartBackoff.getAndAdd(SubscriptionHelper.this.backoffIncrement);
                            if (backoff > SubscriptionHelper.this.maxBackoff) {
                                LOG.error("Subscribe aborted after exceeding {} msecs backoff", (Object)SubscriptionHelper.this.maxBackoff);
                            } else {
                                abort = false;
                                try {
                                    LOG.debug("Pausing for {} msecs before subscribe attempt", (Object)backoff);
                                    Thread.sleep(backoff);
                                    SalesforceHttpClient httpClient = SubscriptionHelper.this.component.getConfig().getHttpClient();
                                    httpClient.getExecutor().execute(new Runnable(){

                                        @Override
                                        public void run() {
                                            SubscriptionHelper.this.subscribe(topicName, consumer);
                                        }
                                    });
                                }
                                catch (InterruptedException e) {
                                    LOG.warn("Aborting subscribe on interrupt!", (Throwable)e);
                                }
                            }
                        }
                        if (abort) {
                            consumer.handleException(msg, (Throwable)((Object)new SalesforceException(msg, failure)));
                        }
                    } else {
                        LOG.info("Subscribed to channel {}", (Object)subscribedChannelName);
                        SubscriptionHelper.this.listenerMap.put(consumer, listener);
                        SubscriptionHelper.this.restartBackoff.set(0L);
                    }
                    SubscriptionHelper.this.client.getChannel("/meta/subscribe").removeListener((ClientSessionChannel.ClientSessionChannelListener)this);
                }
            }
        };
        this.client.getChannel("/meta/subscribe").addListener((ClientSessionChannel.ClientSessionChannelListener)subscriptionListener);
        clientChannel.subscribe(listener);
    }

    private static boolean isTemporaryError(Message message) {
        String failureReason = SubscriptionHelper.getFailureReason(message);
        return failureReason != null && failureReason.startsWith(SERVER_TOO_BUSY_ERROR);
    }

    private static String getFailureReason(Message message) {
        Map sfdcFields;
        String failureReason = null;
        if (message.getExt() != null && (sfdcFields = (Map)message.getExt().get(SFDC_FIELD)) != null) {
            failureReason = (String)sfdcFields.get(FAILURE_REASON_FIELD);
        }
        return failureReason;
    }

    void setupReplay(SalesforceEndpoint endpoint) {
        String topicName = endpoint.getTopicName();
        Optional<Long> replayId = SubscriptionHelper.determineReplayIdFor(endpoint, topicName);
        if (replayId.isPresent()) {
            String channelName = SubscriptionHelper.getChannelName(topicName);
            Long replayIdValue = replayId.get();
            LOG.info("Set Replay extension to replay from `{}` for channel `{}`", (Object)replayIdValue, (Object)channelName);
            REPLAY_EXTENSION.addChannelReplayId(channelName, replayIdValue);
        }
    }

    static Optional<Long> determineReplayIdFor(SalesforceEndpoint endpoint, String topicName) {
        String channelName = SubscriptionHelper.getChannelName(topicName);
        Long replayId = endpoint.getReplayId();
        SalesforceComponent component = endpoint.getComponent();
        SalesforceEndpointConfig endpointConfiguration = endpoint.getConfiguration();
        Map<String, Long> endpointInitialReplayIdMap = endpointConfiguration.getInitialReplayIdMap();
        Long endpointReplayId = endpointInitialReplayIdMap.getOrDefault(topicName, endpointInitialReplayIdMap.get(channelName));
        Long endpointDefaultReplayId = endpointConfiguration.getDefaultReplayId();
        SalesforceEndpointConfig componentConfiguration = component.getConfig();
        Map<String, Long> componentInitialReplayIdMap = componentConfiguration.getInitialReplayIdMap();
        Long componentReplayId = componentInitialReplayIdMap.getOrDefault(topicName, componentInitialReplayIdMap.get(channelName));
        Long componentDefaultReplayId = componentConfiguration.getDefaultReplayId();
        return Stream.of(replayId, endpointReplayId, componentReplayId, endpointDefaultReplayId, componentDefaultReplayId).filter(Objects::nonNull).findFirst();
    }

    static String getChannelName(String topicName) {
        StringBuilder channelName = new StringBuilder();
        if (topicName.charAt(0) != '/') {
            channelName.append('/');
        }
        if (topicName.indexOf(47, 1) > 0) {
            channelName.append(topicName);
        } else {
            channelName.append("topic/");
            channelName.append(topicName);
        }
        int typeIdx = channelName.indexOf("/", 1);
        if ("event".equals(channelName.substring(1, typeIdx)) && !topicName.endsWith("__e")) {
            channelName.append("__e");
        }
        return channelName.toString();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unsubscribe(String topicName, SalesforceConsumer consumer) throws CamelException {
        block6: {
            final String channelName = SubscriptionHelper.getChannelName(topicName);
            final CountDownLatch latch = new CountDownLatch(1);
            final String[] unsubscribeError = new String[]{null};
            final Exception[] unsubscribeFailure = new Exception[]{null};
            ClientSessionChannel.MessageListener unsubscribeListener = new ClientSessionChannel.MessageListener(){

                public void onMessage(ClientSessionChannel channel, Message message) {
                    String unsubscribedChannelName;
                    LOG.debug("[CHANNEL:META_UNSUBSCRIBE]: {}", (Object)message);
                    Object subscription = message.get((Object)"subscription");
                    if (subscription != null && channelName.equals(unsubscribedChannelName = subscription.toString())) {
                        if (!message.isSuccessful()) {
                            unsubscribeError[0] = (String)message.get((Object)"error");
                            unsubscribeFailure[0] = SubscriptionHelper.getFailure(message);
                        } else {
                            LOG.info("Unsubscribed from channel {}", (Object)unsubscribedChannelName);
                        }
                        latch.countDown();
                    }
                }
            };
            this.client.getChannel("/meta/unsubscribe").addListener((ClientSessionChannel.ClientSessionChannelListener)unsubscribeListener);
            try {
                ClientSessionChannel.MessageListener listener = this.listenerMap.remove((Object)consumer);
                if (listener == null) break block6;
                LOG.info("Unsubscribing from channel {}...", (Object)channelName);
                ClientSessionChannel clientChannel = this.client.getChannel(channelName);
                clientChannel.unsubscribe(listener);
                try {
                    if (!latch.await(40L, TimeUnit.SECONDS)) {
                        String message = unsubscribeFailure[0] != null ? String.format("Error unsubscribing from topic %s: %s", topicName, unsubscribeFailure[0].getMessage()) : (unsubscribeError[0] != null ? String.format("Error unsubscribing from topic %s: %s", topicName, unsubscribeError[0]) : String.format("Timeout error unsubscribing from topic %s after %s seconds", topicName, 40));
                        throw new CamelException(message, (Throwable)unsubscribeFailure[0]);
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            finally {
                this.client.getChannel("/meta/unsubscribe").removeListener((ClientSessionChannel.ClientSessionChannelListener)unsubscribeListener);
            }
        }
    }

    static String getEndpointUrl(SalesforceComponent component) {
        if (Double.valueOf(component.getConfig().getApiVersion()) == 36.0) {
            boolean replayOptionsPresent;
            boolean bl = replayOptionsPresent = component.getConfig().getDefaultReplayId() != null || !component.getConfig().getInitialReplayIdMap().isEmpty();
            if (replayOptionsPresent) {
                return component.getSession().getInstanceUrl() + "/cometd/replay/" + component.getConfig().getApiVersion();
            }
        }
        return component.getSession().getInstanceUrl() + "/cometd/" + component.getConfig().getApiVersion();
    }
}

