/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ambari.metrics.sink.relocated.zookeeper.server.quorum;

import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.UnresolvedAddressException;
import java.util.Enumeration;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ambari.metrics.sink.relocated.slf4j.Logger;
import org.apache.ambari.metrics.sink.relocated.slf4j.LoggerFactory;
import org.apache.ambari.metrics.sink.relocated.zookeeper.server.ZooKeeperThread;
import org.apache.ambari.metrics.sink.relocated.zookeeper.server.quorum.QuorumPeer;
import org.apache.ambari.metrics.sink.relocated.zookeeper.server.quorum.flexible.QuorumVerifier;

public class QuorumCnxManager {
    private static final Logger LOG = LoggerFactory.getLogger(QuorumCnxManager.class);
    static final int RECV_CAPACITY = 100;
    static final int SEND_CAPACITY = 1;
    static final int PACKETMAXSIZE = 524288;
    static final int MAX_CONNECTION_ATTEMPTS = 2;
    private long observerCounter = -1L;
    public static final long PROTOCOL_VERSION = -65536L;
    public static final int maxBuffer = 2048;
    private int cnxTO = 5000;
    final QuorumPeer self;
    final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
    final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
    final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
    public final ArrayBlockingQueue<Message> recvQueue;
    private final Object recvQLock = new Object();
    volatile boolean shutdown = false;
    public final Listener listener;
    private AtomicInteger threadCnt = new AtomicInteger(0);

    public QuorumCnxManager(QuorumPeer self) {
        this.recvQueue = new ArrayBlockingQueue(100);
        this.queueSendMap = new ConcurrentHashMap();
        this.senderWorkerMap = new ConcurrentHashMap();
        this.lastMessageSent = new ConcurrentHashMap();
        String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
        if (cnxToValue != null) {
            this.cnxTO = Integer.parseInt(cnxToValue);
        }
        this.self = self;
        this.listener = new Listener();
        this.listener.setName("QuorumPeerListener");
    }

    public void testInitiateConnection(long sid) throws Exception {
        LOG.debug("Opening channel to server " + sid);
        Socket sock = new Socket();
        this.setSockOpts(sock);
        sock.connect(this.self.getVotingView().get((Object)Long.valueOf((long)sid)).electionAddr, this.cnxTO);
        this.initiateConnection(sock, sid);
    }

    public boolean initiateConnection(Socket sock, Long sid) {
        try {
            BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
            DataOutputStream dout = new DataOutputStream(buf);
            dout.writeLong(-65536L);
            dout.writeLong(this.self.getId());
            String addr = this.self.getElectionAddress().getHostString() + ":" + this.self.getElectionAddress().getPort();
            byte[] addr_bytes = addr.getBytes();
            dout.writeInt(addr_bytes.length);
            dout.write(addr_bytes);
            dout.flush();
        }
        catch (IOException e) {
            LOG.warn("Ignoring exception reading or writing challenge: ", e);
            this.closeSocket(sock);
            return false;
        }
        if (sid <= this.self.getId()) {
            SendWorker sw = new SendWorker(sock, sid);
            RecvWorker rw = new RecvWorker(sock, sid, sw);
            sw.setRecv(rw);
            SendWorker vsw = this.senderWorkerMap.get(sid);
            if (vsw != null) {
                vsw.finish();
            }
            this.senderWorkerMap.put(sid, sw);
            this.queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue(1));
            sw.start();
            rw.start();
            return true;
        }
        LOG.info("Have smaller server identifier, so dropping the connection: (" + sid + ", " + this.self.getId() + ")");
        this.closeSocket(sock);
        return false;
    }

    public void receiveConnection(Socket sock) {
        SendWorker sw;
        Long sid = null;
        Long protocolVersion = null;
        InetSocketAddress electionAddr = null;
        try {
            DataInputStream din = new DataInputStream(sock.getInputStream());
            protocolVersion = din.readLong();
            if (protocolVersion >= 0L) {
                sid = protocolVersion;
            } else {
                try {
                    InitialMessage init = InitialMessage.parse(protocolVersion, din);
                    sid = init.sid;
                    electionAddr = init.electionAddr;
                }
                catch (InitialMessage.InitialMessageException ex) {
                    LOG.error(ex.toString());
                    this.closeSocket(sock);
                    return;
                }
            }
            if (sid == Long.MAX_VALUE) {
                sid = this.observerCounter--;
                LOG.info("Setting arbitrary identifier to observer: {}", (Object)sid);
            }
        }
        catch (IOException e) {
            this.closeSocket(sock);
            LOG.warn("Exception reading or writing challenge: {}", (Object)e.toString());
            return;
        }
        if (sid < this.self.getId()) {
            sw = this.senderWorkerMap.get(sid);
            if (sw != null) {
                sw.finish();
            }
            LOG.debug("Create new connection to server: {}", (Object)sid);
            this.closeSocket(sock);
            if (electionAddr != null) {
                this.connectOne(sid, electionAddr);
            } else {
                this.connectOne(sid);
            }
        } else {
            sw = new SendWorker(sock, sid);
            RecvWorker rw = new RecvWorker(sock, sid, sw);
            sw.setRecv(rw);
            SendWorker vsw = this.senderWorkerMap.get(sid);
            if (vsw != null) {
                vsw.finish();
            }
            this.senderWorkerMap.put(sid, sw);
            this.queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue(1));
            sw.start();
            rw.start();
        }
    }

    public void toSend(Long sid, ByteBuffer b) {
        if (this.self.getId() == sid.longValue()) {
            b.position(0);
            this.addToRecvQueue(new Message(b.duplicate(), sid));
        } else {
            ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(1);
            ArrayBlockingQueue<ByteBuffer> oldq = this.queueSendMap.putIfAbsent(sid, bq);
            if (oldq != null) {
                this.addToSendQueue(oldq, b);
            } else {
                this.addToSendQueue(bq, b);
            }
            this.connectOne(sid);
        }
    }

    private synchronized boolean connectOne(long sid, InetSocketAddress electionAddr) {
        if (this.senderWorkerMap.get(sid) != null) {
            LOG.debug("There is a connection already for server " + sid);
            return true;
        }
        Socket sock = null;
        try {
            LOG.debug("Opening channel to server " + sid);
            sock = new Socket();
            this.setSockOpts(sock);
            sock.connect(electionAddr, this.cnxTO);
            LOG.debug("Connected to server " + sid);
            this.initiateConnection(sock, sid);
            return true;
        }
        catch (UnresolvedAddressException e) {
            LOG.warn("Cannot open channel to " + sid + " at election address " + electionAddr, e);
            this.closeSocket(sock);
            throw e;
        }
        catch (IOException e) {
            LOG.warn("Cannot open channel to " + sid + " at election address " + electionAddr, e);
            this.closeSocket(sock);
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    synchronized void connectOne(long sid) {
        if (this.senderWorkerMap.get(sid) != null) {
            LOG.debug("There is a connection already for server " + sid);
            return;
        }
        Object object = this.self.QV_LOCK;
        synchronized (object) {
            boolean knownId = false;
            this.self.recreateSocketAddresses(sid);
            Map<Long, QuorumPeer.QuorumServer> lastCommittedView = this.self.getView();
            QuorumVerifier lastSeenQV = this.self.getLastSeenQuorumVerifier();
            Map<Long, QuorumPeer.QuorumServer> lastProposedView = lastSeenQV.getAllMembers();
            if (lastCommittedView.containsKey(sid)) {
                knownId = true;
                if (this.connectOne(sid, lastCommittedView.get((Object)Long.valueOf((long)sid)).electionAddr)) {
                    return;
                }
            }
            if (lastSeenQV != null && lastProposedView.containsKey(sid) && (!knownId || lastProposedView.get((Object)Long.valueOf((long)sid)).electionAddr != lastCommittedView.get((Object)Long.valueOf((long)sid)).electionAddr)) {
                knownId = true;
                if (this.connectOne(sid, lastProposedView.get((Object)Long.valueOf((long)sid)).electionAddr)) {
                    return;
                }
            }
            if (!knownId) {
                LOG.warn("Invalid server id: " + sid);
                return;
            }
        }
    }

    public void connectAll() {
        Enumeration<Long> en = this.queueSendMap.keys();
        while (en.hasMoreElements()) {
            long sid = en.nextElement();
            this.connectOne(sid);
        }
    }

    boolean haveDelivered() {
        for (ArrayBlockingQueue<ByteBuffer> queue : this.queueSendMap.values()) {
            LOG.debug("Queue size: " + queue.size());
            if (queue.size() != 0) continue;
            return true;
        }
        return false;
    }

    public void halt() {
        this.shutdown = true;
        LOG.debug("Halting listener");
        this.listener.halt();
        try {
            this.listener.join();
        }
        catch (InterruptedException ex) {
            LOG.warn("Got interrupted before joining the listener", ex);
        }
        this.softHalt();
    }

    public void softHalt() {
        for (SendWorker sw : this.senderWorkerMap.values()) {
            LOG.debug("Halting sender: " + sw);
            sw.finish();
        }
    }

    private void setSockOpts(Socket sock) throws SocketException {
        sock.setTcpNoDelay(true);
        sock.setSoTimeout(this.self.tickTime * this.self.syncLimit);
    }

    private void closeSocket(Socket sock) {
        if (sock == null) {
            return;
        }
        try {
            sock.close();
        }
        catch (IOException ie) {
            LOG.error("Exception while closing", ie);
        }
    }

    public long getThreadCount() {
        return this.threadCnt.get();
    }

    public QuorumPeer getQuorumPeer() {
        return this.self;
    }

    private void addToSendQueue(ArrayBlockingQueue<ByteBuffer> queue, ByteBuffer buffer) {
        if (queue.remainingCapacity() == 0) {
            try {
                queue.remove();
            }
            catch (NoSuchElementException ne) {
                LOG.debug("Trying to remove from an empty Queue. Ignoring exception " + ne);
            }
        }
        try {
            queue.add(buffer);
        }
        catch (IllegalStateException ie) {
            LOG.error("Unable to insert an element in the queue " + ie);
        }
    }

    private boolean isSendQueueEmpty(ArrayBlockingQueue<ByteBuffer> queue) {
        return queue.isEmpty();
    }

    private ByteBuffer pollSendQueue(ArrayBlockingQueue<ByteBuffer> queue, long timeout, TimeUnit unit) throws InterruptedException {
        return queue.poll(timeout, unit);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addToRecvQueue(Message msg) {
        Object object = this.recvQLock;
        synchronized (object) {
            if (this.recvQueue.remainingCapacity() == 0) {
                try {
                    this.recvQueue.remove();
                }
                catch (NoSuchElementException ne) {
                    LOG.debug("Trying to remove from an empty recvQueue. Ignoring exception " + ne);
                }
            }
            try {
                this.recvQueue.add(msg);
            }
            catch (IllegalStateException ie) {
                LOG.error("Unable to insert element in the recvQueue " + ie);
            }
        }
    }

    public Message pollRecvQueue(long timeout, TimeUnit unit) throws InterruptedException {
        return this.recvQueue.poll(timeout, unit);
    }

    class RecvWorker
    extends ZooKeeperThread {
        Long sid;
        Socket sock;
        volatile boolean running;
        DataInputStream din;
        final SendWorker sw;

        RecvWorker(Socket sock, Long sid, SendWorker sw) {
            super("RecvWorker:" + sid);
            this.running = true;
            this.sid = sid;
            this.sock = sock;
            this.sw = sw;
            try {
                this.din = new DataInputStream(sock.getInputStream());
                sock.setSoTimeout(0);
            }
            catch (IOException e) {
                LOG.error("Error while accessing socket for " + sid, e);
                QuorumCnxManager.this.closeSocket(sock);
                this.running = false;
            }
        }

        synchronized boolean finish() {
            if (!this.running) {
                return this.running;
            }
            this.running = false;
            this.interrupt();
            QuorumCnxManager.this.threadCnt.decrementAndGet();
            return this.running;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            QuorumCnxManager.this.threadCnt.incrementAndGet();
            try {
                while (this.running && !QuorumCnxManager.this.shutdown && this.sock != null) {
                    int length = this.din.readInt();
                    if (length <= 0 || length > 524288) {
                        throw new IOException("Received packet with invalid packet: " + length);
                    }
                    byte[] msgArray = new byte[length];
                    this.din.readFully(msgArray, 0, length);
                    ByteBuffer message = ByteBuffer.wrap(msgArray);
                    QuorumCnxManager.this.addToRecvQueue(new Message(message.duplicate(), this.sid));
                }
            }
            catch (Exception e) {
                LOG.warn("Connection broken for id " + this.sid + ", my id = " + QuorumCnxManager.this.self.getId() + ", error = ", e);
            }
            finally {
                LOG.warn("Interrupting SendWorker");
                this.sw.finish();
                QuorumCnxManager.this.closeSocket(this.sock);
            }
        }
    }

    class SendWorker
    extends ZooKeeperThread {
        Long sid;
        Socket sock;
        RecvWorker recvWorker;
        volatile boolean running;
        DataOutputStream dout;

        SendWorker(Socket sock, Long sid) {
            super("SendWorker:" + sid);
            this.running = true;
            this.sid = sid;
            this.sock = sock;
            this.recvWorker = null;
            try {
                this.dout = new DataOutputStream(sock.getOutputStream());
            }
            catch (IOException e) {
                LOG.error("Unable to access socket output stream", e);
                QuorumCnxManager.this.closeSocket(sock);
                this.running = false;
            }
            LOG.debug("Address of remote peer: " + this.sid);
        }

        synchronized void setRecv(RecvWorker recvWorker) {
            this.recvWorker = recvWorker;
        }

        synchronized RecvWorker getRecvWorker() {
            return this.recvWorker;
        }

        synchronized boolean finish() {
            LOG.debug("Calling finish for " + this.sid);
            if (!this.running) {
                return this.running;
            }
            this.running = false;
            QuorumCnxManager.this.closeSocket(this.sock);
            this.interrupt();
            if (this.recvWorker != null) {
                this.recvWorker.finish();
            }
            LOG.debug("Removing entry from senderWorkerMap sid=" + this.sid);
            QuorumCnxManager.this.senderWorkerMap.remove(this.sid, this);
            QuorumCnxManager.this.threadCnt.decrementAndGet();
            return this.running;
        }

        synchronized void send(ByteBuffer b) throws IOException {
            byte[] msgBytes = new byte[b.capacity()];
            try {
                b.position(0);
                b.get(msgBytes);
            }
            catch (BufferUnderflowException be) {
                LOG.error("BufferUnderflowException ", be);
                return;
            }
            this.dout.writeInt(b.capacity());
            this.dout.write(b.array());
            this.dout.flush();
        }

        @Override
        public void run() {
            QuorumCnxManager.this.threadCnt.incrementAndGet();
            try {
                ByteBuffer b;
                ArrayBlockingQueue<ByteBuffer> bq = QuorumCnxManager.this.queueSendMap.get(this.sid);
                if ((bq == null || QuorumCnxManager.this.isSendQueueEmpty(bq)) && (b = QuorumCnxManager.this.lastMessageSent.get(this.sid)) != null) {
                    LOG.debug("Attempting to send lastMessage to sid=" + this.sid);
                    this.send(b);
                }
            }
            catch (IOException e) {
                LOG.error("Failed to send last message. Shutting down thread.", e);
                this.finish();
            }
            block6: while (true) {
                try {
                    while (this.running && !QuorumCnxManager.this.shutdown && this.sock != null) {
                        ByteBuffer b = null;
                        try {
                            ArrayBlockingQueue<ByteBuffer> bq = QuorumCnxManager.this.queueSendMap.get(this.sid);
                            if (bq == null) {
                                LOG.error("No queue of incoming messages for server " + this.sid);
                                break block6;
                            }
                            b = QuorumCnxManager.this.pollSendQueue(bq, 1000L, TimeUnit.MILLISECONDS);
                            if (b == null) continue block6;
                            QuorumCnxManager.this.lastMessageSent.put(this.sid, b);
                            this.send(b);
                            continue block6;
                        }
                        catch (InterruptedException e) {
                            LOG.warn("Interrupted while waiting for message on queue", e);
                        }
                    }
                    break;
                }
                catch (Exception e) {
                    LOG.warn("Exception when using channel: for id " + this.sid + " my id = " + QuorumCnxManager.this.self.getId() + " error = " + e);
                    break;
                }
            }
            this.finish();
            LOG.warn("Send worker leaving thread  id " + this.sid + " my id = " + QuorumCnxManager.this.self.getId());
        }
    }

    public class Listener
    extends ZooKeeperThread {
        volatile ServerSocket ss;

        public Listener() {
            super("ListenerThread");
            this.ss = null;
        }

        @Override
        public void run() {
            int numRetries = 0;
            Socket client = null;
            while (!QuorumCnxManager.this.shutdown && numRetries < 3) {
                try {
                    InetSocketAddress addr;
                    this.ss = new ServerSocket();
                    this.ss.setReuseAddress(true);
                    if (QuorumCnxManager.this.self.getQuorumListenOnAllIPs()) {
                        int port = QuorumCnxManager.this.self.getElectionAddress().getPort();
                        addr = new InetSocketAddress(port);
                    } else {
                        QuorumCnxManager.this.self.recreateSocketAddresses(QuorumCnxManager.this.self.getId());
                        addr = QuorumCnxManager.this.self.getElectionAddress();
                    }
                    LOG.info("My election bind port: " + addr.toString());
                    this.setName(addr.toString());
                    this.ss.bind(addr);
                    while (!QuorumCnxManager.this.shutdown) {
                        client = this.ss.accept();
                        QuorumCnxManager.this.setSockOpts(client);
                        LOG.info("Received connection request " + client.getRemoteSocketAddress());
                        QuorumCnxManager.this.receiveConnection(client);
                        numRetries = 0;
                    }
                }
                catch (IOException e) {
                    if (QuorumCnxManager.this.shutdown) break;
                    LOG.error("Exception while listening", e);
                    ++numRetries;
                    try {
                        this.ss.close();
                        Thread.sleep(1000L);
                    }
                    catch (IOException ie) {
                        LOG.error("Error closing server socket", ie);
                    }
                    catch (InterruptedException ie) {
                        LOG.error("Interrupted while sleeping. Ignoring exception", ie);
                    }
                    QuorumCnxManager.this.closeSocket(client);
                }
            }
            LOG.info("Leaving listener");
            if (!QuorumCnxManager.this.shutdown) {
                LOG.error("As I'm leaving the listener thread, I won't be able to participate in leader election any longer: " + QuorumCnxManager.this.self.getElectionAddress());
            } else if (this.ss != null) {
                try {
                    this.ss.close();
                }
                catch (IOException ie) {
                    LOG.debug("Error closing server socket", ie);
                }
            }
        }

        void halt() {
            try {
                LOG.debug("Trying to close listener: " + this.ss);
                if (this.ss != null) {
                    LOG.debug("Closing listener: " + QuorumCnxManager.this.self.getId());
                    this.ss.close();
                }
            }
            catch (IOException e) {
                LOG.warn("Exception when shutting down listener: " + e);
            }
        }
    }

    public static class InitialMessage {
        public Long sid;
        public InetSocketAddress electionAddr;

        InitialMessage(Long sid, InetSocketAddress address) {
            this.sid = sid;
            this.electionAddr = address;
        }

        public static InitialMessage parse(Long protocolVersion, DataInputStream din) throws InitialMessageException, IOException {
            int port;
            if (protocolVersion != -65536L) {
                throw new InitialMessageException("Got unrecognized protocol version %s", protocolVersion);
            }
            Long sid = din.readLong();
            int remaining = din.readInt();
            if (remaining <= 0 || remaining > 2048) {
                throw new InitialMessageException("Unreasonable buffer length: %s", remaining);
            }
            byte[] b = new byte[remaining];
            int num_read = din.read(b);
            if (num_read != remaining) {
                throw new InitialMessageException("Read only %s bytes out of %s sent by server %s", num_read, remaining, sid);
            }
            String addr = new String(b);
            String[] host_port = addr.split(":");
            if (host_port.length != 2) {
                throw new InitialMessageException("Badly formed address: %s", addr);
            }
            try {
                port = Integer.parseInt(host_port[1]);
            }
            catch (NumberFormatException e) {
                throw new InitialMessageException("Bad port number: %s", host_port[1]);
            }
            return new InitialMessage(sid, new InetSocketAddress(host_port[0], port));
        }

        public static class InitialMessageException
        extends Exception {
            InitialMessageException(String message, Object ... args) {
                super(String.format(message, args));
            }
        }
    }

    public static class Message {
        ByteBuffer buffer;
        long sid;

        Message(ByteBuffer buffer, long sid) {
            this.buffer = buffer;
            this.sid = sid;
        }
    }
}

