/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.nsq;

import com.sproutsocial.nsq.Client;
import com.sproutsocial.nsq.Subscriber;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.apache.pulsar.io.nsq.NSQSourceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Connector(name="nsq", type=IOType.SOURCE, help="A Simple connector moving messages from an NSQ topic to a Pulsar Topic", configClass=NSQSourceConfig.class)
public class NSQSource
extends PushSource<byte[]> {
    private static final Logger log = LoggerFactory.getLogger(NSQSource.class);
    private static final Logger LOG = LoggerFactory.getLogger(NSQSource.class);
    private Subscriber subscriber;
    private Object waitObject;

    public void open(Map<String, Object> config, SourceContext sourceContext) throws IOException {
        NSQSourceConfig nsqSourceConfig = (NSQSourceConfig)IOConfigUtils.loadWithSecrets(config, NSQSourceConfig.class, (SourceContext)sourceContext);
        nsqSourceConfig.validate();
        this.waitObject = new Object();
        this.startThread(nsqSourceConfig);
    }

    public void close() throws Exception {
        this.stopThread();
    }

    private void startThread(NSQSourceConfig config) {
        String[] lookupds = new String[config.getLookupds().size()];
        config.getLookupds().toArray(lookupds);
        this.subscriber = new Subscriber(lookupds);
        Thread runnerThread = new Thread(() -> {
            this.subscriber.subscribe(config.getTopic(), config.getChannel(), data -> this.consume(new NSQRecord(data)));
            LOG.info("NSQ Consumer started for topic {} with channel {}", (Object)config.getTopic(), (Object)config.getChannel());
            try {
                Object object = this.waitObject;
                synchronized (object) {
                    this.waitObject.wait();
                }
            }
            catch (Exception e) {
                LOG.info("Got an exception in waitObject");
            }
            LOG.debug("Closing the NSQ connection");
            this.subscriber.stop();
            Client.getDefaultClient().stop();
            LOG.info("NSQ subscriber stopped");
            LOG.info("NSQ Runner Thread ending");
        });
        runnerThread.setName("NSQSubscriberRunner");
        runnerThread.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void stopThread() {
        LOG.info("Source closed");
        Object object = this.waitObject;
        synchronized (object) {
            this.waitObject.notify();
        }
    }

    private static class NSQRecord
    implements Record<byte[]> {
        private final byte[] value;

        public NSQRecord(byte[] value) {
            this.value = value;
        }

        public byte[] getValue() {
            return this.value;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof NSQRecord)) {
                return false;
            }
            NSQRecord other = (NSQRecord)o;
            if (!other.canEqual(this)) {
                return false;
            }
            return Arrays.equals(this.getValue(), other.getValue());
        }

        protected boolean canEqual(Object other) {
            return other instanceof NSQRecord;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            result = result * 59 + Arrays.hashCode(this.getValue());
            return result;
        }

        public String toString() {
            return "NSQSource.NSQRecord(value=" + Arrays.toString(this.getValue()) + ")";
        }
    }
}

