/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.plugin.salesforce.plugin.source.streaming;

import io.cdap.plugin.salesforce.authenticator.Authenticator;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
import io.cdap.plugin.salesforce.plugin.OAuthInfo;
import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.cometd.client.BayeuxClient;
import org.cometd.client.transport.ClientTransport;
import org.cometd.client.transport.LongPollingTransport;
import org.cometd.common.JSONContext;
import org.cometd.common.JacksonJSONContextClient;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SalesforcePushTopicListener {
    private static final Logger LOG = LoggerFactory.getLogger(SalesforcePushTopicListener.class);
    private static final String DEFAULT_PUSH_ENDPOINT = "/cometd/53.0";
    private static final long CONNECTION_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(110L);
    private static final long HANDSHAKE_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(110L);
    private static final int HANDSHAKE_CHECK_INTERVAL_MS = 1000;
    private final BlockingQueue<String> messagesQueue = new LinkedBlockingQueue<String>();
    private final AuthenticatorCredentials credentials;
    private final String topic;
    private JSONContext.Client jsonContext;

    public SalesforcePushTopicListener(AuthenticatorCredentials credentials, String topic) {
        this.credentials = credentials;
        this.topic = topic;
    }

    public void start() {
        try {
            BayeuxClient bayeuxClient = this.getClient(this.credentials);
            this.waitForHandshake(bayeuxClient, HANDSHAKE_TIMEOUT_MS, 1000L);
            LOG.debug("Client handshake done");
            bayeuxClient.getChannel("/topic/" + this.topic).subscribe((channel, message) -> this.messagesQueue.add(this.jsonContext.getGenerator().generate((Object)message.getDataAsMap())));
        }
        catch (Exception e) {
            throw new RuntimeException("Could not start client", e);
        }
    }

    public String getMessage(long timeout, TimeUnit unit) throws InterruptedException {
        return this.messagesQueue.poll(timeout, unit);
    }

    private BayeuxClient getClient(AuthenticatorCredentials credentials) throws Exception {
        final OAuthInfo oAuthInfo = Authenticator.getOAuthInfo(credentials);
        SslContextFactory sslContextFactory = new SslContextFactory();
        HttpClient httpClient = new HttpClient(sslContextFactory);
        httpClient.setConnectTimeout(CONNECTION_TIMEOUT_MS);
        httpClient.start();
        this.jsonContext = new JacksonJSONContextClient();
        HashMap<String, JSONContext.Client> transportOptions = new HashMap<String, JSONContext.Client>();
        transportOptions.put("jsonContext", this.jsonContext);
        LongPollingTransport transport = new LongPollingTransport(transportOptions, httpClient){

            protected void customize(Request exchange) {
                super.customize(exchange);
                exchange.header("Authorization", "OAuth " + oAuthInfo.getAccessToken());
            }
        };
        BayeuxClient client = new BayeuxClient(oAuthInfo.getInstanceURL() + DEFAULT_PUSH_ENDPOINT, (ClientTransport)transport, new ClientTransport[0]);
        client.handshake();
        return client;
    }

    private void waitForHandshake(BayeuxClient client, long timeoutInMilliseconds, long intervalInMilliseconds) {
        try {
            Awaitility.await().atMost(timeoutInMilliseconds, TimeUnit.MILLISECONDS).pollInterval(intervalInMilliseconds, TimeUnit.MILLISECONDS).until(() -> client.isHandshook());
        }
        catch (ConditionTimeoutException e) {
            throw new IllegalStateException("Client could not handshake with Salesforce server", e);
        }
    }
}

