/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.integration.vertx;

import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.ConnectorService;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.integration.vertx.ActiveMQVertxLogger;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.vertx.java.core.Handler;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.eventbus.EventBus;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.eventbus.ReplyException;
import org.vertx.java.core.eventbus.impl.PingMessage;
import org.vertx.java.core.json.JsonArray;
import org.vertx.java.core.json.JsonObject;
import org.vertx.java.platform.PlatformLocator;
import org.vertx.java.platform.PlatformManager;
import org.vertx.java.spi.cluster.impl.hazelcast.HazelcastClusterManagerFactory;

public class IncomingVertxEventHandler
implements ConnectorService {
    private final String connectorName;
    private final String queueName;
    private final int port;
    private final String host;
    private final int quorumSize;
    private final String haGroup;
    private final String vertxAddress;
    private EventBus eventBus;
    private PlatformManager platformManager;
    private EventHandler handler;
    private final StorageManager storageManager;
    private final PostOffice postOffice;
    private boolean isStarted = false;

    public IncomingVertxEventHandler(String connectorName, Map<String, Object> configuration, StorageManager storageManager, PostOffice postOffice, ScheduledExecutorService scheduledThreadPool) {
        this.connectorName = connectorName;
        this.queueName = ConfigurationHelper.getStringProperty((String)"queue", null, configuration);
        this.port = ConfigurationHelper.getIntProperty((String)"port", (int)0, configuration);
        this.host = ConfigurationHelper.getStringProperty((String)"host", (String)"localhost", configuration);
        this.quorumSize = ConfigurationHelper.getIntProperty((String)"quorum-size", (int)-1, configuration);
        this.haGroup = ConfigurationHelper.getStringProperty((String)"ha-group", (String)"activemq", configuration);
        this.vertxAddress = ConfigurationHelper.getStringProperty((String)"vertx-address", (String)"org.apache.activemq", configuration);
        this.storageManager = storageManager;
        this.postOffice = postOffice;
    }

    public void start() throws Exception {
        if (this.isStarted) {
            return;
        }
        System.setProperty("vertx.clusterManagerFactory", HazelcastClusterManagerFactory.class.getName());
        this.platformManager = this.quorumSize != -1 ? PlatformLocator.factory.createPlatformManager(this.port, this.host, this.quorumSize, this.haGroup) : PlatformLocator.factory.createPlatformManager(this.port, this.host);
        this.eventBus = this.platformManager.vertx().eventBus();
        Binding b = this.postOffice.getBinding(new SimpleString(this.queueName));
        if (b == null) {
            throw new Exception(this.connectorName + ": queue " + this.queueName + " not found");
        }
        this.handler = new EventHandler();
        this.eventBus.registerHandler(this.vertxAddress, (Handler)this.handler);
        this.isStarted = true;
        ActiveMQVertxLogger.LOGGER.debug(this.connectorName + ": started");
    }

    public void stop() throws Exception {
        if (!this.isStarted) {
            return;
        }
        this.eventBus.unregisterHandler(this.vertxAddress, (Handler)this.handler);
        this.platformManager.stop();
        System.clearProperty("vertx.clusterManagerFactory");
        this.isStarted = false;
        ActiveMQVertxLogger.LOGGER.debug(this.connectorName + ": stopped");
    }

    public boolean isStarted() {
        return this.isStarted;
    }

    public String getName() {
        return this.connectorName;
    }

    public String toString() {
        return "[IncomingVertxEventHandler(" + this.connectorName + "), queueName: " + this.queueName + " host: " + this.host + " port: " + this.port + " vertxAddress: " + this.vertxAddress + "]";
    }

    private class EventHandler
    implements Handler<Message<?>> {
        private EventHandler() {
        }

        public void handle(Message<?> message) {
            ServerMessageImpl msg = new ServerMessageImpl(IncomingVertxEventHandler.this.storageManager.generateID(), 50);
            msg.setAddress(new SimpleString(IncomingVertxEventHandler.this.queueName));
            msg.setDurable(true);
            msg.encodeMessageIDToBuffer();
            String replyAddress = message.replyAddress();
            if (replyAddress != null) {
                msg.putStringProperty("VertxMessageReplyAddress", replyAddress);
            }
            int type = this.getMessageType(message);
            msg.putIntProperty("VertxMessageType", type);
            this.manualEncodeVertxMessageBody(msg.getBodyBuffer(), message.body(), type);
            try {
                IncomingVertxEventHandler.this.postOffice.route((ServerMessage)msg, null, false);
            }
            catch (Exception e) {
                ActiveMQVertxLogger.LOGGER.error("failed to route msg " + msg, e);
            }
        }

        private void manualEncodeVertxMessageBody(ActiveMQBuffer bodyBuffer, Object body, int type) {
            switch (type) {
                case 2: {
                    bodyBuffer.writeBoolean(((Boolean)body).booleanValue());
                    break;
                }
                case 1: {
                    Buffer buff = (Buffer)body;
                    int len = buff.length();
                    bodyBuffer.writeInt(len);
                    bodyBuffer.writeBytes(((Buffer)body).getBytes());
                    break;
                }
                case 3: {
                    byte[] bytes = (byte[])body;
                    bodyBuffer.writeInt(bytes.length);
                    bodyBuffer.writeBytes(bytes);
                    break;
                }
                case 4: {
                    bodyBuffer.writeByte(((Byte)body).byteValue());
                    break;
                }
                case 5: {
                    bodyBuffer.writeChar(((Character)body).charValue());
                    break;
                }
                case 6: {
                    bodyBuffer.writeDouble(((Double)body).doubleValue());
                    break;
                }
                case 7: {
                    bodyBuffer.writeFloat(((Float)body).floatValue());
                    break;
                }
                case 8: {
                    bodyBuffer.writeInt(((Integer)body).intValue());
                    break;
                }
                case 9: {
                    bodyBuffer.writeLong(((Long)body).longValue());
                    break;
                }
                case 10: {
                    bodyBuffer.writeShort(((Short)body).shortValue());
                    break;
                }
                case 0: 
                case 11: {
                    bodyBuffer.writeString((String)body);
                    break;
                }
                case 12: {
                    bodyBuffer.writeString(((JsonObject)body).encode());
                    break;
                }
                case 13: {
                    bodyBuffer.writeString(((JsonArray)body).encode());
                    break;
                }
                case 100: {
                    ReplyException except = (ReplyException)body;
                    bodyBuffer.writeInt(except.failureType().toInt());
                    bodyBuffer.writeInt(except.failureCode());
                    bodyBuffer.writeString(except.getMessage());
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Invalid body type: " + type);
                }
            }
        }

        private int getMessageType(Message<?> message) {
            Object body = message.body();
            if (message instanceof PingMessage) {
                return 0;
            }
            if (body instanceof Buffer) {
                return 1;
            }
            if (body instanceof Boolean) {
                return 2;
            }
            if (body instanceof byte[]) {
                return 3;
            }
            if (body instanceof Byte) {
                return 4;
            }
            if (body instanceof Character) {
                return 5;
            }
            if (body instanceof Double) {
                return 6;
            }
            if (body instanceof Float) {
                return 7;
            }
            if (body instanceof Integer) {
                return 8;
            }
            if (body instanceof Long) {
                return 9;
            }
            if (body instanceof Short) {
                return 10;
            }
            if (body instanceof String) {
                return 11;
            }
            if (body instanceof JsonArray) {
                return 13;
            }
            if (body instanceof JsonObject) {
                return 12;
            }
            if (body instanceof ReplyException) {
                return 100;
            }
            throw new IllegalArgumentException("Type not supported: " + message);
        }
    }
}

