/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.avro;

import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.avro.Protocol;
import org.apache.avro.Schema;
import org.apache.avro.ipc.HttpServer;
import org.apache.avro.ipc.NettyServer;
import org.apache.avro.ipc.Responder;
import org.apache.avro.ipc.Server;
import org.apache.avro.specific.SpecificData;
import org.apache.camel.Exchange;
import org.apache.camel.component.avro.AvroComponentException;
import org.apache.camel.component.avro.AvroConfiguration;
import org.apache.camel.component.avro.AvroConsumer;
import org.apache.camel.component.avro.AvroEndpoint;
import org.apache.camel.component.avro.AvroReflectResponder;
import org.apache.camel.component.avro.AvroSpecificResponder;
import org.apache.camel.util.ExchangeHelper;
import org.apache.commons.lang.StringUtils;
import org.mortbay.log.Log;

public class AvroListener {
    private ConcurrentMap<String, AvroConsumer> consumerRegistry = new ConcurrentHashMap<String, AvroConsumer>();
    private AvroConsumer defaultConsumer;
    private final Server server;

    public AvroListener(AvroEndpoint endpoint) throws Exception {
        this.server = this.initAndStartServer(endpoint.getConfiguration());
    }

    private Server initAndStartServer(AvroConfiguration configuration) throws Exception {
        HttpServer server;
        Object responder = configuration.isReflectionProtocol() ? new AvroReflectResponder(configuration.getProtocol(), this) : new AvroSpecificResponder(configuration.getProtocol(), this);
        if ("http".equalsIgnoreCase(configuration.getTransport())) {
            server = new HttpServer((Responder)responder, configuration.getPort());
        } else if ("netty".equalsIgnoreCase(configuration.getTransport())) {
            server = new NettyServer((Responder)responder, new InetSocketAddress(configuration.getHost(), configuration.getPort()));
        } else {
            throw new IllegalArgumentException("Unknown transport " + configuration.getTransport());
        }
        server.start();
        return server;
    }

    public void register(String messageName, AvroConsumer consumer) throws AvroComponentException {
        if (messageName == null) {
            if (this.defaultConsumer != null) {
                throw new AvroComponentException("Default consumer already registered for uri: " + consumer.getEndpoint().getEndpointUri());
            }
            this.defaultConsumer = consumer;
        } else if (this.consumerRegistry.putIfAbsent(messageName, consumer) != null) {
            throw new AvroComponentException("Consumer already registered for message: " + messageName + " and uri: " + consumer.getEndpoint().getEndpointUri());
        }
    }

    public boolean unregister(String messageName) {
        if (!StringUtils.isEmpty((String)messageName)) {
            if (this.consumerRegistry.remove(messageName) == null) {
                Log.warn((String)("Consumer with message name " + messageName + " was already unregistered."));
            }
        } else {
            this.defaultConsumer = null;
        }
        if (this.defaultConsumer == null && this.consumerRegistry.isEmpty()) {
            if (this.server != null) {
                this.server.close();
            }
            return true;
        }
        return false;
    }

    public Object respond(Protocol.Message message, Object request, SpecificData data) throws Exception {
        AvroConsumer consumer = this.defaultConsumer;
        if (this.consumerRegistry.containsKey(message.getName())) {
            consumer = (AvroConsumer)((Object)this.consumerRegistry.get(message.getName()));
        }
        if (consumer == null) {
            throw new AvroComponentException("No consumer defined for message: " + message.getName());
        }
        Object params = AvroListener.extractParams(message, request, consumer.getEndpoint().getConfiguration().isSingleParameter(), data);
        return AvroListener.processExchange(consumer, message, params);
    }

    private static Object extractParams(Protocol.Message message, Object request, boolean singleParameter, SpecificData dataResolver) {
        if (singleParameter) {
            Schema.Field field = (Schema.Field)message.getRequest().getFields().get(0);
            return dataResolver.getField(request, field.name(), field.pos());
        }
        int i = 0;
        Object[] params = new Object[message.getRequest().getFields().size()];
        for (Schema.Field param : message.getRequest().getFields()) {
            params[i] = dataResolver.getField(request, param.name(), param.pos());
            ++i;
        }
        return params;
    }

    private static Object processExchange(AvroConsumer consumer, Protocol.Message message, Object params) throws Exception {
        Exchange exchange = consumer.getEndpoint().createExchange(message, params);
        try {
            consumer.getProcessor().process(exchange);
        }
        catch (Throwable e) {
            consumer.getExceptionHandler().handleException(e);
        }
        Object response = ExchangeHelper.isOutCapable((Exchange)exchange) ? exchange.getOut().getBody() : null;
        boolean failed = exchange.isFailed();
        if (failed) {
            if (exchange.getException() != null) {
                throw exchange.getException();
            }
            throw new AvroComponentException("Camel processing error.");
        }
        return response;
    }
}

