/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.interceptors.distribution;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.CacheSet;
import org.infinispan.CacheStream;
import org.infinispan.commands.LocalFlagAffectedCommand;
import org.infinispan.commands.read.AbstractCloseableIteratorCollection;
import org.infinispan.commands.read.EntrySetCommand;
import org.infinispan.commands.read.KeySetCommand;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.commons.util.CloseableIteratorMapper;
import org.infinispan.commons.util.CloseableSpliterator;
import org.infinispan.commons.util.Closeables;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.ForwardingCacheEntry;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.LocalTxInvocationContext;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.interceptors.base.CommandInterceptor;
import org.infinispan.stream.StreamMarshalling;
import org.infinispan.stream.impl.ClusterStreamManager;
import org.infinispan.stream.impl.DistributedCacheStream;
import org.infinispan.stream.impl.RemovableCloseableIterator;
import org.infinispan.stream.impl.RemovableIterator;
import org.infinispan.stream.impl.tx.TxClusterStreamManager;
import org.infinispan.stream.impl.tx.TxDistributedCacheStream;

public class DistributionBulkInterceptor<K, V>
extends CommandInterceptor {
    private Cache<K, V> cache;

    @Inject
    public void inject(Cache<K, V> cache) {
        this.cache = cache;
    }

    @Override
    public CacheSet<CacheEntry<K, V>> visitEntrySetCommand(InvocationContext ctx, EntrySetCommand command) throws Throwable {
        CacheSet entrySet = (CacheSet)super.visitEntrySetCommand(ctx, command);
        if (!command.hasFlag(Flag.CACHE_MODE_LOCAL)) {
            if (ctx.isInTxScope()) {
                return new TxBackingEntrySet(this.getCacheWithFlags(this.cache, command), entrySet, command, (LocalTxInvocationContext)ctx);
            }
            return new BackingEntrySet(this.getCacheWithFlags(this.cache, command), entrySet, command);
        }
        return entrySet;
    }

    private static <C> CacheStream<C> applyTimeOut(CacheStream<C> stream, Cache<?, ?> cache) {
        return stream.timeout(cache.getCacheConfiguration().clustering().sync().replTimeout(), TimeUnit.MILLISECONDS);
    }

    @Override
    public CacheSet<K> visitKeySetCommand(InvocationContext ctx, KeySetCommand command) throws Throwable {
        if (!command.hasFlag(Flag.CACHE_MODE_LOCAL)) {
            if (ctx.isInTxScope()) {
                return new TxBackingKeySet<K, V>(this.getCacheWithFlags(this.cache, command), this.cache.getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL).cacheEntrySet(), command, (LocalTxInvocationContext)ctx);
            }
            return new BackingKeySet<K, V>(this.getCacheWithFlags(this.cache, command), this.cache.getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL).cacheEntrySet(), command);
        }
        return (CacheSet)super.visitKeySetCommand(ctx, command);
    }

    private static class TxBackingKeySet<K, V>
    extends BackingKeySet<K, V> {
        private final LocalTxInvocationContext ctx;

        public TxBackingKeySet(Cache<K, V> cache, CacheSet<CacheEntry<K, V>> entrySet, LocalFlagAffectedCommand command, LocalTxInvocationContext ctx) {
            super(cache, entrySet, command);
            this.ctx = ctx;
        }

        @Override
        public CacheStream<K> stream() {
            AdvancedCache advancedCache = this.cache.getAdvancedCache();
            DistributionManager dm = advancedCache.getDistributionManager();
            ComponentRegistry registry = advancedCache.getComponentRegistry();
            ClusterStreamManager realManager = registry.getComponent(ClusterStreamManager.class);
            TxClusterStreamManager txManager = new TxClusterStreamManager(realManager, this.ctx, dm.getConsistentHash());
            return new TxDistributedCacheStream(this.cache.getCacheManager().getAddress(), false, dm, () -> this.entrySet.stream(), txManager, !this.command.hasFlag(Flag.SKIP_CACHE_LOAD), this.cache.getCacheConfiguration().clustering().stateTransfer().chunkSize(), registry.getComponent(Executor.class, "org.infinispan.executors.async"), registry, StreamMarshalling.entryToKeyFunction(), this.ctx);
        }

        @Override
        public CacheStream<K> parallelStream() {
            AdvancedCache advancedCache = this.cache.getAdvancedCache();
            DistributionManager dm = advancedCache.getDistributionManager();
            ComponentRegistry registry = advancedCache.getComponentRegistry();
            ClusterStreamManager realManager = registry.getComponent(ClusterStreamManager.class);
            TxClusterStreamManager txManager = new TxClusterStreamManager(realManager, this.ctx, dm.getConsistentHash());
            return new TxDistributedCacheStream(this.cache.getCacheManager().getAddress(), true, dm, () -> this.entrySet.stream(), txManager, !this.command.hasFlag(Flag.SKIP_CACHE_LOAD), this.cache.getCacheConfiguration().clustering().stateTransfer().chunkSize(), registry.getComponent(Executor.class, "org.infinispan.executors.async"), registry, StreamMarshalling.entryToKeyFunction(), this.ctx);
        }
    }

    protected static class BackingKeySet<K, V>
    extends AbstractCloseableIteratorCollection<K, K, V>
    implements CacheSet<K> {
        protected final CacheSet<CacheEntry<K, V>> entrySet;
        protected final LocalFlagAffectedCommand command;

        public BackingKeySet(Cache<K, V> cache, CacheSet<CacheEntry<K, V>> entrySet, LocalFlagAffectedCommand command) {
            super(cache);
            this.entrySet = entrySet;
            this.command = command;
        }

        @Override
        public CloseableIterator<K> iterator() {
            return new RemovableCloseableIterator(Closeables.iterator(this.stream()), this.cache, Function.identity());
        }

        @Override
        public boolean contains(Object o) {
            return this.cache.containsKey(o);
        }

        @Override
        public boolean remove(Object o) {
            return this.cache.remove(o) != null;
        }

        @Override
        public CloseableSpliterator<K> spliterator() {
            return Closeables.spliterator(this.iterator(), Long.MAX_VALUE, 4353);
        }

        @Override
        public CacheStream<K> stream() {
            AdvancedCache advancedCache = this.cache.getAdvancedCache();
            ComponentRegistry registry = advancedCache.getComponentRegistry();
            return new DistributedCacheStream<K>(this.cache.getCacheManager().getAddress(), false, advancedCache.getDistributionManager(), () -> this.entrySet.stream(), registry.getComponent(ClusterStreamManager.class), !this.command.hasFlag(Flag.SKIP_CACHE_LOAD), this.cache.getCacheConfiguration().clustering().stateTransfer().chunkSize(), registry.getComponent(Executor.class, "org.infinispan.executors.async"), registry, StreamMarshalling.entryToKeyFunction()){

                @Override
                public Iterator<K> iterator() {
                    if (this.intermediateOperations.size() == 1) {
                        return new RemovableIterator(super.iterator(), cache, Function.identity());
                    }
                    return super.iterator();
                }
            };
        }

        @Override
        public CacheStream<K> parallelStream() {
            AdvancedCache advancedCache = this.cache.getAdvancedCache();
            ComponentRegistry registry = advancedCache.getComponentRegistry();
            return new DistributedCacheStream(this.cache.getCacheManager().getAddress(), true, advancedCache.getDistributionManager(), () -> this.entrySet.stream(), registry.getComponent(ClusterStreamManager.class), !this.command.hasFlag(Flag.SKIP_CACHE_LOAD), this.cache.getCacheConfiguration().clustering().stateTransfer().chunkSize(), registry.getComponent(Executor.class, "org.infinispan.executors.async"), registry, StreamMarshalling.entryToKeyFunction());
        }
    }

    private static class EntryWrapper<K, V>
    extends ForwardingCacheEntry<K, V> {
        private final Cache<K, V> cache;
        private final CacheEntry<K, V> entry;

        public EntryWrapper(Cache<K, V> cache, CacheEntry<K, V> entry) {
            this.cache = cache;
            this.entry = entry;
        }

        @Override
        protected CacheEntry<K, V> delegate() {
            return this.entry;
        }

        @Override
        public V setValue(V value) {
            this.cache.put(this.entry.getKey(), value);
            return super.setValue(value);
        }
    }

    protected static class TxBackingEntrySet<K, V>
    extends BackingEntrySet<K, V> {
        private final LocalTxInvocationContext ctx;

        private TxBackingEntrySet(Cache cache, CacheSet<CacheEntry<K, V>> entrySet, LocalFlagAffectedCommand command, LocalTxInvocationContext ctx) {
            super(cache, entrySet, command);
            this.ctx = ctx;
        }

        @Override
        public CacheStream<CacheEntry<K, V>> stream() {
            AdvancedCache advancedCache = this.cache.getAdvancedCache();
            DistributionManager dm = advancedCache.getDistributionManager();
            ComponentRegistry registry = advancedCache.getComponentRegistry();
            ClusterStreamManager realManager = registry.getComponent(ClusterStreamManager.class);
            TxClusterStreamManager txManager = new TxClusterStreamManager(realManager, this.ctx, dm.getConsistentHash());
            TxDistributedCacheStream cacheStream = new TxDistributedCacheStream(this.cache.getCacheManager().getAddress(), false, dm, () -> this.entrySet.stream(), txManager, !this.command.hasFlag(Flag.SKIP_CACHE_LOAD), this.cache.getCacheConfiguration().clustering().stateTransfer().chunkSize(), registry.getComponent(Executor.class, "org.infinispan.executors.async"), registry, this.ctx);
            return DistributionBulkInterceptor.applyTimeOut(cacheStream, this.cache);
        }

        @Override
        public CacheStream<CacheEntry<K, V>> parallelStream() {
            AdvancedCache advancedCache = this.cache.getAdvancedCache();
            DistributionManager dm = advancedCache.getDistributionManager();
            ComponentRegistry registry = advancedCache.getComponentRegistry();
            ClusterStreamManager realManager = registry.getComponent(ClusterStreamManager.class);
            TxClusterStreamManager txManager = new TxClusterStreamManager(realManager, this.ctx, dm.getConsistentHash());
            TxDistributedCacheStream cacheStream = new TxDistributedCacheStream(this.cache.getCacheManager().getAddress(), true, dm, () -> this.entrySet.stream(), txManager, !this.command.hasFlag(Flag.SKIP_CACHE_LOAD), this.cache.getCacheConfiguration().clustering().stateTransfer().chunkSize(), registry.getComponent(Executor.class, "org.infinispan.executors.async"), registry, this.ctx);
            return DistributionBulkInterceptor.applyTimeOut(cacheStream, this.cache);
        }
    }

    protected static class BackingEntrySet<K, V>
    extends AbstractCloseableIteratorCollection<CacheEntry<K, V>, K, V>
    implements CacheSet<CacheEntry<K, V>> {
        protected final CacheSet<CacheEntry<K, V>> entrySet;
        protected final LocalFlagAffectedCommand command;

        private BackingEntrySet(Cache cache, CacheSet<CacheEntry<K, V>> entrySet, LocalFlagAffectedCommand command) {
            super(cache);
            this.entrySet = entrySet;
            this.command = command;
        }

        @Override
        public CloseableIterator<CacheEntry<K, V>> iterator() {
            return new CloseableIteratorMapper<CacheEntry, CacheEntry>((CloseableIterator<CacheEntry>)new RemovableCloseableIterator<Object, CacheEntry>(Closeables.iterator(this.stream()), this.cache, CacheEntry::getKey), e -> new EntryWrapper(this.cache, e));
        }

        @Override
        public CloseableSpliterator<CacheEntry<K, V>> spliterator() {
            return Closeables.spliterator(this.stream());
        }

        @Override
        public boolean contains(Object o) {
            Map.Entry<K, V> entry = this.toEntry(o);
            if (entry != null) {
                Object value = this.cache.get(entry.getKey());
                return value != null && value.equals(entry.getValue());
            }
            return false;
        }

        @Override
        public boolean remove(Object o) {
            Map.Entry<K, V> entry = this.toEntry(o);
            if (entry != null) {
                return this.cache.remove(entry.getKey(), entry.getValue());
            }
            return false;
        }

        @Override
        public boolean add(CacheEntry<K, V> internalCacheEntry) {
            V value = this.cache.put(internalCacheEntry.getKey(), internalCacheEntry.getValue());
            return value == null || !value.equals(internalCacheEntry.getValue());
        }

        private Map.Entry<K, V> toEntry(Object obj) {
            if (obj instanceof Map.Entry) {
                return (Map.Entry)obj;
            }
            return null;
        }

        @Override
        public CacheStream<CacheEntry<K, V>> stream() {
            AdvancedCache advancedCache = this.cache.getAdvancedCache();
            ComponentRegistry registry = advancedCache.getComponentRegistry();
            DistributedCacheStream cacheStream = new DistributedCacheStream<CacheEntry<K, V>>(this.cache.getCacheManager().getAddress(), false, advancedCache.getDistributionManager(), () -> this.entrySet.stream(), registry.getComponent(ClusterStreamManager.class), !this.command.hasFlag(Flag.SKIP_CACHE_LOAD), this.cache.getCacheConfiguration().clustering().stateTransfer().chunkSize(), registry.getComponent(Executor.class, "org.infinispan.executors.async"), registry){

                @Override
                public Iterator<CacheEntry<K, V>> iterator() {
                    if (this.intermediateOperations.isEmpty()) {
                        return new RemovableIterator<Object, CacheEntry>(super.iterator(), cache, e -> e.getKey());
                    }
                    return super.iterator();
                }
            };
            return DistributionBulkInterceptor.applyTimeOut(cacheStream, this.cache);
        }

        @Override
        public CacheStream<CacheEntry<K, V>> parallelStream() {
            AdvancedCache advancedCache = this.cache.getAdvancedCache();
            ComponentRegistry registry = advancedCache.getComponentRegistry();
            DistributedCacheStream cacheStream = new DistributedCacheStream(this.cache.getCacheManager().getAddress(), true, advancedCache.getDistributionManager(), () -> this.entrySet.stream(), registry.getComponent(ClusterStreamManager.class), !this.command.hasFlag(Flag.SKIP_CACHE_LOAD), this.cache.getCacheConfiguration().clustering().stateTransfer().chunkSize(), registry.getComponent(Executor.class, "org.infinispan.executors.async"), registry);
            return DistributionBulkInterceptor.applyTimeOut(cacheStream, this.cache);
        }
    }
}

