/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.com;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.WriteCompletionEvent;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.neo4j.com.BlockLogBuffer;
import org.neo4j.com.ChunkingChannelBuffer;
import org.neo4j.com.DechunkingChannelBuffer;
import org.neo4j.com.IllegalProtocolVersionException;
import org.neo4j.com.Protocol;
import org.neo4j.com.RequestContext;
import org.neo4j.com.RequestType;
import org.neo4j.com.Response;
import org.neo4j.com.TransactionStream;
import org.neo4j.com.TxChecksumVerifier;
import org.neo4j.com.TxExtractor;
import org.neo4j.com.monitor.RequestMonitor;
import org.neo4j.helpers.Clock;
import org.neo4j.helpers.Exceptions;
import org.neo4j.helpers.HostnamePort;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.helpers.Pair;
import org.neo4j.helpers.Triplet;
import org.neo4j.helpers.collection.IteratorUtil;
import org.neo4j.kernel.impl.nioneo.store.StoreId;
import org.neo4j.kernel.impl.util.StringLogger;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.logging.Logging;
import org.neo4j.kernel.monitoring.ByteCounterMonitor;
import org.neo4j.kernel.monitoring.Monitors;

public abstract class Server<T, R>
extends SimpleChannelHandler
implements ChannelPipelineFactory,
Lifecycle {
    private final ByteCounterMonitor byteCounterMonitor;
    private final RequestMonitor requestMonitor;
    private InetSocketAddress socketAddress;
    private static final String INADDR_ANY = "0.0.0.0";
    private final Clock clock;
    static final byte INTERNAL_PROTOCOL_VERSION = 2;
    public static final int DEFAULT_MAX_NUMBER_OF_CONCURRENT_TRANSACTIONS = 200;
    private ServerBootstrap bootstrap;
    private final T requestTarget;
    private ChannelGroup channelGroup;
    private final Map<Channel, Pair<RequestContext, AtomicLong>> connectedSlaveChannels = new ConcurrentHashMap<Channel, Pair<RequestContext, AtomicLong>>();
    private ExecutorService executor;
    private ExecutorService workerExecutor;
    private ExecutorService targetCallExecutor;
    private final StringLogger msgLog;
    private final Map<Channel, PartialRequest> partialRequests = new ConcurrentHashMap<Channel, PartialRequest>();
    private final Configuration config;
    private final int frameLength;
    private volatile boolean shuttingDown;
    private ExecutorService unfinishedTransactionExecutor;
    private ScheduledExecutorService silentChannelExecutor;
    private final byte applicationProtocolVersion;
    private long oldChannelThresholdMillis;
    private final TxChecksumVerifier txVerifier;
    private int chunkSize;

    public Server(T requestTarget, Configuration config, Logging logging, int frameLength, byte applicationProtocolVersion, TxChecksumVerifier txVerifier, Clock clock, Monitors monitors) {
        this.requestTarget = requestTarget;
        this.config = config;
        this.frameLength = frameLength;
        this.applicationProtocolVersion = applicationProtocolVersion;
        this.msgLog = logging.getMessagesLog(((Object)((Object)this)).getClass());
        this.txVerifier = txVerifier;
        this.clock = clock;
        this.byteCounterMonitor = (ByteCounterMonitor)monitors.newMonitor(ByteCounterMonitor.class, ((Object)((Object)this)).getClass(), new String[0]);
        this.requestMonitor = (RequestMonitor)monitors.newMonitor(RequestMonitor.class, ((Object)((Object)this)).getClass(), new String[0]);
    }

    public void init() throws Throwable {
    }

    public void start() throws Throwable {
        this.oldChannelThresholdMillis = this.config.getOldChannelThreshold();
        this.chunkSize = this.config.getChunkSize();
        Protocol.assertChunkSizeIsWithinFrameSize(this.chunkSize, this.frameLength);
        this.executor = Executors.newCachedThreadPool((ThreadFactory)new NamedThreadFactory("Server receiving"));
        this.workerExecutor = Executors.newCachedThreadPool((ThreadFactory)new NamedThreadFactory("Server receiving"));
        this.targetCallExecutor = Executors.newCachedThreadPool((ThreadFactory)new NamedThreadFactory(((Object)((Object)this)).getClass().getSimpleName() + ":" + this.config.getServerAddress().getPort()));
        this.unfinishedTransactionExecutor = Executors.newScheduledThreadPool(2, (ThreadFactory)new NamedThreadFactory("Unfinished transactions"));
        this.silentChannelExecutor = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new NamedThreadFactory("Silent channel reaper"));
        this.silentChannelExecutor.scheduleWithFixedDelay(this.silentChannelFinisher(), 5L, 5L, TimeUnit.SECONDS);
        this.bootstrap = new ServerBootstrap((ChannelFactory)new NioServerSocketChannelFactory((Executor)this.executor, (Executor)this.workerExecutor, this.config.getMaxConcurrentTransactions()));
        this.bootstrap.setPipelineFactory((ChannelPipelineFactory)this);
        Channel channel = null;
        this.socketAddress = null;
        int[] ports = this.config.getServerAddress().getPorts();
        ChannelException ex = null;
        for (int port = ports[0]; port <= ports[1]; ++port) {
            this.socketAddress = this.config.getServerAddress().getHost() == null || this.config.getServerAddress().getHost().equals(INADDR_ANY) ? new InetSocketAddress(port) : new InetSocketAddress(this.config.getServerAddress().getHost(), port);
            try {
                channel = this.bootstrap.bind((SocketAddress)this.socketAddress);
                ex = null;
                break;
            }
            catch (ChannelException e) {
                ex = e;
                continue;
            }
        }
        if (ex != null) {
            this.msgLog.logMessage("Failed to bind server to " + this.socketAddress, ex);
            this.executor.shutdown();
            this.workerExecutor.shutdown();
            throw new IOException(ex);
        }
        this.channelGroup = new DefaultChannelGroup();
        this.channelGroup.add(channel);
        this.msgLog.logMessage(((Object)((Object)this)).getClass().getSimpleName() + " communication server started and bound to " + this.socketAddress);
    }

    public void stop() throws Throwable {
        this.shuttingDown = true;
        this.targetCallExecutor.shutdown();
        this.targetCallExecutor.awaitTermination(10L, TimeUnit.SECONDS);
        this.unfinishedTransactionExecutor.shutdown();
        this.unfinishedTransactionExecutor.awaitTermination(10L, TimeUnit.SECONDS);
        this.silentChannelExecutor.shutdown();
        this.silentChannelExecutor.awaitTermination(10L, TimeUnit.SECONDS);
        this.channelGroup.close().awaitUninterruptibly();
        this.bootstrap.releaseExternalResources();
    }

    public void shutdown() throws Throwable {
    }

    public InetSocketAddress getSocketAddress() {
        return this.socketAddress;
    }

    private Runnable silentChannelFinisher() {
        return new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                HashMap channels = new HashMap();
                Map map = Server.this.connectedSlaveChannels;
                synchronized (map) {
                    for (Map.Entry channel : Server.this.connectedSlaveChannels.entrySet()) {
                        long age = System.currentTimeMillis() - ((AtomicLong)((Pair)channel.getValue()).other()).get();
                        if (age > Server.this.oldChannelThresholdMillis) {
                            Server.this.msgLog.logMessage("Found a silent channel " + channel + ", " + age);
                            channels.put(channel.getKey(), Boolean.TRUE);
                            continue;
                        }
                        if (age <= Server.this.oldChannelThresholdMillis / 2L) continue;
                        channels.put(channel.getKey(), Boolean.FALSE);
                    }
                }
                for (Map.Entry channel : channels.entrySet()) {
                    if (!((Boolean)channel.getValue()).booleanValue() && ((Channel)channel.getKey()).isOpen() && ((Channel)channel.getKey()).isConnected() && ((Channel)channel.getKey()).isBound()) continue;
                    Server.this.tryToFinishOffChannel((Channel)channel.getKey());
                }
            }
        };
    }

    protected byte getInternalProtocolVersion() {
        return 2;
    }

    public ChannelPipeline getPipeline() throws Exception {
        ChannelPipeline pipeline = Channels.pipeline();
        Protocol.addLengthFieldPipes(pipeline, this.frameLength);
        pipeline.addLast("serverHandler", (ChannelHandler)this);
        return pipeline;
    }

    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        this.channelGroup.add((Object)e.getChannel());
    }

    public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
        try {
            ChannelBuffer message = (ChannelBuffer)event.getMessage();
            this.handleRequest(message, event.getChannel());
        }
        catch (Throwable e) {
            this.msgLog.error("Error handling request", e);
            ChunkingChannelBuffer buffer = this.newChunkingBuffer(event.getChannel());
            buffer.clear(true);
            this.writeFailureResponse(e, buffer);
            ctx.getChannel().close();
            this.tryToFinishOffChannel(ctx.getChannel());
            throw Exceptions.launderedException((Throwable)e);
        }
    }

    public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception {
        Pair<RequestContext, AtomicLong> slave = this.connectedSlaveChannels.get(ctx.getChannel());
        if (slave != null) {
            ((AtomicLong)slave.other()).set(this.clock.currentTimeMillis());
            super.writeComplete(ctx, e);
        }
    }

    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        super.channelClosed(ctx, e);
        if (!ctx.getChannel().isOpen()) {
            this.tryToFinishOffChannel(ctx.getChannel());
        }
        this.channelGroup.remove((Object)e.getChannel());
    }

    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        super.channelDisconnected(ctx, e);
        if (!ctx.getChannel().isConnected()) {
            this.tryToFinishOffChannel(ctx.getChannel());
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        this.msgLog.warn("Exception from Netty", e.getCause());
    }

    protected void tryToFinishOffChannel(Channel channel) {
        Pair<RequestContext, AtomicLong> slave = null;
        slave = this.unmapSlave(channel);
        if (slave == null) {
            return;
        }
        this.tryToFinishOffChannel(channel, (RequestContext)slave.first());
    }

    protected void tryToFinishOffChannel(Channel channel, RequestContext slave) {
        block2: {
            try {
                this.finishOffChannel(channel, slave);
                this.unmapSlave(channel);
            }
            catch (Throwable failure) {
                this.submitSilent(this.unfinishedTransactionExecutor, this.newTransactionFinisher(slave));
                if (!this.shouldLogFailureToFinishOffChannel(failure)) break block2;
                this.msgLog.logMessage("Could not finish off dead channel", failure);
            }
        }
    }

    protected boolean shouldLogFailureToFinishOffChannel(Throwable failure) {
        return true;
    }

    private void submitSilent(ExecutorService service, Runnable job) {
        block2: {
            try {
                service.submit(job);
            }
            catch (RejectedExecutionException e) {
                if (this.shuttingDown) break block2;
                throw e;
            }
        }
    }

    private Runnable newTransactionFinisher(final RequestContext slave) {
        return new Runnable(){

            @Override
            public void run() {
                try {
                    Server.this.finishOffChannel(null, slave);
                }
                catch (Throwable e) {
                    this.sleepNicely(200);
                    Server.this.unfinishedTransactionExecutor.submit(this);
                }
            }

            private void sleepNicely(int millis) {
                try {
                    Thread.sleep(millis);
                }
                catch (InterruptedException e) {
                    Thread.interrupted();
                }
            }
        };
    }

    protected void handleRequest(ChannelBuffer buffer, Channel channel) {
        Byte continuation = this.readContinuationHeader(buffer, channel);
        if (continuation == null) {
            return;
        }
        if (continuation == 1) {
            PartialRequest partialRequest = this.partialRequests.get(channel);
            if (partialRequest == null) {
                RequestType<T> type = this.getRequestContext(buffer.readByte());
                RequestContext context = this.readContext(buffer);
                ChannelBuffer targetBuffer = this.mapSlave(channel, context);
                partialRequest = new PartialRequest(type, context, targetBuffer);
                this.partialRequests.put(channel, partialRequest);
            }
            partialRequest.add(buffer);
        } else {
            ChannelBuffer bufferToWriteTo;
            ChannelBuffer bufferToReadFrom;
            RequestContext context;
            RequestType type;
            PartialRequest partialRequest = this.partialRequests.remove(channel);
            if (partialRequest == null) {
                type = this.getRequestContext(buffer.readByte());
                context = this.readContext(buffer);
                ChannelBuffer targetBuffer = this.mapSlave(channel, context);
                bufferToReadFrom = buffer;
                bufferToWriteTo = targetBuffer;
            } else {
                type = partialRequest.type;
                context = partialRequest.context;
                ChannelBuffer targetBuffer = partialRequest.buffer;
                partialRequest.add(buffer);
                bufferToReadFrom = targetBuffer;
                bufferToWriteTo = ChannelBuffers.dynamicBuffer();
            }
            bufferToWriteTo.clear();
            ChunkingChannelBuffer chunkingBuffer = new ChunkingChannelBuffer(bufferToWriteTo, channel, this.chunkSize, this.getInternalProtocolVersion(), this.applicationProtocolVersion);
            this.submitSilent(this.targetCallExecutor, this.targetCaller(type, channel, context, chunkingBuffer, bufferToReadFrom));
        }
    }

    private Byte readContinuationHeader(ChannelBuffer buffer, final Channel channel) {
        byte[] header = new byte[2];
        buffer.readBytes(header);
        try {
            DechunkingChannelBuffer.assertSameProtocolVersion(header, this.getInternalProtocolVersion(), this.applicationProtocolVersion);
        }
        catch (IllegalProtocolVersionException e) {
            this.submitSilent(this.targetCallExecutor, new Runnable(){

                @Override
                public void run() {
                    Server.this.writeFailureResponse(e, Server.this.newChunkingBuffer(channel));
                }
            });
            return null;
        }
        return (byte)(header[0] & 1);
    }

    protected Runnable targetCaller(final RequestType<T> type, final Channel channel, final RequestContext context, final ChunkingChannelBuffer targetBuffer, final ChannelBuffer bufferToReadFrom) {
        return new Runnable(){

            @Override
            public void run() {
                HashMap<String, String> requestContext = new HashMap<String, String>();
                requestContext.put("type", type.toString());
                requestContext.put("remoteClient", channel.getRemoteAddress().toString());
                requestContext.put("slaveContext", context.toString());
                Server.this.requestMonitor.beginRequest(requestContext);
                Response response = null;
                Throwable failure = null;
                try {
                    Server.this.unmapSlave(channel);
                    response = type.getTargetCaller().call(Server.this.requestTarget, context, bufferToReadFrom, targetBuffer);
                    type.getObjectSerializer().write(response.response(), targetBuffer);
                    Server.writeStoreId(response.getStoreId(), targetBuffer);
                    Server.writeTransactionStreams(response.transactions(), targetBuffer, Server.this.byteCounterMonitor);
                    targetBuffer.done();
                    Server.this.responseWritten(type, channel, context);
                }
                catch (Throwable e) {
                    failure = e;
                    targetBuffer.clear(true);
                    Server.this.writeFailureResponse(e, targetBuffer);
                    Server.this.tryToFinishOffChannel(channel, context);
                    throw Exceptions.launderedException((Throwable)e);
                }
                finally {
                    if (response != null) {
                        response.close();
                    }
                    Server.this.requestMonitor.endRequest(failure);
                }
            }
        };
    }

    protected void writeFailureResponse(Throwable exception, ChunkingChannelBuffer buffer) {
        try {
            ByteArrayOutputStream bytes = new ByteArrayOutputStream();
            ObjectOutputStream out = new ObjectOutputStream(bytes);
            out.writeObject(exception);
            out.close();
            buffer.writeBytes(bytes.toByteArray());
            buffer.done();
        }
        catch (IOException e) {
            this.msgLog.logMessage("Couldn't send cause of error to client", exception);
        }
    }

    protected void responseWritten(RequestType<T> type, Channel channel, RequestContext context) {
    }

    private static void writeStoreId(StoreId storeId, ChannelBuffer targetBuffer) {
        targetBuffer.writeBytes(storeId.serialize());
    }

    private static void writeTransactionStreams(TransactionStream txStream, ChannelBuffer buffer, ByteCounterMonitor bufferMonitor) {
        if (!txStream.hasNext()) {
            buffer.writeByte(0);
            return;
        }
        String[] datasources = txStream.dataSourceNames();
        assert (datasources.length <= 255) : "too many data sources";
        buffer.writeByte(datasources.length);
        HashMap<String, Integer> datasourceId = new HashMap<String, Integer>();
        for (int i = 0; i < datasources.length; ++i) {
            String datasource = datasources[i];
            Protocol.writeString(buffer, datasource);
            datasourceId.put(datasource, i + 1);
        }
        for (Triplet tx : IteratorUtil.asIterable((Iterator)((Object)txStream))) {
            buffer.writeByte(((Integer)datasourceId.get(tx.first())).intValue());
            buffer.writeLong(((Long)tx.second()).longValue());
            BlockLogBuffer blockBuffer = new BlockLogBuffer(buffer, bufferMonitor);
            ((TxExtractor)tx.third()).extract(blockBuffer);
            blockBuffer.done();
        }
        buffer.writeByte(0);
    }

    protected RequestContext readContext(ChannelBuffer buffer) {
        long sessionId = buffer.readLong();
        int machineId = buffer.readInt();
        int eventIdentifier = buffer.readInt();
        int txsSize = buffer.readByte();
        RequestContext.Tx[] lastAppliedTransactions = new RequestContext.Tx[txsSize];
        RequestContext.Tx neoTx = null;
        for (int i = 0; i < txsSize; ++i) {
            RequestContext.Tx tx;
            String ds = Protocol.readString(buffer);
            lastAppliedTransactions[i] = tx = RequestContext.lastAppliedTx(ds, buffer.readLong());
            if (!ds.equals("nioneodb")) continue;
            neoTx = tx;
        }
        int masterId = buffer.readInt();
        long checksum = buffer.readLong();
        if (neoTx != null) {
            this.txVerifier.assertMatch(neoTx.getTxId(), masterId, checksum);
        }
        return new RequestContext(sessionId, machineId, eventIdentifier, lastAppliedTransactions, masterId, checksum);
    }

    protected abstract RequestType<T> getRequestContext(byte var1);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected ChannelBuffer mapSlave(Channel channel, RequestContext slave) {
        Map<Channel, Pair<RequestContext, AtomicLong>> map = this.connectedSlaveChannels;
        synchronized (map) {
            if (slave != null && slave.machineId() != RequestContext.EMPTY.machineId()) {
                Pair<RequestContext, AtomicLong> previous = this.connectedSlaveChannels.get(channel);
                if (previous != null) {
                    ((AtomicLong)previous.other()).set(System.currentTimeMillis());
                } else {
                    this.connectedSlaveChannels.put(channel, (Pair<RequestContext, AtomicLong>)Pair.of((Object)slave, (Object)new AtomicLong(System.currentTimeMillis())));
                }
            }
        }
        return ChannelBuffers.dynamicBuffer();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Pair<RequestContext, AtomicLong> unmapSlave(Channel channel) {
        Map<Channel, Pair<RequestContext, AtomicLong>> map = this.connectedSlaveChannels;
        synchronized (map) {
            return this.connectedSlaveChannels.remove(channel);
        }
    }

    protected T getRequestTarget() {
        return this.requestTarget;
    }

    protected abstract void finishOffChannel(Channel var1, RequestContext var2);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Map<Channel, RequestContext> getConnectedSlaveChannels() {
        HashMap<Channel, RequestContext> result = new HashMap<Channel, RequestContext>();
        Map<Channel, Pair<RequestContext, AtomicLong>> map = this.connectedSlaveChannels;
        synchronized (map) {
            for (Map.Entry<Channel, Pair<RequestContext, AtomicLong>> entry : this.connectedSlaveChannels.entrySet()) {
                result.put(entry.getKey(), (RequestContext)entry.getValue().first());
            }
        }
        return result;
    }

    private ChunkingChannelBuffer newChunkingBuffer(Channel channel) {
        return new ChunkingChannelBuffer(ChannelBuffers.dynamicBuffer(), channel, this.chunkSize, this.getInternalProtocolVersion(), this.applicationProtocolVersion);
    }

    private class PartialRequest {
        final RequestContext context;
        final ChannelBuffer buffer;
        final RequestType<T> type;

        public PartialRequest(RequestType<T> type, RequestContext context, ChannelBuffer buffer) {
            this.type = type;
            this.context = context;
            this.buffer = buffer;
        }

        public void add(ChannelBuffer buffer) {
            this.buffer.writeBytes(buffer);
        }
    }

    public static interface Configuration {
        public long getOldChannelThreshold();

        public int getMaxConcurrentTransactions();

        public int getChunkSize();

        public HostnamePort getServerAddress();
    }
}

