/*
 * Decompiled with CFR 0.152.
 */
package com.github.brainlag.nsq;

import com.github.brainlag.nsq.NSQCommand;
import com.github.brainlag.nsq.NSQConfig;
import com.github.brainlag.nsq.NSQConsumer;
import com.github.brainlag.nsq.NSQMessage;
import com.github.brainlag.nsq.ServerAddress;
import com.github.brainlag.nsq.callbacks.NSQErrorCallback;
import com.github.brainlag.nsq.exceptions.NSQException;
import com.github.brainlag.nsq.exceptions.NoConnectionsException;
import com.github.brainlag.nsq.frames.ErrorFrame;
import com.github.brainlag.nsq.frames.MessageFrame;
import com.github.brainlag.nsq.frames.NSQFrame;
import com.github.brainlag.nsq.frames.ResponseFrame;
import com.github.brainlag.nsq.netty.NSQClientInitializer;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Date;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;

public class Connection {
    public static final byte[] MAGIC_PROTOCOL_VERSION = "  V2".getBytes();
    public static final AttributeKey<Connection> STATE = AttributeKey.valueOf((String)"Connection.state");
    private final ServerAddress address;
    private final Channel channel;
    private NSQConsumer consumer = null;
    private NSQErrorCallback errorCallback = null;
    private final LinkedBlockingQueue<NSQCommand> requests = new LinkedBlockingQueue(1);
    private final LinkedBlockingQueue<NSQFrame> responses = new LinkedBlockingQueue(1);
    private static EventLoopGroup defaultGroup = null;
    private final EventLoopGroup eventLoopGroup;
    private final NSQConfig config;
    public static final long HEARTBEAT_MAX_INTERVAL = 60000L;
    private volatile AtomicReference<Long> lastHeartbeatSuccess = new AtomicReference<Long>(System.currentTimeMillis());

    public Connection(ServerAddress serverAddress, NSQConfig config) throws NoConnectionsException {
        this.address = serverAddress;
        this.config = config;
        Bootstrap bootstrap = new Bootstrap();
        this.eventLoopGroup = config.getEventLoopGroup() != null ? config.getEventLoopGroup() : this.getDefaultGroup();
        bootstrap.group(this.eventLoopGroup);
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.handler((ChannelHandler)new NSQClientInitializer());
        ChannelFuture future = bootstrap.connect((SocketAddress)new InetSocketAddress(serverAddress.getHost(), serverAddress.getPort()));
        this.channel = future.awaitUninterruptibly().channel();
        if (!future.isSuccess()) {
            throw new NoConnectionsException("Could not connect to server", future.cause());
        }
        LogManager.getLogger((Object)this).info("Created connection: " + serverAddress.toString());
        this.channel.attr(STATE).set((Object)this);
        ByteBuf buf = Unpooled.buffer();
        buf.writeBytes(MAGIC_PROTOCOL_VERSION);
        this.channel.write((Object)buf);
        this.channel.flush();
        NSQCommand ident = NSQCommand.identify(config.toString().getBytes());
        try {
            NSQFrame response = this.commandAndWait(ident);
            if (response != null) {
                LogManager.getLogger((Object)this).info("Server identification: " + ((ResponseFrame)response).getMessage());
            }
        }
        catch (TimeoutException e) {
            LogManager.getLogger((Object)this).error("Creating connection timed out", (Throwable)e);
            this.close();
        }
    }

    private EventLoopGroup getDefaultGroup() {
        if (defaultGroup == null) {
            defaultGroup = new NioEventLoopGroup();
        }
        return defaultGroup;
    }

    public boolean isConnected() {
        return this.channel.isActive();
    }

    public boolean isRequestInProgress() {
        return this.requests.size() > 0;
    }

    public boolean isHeartbeatStatusOK() {
        return System.currentTimeMillis() - this.lastHeartbeatSuccess.get() <= 60000L;
    }

    public void incoming(NSQFrame frame) {
        if (frame instanceof ResponseFrame) {
            if ("_heartbeat_".equals(((ResponseFrame)frame).getMessage())) {
                this.heartbeat();
                return;
            }
            if (!this.requests.isEmpty()) {
                try {
                    this.responses.offer(frame, 20L, TimeUnit.SECONDS);
                }
                catch (InterruptedException e) {
                    LogManager.getLogger((Object)this).error("Thread was interruped, probably shuthing down", (Throwable)e);
                    this.close();
                }
            }
            return;
        }
        if (frame instanceof ErrorFrame) {
            if (this.errorCallback != null) {
                this.errorCallback.error(NSQException.of((ErrorFrame)frame));
            }
            this.responses.add(frame);
            return;
        }
        if (frame instanceof MessageFrame) {
            MessageFrame msg = (MessageFrame)frame;
            NSQMessage message = new NSQMessage();
            message.setAttempts(msg.getAttempts());
            message.setConnection(this);
            message.setId(msg.getMessageId());
            message.setMessage(msg.getMessageBody());
            message.setTimestamp(new Date(TimeUnit.NANOSECONDS.toMillis(msg.getTimestamp())));
            this.consumer.processMessage(message);
            return;
        }
        LogManager.getLogger((Object)this).warn("Unknown frame type: " + frame);
    }

    private void heartbeat() {
        LogManager.getLogger((Object)this).trace("HEARTBEAT!");
        this.command(NSQCommand.nop());
        this.lastHeartbeatSuccess.getAndSet(System.currentTimeMillis());
    }

    public void setErrorCallback(NSQErrorCallback callback) {
        this.errorCallback = callback;
    }

    public void close() {
        LogManager.getLogger((Object)this).info("Closing  connection: " + this);
        this.channel.disconnect();
    }

    public NSQFrame commandAndWait(NSQCommand command) throws TimeoutException {
        try {
            if (!this.requests.offer(command, 15L, TimeUnit.SECONDS)) {
                throw new TimeoutException("command: " + command + " timedout");
            }
            this.responses.clear();
            ChannelFuture fut = this.command(command);
            if (!fut.await(15L, TimeUnit.SECONDS)) {
                throw new TimeoutException("command: " + command + " timedout");
            }
            NSQFrame frame = this.responses.poll(15L, TimeUnit.SECONDS);
            if (frame == null) {
                throw new TimeoutException("command: " + command + " timedout");
            }
            this.requests.poll();
            return frame;
        }
        catch (InterruptedException e) {
            this.close();
            LogManager.getLogger((Object)this).warn("Thread was interruped!", (Throwable)e);
            return null;
        }
    }

    public ChannelFuture command(NSQCommand command) {
        return this.channel.writeAndFlush((Object)command);
    }

    public ServerAddress getServerAddress() {
        return this.address;
    }

    public NSQConfig getConfig() {
        return this.config;
    }

    public void setConsumer(NSQConsumer consumer) {
        this.consumer = consumer;
    }
}

