/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.yarn.integration;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.util.Assert;
import org.springframework.yarn.am.AppmasterService;
import org.springframework.yarn.am.GenericRpcMessage;
import org.springframework.yarn.am.RpcMessage;
import org.springframework.yarn.integration.support.IntegrationObjectSupport;
import org.springframework.yarn.integration.support.PortExposingTcpSocketSupport;

public abstract class IntegrationAppmasterService<T>
extends IntegrationObjectSupport
implements AppmasterService {
    private static final Log log = LogFactory.getLog(IntegrationAppmasterService.class);
    private PortExposingTcpSocketSupport socketSupport;
    private SubscribableChannel messageChannel;
    private final MessagingTemplate messagingTemplate = new MessagingTemplate();
    private EventDrivenConsumer consumer;

    protected void doStart() {
        this.consumer = new EventDrivenConsumer(this.messageChannel, (MessageHandler)new ReplyProducingHandler());
        this.consumer.start();
    }

    protected void doStop() {
        if (this.consumer != null) {
            this.consumer.stop();
        }
    }

    public int getPort() {
        return this.socketSupport != null ? this.socketSupport.getServerSocketPort() : -1;
    }

    public String getHost() {
        return this.socketSupport != null ? this.socketSupport.getServerSocketAddress() : null;
    }

    public boolean hasPort() {
        return true;
    }

    public abstract RpcMessage<T> handleMessageInternal(RpcMessage<T> var1);

    public void setMessageChannel(SubscribableChannel messageChannel) {
        Assert.notNull((Object)messageChannel, (String)"messageChannel must not be null");
        this.messageChannel = messageChannel;
    }

    public void setSocketSupport(PortExposingTcpSocketSupport socketSupport) {
        Assert.notNull((Object)socketSupport, (String)"socketSupport must not be null");
        this.socketSupport = socketSupport;
        if (log.isDebugEnabled()) {
            log.debug((Object)("Setting socket support: " + socketSupport));
        }
    }

    private void sendMessage(Message<?> message, Object channel) {
        if (channel instanceof MessageChannel) {
            this.messagingTemplate.send((Object)((MessageChannel)channel), message);
        } else if (channel instanceof String) {
            this.messagingTemplate.send((String)channel, message);
        } else {
            throw new MessageDeliveryException(message, "a non-null reply channel value of type MessageChannel or String is required");
        }
    }

    private class ReplyProducingHandler
    implements MessageHandler {
        private ReplyProducingHandler() {
        }

        public void handleMessage(Message<?> message) throws MessagingException {
            GenericRpcMessage incoming = new GenericRpcMessage(message.getPayload());
            RpcMessage outgoing = IntegrationAppmasterService.this.handleMessageInternal(incoming);
            Message reply = MessageBuilder.withPayload((Object)outgoing.getBody()).build();
            IntegrationAppmasterService.this.sendMessage(reply, message.getHeaders().getReplyChannel());
        }
    }
}

