/*
 * Decompiled with CFR 0.152.
 */
package io.druid.segment.realtime.firehose;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.ircclouds.irc.api.Callback;
import com.ircclouds.irc.api.IRCApi;
import com.ircclouds.irc.api.IRCApiImpl;
import com.ircclouds.irc.api.IServerParameters;
import com.ircclouds.irc.api.domain.IRCServer;
import com.ircclouds.irc.api.domain.messages.ChannelPrivMsg;
import com.ircclouds.irc.api.listeners.IMessageListener;
import com.ircclouds.irc.api.listeners.VariousMessageListenerAdapter;
import com.ircclouds.irc.api.state.IIRCState;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.logger.Logger;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.joda.time.DateTime;

public class IrcFirehoseFactory
implements FirehoseFactory<InputRowParser<Pair<DateTime, ChannelPrivMsg>>> {
    private static final Logger log = new Logger(IrcFirehoseFactory.class);
    private final String nick;
    private final String host;
    private final List<String> channels;
    private volatile boolean closed = false;

    @JsonCreator
    public IrcFirehoseFactory(@JsonProperty(value="nick") String nick, @JsonProperty(value="host") String host, @JsonProperty(value="channels") List<String> channels) {
        this.nick = nick;
        this.host = host;
        this.channels = channels;
    }

    @JsonProperty
    public String getNick() {
        return this.nick;
    }

    @JsonProperty
    public String getHost() {
        return this.host;
    }

    @JsonProperty
    public List<String> getChannels() {
        return this.channels;
    }

    public Firehose connect(final InputRowParser<Pair<DateTime, ChannelPrivMsg>> firehoseParser, File temporaryDirectory) throws IOException {
        IRCApiImpl irc = new IRCApiImpl(Boolean.valueOf(false));
        final LinkedBlockingQueue queue = new LinkedBlockingQueue();
        irc.addListener((IMessageListener)new VariousMessageListenerAdapter(){

            public void onChannelMessage(ChannelPrivMsg aMsg) {
                try {
                    queue.put(Pair.of((Object)DateTimes.nowUtc(), (Object)aMsg));
                }
                catch (InterruptedException e) {
                    throw new RuntimeException("interrupted adding message to queue", e);
                }
            }
        });
        log.info("connecting to irc server [%s]", new Object[]{this.host});
        irc.connect(new IServerParameters(){

            public String getNickname() {
                return IrcFirehoseFactory.this.nick;
            }

            public List<String> getAlternativeNicknames() {
                return Lists.newArrayList((Object[])new String[]{IrcFirehoseFactory.this.nick + UUID.randomUUID(), IrcFirehoseFactory.this.nick + UUID.randomUUID(), IrcFirehoseFactory.this.nick + UUID.randomUUID()});
            }

            public String getIdent() {
                return "druid";
            }

            public String getRealname() {
                return IrcFirehoseFactory.this.nick;
            }

            public IRCServer getServer() {
                return new IRCServer(IrcFirehoseFactory.this.host, Boolean.valueOf(false));
            }
        }, (Callback)new Callback<IIRCState>((IRCApi)irc){
            final /* synthetic */ IRCApi val$irc;
            {
                this.val$irc = iRCApi;
            }

            public void onSuccess(IIRCState aObject) {
                log.info("irc connection to server [%s] established", new Object[]{IrcFirehoseFactory.this.host});
                for (String chan : IrcFirehoseFactory.this.channels) {
                    log.info("Joining channel %s", new Object[]{chan});
                    this.val$irc.joinChannel(chan);
                }
            }

            public void onFailure(Exception e) {
                log.error((Throwable)e, "Unable to connect to irc server [%s]", new Object[]{IrcFirehoseFactory.this.host});
                throw new RuntimeException("Unable to connect to server", e);
            }
        });
        this.closed = false;
        return new Firehose((IRCApi)irc){
            InputRow nextRow = null;
            Iterator<InputRow> nextIterator = Iterators.emptyIterator();
            final /* synthetic */ IRCApi val$irc;
            {
                this.val$irc = iRCApi;
            }

            public boolean hasMore() {
                block4: while (true) {
                    try {
                        while (true) {
                            if (IrcFirehoseFactory.this.closed) {
                                return false;
                            }
                            if (this.nextIterator.hasNext()) {
                                this.nextRow = this.nextIterator.next();
                                if (this.nextRow == null) continue;
                                return true;
                            }
                            Pair nextMsg = (Pair)queue.poll(100L, TimeUnit.MILLISECONDS);
                            if (nextMsg == null) continue;
                            try {
                                this.nextIterator = firehoseParser.parseBatch((Object)nextMsg).iterator();
                                continue block4;
                            }
                            catch (IllegalArgumentException iae) {
                                log.debug("ignoring invalid message in channel [%s]", new Object[]{((ChannelPrivMsg)nextMsg.rhs).getChannelName()});
                                continue;
                            }
                            break;
                        }
                    }
                    catch (InterruptedException e) {
                        Thread.interrupted();
                        throw new RuntimeException("interrupted retrieving elements from queue", e);
                    }
                }
            }

            @Nullable
            public InputRow nextRow() {
                return this.nextRow;
            }

            public Runnable commit() {
                return new Runnable(){

                    @Override
                    public void run() {
                    }
                };
            }

            public void close() throws IOException {
                try {
                    log.info("disconnecting from irc server [%s]", new Object[]{IrcFirehoseFactory.this.host});
                    this.val$irc.disconnect("");
                }
                finally {
                    IrcFirehoseFactory.this.closed = true;
                }
            }
        };
    }
}

