/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.twitter;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.common.DelimitedStreamReader;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.endpoint.StreamingEndpoint;
import com.twitter.hbc.core.processor.HosebirdMessageProcessor;
import com.twitter.hbc.httpclient.BasicClient;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.apache.pulsar.io.twitter.TwitterFireHoseConfig;
import org.apache.pulsar.io.twitter.data.TweetData;
import org.apache.pulsar.io.twitter.data.TwitterRecord;
import org.apache.pulsar.io.twitter.endpoint.SampleStatusesEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Connector(name="twitter", type=IOType.SOURCE, help="A simple connector moving tweets from Twitter FireHose to Pulsar", configClass=TwitterFireHoseConfig.class)
public class TwitterFireHose
extends PushSource<TweetData> {
    private static final Logger log = LoggerFactory.getLogger(TwitterFireHose.class);
    private static final Logger LOG = LoggerFactory.getLogger(TwitterFireHose.class);
    private Object waitObject;
    private final ObjectMapper mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

    public void open(Map<String, Object> config, SourceContext sourceContext) throws IOException {
        TwitterFireHoseConfig hoseConfig = (TwitterFireHoseConfig)IOConfigUtils.loadWithSecrets(config, TwitterFireHoseConfig.class, (SourceContext)sourceContext);
        hoseConfig.validate();
        this.waitObject = new Object();
        this.startThread(hoseConfig);
    }

    public void close() throws Exception {
        this.stopThread();
    }

    private void startThread(final TwitterFireHoseConfig config) {
        BasicClient client = new ClientBuilder().name(config.getClientName()).hosts(config.getClientHosts()).endpoint(this.getEndpoint(config)).authentication(this.getAuthentication(config)).processor(new HosebirdMessageProcessor(){
            public DelimitedStreamReader reader;

            public void setup(InputStream input) {
                this.reader = new DelimitedStreamReader(input, Constants.DEFAULT_CHARSET, config.getClientBufferSize());
            }

            public boolean process() throws IOException, InterruptedException {
                String tweetStr = this.reader.readLine();
                try {
                    TweetData tweet = (TweetData)TwitterFireHose.this.mapper.readValue(tweetStr, TweetData.class);
                    TwitterFireHose.this.consume(new TwitterRecord(tweet, config.getGuestimateTweetTime()));
                }
                catch (Exception e) {
                    LOG.error("Exception thrown", (Throwable)e);
                }
                return true;
            }
        }).build();
        Thread runnerThread = new Thread(() -> {
            LOG.info("Started the Twitter FireHose Runner Thread");
            client.connect();
            LOG.info("Twitter Streaming API connection established successfully");
            try {
                Object object = this.waitObject;
                synchronized (object) {
                    this.waitObject.wait();
                }
            }
            catch (Exception e) {
                LOG.info("Got a exception in waitObject");
            }
            LOG.debug("Closing Twitter Streaming API connection");
            client.stop();
            LOG.info("Twitter Streaming API connection closed");
            LOG.info("Twitter FireHose Runner Thread ending");
        });
        runnerThread.setName("TwitterFireHoseRunner");
        runnerThread.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void stopThread() {
        LOG.info("Source closed");
        Object object = this.waitObject;
        synchronized (object) {
            this.waitObject.notify();
        }
    }

    private Authentication getAuthentication(TwitterFireHoseConfig config) {
        return new OAuth1(config.getConsumerKey(), config.getConsumerSecret(), config.getToken(), config.getTokenSecret());
    }

    private StreamingEndpoint getEndpoint(TwitterFireHoseConfig config) {
        List<Long> followings = config.getFollowings();
        List<String> terms = config.getTrackTerms();
        if (CollectionUtils.isEmpty(followings) && CollectionUtils.isEmpty(terms)) {
            return new SampleStatusesEndpoint().createEndpoint();
        }
        StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
        if (CollectionUtils.isNotEmpty(followings)) {
            hosebirdEndpoint.followings(followings);
        }
        if (CollectionUtils.isNotEmpty(terms)) {
            hosebirdEndpoint.trackTerms(terms);
        }
        return hosebirdEndpoint;
    }
}

