/*
 * Decompiled with CFR 0.152.
 */
package org.cojen.tupl.repl;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.net.ConnectException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.LongPredicate;
import java.util.zip.CRC32C;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import org.cojen.tupl.core.Scheduler;
import org.cojen.tupl.io.Utils;
import org.cojen.tupl.repl.Channel;
import org.cojen.tupl.repl.ChannelInputStream;
import org.cojen.tupl.repl.JoinException;
import org.cojen.tupl.repl.Peer;
import org.cojen.tupl.repl.Role;
import org.cojen.tupl.util.Latch;

final class ChannelManager {
    static final long MAGIC_NUMBER = 2825672906279293275L;
    private static final int GROUP_ID = 8;
    private static final int MEMBER_ID = 16;
    private static final int CONNECTION_TYPE = 24;
    private static final int GROUP_TOKEN_1 = 28;
    private static final int GROUP_TOKEN_2 = 36;
    private static final int INIT_HEADER_SIZE = 44;
    static final int TYPE_PLAIN = 0;
    static final int TYPE_JOIN = 2;
    static final int TYPE_CONTROL = 4;
    static final int TYPE_SNAPSHOT = 6;
    private static final int CONNECT_TIMEOUT_MILLIS = 5000;
    private static final int MIN_RECONNECT_DELAY_MILLIS = 10;
    private static final int MAX_RECONNECT_DELAY_MILLIS = 1000;
    private static final int INITIAL_READ_TIMEOUT_MILLIS = 5000;
    private static final int WRITE_CHECK_DELAY_MILLIS = 125;
    private static final int OP_NOP = 0;
    private static final int OP_REQUEST_VOTE = 2;
    private static final int OP_REQUEST_VOTE_REPLY = 3;
    private static final int OP_QUERY_TERMS = 4;
    private static final int OP_QUERY_TERMS_REPLY = 5;
    private static final int OP_QUERY_DATA = 6;
    private static final int OP_QUERY_DATA_REPLY = 7;
    private static final int OP_WRITE_DATA = 8;
    private static final int OP_WRITE_DATA_REPLY = 9;
    private static final int OP_SYNC_COMMIT = 10;
    private static final int OP_SYNC_COMMIT_REPLY = 11;
    private static final int OP_COMPACT = 12;
    private static final int OP_SNAPSHOT_SCORE = 14;
    private static final int OP_SNAPSHOT_SCORE_REPLY = 15;
    private static final int OP_UPDATE_ROLE = 16;
    private static final int OP_UPDATE_ROLE_REPLY = 17;
    private static final int OP_GROUP_VERSION = 18;
    private static final int OP_GROUP_VERSION_REPLY = 19;
    private static final int OP_GROUP_FILE = 20;
    private static final int OP_GROUP_FILE_REPLY = 21;
    private static final int OP_LEADER_CHECK = 22;
    private static final int OP_LEADER_CHECK_REPLY = 23;
    private static final int OP_WRITE_AND_PROXY = 24;
    private static final int OP_WRITE_VIA_PROXY = 26;
    private static final int OP_QUERY_DATA_REPLY_MISSING = 29;
    private static final int OP_FORCE_ELECTION = 34;
    private static final VarHandle cControlVersionHandle;
    private final SocketFactory mSocketFactory;
    private final Scheduler mScheduler;
    private final long mGroupToken1;
    private final long mGroupToken2;
    private final boolean mWriteCRCs;
    private final Consumer<Throwable> mUncaughtHandler;
    private final Map<SocketAddress, Peer> mPeerMap;
    private final TreeSet<Peer> mPeerSet;
    private final Set<SocketChannel> mChannels;
    private final TreeMap<Peer, ServerChannel> mPeerServerChannels;
    private final Set<Socket> mFreshSockets;
    private long mGroupId;
    private long mLocalMemberId;
    private ServerSocket mServerSocket;
    private boolean mKeepServerSocket;
    private Channel mLocalServer;
    private volatile Consumer<Socket> mSocketAcceptor;
    private volatile Consumer<Socket> mJoinAcceptor;
    private volatile Consumer<Socket> mSnapshotRequestAcceptor;
    volatile boolean mPartitioned;
    private volatile int mControlVersion;
    static final VarHandle cWriteStateHandle;

    ChannelManager(SocketFactory factory, Scheduler scheduler, long groupToken1, long groupToken2, long groupId, boolean writeCRCs, Consumer<Throwable> uncaughtHandler) {
        if (scheduler == null || uncaughtHandler == null) {
            throw new IllegalArgumentException();
        }
        this.mSocketFactory = factory;
        this.mScheduler = scheduler;
        this.mGroupToken1 = groupToken1;
        this.mGroupToken2 = groupToken2;
        this.mWriteCRCs = writeCRCs;
        this.mUncaughtHandler = uncaughtHandler;
        this.mPeerMap = new HashMap<SocketAddress, Peer>();
        Comparator cmp = (a, b) -> Long.compare(a.mMemberId, b.mMemberId);
        this.mPeerSet = new TreeSet(cmp);
        this.mChannels = ConcurrentHashMap.newKeySet();
        this.mPeerServerChannels = new TreeMap(cmp);
        this.mFreshSockets = new HashSet<Socket>();
        this.setGroupId(groupId);
    }

    static ServerSocket newServerSocket(ServerSocketFactory factory, SocketAddress listenAddress) throws IOException {
        ServerSocket ss = factory == null ? new ServerSocket() : factory.createServerSocket();
        try {
            ss.setReuseAddress(true);
            ss.bind(listenAddress);
            return ss;
        }
        catch (Throwable e) {
            Utils.closeQuietly(ss);
            throw e;
        }
    }

    synchronized void setLocalMemberId(long localMemberId, ServerSocket ss) {
        if (localMemberId == 0L || ss == null) {
            throw new IllegalArgumentException();
        }
        if (this.mLocalMemberId != 0L) {
            throw new IllegalStateException();
        }
        this.mLocalMemberId = localMemberId;
        this.mServerSocket = ss;
    }

    synchronized void keepServerSocket() {
        this.mKeepServerSocket = true;
    }

    long groupToken1() {
        return this.mGroupToken1;
    }

    long groupToken2() {
        return this.mGroupToken2;
    }

    synchronized long groupId() {
        return this.mGroupId;
    }

    synchronized void setGroupId(long groupId) {
        this.mGroupId = groupId;
    }

    synchronized long localMemberId() {
        return this.mLocalMemberId;
    }

    synchronized boolean isStarted() {
        return this.mLocalServer != null;
    }

    synchronized boolean start(Channel localServer) throws IOException {
        if (localServer == null) {
            throw new IllegalArgumentException();
        }
        if (this.mLocalMemberId == 0L) {
            throw new IllegalStateException();
        }
        if (this.mLocalServer != null) {
            return false;
        }
        this.execute(this::acceptLoop);
        this.mLocalServer = localServer;
        this.checkWrites();
        return true;
    }

    Set<? extends Channel> allChannels() {
        return this.mChannels;
    }

    void socketAcceptor(Consumer<Socket> acceptor) {
        this.mSocketAcceptor = acceptor;
    }

    void joinAcceptor(Consumer<Socket> acceptor) {
        this.mJoinAcceptor = acceptor;
    }

    void snapshotRequestAcceptor(Consumer<Socket> acceptor) {
        this.mSnapshotRequestAcceptor = acceptor;
    }

    boolean hasSnapshotRequestAcceptor() {
        return this.mSnapshotRequestAcceptor != null;
    }

    boolean checkControlVersion(int limit) {
        while (true) {
            int version;
            if ((version = this.mControlVersion) > 0 || -version >= limit) {
                if (!cControlVersionHandle.compareAndSet(this, version, 0)) continue;
                return true;
            }
            if (cControlVersionHandle.compareAndSet(this, version, version - 1)) break;
        }
        return false;
    }

    private void incrementControlVersion() {
        int newVersion;
        int version;
        while (!cControlVersionHandle.compareAndSet(this, version = this.mControlVersion, newVersion = Math.max(0, version) + 1)) {
        }
    }

    synchronized void stop() {
        if (!this.mKeepServerSocket) {
            Utils.closeQuietly(this.mServerSocket);
        }
        this.mServerSocket = null;
        this.mLocalServer = null;
        for (SocketChannel channel : this.mChannels) {
            channel.disconnect();
        }
        for (Socket s : this.mFreshSockets) {
            Utils.closeQuietly(s);
        }
        this.mChannels.clear();
        this.mPeerServerChannels.clear();
        this.mFreshSockets.clear();
    }

    synchronized boolean isStopped() {
        return this.mServerSocket == null;
    }

    synchronized void partitioned(boolean enable) {
        this.mPartitioned = enable;
        if (enable) {
            for (SocketChannel channel : this.mChannels) {
                channel.closeSocket();
            }
        }
    }

    private void execute(Runnable task) {
        if (!this.mScheduler.execute(task)) {
            this.stop();
        }
    }

    private void scheduleMillis(Runnable task, long delayMillis) {
        if (!this.mScheduler.scheduleMillis(task, delayMillis)) {
            this.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    Channel connect(Peer peer, Channel localServer) {
        ClientChannel client;
        if (peer.mMemberId == 0L || peer.mAddress == null || localServer == null) {
            throw new IllegalArgumentException();
        }
        ChannelManager channelManager = this;
        synchronized (channelManager) {
            if (this.mLocalMemberId == peer.mMemberId) {
                throw new IllegalArgumentException("Cannot connect to self");
            }
            Peer existing = this.mPeerSet.ceiling(peer);
            if (existing != null && existing.mMemberId == peer.mMemberId && !existing.mAddress.equals(peer.mAddress)) {
                throw new IllegalStateException("Already connected with a different address");
            }
            client = new ClientChannel(peer, localServer);
            if (this.mPeerMap.putIfAbsent(peer.mAddress, peer) != null) {
                throw new IllegalStateException("Duplicate address: " + peer);
            }
            this.mPeerSet.add(peer);
            this.mChannels.add(client);
        }
        this.execute(client::connect);
        return client;
    }

    Socket connectPlain(SocketAddress addr) throws IOException {
        return this.connectSocket(addr, new int[]{0});
    }

    Socket connectSnapshot(SocketAddress addr) throws IOException {
        return this.connectSocket(addr, new int[]{6});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Socket connectSocket(SocketAddress addr, int[] connectionType) throws IOException {
        Peer peer;
        if (addr == null) {
            throw new IllegalArgumentException();
        }
        ChannelManager channelManager = this;
        synchronized (channelManager) {
            peer = this.mPeerMap.get(addr);
            if (peer == null) {
                throw new ConnectException("Not a group member: " + addr);
            }
        }
        Socket s = this.doConnect(peer, connectionType);
        if (s == null) {
            throw new ConnectException("Rejected");
        }
        return s;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    Socket doConnect(Peer peer, int[] conType) throws IOException {
        if (this.mPartitioned) {
            return null;
        }
        Socket s = this.mSocketFactory == null ? new Socket() : this.mSocketFactory.createSocket();
        try {
            int actualType;
            long localMemberId;
            long groupId;
            s.connect(peer.mAddress, 5000);
            ChannelManager channelManager = this;
            synchronized (channelManager) {
                groupId = this.mGroupId;
                localMemberId = this.mLocalMemberId;
            }
            byte[] header = ChannelManager.newConnectHeader(groupId, localMemberId, conType[0], this.mGroupToken1, this.mGroupToken2);
            s.getOutputStream().write(header);
            header = this.readHeader(s, false);
            if (header != null && Utils.decodeLongLE(header, 16) == peer.mMemberId && ((actualType = Utils.decodeIntLE(header, 24)) | 1) == (conType[0] | 1)) {
                conType[0] = actualType;
                return s;
            }
        }
        catch (IOException e) {
            Utils.closeQuietly(s);
            throw e;
        }
        Utils.closeQuietly(s);
        return null;
    }

    static byte[] newConnectHeader(long groupId, long memberId, int conType, long groupToken1, long groupToken2) {
        byte[] header = new byte[44];
        Utils.encodeLongLE(header, 0, 2825672906279293275L);
        Utils.encodeLongLE(header, 8, groupId);
        Utils.encodeLongLE(header, 16, memberId);
        Utils.encodeIntLE(header, 24, conType);
        Utils.encodeLongLE(header, 28, groupToken1);
        Utils.encodeLongLE(header, 36, groupToken2);
        return header;
    }

    void disconnect(long remoteMemberId) {
        if (remoteMemberId == 0L) {
            throw new IllegalArgumentException();
        }
        this.disconnect(id -> id == remoteMemberId);
    }

    synchronized void disconnect(LongPredicate tester) {
        Iterator<SocketChannel> it = this.mChannels.iterator();
        while (it.hasNext()) {
            SocketChannel channel = it.next();
            long memberId = channel.mPeer.mMemberId;
            if (!tester.test(memberId)) continue;
            it.remove();
            this.removeIfServerChannel(channel);
            channel.disconnect();
            Peer peer = this.mPeerSet.ceiling(new Peer(memberId));
            if (peer == null || peer.mMemberId != memberId) continue;
            this.mPeerSet.remove(peer);
            this.mPeerMap.remove(peer.mAddress);
        }
    }

    synchronized void unregister(SocketChannel channel) {
        this.mChannels.remove(channel);
        this.removeIfServerChannel(channel);
    }

    private void removeIfServerChannel(SocketChannel channel) {
        if (channel instanceof ServerChannel) {
            this.mPeerServerChannels.remove(channel.mPeer, channel);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void acceptLoop() {
        Channel localServer;
        ServerSocket ss;
        ChannelManager channelManager = this;
        synchronized (channelManager) {
            ss = this.mServerSocket;
            localServer = this.mLocalServer;
        }
        if (ss == null) {
            return;
        }
        while (true) {
            try {
                while (true) {
                    this.doAccept(ss, localServer);
                }
            }
            catch (Throwable e) {
                ChannelManager channelManager2 = this;
                synchronized (channelManager2) {
                    if (ss != this.mServerSocket) {
                        return;
                    }
                }
                this.mUncaughtHandler.accept(e);
                if (ss.isClosed()) {
                    this.stop();
                    return;
                }
                Thread.yield();
                continue;
            }
            break;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void doAccept(ServerSocket ss, Channel localServer) throws IOException {
        Socket s = ss.accept();
        if (this.mPartitioned) {
            Utils.closeQuietly(s);
            return;
        }
        ServerChannel server = null;
        try {
            Consumer<Socket> acceptor;
            boolean checkCRCs;
            int connectionType;
            long remoteMemberId;
            byte[] header;
            block22: {
                header = this.readHeader(s, true);
                if (header == null) {
                    return;
                }
                remoteMemberId = Utils.decodeLongLE(header, 16);
                connectionType = Utils.decodeIntLE(header, 24);
                checkCRCs = (connectionType & 1) != 0;
                acceptor = null;
                switch (connectionType &= 0xFFFFFFFE) {
                    case 4: {
                        break block22;
                    }
                    case 0: {
                        acceptor = this.mSocketAcceptor;
                        break;
                    }
                    case 2: {
                        acceptor = this.mJoinAcceptor;
                        break;
                    }
                    case 6: {
                        acceptor = this.mSnapshotRequestAcceptor;
                    }
                }
                if (acceptor == null) {
                    Utils.closeQuietly(s);
                    return;
                }
            }
            ServerChannel existing = null;
            ChannelManager channelManager = this;
            synchronized (channelManager) {
                Peer peer;
                if (this.mServerSocket == null) {
                    Utils.closeQuietly(s);
                    return;
                }
                if (remoteMemberId == 0L) {
                    peer = null;
                } else {
                    peer = this.mPeerSet.ceiling(new Peer(remoteMemberId));
                    if (peer == null || peer.mMemberId != remoteMemberId) {
                        Utils.closeQuietly(s);
                        return;
                    }
                }
                if (connectionType == 4) {
                    if (peer == null) {
                        Utils.closeQuietly(s);
                        return;
                    }
                    server = new ServerChannel(peer, localServer);
                    this.mChannels.add(server);
                    existing = this.mPeerServerChannels.put(peer, server);
                    if (this.mWriteCRCs) {
                        connectionType |= 1;
                    }
                    Utils.encodeIntLE(header, 24, connectionType);
                }
                Utils.encodeLongLE(header, 16, this.mLocalMemberId);
                this.mFreshSockets.add(s);
            }
            ServerChannel fserver = server;
            if (acceptor == null) {
                acceptor = sock -> fserver.accepted((Socket)sock, checkCRCs);
            }
            Consumer<Socket> facceptor = acceptor;
            Runnable replyTask = () -> {
                try {
                    ChannelManager channelManager = this;
                    synchronized (channelManager) {
                        this.mFreshSockets.remove(s);
                    }
                    s.getOutputStream().write(header);
                    facceptor.accept(s);
                    return;
                }
                catch (IOException iOException) {
                }
                catch (Throwable e) {
                    this.mUncaughtHandler.accept(e);
                }
                Utils.closeQuietly(s);
                Utils.closeQuietly(fserver);
            };
            if (existing == null) {
                this.execute(replyTask);
                return;
            }
            if (existing.replaced(replyTask)) return;
            this.scheduleMillis(existing::close, 2500L);
            return;
        }
        catch (Throwable e) {
            Utils.closeQuietly(s);
            Utils.closeQuietly(server);
            throw e;
        }
    }

    byte[] readHeader(Socket s, boolean accepted) throws JoinException {
        byte[] header;
        try {
            s.setSoTimeout(5000);
            header = ChannelManager.readHeader(s, accepted, this.groupId(), this.mGroupToken1, this.mGroupToken2);
            s.setSoTimeout(0);
            s.setTcpNoDelay(true);
        }
        catch (IOException e) {
            Utils.closeQuietly(s);
            if (e instanceof JoinException) {
                JoinException je = (JoinException)e;
                throw je;
            }
            return null;
        }
        if (header == null) {
            Utils.closeQuietly(s);
        }
        return header;
    }

    static byte[] readHeader(Socket s, boolean accepted, long groupId, long groupToken1, long groupToken2) throws IOException, JoinException {
        boolean idMatch;
        InputStream in = s.getInputStream();
        byte[] header = new byte[44];
        Utils.readFully(in, header, 0, header.length);
        if (Utils.decodeLongLE(header, 0) != 2825672906279293275L) {
            return null;
        }
        long gt1 = Utils.decodeLongLE(header, 28);
        long gt2 = Utils.decodeLongLE(header, 36);
        boolean tokenMatch = gt1 == groupToken1 || gt1 == groupToken2 || gt2 == groupToken1 || gt2 == groupToken2;
        boolean bl = idMatch = Utils.decodeLongLE(header, 8) == groupId || Utils.decodeIntLE(header, 24) < 4;
        if (!tokenMatch || !idMatch) {
            if (!accepted) {
                throw new JoinException("Group token or id doesn't match", s.getRemoteSocketAddress());
            }
            Utils.encodeLongLE(header, 8, 0L);
            Utils.encodeLongLE(header, 16, 0L);
            s.getOutputStream().write(header);
            return null;
        }
        return header;
    }

    private synchronized void checkWrites() {
        if (this.mServerSocket == null) {
            return;
        }
        Iterator<SocketChannel> iterator = this.mChannels.iterator();
        while (iterator.hasNext()) {
            SocketChannel channel;
            int state = channel.mWriteState;
            channel = iterator.next();
            if (state >= channel.maxWriteTagCount()) {
                if (!cWriteStateHandle.compareAndSet(channel, state, 0)) continue;
                channel.closeSocket();
                continue;
            }
            if (state < 1) continue;
            cWriteStateHandle.compareAndSet(channel, state, state + 1);
        }
        this.scheduleMillis(this::checkWrites, 125L);
    }

    static {
        try {
            cControlVersionHandle = MethodHandles.lookup().findVarHandle(ChannelManager.class, "mControlVersion", Integer.TYPE);
        }
        catch (Throwable e) {
            throw Utils.rethrow(e);
        }
        try {
            cWriteStateHandle = MethodHandles.lookup().findVarHandle(SocketChannel.class, "mWriteState", Integer.TYPE);
        }
        catch (Throwable e) {
            throw Utils.rethrow(e);
        }
    }

    abstract class SocketChannel
    extends Latch
    implements Channel,
    Closeable {
        final Peer mPeer;
        private Channel mLocalServer;
        private volatile Socket mSocket;
        private OutputStream mOut;
        private final CRC32C mOutCRC;
        private ChannelInputStream mIn;
        private int mReconnectDelay;
        private volatile long mConnectAttemptStartedAt;
        private boolean mJoinFailure;
        volatile int mWriteState;
        private byte[] mWriteBuffer = new byte[128];
        private Runnable mReplacementTask;

        SocketChannel(Peer peer, Channel localServer) {
            this.mPeer = peer;
            this.mLocalServer = localServer;
            this.mConnectAttemptStartedAt = Long.MAX_VALUE;
            this.mOutCRC = ChannelManager.this.mWriteCRCs ? new CRC32C() : null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void connect() {
            ChannelInputStream in;
            SocketChannel socketChannel = this;
            synchronized (socketChannel) {
                in = this.mIn;
                if (this.mConnectAttemptStartedAt == Long.MAX_VALUE) {
                    this.mConnectAttemptStartedAt = System.currentTimeMillis();
                }
            }
            int[] conType = new int[]{4};
            if (this.mOutCRC != null) {
                conType[0] = conType[0] | 1;
            }
            try {
                this.connected(ChannelManager.this.doConnect(this.mPeer, conType), conType[0] < 0);
            }
            catch (JoinException e) {
                boolean report;
                SocketChannel socketChannel2 = this;
                synchronized (socketChannel2) {
                    boolean bl = report = !this.mJoinFailure;
                    if (report) {
                        this.mJoinFailure = true;
                    }
                }
                if (report) {
                    ChannelManager.this.mUncaughtHandler.accept(e);
                }
            }
            catch (IOException iOException) {
                // empty catch block
            }
            this.reconnect(in);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void reconnect(InputStream existing) {
            int delay;
            Socket s;
            Channel localServer;
            SocketChannel socketChannel = this;
            synchronized (socketChannel) {
                if (existing != this.mIn) {
                    return;
                }
                localServer = this.mLocalServer;
                s = this.mSocket;
                this.mSocket = null;
                this.mOut = null;
                this.mIn = null;
                delay = Math.max(this.mReconnectDelay, 10);
                this.mReconnectDelay = Math.min(delay << 1, 1000);
            }
            ChannelManager.this.incrementControlVersion();
            Utils.closeQuietly(s);
            if (localServer != null) {
                ChannelManager.this.scheduleMillis(this::connect, delay);
            }
        }

        synchronized boolean replaced(Runnable task) {
            if (this.mReplacementTask != null) {
                throw new IllegalStateException();
            }
            if (this.mSocket == null) {
                ChannelManager.this.execute(task);
                return true;
            }
            this.mReplacementTask = task;
            return false;
        }

        @Override
        public void close() {
            ChannelManager.this.unregister(this);
            this.disconnect();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void disconnect() {
            Runnable task;
            Socket s;
            SocketChannel socketChannel = this;
            synchronized (socketChannel) {
                this.mLocalServer = null;
                s = this.mSocket;
                this.mSocket = null;
                this.mOut = null;
                this.mIn = null;
                task = this.mReplacementTask;
                this.mReplacementTask = null;
            }
            if (task != null) {
                ChannelManager.this.execute(task);
            }
            ChannelManager.this.incrementControlVersion();
            Utils.closeQuietly(s);
        }

        void closeSocket() {
            Utils.closeQuietly(this.mSocket);
        }

        void accepted(Socket s, boolean checkCRCs) {
            this.connected(s, checkCRCs);
        }

        private synchronized boolean connected(Socket s, boolean checkCRCs) {
            ChannelInputStream in;
            OutputStream out;
            if (ChannelManager.this.mPartitioned) {
                Utils.closeQuietly(s);
                return false;
            }
            try {
                out = s.getOutputStream();
                in = new ChannelInputStream(s.getInputStream(), 128, checkCRCs);
            }
            catch (Throwable e) {
                Utils.closeQuietly(s);
                return false;
            }
            ChannelManager.this.incrementControlVersion();
            Utils.closeQuietly(this.mSocket);
            this.acquireExclusive();
            this.mSocket = s;
            this.mOut = out;
            this.mIn = in;
            this.mReconnectDelay = 0;
            this.mConnectAttemptStartedAt = Long.MAX_VALUE;
            this.mJoinFailure = false;
            this.releaseExclusive();
            ChannelManager.this.execute(this::inputLoop);
            this.notifyAll();
            return true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void inputLoop() {
            ChannelInputStream in;
            Channel localServer;
            SocketChannel socketChannel = this;
            synchronized (socketChannel) {
                localServer = this.mLocalServer;
                in = this.mIn;
            }
            if (localServer == null || in == null) {
                return;
            }
            try {
                while (true) {
                    long header = in.readLongLE();
                    int opAndLength = (int)header;
                    int commandLength = opAndLength >> 8 & 0xFFFFFF;
                    int op = opAndLength & 0xFF;
                    int crc = (int)(header >> 32) ^ opAndLength;
                    in.prepareChecksum(commandLength, crc);
                    switch (op) {
                        case 0: {
                            break;
                        }
                        case 2: {
                            localServer.requestVote(this, in.readLongLE(), in.readLongLE(), in.readLongLE(), in.readLongLE());
                            commandLength -= 32;
                            break;
                        }
                        case 3: {
                            localServer.requestVoteReply(this, in.readLongLE());
                            commandLength -= 8;
                            break;
                        }
                        case 34: {
                            localServer.forceElection(this);
                            break;
                        }
                        case 4: {
                            localServer.queryTerms(this, in.readLongLE(), in.readLongLE());
                            commandLength -= 16;
                            break;
                        }
                        case 5: {
                            localServer.queryTermsReply(this, in.readLongLE(), in.readLongLE(), in.readLongLE());
                            commandLength -= 24;
                            break;
                        }
                        case 6: {
                            localServer.queryData(this, in.readLongLE(), in.readLongLE());
                            commandLength -= 16;
                            break;
                        }
                        case 7: {
                            long currentTerm = in.readLongLE();
                            long prevTerm = in.readLongLE();
                            long term = in.readLongLE();
                            long position = in.readLongLE();
                            in.readFully(commandLength -= 32);
                            localServer.queryDataReply(this, currentTerm, prevTerm, term, position, in.mBuffer, in.mPos, commandLength);
                            in.mPos += commandLength;
                            commandLength = 0;
                            break;
                        }
                        case 29: {
                            long currentTerm = in.readLongLE();
                            long prevTerm = in.readLongLE();
                            long term = in.readLongLE();
                            long position = in.readLongLE();
                            long endPosition = in.readLongLE();
                            localServer.queryDataReplyMissing(this, currentTerm, prevTerm, term, position, endPosition);
                            commandLength -= 40;
                            break;
                        }
                        case 8: {
                            long prevTerm = in.readLongLE();
                            long term = in.readLongLE();
                            long position = in.readLongLE();
                            long highestPosition = in.readLongLE();
                            long commitPosition = in.readLongLE();
                            in.readFully(commandLength -= 40);
                            localServer.writeData(this, prevTerm, term, position, highestPosition, commitPosition, null, in.mBuffer, in.mPos, commandLength);
                            in.mPos += commandLength;
                            commandLength = 0;
                            break;
                        }
                        case 9: {
                            localServer.writeDataReply(this, in.readLongLE(), in.readLongLE());
                            commandLength -= 16;
                            break;
                        }
                        case 24: {
                            long prevTerm = in.readLongLE();
                            long term = in.readLongLE();
                            long position = in.readLongLE();
                            long highestPosition = in.readLongLE();
                            long commitPosition = in.readLongLE();
                            in.readFully(commandLength -= 40);
                            localServer.writeDataAndProxy(this, prevTerm, term, position, highestPosition, commitPosition, null, in.mBuffer, in.mPos, commandLength);
                            in.mPos += commandLength;
                            commandLength = 0;
                            break;
                        }
                        case 26: {
                            long prevTerm = in.readLongLE();
                            long term = in.readLongLE();
                            long position = in.readLongLE();
                            long highestPosition = in.readLongLE();
                            long commitPosition = in.readLongLE();
                            in.readFully(commandLength -= 40);
                            localServer.writeDataViaProxy(this, prevTerm, term, position, highestPosition, commitPosition, null, in.mBuffer, in.mPos, commandLength);
                            in.mPos += commandLength;
                            commandLength = 0;
                            break;
                        }
                        case 10: {
                            localServer.syncCommit(this, in.readLongLE(), in.readLongLE(), in.readLongLE());
                            commandLength -= 24;
                            break;
                        }
                        case 11: {
                            localServer.syncCommitReply(this, in.readLongLE(), in.readLongLE(), in.readLongLE());
                            commandLength -= 24;
                            break;
                        }
                        case 12: {
                            localServer.compact(this, in.readLongLE());
                            commandLength -= 8;
                            break;
                        }
                        case 14: {
                            localServer.snapshotScore(this);
                            break;
                        }
                        case 15: {
                            localServer.snapshotScoreReply(this, in.readIntLE(), Float.intBitsToFloat(in.readIntLE()));
                            commandLength -= 8;
                            break;
                        }
                        case 16: {
                            localServer.updateRole(this, in.readLongLE(), in.readLongLE(), Role.decode(in.readByte()));
                            commandLength -= 17;
                            break;
                        }
                        case 17: {
                            localServer.updateRoleReply(this, in.readLongLE(), in.readLongLE(), in.readByte());
                            commandLength -= 17;
                            break;
                        }
                        case 18: {
                            localServer.groupVersion(this, in.readLongLE());
                            commandLength -= 8;
                            break;
                        }
                        case 19: {
                            localServer.groupVersionReply(this, in.readLongLE());
                            commandLength -= 8;
                            break;
                        }
                        case 20: {
                            localServer.groupFile(this, in.readLongLE());
                            commandLength -= 8;
                            break;
                        }
                        case 21: {
                            localServer.groupFileReply(this, in, null);
                            break;
                        }
                        case 22: {
                            localServer.leaderCheck(this);
                            break;
                        }
                        case 23: {
                            localServer.leaderCheckReply(this, in.readLongLE());
                            commandLength -= 8;
                            break;
                        }
                        default: {
                            localServer.unknown(this, op);
                        }
                    }
                    in.skipFully(commandLength);
                }
            }
            catch (IOException header) {
            }
            catch (Throwable e) {
                ChannelManager.this.mUncaughtHandler.accept(e);
            }
            this.reconnect(in);
        }

        @Override
        public synchronized boolean isConnected() {
            return this.mOut != null;
        }

        @Override
        public Peer peer() {
            return this.mPeer;
        }

        /*
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public int waitForConnection(int timeoutMillis) throws InterruptedIOException {
            if (timeoutMillis == 0) {
                return 0;
            }
            long end = Long.MIN_VALUE;
            SocketChannel socketChannel = this;
            synchronized (socketChannel) {
                while (this.mSocket == null && this.mLocalServer != null) {
                    try {
                        if (timeoutMillis < 0) {
                            this.wait();
                            continue;
                        }
                        long now = System.currentTimeMillis();
                        if (end == Long.MIN_VALUE) {
                            end = now + (long)timeoutMillis;
                        } else {
                            timeoutMillis = (int)(end - now);
                        }
                        if (timeoutMillis <= 0) {
                            return 0;
                        }
                        this.wait(timeoutMillis);
                    }
                    catch (InterruptedException e) {
                        throw new InterruptedIOException();
                    }
                }
                return timeoutMillis;
            }
        }

        @Override
        public long connectAttemptStartedAt() {
            return this.mConnectAttemptStartedAt;
        }

        @Override
        public void unknown(Channel from, int op) {
        }

        @Override
        public boolean requestVote(Channel from, long term, long candidateId, long highestTerm, long highestPosition) {
            return this.writeCommand(2, term, candidateId, highestTerm, highestPosition);
        }

        @Override
        public boolean requestVoteReply(Channel from, long term) {
            return this.writeCommand(3, term);
        }

        @Override
        public boolean forceElection(Channel from) {
            return this.writeCommand(34);
        }

        @Override
        public boolean queryTerms(Channel from, long startPosition, long endPosition) {
            return this.writeCommand(4, startPosition, endPosition);
        }

        @Override
        public boolean queryTermsReply(Channel from, long prevTerm, long term, long startPosition) {
            return this.writeCommand(5, prevTerm, term, startPosition);
        }

        @Override
        public boolean queryData(Channel from, long startPosition, long endPosition) {
            return this.writeCommand(6, startPosition, endPosition);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public boolean queryDataReply(Channel from, long currentTerm, long prevTerm, long term, long position, byte[] data, int off, int len) {
            while (len >= 0xFFFFE0) {
                int max = 0xFFFFDF;
                if (!this.queryDataReply(from, currentTerm, prevTerm, term, position, data, off, max)) {
                    return false;
                }
                position += (long)max;
                off += max;
                len -= max;
            }
            this.acquireExclusive();
            try {
                OutputStream out = this.mOut;
                if (out == null) {
                    boolean bl = false;
                    return bl;
                }
                int commandLength = 40 + len;
                byte[] command = this.allocWriteBuffer(commandLength);
                this.prepareCommand(command, 7, 0, commandLength - 8);
                Utils.encodeLongLE(command, 8, currentTerm);
                Utils.encodeLongLE(command, 16, prevTerm);
                Utils.encodeLongLE(command, 24, term);
                Utils.encodeLongLE(command, 32, position);
                System.arraycopy(data, off, command, 40, len);
                boolean bl = this.writeCommand(out, command, 0, commandLength);
                return bl;
            }
            finally {
                this.releaseExclusive();
            }
        }

        @Override
        public boolean queryDataReplyMissing(Channel from, long currentTerm, long prevTerm, long term, long startPosition, long endPosition) {
            return this.writeCommand(29, currentTerm, prevTerm, term, startPosition, endPosition);
        }

        @Override
        public boolean writeData(Channel from, long prevTerm, long term, long position, long highestPos, long commitPos, byte[] prefix, byte[] data, int off, int len) {
            return this.writeData(8, prevTerm, term, position, highestPos, commitPos, prefix, data, off, len);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean writeData(int op, long prevTerm, long term, long position, long highestPosition, long commitPosition, byte[] prefix, byte[] data, int off, int len) {
            int fullLen = len;
            if (prefix != null) {
                fullLen += prefix.length;
            }
            while (fullLen >= 0xFFFFD8) {
                if (prefix != null) {
                    if (!this.writeData(op, prevTerm, term, position, highestPosition, commitPosition, null, prefix, 0, prefix.length)) {
                        return false;
                    }
                    position += (long)prefix.length;
                    prefix = null;
                } else {
                    int max = 0xFFFFD7;
                    if (!this.writeData(op, prevTerm, term, position, highestPosition, commitPosition, null, data, off, max)) {
                        return false;
                    }
                    position += (long)max;
                    off += max;
                    len -= max;
                }
                fullLen = len;
            }
            this.acquireExclusive();
            try {
                OutputStream out = this.mOut;
                if (out == null) {
                    boolean bl = false;
                    return bl;
                }
                int commandLength = 48 + fullLen;
                byte[] command = this.allocWriteBuffer(commandLength);
                this.prepareCommand(command, op, 0, commandLength - 8);
                Utils.encodeLongLE(command, 8, prevTerm);
                Utils.encodeLongLE(command, 16, term);
                Utils.encodeLongLE(command, 24, position);
                Utils.encodeLongLE(command, 32, highestPosition);
                Utils.encodeLongLE(command, 40, commitPosition);
                int commandOffset = 48;
                if (prefix != null) {
                    System.arraycopy(prefix, 0, command, commandOffset, prefix.length);
                    commandOffset += prefix.length;
                }
                System.arraycopy(data, off, command, commandOffset, len);
                boolean bl = this.writeCommand(out, command, 0, commandLength);
                return bl;
            }
            finally {
                this.releaseExclusive();
            }
        }

        @Override
        public boolean writeDataReply(Channel from, long term, long highestPosition) {
            return this.writeCommand(9, term, highestPosition);
        }

        @Override
        public boolean writeDataAndProxy(Channel from, long prevTerm, long term, long position, long highestPos, long commitPos, byte[] prefix, byte[] data, int off, int len) {
            return this.writeData(24, prevTerm, term, position, highestPos, commitPos, prefix, data, off, len);
        }

        @Override
        public boolean writeDataViaProxy(Channel from, long prevTerm, long term, long position, long highestPos, long commitPos, byte[] prefix, byte[] data, int off, int len) {
            return this.writeData(26, prevTerm, term, position, highestPos, commitPos, prefix, data, off, len);
        }

        @Override
        public boolean syncCommit(Channel from, long prevTerm, long term, long position) {
            return this.writeCommand(10, prevTerm, term, position);
        }

        @Override
        public boolean syncCommitReply(Channel from, long groupVersion, long term, long position) {
            return this.writeCommand(11, groupVersion, term, position);
        }

        @Override
        public boolean compact(Channel from, long position) {
            return this.writeCommand(12, position);
        }

        @Override
        public boolean snapshotScore(Channel from) {
            return this.writeCommand(14);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public boolean snapshotScoreReply(Channel from, int activeSessions, float weight) {
            this.acquireExclusive();
            try {
                OutputStream out = this.mOut;
                if (out == null) {
                    boolean bl = false;
                    return bl;
                }
                int commandLength = 16;
                byte[] command = this.allocWriteBuffer(16);
                this.prepareCommand(command, 15, 0, 8);
                Utils.encodeIntLE(command, 8, activeSessions);
                Utils.encodeIntLE(command, 12, Float.floatToIntBits(weight));
                boolean bl = this.writeCommand(out, command, 0, 16);
                return bl;
            }
            finally {
                this.releaseExclusive();
            }
        }

        @Override
        public boolean updateRole(Channel from, long groupVersion, long memberId, Role role) {
            return this.writeCommand(16, groupVersion, memberId, role.mCode);
        }

        @Override
        public boolean updateRoleReply(Channel from, long groupVersion, long memberId, byte result) {
            return this.writeCommand(17, groupVersion, memberId, result);
        }

        @Override
        public boolean groupVersion(Channel from, long groupVersion) {
            return this.writeCommand(18, groupVersion);
        }

        @Override
        public boolean groupVersionReply(Channel from, long groupVersion) {
            return this.writeCommand(19, groupVersion);
        }

        @Override
        public boolean groupFile(Channel from, long groupVersion) {
            return this.writeCommand(20, groupVersion);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public boolean groupFileReply(Channel from, InputStream unused, Consumer<OutputStream> consumer) throws IOException {
            block7: {
                if (unused != null || consumer == null) {
                    throw new IllegalArgumentException();
                }
                this.acquireExclusive();
                try {
                    OutputStream out = this.mOut;
                    if (out == null) break block7;
                    int commandLength = 8;
                    byte[] command = this.allocWriteBuffer(8);
                    this.prepareCommand(command, 21, 0, 0);
                    if (!this.writeCommand(out, command, 0, 8)) break block7;
                    try {
                        consumer.accept(out);
                        out.flush();
                        boolean bl = true;
                        return bl;
                    }
                    catch (IOException e) {
                        this.mOut = null;
                        this.closeSocket();
                    }
                }
                finally {
                    this.releaseExclusive();
                }
            }
            return false;
        }

        @Override
        public boolean leaderCheck(Channel from) {
            return this.writeCommand(22);
        }

        @Override
        public boolean leaderCheckReply(Channel from, long term) {
            return this.writeCommand(23, term);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean writeCommand(int op) {
            this.acquireExclusive();
            try {
                OutputStream out = this.mOut;
                if (out == null) {
                    boolean bl = false;
                    return bl;
                }
                int commandLength = 8;
                byte[] command = this.allocWriteBuffer(8);
                this.prepareCommand(command, op, 0, 0);
                boolean bl = this.writeCommand(out, command, 0, 8);
                return bl;
            }
            finally {
                this.releaseExclusive();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean writeCommand(int op, long a) {
            this.acquireExclusive();
            try {
                OutputStream out = this.mOut;
                if (out == null) {
                    boolean bl = false;
                    return bl;
                }
                int commandLength = 16;
                byte[] command = this.allocWriteBuffer(16);
                this.prepareCommand(command, op, 0, 8);
                Utils.encodeLongLE(command, 8, a);
                boolean bl = this.writeCommand(out, command, 0, 16);
                return bl;
            }
            finally {
                this.releaseExclusive();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean writeCommand(int op, long a, long b) {
            this.acquireExclusive();
            try {
                OutputStream out = this.mOut;
                if (out == null) {
                    boolean bl = false;
                    return bl;
                }
                int commandLength = 24;
                byte[] command = this.allocWriteBuffer(24);
                this.prepareCommand(command, op, 0, 16);
                Utils.encodeLongLE(command, 8, a);
                Utils.encodeLongLE(command, 16, b);
                boolean bl = this.writeCommand(out, command, 0, 24);
                return bl;
            }
            finally {
                this.releaseExclusive();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean writeCommand(int op, long a, long b, byte c) {
            this.acquireExclusive();
            try {
                OutputStream out = this.mOut;
                if (out == null) {
                    boolean bl = false;
                    return bl;
                }
                int commandLength = 25;
                byte[] command = this.allocWriteBuffer(25);
                this.prepareCommand(command, op, 0, 17);
                Utils.encodeLongLE(command, 8, a);
                Utils.encodeLongLE(command, 16, b);
                command[24] = c;
                boolean bl = this.writeCommand(out, command, 0, 25);
                return bl;
            }
            finally {
                this.releaseExclusive();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean writeCommand(int op, long a, long b, long c) {
            this.acquireExclusive();
            try {
                OutputStream out = this.mOut;
                if (out == null) {
                    boolean bl = false;
                    return bl;
                }
                int commandLength = 32;
                byte[] command = this.allocWriteBuffer(32);
                this.prepareCommand(command, op, 0, 24);
                Utils.encodeLongLE(command, 8, a);
                Utils.encodeLongLE(command, 16, b);
                Utils.encodeLongLE(command, 24, c);
                boolean bl = this.writeCommand(out, command, 0, 32);
                return bl;
            }
            finally {
                this.releaseExclusive();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean writeCommand(int op, long a, long b, long c, long d) {
            this.acquireExclusive();
            try {
                OutputStream out = this.mOut;
                if (out == null) {
                    boolean bl = false;
                    return bl;
                }
                int commandLength = 40;
                byte[] command = this.allocWriteBuffer(40);
                this.prepareCommand(command, op, 0, 32);
                Utils.encodeLongLE(command, 8, a);
                Utils.encodeLongLE(command, 16, b);
                Utils.encodeLongLE(command, 24, c);
                Utils.encodeLongLE(command, 32, d);
                boolean bl = this.writeCommand(out, command, 0, 40);
                return bl;
            }
            finally {
                this.releaseExclusive();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean writeCommand(int op, long a, long b, long c, long d, long e) {
            this.acquireExclusive();
            try {
                OutputStream out = this.mOut;
                if (out == null) {
                    boolean bl = false;
                    return bl;
                }
                int commandLength = 48;
                byte[] command = this.allocWriteBuffer(48);
                this.prepareCommand(command, op, 0, 40);
                Utils.encodeLongLE(command, 8, a);
                Utils.encodeLongLE(command, 16, b);
                Utils.encodeLongLE(command, 24, c);
                Utils.encodeLongLE(command, 32, d);
                Utils.encodeLongLE(command, 40, e);
                boolean bl = this.writeCommand(out, command, 0, 48);
                return bl;
            }
            finally {
                this.releaseExclusive();
            }
        }

        private byte[] allocWriteBuffer(int size) {
            byte[] buf = this.mWriteBuffer;
            if (size > buf.length) {
                buf = this.growWriteBuffer(size);
            }
            return buf;
        }

        private byte[] growWriteBuffer(int size) {
            byte[] buf = new byte[Math.max(size, (int)((double)this.mWriteBuffer.length * 1.5))];
            this.mWriteBuffer = buf;
            return buf;
        }

        private void prepareCommand(byte[] command, int op, int offset, int length) {
            Utils.encodeIntLE(command, offset, length << 8 | (byte)op);
        }

        private boolean writeCommand(OutputStream out, byte[] command, int offset, int length) {
            try {
                CRC32C crc = this.mOutCRC;
                if (crc != null) {
                    crc.reset();
                    crc.update(command, offset + 8, length - 8);
                    int crcValue = (int)crc.getValue();
                    Utils.encodeIntLE(command, offset + 4, crcValue ^= Utils.decodeIntLE(command, offset));
                }
                this.mWriteState = 1;
                out.write(command, offset, length);
                this.mWriteState = 0;
                return true;
            }
            catch (IOException e) {
                this.mOut = null;
                this.closeSocket();
                return false;
            }
        }

        @Override
        public String toString() {
            return this.getClass().getSimpleName() + "{peer=" + this.mPeer + ", socket=" + this.mSocket + "}";
        }

        abstract int maxWriteTagCount();
    }

    final class ClientChannel
    extends SocketChannel {
        ClientChannel(Peer peer, Channel localServer) {
            super(peer, localServer);
        }

        @Override
        int maxWriteTagCount() {
            return 2;
        }
    }

    final class ServerChannel
    extends SocketChannel {
        ServerChannel(Peer peer, Channel localServer) {
            super(peer, localServer);
        }

        @Override
        void reconnect(InputStream existing) {
            this.close();
        }

        @Override
        int maxWriteTagCount() {
            return 50;
        }
    }
}

