/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.persistence.remote.upgrade;

import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.infinispan.Cache;
import org.infinispan.client.hotrod.MetadataValue;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.io.UnsignedNumeric;
import org.infinispan.commons.marshall.AbstractExternalizer;
import org.infinispan.commons.marshall.WrappedByteArray;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.commons.util.Util;
import org.infinispan.container.versioning.EntryVersion;
import org.infinispan.container.versioning.NumericVersion;
import org.infinispan.context.Flag;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.factories.threads.DefaultThreadFactory;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.metadata.EmbeddedMetadata;
import org.infinispan.metadata.Metadata;
import org.infinispan.metadata.impl.InternalMetadataImpl;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
import org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.persistence.remote.RemoteStore;
import org.infinispan.persistence.remote.configuration.RemoteStoreConfiguration;
import org.infinispan.persistence.remote.logging.Log;
import org.infinispan.persistence.remote.upgrade.HotRodMigratorHelper;
import org.infinispan.persistence.remote.upgrade.MigrationMarshaller;
import org.infinispan.persistence.remote.upgrade.RemovedFilter;
import org.infinispan.util.logging.LogFactory;

public class MigrationTask
implements Function<EmbeddedCacheManager, Integer> {
    private static final Log log = (Log)LogFactory.getLog(MigrationTask.class, Log.class);
    private static final String THREAD_NAME = "RollingUpgrade-MigrationTask";
    private final String cacheName;
    private final Set<Integer> segments;
    private final int readBatch;
    private final int threads;
    private final Set<WrappedByteArray> deletedKeys = ConcurrentHashMap.newKeySet();

    public MigrationTask(String cacheName, Set<Integer> segments, int readBatch, int threads) {
        this.cacheName = cacheName;
        this.segments = segments;
        this.readBatch = readBatch;
        this.threads = threads;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public Integer apply(EmbeddedCacheManager embeddedCacheManager) {
        ExecutorService executorService;
        AtomicInteger counter;
        block7: {
            counter = new AtomicInteger(0);
            DefaultThreadFactory threadFactory = new DefaultThreadFactory(null, 1, "RollingUpgrade-MigrationTask-%t", null, null);
            executorService = Executors.newFixedThreadPool(this.threads, (ThreadFactory)threadFactory);
            RemoveListener listener = null;
            Cache cache = embeddedCacheManager.getCache(this.cacheName);
            try {
                byte[] ignoredKey;
                ComponentRegistry cr = cache.getAdvancedCache().getComponentRegistry();
                PersistenceManager loaderManager = (PersistenceManager)cr.getComponent(PersistenceManager.class);
                Set stores = loaderManager.getStores(RemoteStore.class);
                MigrationMarshaller marshaller = new MigrationMarshaller(cache.getCacheManager().getClassWhiteList());
                listener = new RemoveListener();
                cache.addFilteredListener((Object)listener, new RemovedFilter(), null, Util.asSet((Object[])new Class[]{CacheEntryRemoved.class}));
                try {
                    ignoredKey = marshaller.objectToByteBuffer("___MigrationManager_HotRod_KnownKeys___");
                }
                catch (Exception e) {
                    throw new CacheException((Throwable)e);
                }
                Iterator storeIterator = stores.iterator();
                if (storeIterator.hasNext()) {
                    RemoteStore store = (RemoteStore)storeIterator.next();
                    RemoteCache<Object, Object> storeCache = store.getRemoteCache();
                    RemoteStoreConfiguration storeConfig = store.getConfiguration();
                    if (!storeConfig.hotRodWrapping()) {
                        throw log.remoteStoreNoHotRodWrapping(cache.getName());
                    }
                    this.migrateEntriesWithMetadata(storeCache, counter, ignoredKey, executorService, (Cache<Object, Object>)cache);
                    HotRodMigratorHelper.awaitTermination(executorService);
                }
                if (listener == null) break block7;
            }
            catch (Throwable throwable) {
                if (listener != null) {
                    cache.removeListener(listener);
                }
                executorService.shutdown();
                throw throwable;
            }
            cache.removeListener((Object)listener);
        }
        executorService.shutdown();
        return counter.get();
    }

    private void migrateEntriesWithMetadata(RemoteCache<Object, Object> sourceCache, AtomicInteger counter, byte[] ignoredKey, ExecutorService executorService, Cache<Object, Object> cache) {
        try (CloseableIterator iterator = sourceCache.retrieveEntriesWithMetadata(this.segments, this.readBatch);){
            while (iterator.hasNext() && !Thread.currentThread().isInterrupted()) {
                Map.Entry entry = (Map.Entry)iterator.next();
                if (Arrays.equals((byte[])entry.getKey(), ignoredKey)) continue;
                MetadataValue metadataValue = (MetadataValue)entry.getValue();
                int lifespan = metadataValue.getLifespan();
                int maxIdle = metadataValue.getMaxIdle();
                long version = metadataValue.getVersion();
                Metadata metadata = new EmbeddedMetadata.Builder().version((EntryVersion)new NumericVersion(version)).lifespan((long)lifespan, TimeUnit.SECONDS).maxIdle((long)maxIdle, TimeUnit.SECONDS).build();
                executorService.submit(() -> {
                    Object key = entry.getKey();
                    if (!this.deletedKeys.contains(new WrappedByteArray((byte[])key))) {
                        InternalMetadataImpl internalMetadata = new InternalMetadataImpl(metadata, metadataValue.getCreated(), metadataValue.getLastUsed());
                        cache.getAdvancedCache().withFlags(new Flag[]{Flag.SKIP_CACHE_LOAD, Flag.ROLLING_UPGRADE}).putIfAbsent(entry.getKey(), ((MetadataValue)entry.getValue()).getValue(), (Metadata)internalMetadata);
                    }
                    int currentCount = counter.incrementAndGet();
                    if (log.isDebugEnabled() && currentCount % 100 == 0) {
                        log.debugf(">>    Migrated %s entries\n", currentCount);
                    }
                });
            }
        }
    }

    public static class Externalizer
    extends AbstractExternalizer<MigrationTask> {
        public Set<Class<? extends MigrationTask>> getTypeClasses() {
            return Collections.singleton(MigrationTask.class);
        }

        public void writeObject(ObjectOutput output, MigrationTask task) throws IOException {
            output.writeObject(task.cacheName);
            UnsignedNumeric.writeUnsignedInt((ObjectOutput)output, (int)task.readBatch);
            UnsignedNumeric.writeUnsignedInt((ObjectOutput)output, (int)task.threads);
            BitSet bs = new BitSet();
            for (Integer segment : task.segments) {
                bs.set(segment);
            }
            byte[] bytes = bs.toByteArray();
            UnsignedNumeric.writeUnsignedInt((ObjectOutput)output, (int)bytes.length);
            output.write(bs.toByteArray());
        }

        public MigrationTask readObject(ObjectInput input) throws IOException, ClassNotFoundException {
            String cacheName = (String)input.readObject();
            int readBatch = UnsignedNumeric.readUnsignedInt((ObjectInput)input);
            int threads = UnsignedNumeric.readUnsignedInt((ObjectInput)input);
            int segmentsSize = UnsignedNumeric.readUnsignedInt((ObjectInput)input);
            byte[] bytes = new byte[segmentsSize];
            input.read(bytes);
            BitSet bitSet = BitSet.valueOf(bytes);
            Set<Integer> segments = bitSet.stream().boxed().collect(Collectors.toSet());
            return new MigrationTask(cacheName, segments, readBatch, threads);
        }
    }

    @Listener(clustered=true)
    private class RemoveListener {
        private RemoveListener() {
        }

        @CacheEntryRemoved
        public void entryRemoved(CacheEntryRemovedEvent event) {
            MigrationTask.this.deletedKeys.add(new WrappedByteArray((byte[])event.getKey()));
        }
    }
}

