/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.client.hotrod.event;

import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryExpired;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved;
import org.infinispan.client.hotrod.annotation.ClientCacheFailover;
import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryCustomEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryExpiredEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
import org.infinispan.client.hotrod.event.ClientCacheFailoverEvent;
import org.infinispan.client.hotrod.event.ClientEvent;
import org.infinispan.client.hotrod.event.ClientEvents;
import org.infinispan.client.hotrod.event.SecurityActions;
import org.infinispan.client.hotrod.exceptions.TransportException;
import org.infinispan.client.hotrod.impl.operations.AddClientListenerOperation;
import org.infinispan.client.hotrod.impl.protocol.Codec;
import org.infinispan.client.hotrod.impl.transport.Transport;
import org.infinispan.client.hotrod.impl.transport.TransportFactory;
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;
import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.commons.marshall.WrappedByteArray;
import org.infinispan.commons.util.Util;

public class ClientListenerNotifier {
    private static final Log log = LogFactory.getLog(ClientListenerNotifier.class, Log.class);
    private static final boolean trace = log.isTraceEnabled();
    private static final Map<Class<? extends Annotation>, Class<?>[]> allowedListeners = new HashMap<Class<? extends Annotation>, Class<?>[]>(4);
    private final ConcurrentMap<WrappedByteArray, EventDispatcher> clientListeners = new ConcurrentHashMap<WrappedByteArray, EventDispatcher>();
    private final ExecutorService executor;
    private final Codec codec;
    private final Marshaller marshaller;
    private final TransportFactory transportFactory;
    private final Consumer<WrappedByteArray> failoverClientListener = this::failoverClientListener;

    protected ClientListenerNotifier(ExecutorService executor, Codec codec, Marshaller marshaller, TransportFactory transportFactory) {
        this.executor = executor;
        this.codec = codec;
        this.marshaller = marshaller;
        this.transportFactory = transportFactory;
    }

    public static ClientListenerNotifier create(Codec codec, Marshaller marshaller, TransportFactory transportFactory) {
        ExecutorService executor = Executors.newCachedThreadPool(ClientListenerNotifier.getRestoreThreadNameThreadFactory());
        return new ClientListenerNotifier(executor, codec, marshaller, transportFactory);
    }

    private static ThreadFactory getRestoreThreadNameThreadFactory() {
        return r -> new Thread(() -> {
            String originalName = Thread.currentThread().getName();
            try {
                r.run();
            }
            finally {
                Thread.currentThread().setName(originalName);
            }
        });
    }

    public Marshaller getMarshaller() {
        return this.marshaller;
    }

    public void addClientListener(AddClientListenerOperation op) {
        Map<Class<? extends Annotation>, List<ClientListenerInvocation>> invocables = this.findMethods(op.listener);
        EventDispatcher eventDispatcher = new EventDispatcher(op, invocables, op.getCacheName());
        this.clientListeners.put(new WrappedByteArray(op.listenerId), eventDispatcher);
        if (trace) {
            log.tracef("Add client listener with id %s, for listener %s and invocable methods %s", (Object)Util.printArray(op.listenerId), op.listener, (Object)invocables);
        }
    }

    public void failoverClientListeners(Set<SocketAddress> failedServers) {
        ArrayList failoverListenerIds = new ArrayList();
        for (Map.Entry entry : this.clientListeners.entrySet()) {
            EventDispatcher dispatcher = (EventDispatcher)entry.getValue();
            if (!failedServers.contains(dispatcher.transport.getRemoteSocketAddress())) continue;
            failoverListenerIds.add(entry.getKey());
        }
        if (trace && failoverListenerIds.isEmpty()) {
            log.tracef("No event listeners registered in faild servers: %s", (Object)failedServers);
        }
        failoverListenerIds.forEach(this.failoverClientListener);
    }

    public void failoverClientListener(byte[] listenerId) {
        this.failoverClientListener(new WrappedByteArray(listenerId));
    }

    private void failoverClientListener(WrappedByteArray listenerId) {
        EventDispatcher dispatcher = (EventDispatcher)this.clientListeners.get(listenerId);
        this.removeClientListener(listenerId);
        this.invokeFailoverEvent(dispatcher);
        dispatcher.op.execute();
        if (trace) {
            SocketAddress failedServerAddress = dispatcher.transport.getRemoteSocketAddress();
            log.tracef("Fallback listener id %s from a failed server %s to %s", (Object)Util.printArray(listenerId.getBytes()), (Object)failedServerAddress, (Object)dispatcher.op.getDedicatedTransport().getRemoteSocketAddress());
        }
    }

    private void invokeFailoverEvent(EventDispatcher dispatcher) {
        List<ClientListenerInvocation> callbacks = dispatcher.invocables.get(ClientCacheFailover.class);
        if (callbacks != null) {
            for (ClientListenerInvocation callback : callbacks) {
                callback.invoke(ClientEvents.mkCachefailoverEvent());
            }
        }
    }

    public void startClientListener(byte[] listenerId) {
        EventDispatcher eventDispatcher = (EventDispatcher)this.clientListeners.get(new WrappedByteArray(listenerId));
        this.executor.submit(eventDispatcher);
    }

    public void removeClientListener(byte[] listenerId) {
        this.removeClientListener(new WrappedByteArray(listenerId));
    }

    private void removeClientListener(WrappedByteArray listenerId) {
        EventDispatcher dispatcher = (EventDispatcher)this.clientListeners.remove(listenerId);
        dispatcher.transport.release();
        if (trace) {
            log.tracef("Remove client listener with id %s", (Object)Util.printArray(listenerId.getBytes()));
        }
    }

    public byte[] findListenerId(Object listener) {
        for (EventDispatcher dispatcher : this.clientListeners.values()) {
            if (!dispatcher.op.listener.equals(listener)) continue;
            return dispatcher.op.listenerId;
        }
        return null;
    }

    public boolean isListenerConnected(byte[] listenerId) {
        EventDispatcher dispatcher = (EventDispatcher)this.clientListeners.get(new WrappedByteArray(listenerId));
        return dispatcher != null && !dispatcher.stopped;
    }

    public Transport findTransport(byte[] listenerId) {
        EventDispatcher dispatcher = (EventDispatcher)this.clientListeners.get(new WrappedByteArray(listenerId));
        if (dispatcher != null) {
            return dispatcher.transport;
        }
        return null;
    }

    public Map<Class<? extends Annotation>, List<ClientListenerInvocation>> findMethods(Object listener) {
        HashMap<Class<? extends Annotation>, List<ClientListenerInvocation>> listenerMethodMap = new HashMap<Class<? extends Annotation>, List<ClientListenerInvocation>>(4, 0.99f);
        for (Method m : listener.getClass().getMethods()) {
            for (Map.Entry<Class<? extends Annotation>, Class<?>[]> entry : allowedListeners.entrySet()) {
                Class<? extends Annotation> annotationType = entry.getKey();
                Class<?>[] eventTypes = entry.getValue();
                if (!m.isAnnotationPresent(annotationType)) continue;
                this.testListenerMethodValidity(m, eventTypes, annotationType.getName());
                SecurityActions.setAccessible(m);
                ClientListenerInvocation invocation = new ClientListenerInvocation(listener, m);
                ArrayList<ClientListenerInvocation> invocables = (ArrayList<ClientListenerInvocation>)listenerMethodMap.get(annotationType);
                if (invocables == null) {
                    invocables = new ArrayList<ClientListenerInvocation>();
                    listenerMethodMap.put(annotationType, invocables);
                }
                invocables.add(invocation);
            }
        }
        return listenerMethodMap;
    }

    private void testListenerMethodValidity(Method m, Class<?>[] allowedParameters, String annotationName) {
        boolean isAllowed = false;
        for (Class<?> allowedParameter : allowedParameters) {
            if (m.getParameterTypes().length != 1 || !m.getParameterTypes()[0].isAssignableFrom(allowedParameter)) continue;
            isAllowed = true;
            break;
        }
        if (!isAllowed) {
            throw log.incorrectClientListener(annotationName, Arrays.asList(allowedParameters));
        }
        if (!m.getReturnType().equals(Void.TYPE)) {
            throw log.incorrectClientListener(annotationName);
        }
    }

    public Set<Object> getListeners(byte[] cacheName) {
        HashSet<Object> ret = new HashSet<Object>(this.clientListeners.size());
        for (EventDispatcher dispatcher : this.clientListeners.values()) {
            if (!Arrays.equals(dispatcher.op.cacheName, cacheName)) continue;
            ret.add(dispatcher.op.listener);
        }
        return ret;
    }

    public void stop() {
        for (WrappedByteArray listenerId : this.clientListeners.keySet()) {
            if (trace) {
                log.tracef("Remote cache manager stopping, remove client listener id %s", (Object)Util.printArray(listenerId.getBytes()));
            }
            this.removeClientListener(listenerId);
        }
        this.executor.shutdown();
        try {
            this.executor.awaitTermination(5L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void invokeEvent(byte[] listenerId, ClientEvent clientEvent) {
        EventDispatcher eventDispatcher = (EventDispatcher)this.clientListeners.get(new WrappedByteArray(listenerId));
        eventDispatcher.invokeClientEvent(clientEvent);
    }

    static {
        allowedListeners.put(ClientCacheEntryCreated.class, new Class[]{ClientCacheEntryCreatedEvent.class, ClientCacheEntryCustomEvent.class});
        allowedListeners.put(ClientCacheEntryModified.class, new Class[]{ClientCacheEntryModifiedEvent.class, ClientCacheEntryCustomEvent.class});
        allowedListeners.put(ClientCacheEntryRemoved.class, new Class[]{ClientCacheEntryRemovedEvent.class, ClientCacheEntryCustomEvent.class});
        allowedListeners.put(ClientCacheEntryExpired.class, new Class[]{ClientCacheEntryExpiredEvent.class, ClientCacheEntryCustomEvent.class});
        allowedListeners.put(ClientCacheFailover.class, new Class[]{ClientCacheFailoverEvent.class});
    }

    private static final class ClientListenerInvocation {
        private static final Log log = LogFactory.getLog(ClientListenerInvocation.class, Log.class);
        final Object listener;
        final Method method;

        private ClientListenerInvocation(Object listener, Method method) {
            this.listener = listener;
            this.method = method;
        }

        public void invoke(ClientEvent event) {
            try {
                this.method.invoke(this.listener, event);
            }
            catch (Exception e) {
                throw log.exceptionInvokingListener(e.getClass().getName(), this.method, this.listener, e);
            }
        }
    }

    private final class EventDispatcher
    implements Runnable {
        final Map<Class<? extends Annotation>, List<ClientListenerInvocation>> invocables;
        final AddClientListenerOperation op;
        final Transport transport;
        final String cacheName;
        volatile boolean stopped = false;

        private EventDispatcher(AddClientListenerOperation op, Map<Class<? extends Annotation>, List<ClientListenerInvocation>> invocables, String cacheName) {
            this.op = op;
            this.transport = op.getDedicatedTransport();
            this.invocables = invocables;
            this.cacheName = cacheName;
        }

        @Override
        public void run() {
            Thread.currentThread().setName(this.getThreadName());
            while (!Thread.currentThread().isInterrupted()) {
                ClientEvent clientEvent = null;
                try {
                    clientEvent = ClientListenerNotifier.this.codec.readEvent(this.transport, this.op.listenerId, ClientListenerNotifier.this.marshaller);
                    this.invokeClientEvent(clientEvent);
                    clientEvent = null;
                }
                catch (TransportException e) {
                    Throwable cause = e.getCause();
                    if (cause instanceof ClosedChannelException || cause instanceof SocketException && !this.transport.isValid()) {
                        log.debug("Channel closed, exiting event reader thread");
                        this.stopped = true;
                        return;
                    }
                    if (cause instanceof SocketTimeoutException) {
                        log.debug("Timed out reading event, retry");
                        continue;
                    }
                    if (clientEvent != null) {
                        log.unexpectedErrorConsumingEvent(clientEvent, e);
                        continue;
                    }
                    if (cause instanceof IOException && cause.getMessage().contains("Connection reset by peer")) {
                        this.tryFailoverClientListener();
                        this.stopped = true;
                        return;
                    }
                    log.unrecoverableErrorReadingEvent(e, this.transport.getRemoteSocketAddress());
                    this.stopped = true;
                    return;
                }
                catch (CancelledKeyException e) {
                    log.debug("Key cancelled, most likely channel closed, exiting event reader thread");
                    this.stopped = true;
                    return;
                }
                catch (Throwable t) {
                    if (clientEvent != null) {
                        log.unexpectedErrorConsumingEvent(clientEvent, t);
                    } else {
                        log.unableToReadEventFromServer(t, this.transport.getRemoteSocketAddress());
                    }
                    if (this.transport.isValid()) continue;
                    this.stopped = true;
                    return;
                }
            }
        }

        private void tryFailoverClientListener() {
            try {
                log.debug("Connection reset by peer, so failover client listener");
                ClientListenerNotifier.this.failoverClientListener(this.op.listenerId);
            }
            catch (TransportException e) {
                log.debug("Unable to failover client listener, so ignore connection reset");
                try {
                    ClientListenerNotifier.this.transportFactory.addDisconnectedListener(this.op);
                }
                catch (InterruptedException e1) {
                    Thread.currentThread().interrupt();
                }
            }
        }

        String getThreadName() {
            String listenerId = Util.toHexString(this.op.listenerId, 8);
            return this.cacheName.isEmpty() ? "Client-Listener-" + listenerId : "Client-Listener-" + this.cacheName + "-" + listenerId;
        }

        void invokeClientEvent(ClientEvent clientEvent) {
            if (trace) {
                log.tracef("Event %s received for listener with id=%s", (Object)clientEvent, (Object)Util.printArray(this.op.listenerId));
            }
            switch (clientEvent.getType()) {
                case CLIENT_CACHE_ENTRY_CREATED: {
                    this.invokeCallbacks(clientEvent, ClientCacheEntryCreated.class);
                    break;
                }
                case CLIENT_CACHE_ENTRY_MODIFIED: {
                    this.invokeCallbacks(clientEvent, ClientCacheEntryModified.class);
                    break;
                }
                case CLIENT_CACHE_ENTRY_REMOVED: {
                    this.invokeCallbacks(clientEvent, ClientCacheEntryRemoved.class);
                    break;
                }
                case CLIENT_CACHE_ENTRY_EXPIRED: {
                    this.invokeCallbacks(clientEvent, ClientCacheEntryExpired.class);
                }
            }
        }

        private void invokeCallbacks(ClientEvent event, Class<? extends Annotation> type) {
            List<ClientListenerInvocation> callbacks = this.invocables.get(type);
            if (callbacks != null) {
                for (ClientListenerInvocation callback : callbacks) {
                    callback.invoke(event);
                }
            }
        }
    }
}

