/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.server.hotrod;

import io.netty.channel.Channel;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.dataconversion.MediaType;
import org.infinispan.commons.dataconversion.Transcoder;
import org.infinispan.commons.dataconversion.TranscoderMarshallerAdapter;
import org.infinispan.commons.logging.LogFactory;
import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.commons.marshall.WrappedByteArray;
import org.infinispan.commons.util.CollectionFactory;
import org.infinispan.container.versioning.NumericVersion;
import org.infinispan.encoding.DataConversion;
import org.infinispan.factories.threads.DefaultThreadFactory;
import org.infinispan.marshall.core.EncoderRegistry;
import org.infinispan.metadata.Metadata;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryExpired;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
import org.infinispan.notifications.cachelistener.event.CacheEntryCreatedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent;
import org.infinispan.notifications.cachelistener.event.Event;
import org.infinispan.notifications.cachelistener.filter.CacheEventConverter;
import org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory;
import org.infinispan.notifications.cachelistener.filter.CacheEventFilter;
import org.infinispan.notifications.cachelistener.filter.CacheEventFilterConverter;
import org.infinispan.notifications.cachelistener.filter.CacheEventFilterConverterFactory;
import org.infinispan.notifications.cachelistener.filter.CacheEventFilterFactory;
import org.infinispan.server.hotrod.ClientEventType;
import org.infinispan.server.hotrod.Events;
import org.infinispan.server.hotrod.HotRodHeader;
import org.infinispan.server.hotrod.HotRodOperation;
import org.infinispan.server.hotrod.KeyValueVersionConverterFactory;
import org.infinispan.server.hotrod.Response;
import org.infinispan.server.hotrod.VersionedDecoder;
import org.infinispan.server.hotrod.logging.Log;
import org.infinispan.util.KeyValuePair;

class ClientListenerRegistry {
    private final EncoderRegistry encoderRegistry;
    private static final Log log = (Log)LogFactory.getLog(ClientListenerRegistry.class, Log.class);
    private static final boolean isTrace = log.isTraceEnabled();
    private final ConcurrentMap<WrappedByteArray, Object> eventSenders = new ConcurrentHashMap<WrappedByteArray, Object>();
    private final ConcurrentMap<String, CacheEventFilterFactory> cacheEventFilterFactories = CollectionFactory.makeConcurrentMap((int)4, (float)0.9f, (int)16);
    private final ConcurrentMap<String, CacheEventConverterFactory> cacheEventConverterFactories = CollectionFactory.makeConcurrentMap((int)4, (float)0.9f, (int)16);
    private final ConcurrentMap<String, CacheEventFilterConverterFactory> cacheEventFilterConverterFactories = CollectionFactory.makeConcurrentMap((int)4, (float)0.9f, (int)16);
    private final ExecutorService addListenerExecutor = new ThreadPoolExecutor(0, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), (ThreadFactory)new DefaultThreadFactory(null, 1, "add-listener-thread-%t", null, null));

    ClientListenerRegistry(EncoderRegistry encoderRegistry) {
        this.encoderRegistry = encoderRegistry;
    }

    void setEventMarshaller(Optional<Marshaller> eventMarshaller) {
        eventMarshaller.ifPresent(m -> {
            TranscoderMarshallerAdapter adapter = new TranscoderMarshallerAdapter(m);
            if (this.encoderRegistry.isConversionSupported(MediaType.APPLICATION_OBJECT, m.mediaType())) {
                log.skippingMarshallerWrapping(m.mediaType().toString());
            } else {
                this.encoderRegistry.registerTranscoder((Transcoder)adapter);
            }
        });
    }

    void addCacheEventFilterFactory(String name, CacheEventFilterFactory factory) {
        if (factory instanceof CacheEventConverterFactory) {
            throw log.illegalFilterConverterEventFactory(name);
        }
        this.cacheEventFilterFactories.put(name, factory);
    }

    void removeCacheEventFilterFactory(String name) {
        this.cacheEventFilterFactories.remove(name);
    }

    void addCacheEventConverterFactory(String name, CacheEventConverterFactory factory) {
        if (factory instanceof CacheEventFilterFactory) {
            throw log.illegalFilterConverterEventFactory(name);
        }
        this.cacheEventConverterFactories.put(name, factory);
    }

    void removeCacheEventConverterFactory(String name) {
        this.cacheEventConverterFactories.remove(name);
    }

    void addCacheEventFilterConverterFactory(String name, CacheEventFilterConverterFactory factory) {
        this.cacheEventFilterConverterFactories.put(name, factory);
    }

    void removeCacheEventFilterConverterFactory(String name) {
        this.cacheEventFilterConverterFactories.remove(name);
    }

    void addClientListener(VersionedDecoder decoder, Channel ch, HotRodHeader h, byte[] listenerId, AdvancedCache<byte[], byte[]> cache, boolean includeState, KeyValuePair<String, List<byte[]>> filterFactory, KeyValuePair<String, List<byte[]>> converterFactory, boolean useRawData, int listenerInterests) {
        CacheEventConverter<byte[], byte[], byte[]> converter;
        CacheEventFilter<byte[], byte[]> filter;
        List binaryConverterParams;
        ClientEventType eventType = ClientEventType.createType(converterFactory != null, useRawData, h.version);
        List binaryFilterParams = filterFactory != null ? (List)filterFactory.getValue() : Collections.emptyList();
        List list = binaryConverterParams = converterFactory != null ? (List)converterFactory.getValue() : Collections.emptyList();
        if (filterFactory != null) {
            if (converterFactory != null) {
                if (((String)filterFactory.getKey()).equals(converterFactory.getKey())) {
                    List binaryParams = binaryFilterParams.isEmpty() ? binaryConverterParams : binaryFilterParams;
                    CacheEventFilterConverter<byte[], byte[], byte[]> filterConverter = this.getFilterConverter(cache.getValueDataConversion(), h.getValueMediaType(), (String)filterFactory.getKey(), useRawData, binaryParams);
                    filter = filterConverter;
                    converter = filterConverter;
                } else {
                    filter = this.getFilter(cache.getValueDataConversion(), h.getValueMediaType(), (String)filterFactory.getKey(), useRawData, binaryFilterParams);
                    converter = this.getConverter(cache.getValueDataConversion(), h.getValueMediaType(), (String)converterFactory.getKey(), useRawData, binaryConverterParams);
                }
            } else {
                filter = this.getFilter(cache.getValueDataConversion(), h.getValueMediaType(), (String)filterFactory.getKey(), useRawData, binaryFilterParams);
                converter = null;
            }
        } else if (converterFactory != null) {
            filter = null;
            converter = this.getConverter(cache.getValueDataConversion(), h.getValueMediaType(), (String)converterFactory.getKey(), useRawData, binaryConverterParams);
        } else {
            filter = null;
            converter = null;
        }
        Object clientEventSender = this.getClientEventSender(includeState, ch, h.version, (Cache)cache, listenerId, eventType, h.messageId);
        this.eventSenders.put(new WrappedByteArray(listenerId), clientEventSender);
        if (includeState) {
            CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> this.addCacheListener(cache, clientEventSender, filter, converter, listenerInterests, useRawData), this.addListenerExecutor);
            cf.whenComplete((t, cause) -> {
                Response resp = cause != null ? (cause instanceof CompletionException ? decoder.createErrorResponse(h, cause.getCause()) : decoder.createErrorResponse(h, (Throwable)cause)) : decoder.createSuccessResponse(h, null);
                ch.writeAndFlush((Object)resp);
            });
        } else {
            this.addCacheListener(cache, clientEventSender, filter, converter, listenerInterests, useRawData);
            ch.writeAndFlush((Object)decoder.createSuccessResponse(h, null));
        }
    }

    private void addCacheListener(AdvancedCache<byte[], byte[]> cache, Object clientEventSender, CacheEventFilter<byte[], byte[]> filter, CacheEventConverter<byte[], byte[], byte[]> converter, int listenerInterests, boolean useRawData) {
        HashSet<Object> filterAnnotations;
        if (listenerInterests == 0) {
            filterAnnotations = new HashSet<Class>(Arrays.asList(CacheEntryCreated.class, CacheEntryModified.class, CacheEntryRemoved.class, CacheEntryExpired.class));
        } else {
            filterAnnotations = new HashSet<Class<CacheEntryRemoved>>();
            if ((listenerInterests & 1) == 1) {
                filterAnnotations.add(CacheEntryCreated.class);
            }
            if ((listenerInterests & 2) == 2) {
                filterAnnotations.add(CacheEntryModified.class);
            }
            if ((listenerInterests & 4) == 4) {
                filterAnnotations.add(CacheEntryRemoved.class);
            }
            if ((listenerInterests & 8) == 8) {
                filterAnnotations.add(CacheEntryExpired.class);
            }
        }
        if (useRawData) {
            cache.addStorageFormatFilteredListener(clientEventSender, filter, converter, filterAnnotations);
        } else {
            cache.addFilteredListener(clientEventSender, filter, converter, filterAnnotations);
        }
    }

    private CacheEventFilter<byte[], byte[]> getFilter(DataConversion valueDataConversion, MediaType requestMedia, String name, Boolean useRawData, List<byte[]> binaryParams) {
        CacheEventFilterFactory factory = this.findFactory(name, this.cacheEventFilterFactories, "key/value filter");
        List<?> params = this.unmarshallParams(valueDataConversion, requestMedia, binaryParams, useRawData);
        return factory.getFilter(params.toArray());
    }

    private CacheEventConverter<byte[], byte[], byte[]> getConverter(DataConversion valueDataConversion, MediaType requestMedia, String name, Boolean useRawData, List<byte[]> binaryParams) {
        CacheEventConverterFactory factory = this.findConverterFactory(name, this.cacheEventConverterFactories);
        List<?> params = this.unmarshallParams(valueDataConversion, requestMedia, binaryParams, useRawData);
        return factory.getConverter(params.toArray());
    }

    private CacheEventFilterConverter<byte[], byte[], byte[]> getFilterConverter(DataConversion valueDataConversion, MediaType requestMedia, String name, boolean useRawData, List<byte[]> binaryParams) {
        CacheEventFilterConverterFactory factory = this.findFactory(name, this.cacheEventFilterConverterFactories, "converter");
        List<?> params = this.unmarshallParams(valueDataConversion, requestMedia, binaryParams, useRawData);
        return factory.getFilterConverter(params.toArray());
    }

    private CacheEventConverterFactory findConverterFactory(String name, ConcurrentMap<String, CacheEventConverterFactory> factories) {
        if (name.equals("___eager-key-value-version-converter")) {
            return KeyValueVersionConverterFactory.SINGLETON;
        }
        return this.findFactory(name, factories, "converter");
    }

    private <T> T findFactory(String name, ConcurrentMap<String, T> factories, String factoryType) {
        Object factory = factories.get(name);
        if (factory == null) {
            throw log.missingCacheEventFactory(factoryType, name);
        }
        return (T)factory;
    }

    private List<?> unmarshallParams(DataConversion valueDataConversion, MediaType requestMedia, List<byte[]> binaryParams, boolean useRawData) {
        if (useRawData) {
            return binaryParams;
        }
        return binaryParams.stream().map(bp -> valueDataConversion.convert(bp, requestMedia, MediaType.APPLICATION_OBJECT)).collect(Collectors.toList());
    }

    boolean removeClientListener(byte[] listenerId, Cache cache) {
        Object sender = this.eventSenders.get(new WrappedByteArray(listenerId));
        if (sender != null) {
            cache.removeListener(sender);
            return true;
        }
        return false;
    }

    public void stop() {
        this.eventSenders.clear();
        this.cacheEventFilterFactories.clear();
        this.cacheEventConverterFactories.clear();
        this.addListenerExecutor.shutdown();
    }

    void findAndWriteEvents(Channel channel) {
        channel.eventLoop().execute(() -> this.eventSenders.values().forEach(s -> {
            BaseClientEventSender bces;
            if (s instanceof BaseClientEventSender && (bces = (BaseClientEventSender)s).hasChannel(channel)) {
                bces.writeEventsIfPossible();
            }
        }));
    }

    private Object getClientEventSender(boolean includeState, Channel ch, byte version, Cache cache, byte[] listenerId, ClientEventType eventType, long messageId) {
        if (includeState) {
            return new StatefulClientEventSender(cache, ch, listenerId, version, eventType, messageId);
        }
        return new StatelessClientEventSender(cache, ch, listenerId, version, eventType);
    }

    private abstract class BaseClientEventSender {
        protected final Channel ch;
        protected final byte[] listenerId;
        protected final byte version;
        protected final ClientEventType targetEventType;
        protected final Cache cache;
        BlockingQueue<Object> eventQueue = new LinkedBlockingQueue<Object>(100);
        private final Runnable writeEventsIfPossible = this::writeEventsIfPossible;

        BaseClientEventSender(Cache cache, Channel ch, byte[] listenerId, byte version, ClientEventType targetEventType) {
            this.cache = cache;
            this.ch = ch;
            this.listenerId = listenerId;
            this.version = version;
            this.targetEventType = targetEventType;
        }

        boolean hasChannel(Channel channel) {
            return this.ch == channel;
        }

        void writeEventsIfPossible() {
            boolean written = false;
            while (!this.eventQueue.isEmpty() && this.ch.isWritable()) {
                Object event = this.eventQueue.poll();
                if (isTrace) {
                    log.tracef("Write event: %s to channel %s", event, this.ch);
                }
                this.ch.write(event);
                written = true;
            }
            if (written) {
                this.ch.flush();
            }
        }

        @CacheEntryCreated
        @CacheEntryModified
        @CacheEntryRemoved
        @CacheEntryExpired
        public void onCacheEvent(CacheEntryEvent<byte[], byte[]> event) {
            if (this.isSendEvent(event)) {
                Metadata metadata = event.getMetadata();
                long version = metadata != null && metadata.version() != null ? ((NumericVersion)metadata.version()).getVersion() : 0L;
                Object k = event.getKey();
                Object v = event.getValue();
                this.sendEvent((byte[])k, (byte[])v, version, event);
            }
        }

        boolean isSendEvent(CacheEntryEvent<?, ?> event) {
            if (this.isChannelDisconnected()) {
                log.debug("Channel disconnected, remove event sender listener");
                event.getCache().removeListener((Object)this);
                return false;
            }
            switch (event.getType()) {
                case CACHE_ENTRY_CREATED: 
                case CACHE_ENTRY_MODIFIED: {
                    return !event.isPre();
                }
                case CACHE_ENTRY_REMOVED: {
                    CacheEntryRemovedEvent removedEvent = (CacheEntryRemovedEvent)event;
                    return !event.isPre() && removedEvent.getOldValue() != null;
                }
                case CACHE_ENTRY_EXPIRED: {
                    return true;
                }
            }
            throw log.unexpectedEvent((Event)event);
        }

        boolean isChannelDisconnected() {
            return !this.ch.isOpen();
        }

        void sendEvent(byte[] key, byte[] value, long dataVersion, CacheEntryEvent event) {
            Object remoteEvent = this.createRemoteEvent(key, value, dataVersion, event);
            if (isTrace) {
                log.tracef("Queue event %s, before queuing event queue size is %d", remoteEvent, this.eventQueue.size());
            }
            boolean waitingForFlush = !this.ch.isWritable();
            try {
                this.eventQueue.put(remoteEvent);
            }
            catch (InterruptedException e) {
                throw new CacheException((Throwable)e);
            }
            if (!waitingForFlush) {
                this.ch.eventLoop().submit(this.writeEventsIfPossible);
            }
        }

        private Object createRemoteEvent(byte[] key, byte[] value, long dataVersion, CacheEntryEvent event) {
            switch (this.targetEventType) {
                case PLAIN: {
                    switch (event.getType()) {
                        case CACHE_ENTRY_CREATED: 
                        case CACHE_ENTRY_MODIFIED: {
                            KeyValuePair<HotRodOperation, Boolean> responseType = this.getEventResponseType(event);
                            return new Events.KeyWithVersionEvent(this.version, this.getEventId(event), (HotRodOperation)((Object)responseType.getKey()), this.listenerId, (Boolean)responseType.getValue(), key, dataVersion);
                        }
                        case CACHE_ENTRY_REMOVED: 
                        case CACHE_ENTRY_EXPIRED: {
                            KeyValuePair<HotRodOperation, Boolean> responseType = this.getEventResponseType(event);
                            return new Events.KeyEvent(this.version, this.getEventId(event), (HotRodOperation)((Object)responseType.getKey()), this.listenerId, (boolean)((Boolean)responseType.getValue()), key);
                        }
                    }
                    throw log.unexpectedEvent((Event)event);
                }
                case CUSTOM_PLAIN: {
                    KeyValuePair<HotRodOperation, Boolean> responseType = this.getEventResponseType(event);
                    return new Events.CustomEvent(this.version, this.getEventId(event), (HotRodOperation)((Object)responseType.getKey()), this.listenerId, (boolean)((Boolean)responseType.getValue()), value);
                }
                case CUSTOM_RAW: {
                    KeyValuePair<HotRodOperation, Boolean> responseType = this.getEventResponseType(event);
                    return new Events.CustomRawEvent(this.version, this.getEventId(event), (HotRodOperation)((Object)responseType.getKey()), this.listenerId, (boolean)((Boolean)responseType.getValue()), value);
                }
            }
            throw new IllegalArgumentException("Event type not supported: " + (Object)((Object)this.targetEventType));
        }

        protected long getEventId(CacheEntryEvent event) {
            return 0L;
        }

        private KeyValuePair<HotRodOperation, Boolean> getEventResponseType(CacheEntryEvent event) {
            switch (event.getType()) {
                case CACHE_ENTRY_CREATED: {
                    return new KeyValuePair((Object)HotRodOperation.CACHE_ENTRY_CREATED_EVENT, (Object)((CacheEntryCreatedEvent)event).isCommandRetried());
                }
                case CACHE_ENTRY_MODIFIED: {
                    return new KeyValuePair((Object)HotRodOperation.CACHE_ENTRY_MODIFIED_EVENT, (Object)((CacheEntryModifiedEvent)event).isCommandRetried());
                }
                case CACHE_ENTRY_REMOVED: {
                    return new KeyValuePair((Object)HotRodOperation.CACHE_ENTRY_REMOVED_EVENT, (Object)((CacheEntryRemovedEvent)event).isCommandRetried());
                }
                case CACHE_ENTRY_EXPIRED: {
                    return new KeyValuePair((Object)HotRodOperation.CACHE_ENTRY_EXPIRED_EVENT, (Object)false);
                }
            }
            throw log.unexpectedEvent((Event)event);
        }
    }

    @Listener(clustered=true)
    private class StatelessClientEventSender
    extends BaseClientEventSender {
        StatelessClientEventSender(Cache cache, Channel ch, byte[] listenerId, byte version, ClientEventType targetEventType) {
            super(cache, ch, listenerId, version, targetEventType);
        }
    }

    @Listener(clustered=true, includeCurrentState=true)
    private class StatefulClientEventSender
    extends BaseClientEventSender {
        private final long messageId;

        StatefulClientEventSender(Cache cache, Channel ch, byte[] listenerId, byte version, ClientEventType targetEventType, long messageId) {
            super(cache, ch, listenerId, version, targetEventType);
            this.messageId = messageId;
        }

        @Override
        protected long getEventId(CacheEntryEvent event) {
            return event.isCurrentState() ? this.messageId : 0L;
        }
    }
}

