/*
 * Decompiled with CFR 0.152.
 */
package org.rzo.netty.ahessian.rpc.server;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.util.Timer;
import org.rzo.netty.ahessian.Constants;
import org.rzo.netty.ahessian.rpc.message.FlushRequestMessage;
import org.rzo.netty.ahessian.rpc.message.HessianRPCCallMessage;
import org.rzo.netty.ahessian.rpc.message.HessianRPCReplyMessage;
import org.rzo.netty.ahessian.rpc.server.HessianSkeleton;
import org.rzo.netty.ahessian.utils.MyReentrantLock;
import org.rzo.netty.ahessian.utils.TimedBlockingPriorityQueue;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
@ChannelPipelineCoverage(value="one")
public class HessianRPCServiceHandler
extends SimpleChannelUpstreamHandler
implements Constants {
    private Map<String, HessianSkeleton> _services = new HashMap<String, HessianSkeleton>();
    private TimedBlockingPriorityQueue<HessianRPCReplyMessage> _pendingReplies;
    private TimedBlockingPriorityQueue<HessianRPCCallMessage> _pendingCalls;
    private LinkedBlockingQueue<HessianRPCReplyMessage> _repliesRetry;
    private Executor _executor;
    private boolean _stop = false;
    final AtomicLong _openCounter = new AtomicLong(0L);
    final Lock _lock = new MyReentrantLock();
    final Condition _channelOpen = this._lock.newCondition();
    boolean _inverseServer = false;

    public HessianRPCServiceHandler(Executor executor) {
        this(executor, null, null, false);
    }

    public HessianRPCServiceHandler(Executor executor, boolean inverseServer) {
        this(executor, null, null, inverseServer);
    }

    public HessianRPCServiceHandler(Executor executor, Map<String, Object> options, Timer timer) {
        this(executor, options, timer, false);
    }

    public HessianRPCServiceHandler(Executor executor, Map<String, Object> options, Timer timer, boolean inverseServer) {
        this._inverseServer = inverseServer;
        this._executor = executor;
        this._pendingReplies = options == null || timer == null ? new TimedBlockingPriorityQueue("HessianRPCServiceHandler-PendingReplies") : new TimedBlockingPriorityQueue(options, null, "HessianRPCServiceHandler-PendingReplies");
        this._pendingCalls = options == null || timer == null ? new TimedBlockingPriorityQueue("HessianRPCServiceHandler-PendingCalls") : new TimedBlockingPriorityQueue(options, null, "HessianRPCServiceHandler-PendingCalls");
        this._repliesRetry = new LinkedBlockingQueue();
        this._executor.execute(new Runnable(){

            public void run() {
                Thread.currentThread().setName("HessianRPCServiceHandler-Call-Rx");
                HessianRPCCallMessage message = null;
                while (!HessianRPCServiceHandler.this._stop) {
                    try {
                        message = (HessianRPCCallMessage)HessianRPCServiceHandler.this._pendingCalls.take();
                        HessianSkeleton service = HessianRPCServiceHandler.this.getService(message);
                        service.messageReceived(message);
                    }
                    catch (Exception ex) {
                        Constants.ahessianLogger.warn("", (Throwable)ex);
                    }
                }
            }
        });
    }

    protected void sendMessage(HessianRPCReplyMessage message) {
        Channel ch = message.getChannel();
        if (ch != null) {
            ch.write((Object)message);
            if (this._inverseServer) {
                ch.write((Object)new FlushRequestMessage());
            }
        } else {
            ahessianLogger.warn("message channel null -> ignored: #" + message.getCallId());
        }
    }

    public void addService(String name, HessianSkeleton service) {
        this._services.put(name, service);
    }

    public void removeService(String name) {
        this._services.remove(name);
    }

    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        HessianRPCCallMessage message = (HessianRPCCallMessage)e.getMessage();
        Integer group = (Integer)message.getHeaders().get(Constants.GROUP_HEADER_KEY);
        this._pendingCalls.put(message, group);
    }

    private HessianSkeleton getService(HessianRPCCallMessage message) {
        String id = (String)message.getHeaders().get(SERVICE_ID_HEADER_KEY);
        if (id == null) {
            id = "default";
        }
        return this._services.get(id);
    }

    public void writeResult(HessianRPCReplyMessage message) {
        this.sendMessage(message);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        ahessianLogger.warn(ctx.getChannel() + " connected");
        if (this._openCounter.incrementAndGet() == 1L) {
            this._lock.lock();
            try {
                this._channelOpen.signal();
            }
            catch (Exception exception) {
            }
            finally {
                this._lock.unlock();
            }
        }
        super.channelOpen(ctx, e);
    }

    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        ahessianLogger.warn(ctx.getChannel() + " disconnected");
        this._openCounter.decrementAndGet();
        super.channelClosed(ctx, e);
    }

    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        ahessianLogger.warn(ctx.getChannel() + " exception " + e.getCause());
    }

    public void stop() {
        this._stop = true;
    }
}

