/*
 * Decompiled with CFR 0.152.
 */
package io.joynr.messaging.routing;

import com.google.inject.Inject;
import com.google.inject.name.Named;
import io.joynr.exceptions.JoynrDelayMessageException;
import io.joynr.exceptions.JoynrIllegalStateException;
import io.joynr.exceptions.JoynrMessageNotSentException;
import io.joynr.exceptions.JoynrRuntimeException;
import io.joynr.exceptions.JoynrShutdownException;
import io.joynr.messaging.FailureAction;
import io.joynr.messaging.IMessagingMulticastSubscriber;
import io.joynr.messaging.IMessagingSkeleton;
import io.joynr.messaging.IMessagingStub;
import io.joynr.messaging.MessagingSkeletonFactory;
import io.joynr.messaging.SuccessAction;
import io.joynr.messaging.routing.AddressManager;
import io.joynr.messaging.routing.DelayableImmutableMessage;
import io.joynr.messaging.routing.MessageProcessedListener;
import io.joynr.messaging.routing.MessageRouter;
import io.joynr.messaging.routing.MessagingStubFactory;
import io.joynr.messaging.routing.MulticastReceiverRegistry;
import io.joynr.messaging.routing.RoutingTable;
import io.joynr.runtime.ShutdownListener;
import io.joynr.runtime.ShutdownNotifier;
import java.text.DateFormat;
import java.text.MessageFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.inject.Singleton;
import joynr.ImmutableMessage;
import joynr.system.RoutingTypes.Address;
import joynr.system.RoutingTypes.RoutingTypesUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractMessageRouter
implements MessageRouter,
ShutdownListener {
    private Logger logger = LoggerFactory.getLogger(AbstractMessageRouter.class);
    private final RoutingTable routingTable;
    private static final DateFormat DateFormatter = new SimpleDateFormat("dd/MM/YYYY HH:mm:ss:sss z");
    private ScheduledExecutorService scheduler;
    private long sendMsgRetryIntervalMs;
    private long routingTableGracePeriodMs;
    private long routingTableCleanupIntervalMs;
    @Inject(optional=true)
    @Named(value="joynr.messaging.routingmaxretrycount")
    private long maxRetryCount = -1L;
    @Inject(optional=true)
    @Named(value="joynr.messaging.maxDelayWithExponentialBackoffMs")
    private long maxDelayMs = -1L;
    private MessagingStubFactory messagingStubFactory;
    private final MessagingSkeletonFactory messagingSkeletonFactory;
    private AddressManager addressManager;
    protected final MulticastReceiverRegistry multicastReceiverRegistry;
    private DelayQueue<DelayableImmutableMessage> messageQueue;
    private List<MessageProcessedListener> messageProcessedListeners;
    private List<ScheduledFuture<?>> workerFutures;

    @Inject
    @Singleton
    public AbstractMessageRouter(RoutingTable routingTable, @Named(value="io.joynr.messaging.scheduledthreadpool") ScheduledExecutorService scheduler, @Named(value="joynr.messaging.sendmsgretryintervalms") long sendMsgRetryIntervalMs, @Named(value="joynr.messaging.maximumparallelsends") int maxParallelSends, @Named(value="joynr.messaging.routingtablegraceperiodms") long routingTableGracePeriodMs, @Named(value="joynr.messaging.routingtablecleanupintervalms") long routingTableCleanupIntervalMs, MessagingStubFactory messagingStubFactory, MessagingSkeletonFactory messagingSkeletonFactory, AddressManager addressManager, MulticastReceiverRegistry multicastReceiverRegistry, DelayQueue<DelayableImmutableMessage> messageQueue, ShutdownNotifier shutdownNotifier) {
        this.routingTable = routingTable;
        this.scheduler = scheduler;
        this.sendMsgRetryIntervalMs = sendMsgRetryIntervalMs;
        this.routingTableGracePeriodMs = routingTableGracePeriodMs;
        this.routingTableCleanupIntervalMs = routingTableCleanupIntervalMs;
        this.messagingStubFactory = messagingStubFactory;
        this.messagingSkeletonFactory = messagingSkeletonFactory;
        this.addressManager = addressManager;
        this.multicastReceiverRegistry = multicastReceiverRegistry;
        this.messageQueue = messageQueue;
        DateFormatter.setTimeZone(TimeZone.getTimeZone("UTC"));
        shutdownNotifier.registerForShutdown(this);
        this.messageProcessedListeners = new ArrayList<MessageProcessedListener>();
        this.startMessageWorkerThreads(maxParallelSends);
        this.startRoutingTableCleanupThread();
    }

    private void startMessageWorkerThreads(int numberOfWorkThreads) {
        this.workerFutures = new ArrayList(numberOfWorkThreads);
        for (int i = 0; i < numberOfWorkThreads; ++i) {
            ScheduledFuture<?> messageWorkerFuture = this.scheduler.schedule(new MessageWorker(i), 0L, TimeUnit.MILLISECONDS);
            if (messageWorkerFuture == null) {
                this.logger.warn("scheduling messageWorker-" + i + "returned a null future. Cancel at shutdown not possible");
                continue;
            }
            this.workerFutures.add(messageWorkerFuture);
        }
    }

    private void startRoutingTableCleanupThread() {
        this.scheduler.scheduleWithFixedDelay(new Runnable(){

            @Override
            public void run() {
                AbstractMessageRouter.this.routingTable.purge();
            }
        }, this.routingTableCleanupIntervalMs, this.routingTableCleanupIntervalMs, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void registerMessageProcessedListener(MessageProcessedListener messageProcessedListener) {
        List<MessageProcessedListener> list = this.messageProcessedListeners;
        synchronized (list) {
            this.messageProcessedListeners.add(messageProcessedListener);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void unregisterMessageProcessedListener(MessageProcessedListener messageProcessedListener) {
        List<MessageProcessedListener> list = this.messageProcessedListeners;
        synchronized (list) {
            this.messageProcessedListeners.remove(messageProcessedListener);
        }
    }

    @Override
    public void removeNextHop(String participantId) {
        this.routingTable.remove(participantId);
    }

    @Override
    public boolean resolveNextHop(String participantId) {
        return this.routingTable.containsKey(participantId);
    }

    @Override
    public void addMulticastReceiver(final String multicastId, String subscriberParticipantId, String providerParticipantId) {
        this.logger.trace("Adding multicast receiver {} for multicast {} on provider {}", new Object[]{subscriberParticipantId, multicastId, providerParticipantId});
        this.multicastReceiverRegistry.registerMulticastReceiver(multicastId, subscriberParticipantId);
        this.performSubscriptionOperation(multicastId, providerParticipantId, new SubscriptionOperation(){

            @Override
            public void perform(IMessagingMulticastSubscriber messagingMulticastSubscriber) {
                messagingMulticastSubscriber.registerMulticastSubscription(multicastId);
            }
        });
    }

    @Override
    public void removeMulticastReceiver(final String multicastId, String subscriberParticipantId, String providerParticipantId) {
        this.multicastReceiverRegistry.unregisterMulticastReceiver(multicastId, subscriberParticipantId);
        this.performSubscriptionOperation(multicastId, providerParticipantId, new SubscriptionOperation(){

            @Override
            public void perform(IMessagingMulticastSubscriber messagingMulticastSubscriber) {
                messagingMulticastSubscriber.unregisterMulticastSubscription(multicastId);
            }
        });
    }

    private void performSubscriptionOperation(String multicastId, String providerParticipantId, SubscriptionOperation operation) {
        Address providerAddress = this.routingTable.get(providerParticipantId);
        IMessagingSkeleton messagingSkeleton = this.messagingSkeletonFactory.getSkeleton(providerAddress);
        if (messagingSkeleton != null && messagingSkeleton instanceof IMessagingMulticastSubscriber) {
            operation.perform((IMessagingMulticastSubscriber)((Object)messagingSkeleton));
        } else {
            this.logger.trace("No messaging skeleton found for address {}, not performing multicast subscription.", (Object)providerAddress);
        }
    }

    @Override
    public void addNextHop(String participantId, Address address, boolean isGloballyVisible) {
        long expiryDateMs = Long.MAX_VALUE;
        boolean isSticky = false;
        this.routingTable.put(participantId, address, isGloballyVisible, Long.MAX_VALUE, false);
    }

    @Override
    public void route(ImmutableMessage message) {
        this.checkExpiry(message);
        this.registerGlobalRoutingEntryIfRequired(message);
        this.routeInternal(message, 0L, 0);
    }

    protected void schedule(Runnable runnable, String messageId, long delay, TimeUnit timeUnit) {
        if (this.scheduler.isShutdown()) {
            JoynrShutdownException joynrShutdownEx = new JoynrShutdownException("MessageScheduler is shutting down already. Unable to send message [messageId: " + messageId + "].");
            throw joynrShutdownEx;
        }
        this.scheduler.schedule(runnable, delay, timeUnit);
    }

    protected Set<Address> getAddresses(ImmutableMessage message) {
        return this.addressManager.getAddresses(message);
    }

    private void registerGlobalRoutingEntryIfRequired(ImmutableMessage message) {
        if (!message.isReceivedFromGlobal()) {
            return;
        }
        String messageType = message.getType();
        if (!(messageType.equals("rq") || messageType.equals("arq") || messageType.equals("brq") || messageType.equals("mrq"))) {
            return;
        }
        String replyTo = message.getReplyTo();
        if (replyTo != null && !replyTo.isEmpty()) {
            long expiryDateMs;
            Address address = RoutingTypesUtil.fromAddressString((String)replyTo);
            boolean isGloballyVisible = true;
            try {
                expiryDateMs = Math.addExact(message.getTtlMs(), this.routingTableGracePeriodMs);
            }
            catch (ArithmeticException e) {
                expiryDateMs = Long.MAX_VALUE;
            }
            boolean isSticky = false;
            this.routingTable.put(message.getSender(), address, true, expiryDateMs, false);
        }
    }

    private void routeInternal(ImmutableMessage message, long delayMs, int retriesCount) {
        this.logger.trace("Scheduling {} with delay {} and retries {}", new Object[]{message, delayMs, retriesCount});
        DelayableImmutableMessage delayableMessage = new DelayableImmutableMessage(message, delayMs, retriesCount);
        if (this.maxRetryCount > -1L) {
            if ((long)retriesCount > this.maxRetryCount) {
                this.logger.error("Max-retry-count (" + this.maxRetryCount + ") reached. Dropping message " + message);
                this.callMessageProcessedListeners(message.getId());
                return;
            }
            if (retriesCount > 0) {
                this.logger.debug("Retry {}/{} sending message {}", new Object[]{retriesCount, this.maxRetryCount, message});
            }
        }
        this.messageQueue.put(delayableMessage);
    }

    private void checkExpiry(ImmutableMessage message) {
        if (!message.isTtlAbsolute()) {
            this.callMessageProcessedListeners(message.getId());
            throw new JoynrRuntimeException("Relative ttl not supported");
        }
        long currentTimeMillis = System.currentTimeMillis();
        long ttlExpirationDateMs = message.getTtlMs();
        if (ttlExpirationDateMs <= currentTimeMillis) {
            String errorMessage = MessageFormat.format("ttl must be greater than 0 / ttl timestamp must be in the future: now: {0} abs_ttl: {1} msg_id: {2}", currentTimeMillis, ttlExpirationDateMs, message.getId());
            this.logger.error(errorMessage);
            this.callMessageProcessedListeners(message.getId());
            throw new JoynrMessageNotSentException(errorMessage);
        }
    }

    private FailureAction createFailureAction(final ImmutableMessage message, final int retriesCount) {
        FailureAction failureAction = new FailureAction(){
            final String messageId;
            private boolean failureActionExecutedOnce;
            {
                this.messageId = message.getId();
                this.failureActionExecutedOnce = false;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void execute(Throwable error) {
                4 var2_2 = this;
                synchronized (var2_2) {
                    if (this.failureActionExecutedOnce) {
                        AbstractMessageRouter.this.logger.trace("Failure action for message with id {} already executed once. Ignoring further call.", (Object)this.messageId);
                        return;
                    }
                    this.failureActionExecutedOnce = true;
                }
                if (error instanceof JoynrShutdownException) {
                    AbstractMessageRouter.this.logger.warn("{}", (Object)error.getMessage());
                    return;
                }
                if (error instanceof JoynrMessageNotSentException) {
                    AbstractMessageRouter.this.logger.error(" ERROR SENDING:  aborting send of messageId: {}. Error: {}", new Object[]{this.messageId, error.getMessage()});
                    AbstractMessageRouter.this.callMessageProcessedListeners(this.messageId);
                    return;
                }
                AbstractMessageRouter.this.logger.warn("PROBLEM SENDING, will retry. messageId: {}. Error: {} Message: {}", new Object[]{this.messageId, error.getClass().getName(), error.getMessage()});
                long delayMs = error instanceof JoynrDelayMessageException ? ((JoynrDelayMessageException)error).getDelayMs() : AbstractMessageRouter.this.createDelayWithExponentialBackoff(AbstractMessageRouter.this.sendMsgRetryIntervalMs, retriesCount);
                AbstractMessageRouter.this.logger.error("Rescheduling messageId: {} with delay " + delayMs + " ms, TTL is: {} ms", (Object)this.messageId, (Object)DateFormatter.format(message.getTtlMs()));
                try {
                    AbstractMessageRouter.this.routeInternal(message, delayMs, retriesCount + 1);
                }
                catch (Exception e) {
                    AbstractMessageRouter.this.logger.warn("Rescheduling of message failed (messageId {})", (Object)this.messageId);
                    AbstractMessageRouter.this.callMessageProcessedListeners(this.messageId);
                }
            }
        };
        return failureAction;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void callMessageProcessedListeners(String messageId) {
        List<MessageProcessedListener> list = this.messageProcessedListeners;
        synchronized (list) {
            for (MessageProcessedListener messageProcessedListener : this.messageProcessedListeners) {
                messageProcessedListener.messageProcessed(messageId);
            }
        }
    }

    private SuccessAction createMessageProcessedAction(final String messageId, final int numberOfCalls) {
        SuccessAction successAction = new SuccessAction(){
            private int callCount;
            {
                this.callCount = numberOfCalls;
            }

            @Override
            public void execute() {
                --this.callCount;
                if (this.callCount == 0) {
                    AbstractMessageRouter.this.callMessageProcessedListeners(messageId);
                }
            }
        };
        return successAction;
    }

    @Override
    public void shutdown() {
        for (ScheduledFuture<?> workerFuture : this.workerFutures) {
            workerFuture.cancel(true);
        }
    }

    private long createDelayWithExponentialBackoff(long sendMsgRetryIntervalMs, int retries) {
        this.logger.trace("TRIES: " + retries);
        long millis = sendMsgRetryIntervalMs + (long)((double)((long)(2 ^ retries) * sendMsgRetryIntervalMs) * Math.random());
        if (this.maxDelayMs >= sendMsgRetryIntervalMs && millis > this.maxDelayMs) {
            millis = this.maxDelayMs;
            this.logger.trace("set MILLIS to " + millis + " since maxDelayMs is " + this.maxDelayMs);
        }
        this.logger.trace("MILLIS: " + millis);
        return millis;
    }

    private void checkFoundAddresses(Set<Address> foundAddresses, ImmutableMessage message) {
        if (foundAddresses.isEmpty()) {
            if ("m".equals(message.getType())) {
                throw new JoynrMessageNotSentException("Failed to send Request: No address for given message: " + message);
            }
            if (message.isReply()) {
                throw new JoynrMessageNotSentException("Failed to send Reply: No address found for given message: " + message);
            }
            throw new JoynrIllegalStateException("Unable to find address for recipient with participant ID " + message.getRecipient());
        }
    }

    class MessageWorker
    implements Runnable {
        private int number;

        public MessageWorker(int number) {
            this.number = number;
        }

        @Override
        public void run() {
            while (!Thread.interrupted()) {
                Thread.currentThread().setName("joynrMessageWorker-" + this.number);
                ImmutableMessage message = null;
                int retriesCount = 0;
                try {
                    DelayableImmutableMessage delayableMessage = (DelayableImmutableMessage)AbstractMessageRouter.this.messageQueue.take();
                    retriesCount = delayableMessage.getRetriesCount();
                    message = delayableMessage.getMessage();
                    AbstractMessageRouter.this.logger.trace("Starting processing of message {}", (Object)message);
                    AbstractMessageRouter.this.checkExpiry(message);
                    Set<Address> addresses = AbstractMessageRouter.this.getAddresses(message);
                    AbstractMessageRouter.this.checkFoundAddresses(addresses, message);
                    if (addresses.isEmpty()) {
                        throw new JoynrMessageNotSentException("Failed to send Message: No route for given participantId: " + message.getRecipient());
                    }
                    SuccessAction messageProcessedAction = AbstractMessageRouter.this.createMessageProcessedAction(message.getId(), addresses.size());
                    FailureAction failureAction = AbstractMessageRouter.this.createFailureAction(message, retriesCount);
                    for (Address address : addresses) {
                        AbstractMessageRouter.this.logger.trace(">>>>> SEND  {} to address {}", (Object)message, (Object)address);
                        IMessagingStub messagingStub = AbstractMessageRouter.this.messagingStubFactory.create(address);
                        messagingStub.transmit(message, messageProcessedAction, failureAction);
                    }
                }
                catch (InterruptedException e) {
                    AbstractMessageRouter.this.logger.trace("Message Worker interrupted. Stopping.");
                    Thread.currentThread().interrupt();
                    return;
                }
                catch (Exception error) {
                    AbstractMessageRouter.this.logger.error("error in scheduled message router thread: {}", (Object)error.getMessage());
                    FailureAction failureAction = AbstractMessageRouter.this.createFailureAction(message, retriesCount);
                    failureAction.execute(error);
                }
            }
        }
    }

    private static interface SubscriptionOperation {
        public void perform(IMessagingMulticastSubscriber var1);
    }
}

