/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.persistence.remote.upgrade;

import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.infinispan.Cache;
import org.infinispan.client.hotrod.CacheTopologyInfo;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.configuration.ClassAllowList;
import org.infinispan.commons.util.ProcessorInfo;
import org.infinispan.commons.util.Util;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.manager.ClusterExecutor;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.persistence.remote.RemoteStore;
import org.infinispan.persistence.remote.configuration.RemoteStoreConfiguration;
import org.infinispan.persistence.remote.logging.Log;
import org.infinispan.persistence.remote.upgrade.HotRodMigratorHelper;
import org.infinispan.persistence.remote.upgrade.MigrationMarshaller;
import org.infinispan.persistence.remote.upgrade.MigrationTask;
import org.infinispan.remoting.transport.Address;
import org.infinispan.upgrade.TargetMigrator;
import org.infinispan.util.function.TriConsumer;
import org.infinispan.util.logging.LogFactory;

public class HotRodTargetMigrator
implements TargetMigrator {
    private static final Log log = (Log)LogFactory.getLog(HotRodTargetMigrator.class, Log.class);

    public String getName() {
        return "hotrod";
    }

    public long synchronizeData(Cache<Object, Object> cache) throws CacheException {
        return this.synchronizeData(cache, 10000, ProcessorInfo.availableProcessors());
    }

    public long synchronizeData(Cache<Object, Object> cache, int readBatch, int threads) throws CacheException {
        ComponentRegistry cr = cache.getAdvancedCache().getComponentRegistry();
        ClassAllowList allowList = cache.getCacheManager().getClassAllowList();
        PersistenceManager loaderManager = (PersistenceManager)cr.getComponent(PersistenceManager.class);
        Set stores = loaderManager.getStores(RemoteStore.class);
        String cacheName = cache.getName();
        if (stores.size() != 1) {
            throw log.couldNotMigrateData(cacheName);
        }
        MigrationMarshaller marshaller = new MigrationMarshaller(allowList);
        RemoteStore store = (RemoteStore)stores.iterator().next();
        RemoteCache<Object, Object> remoteSourceCache = store.getRemoteCache();
        if (!HotRodMigratorHelper.supportsIteration(store.getConfiguration().protocol())) {
            byte[] knownKeys;
            try {
                knownKeys = marshaller.objectToByteBuffer("___MigrationManager_HotRod_KnownKeys___");
            }
            catch (Exception e) {
                throw new CacheException((Throwable)e);
            }
            if (remoteSourceCache.containsKey((Object)knownKeys)) {
                Set keys;
                RemoteStoreConfiguration storeConfig = store.getConfiguration();
                if (!storeConfig.hotRodWrapping()) {
                    throw log.remoteStoreNoHotRodWrapping(cacheName);
                }
                try {
                    keys = (Set)marshaller.objectFromByteBuffer((byte[])remoteSourceCache.get((Object)knownKeys));
                }
                catch (Exception e) {
                    throw new CacheException((Throwable)e);
                }
                ExecutorService es = Executors.newFixedThreadPool(threads);
                AtomicInteger count = new AtomicInteger(0);
                for (Object okey : keys) {
                    byte[] key = (byte[])okey;
                    es.submit(() -> {
                        try {
                            cache.get((Object)key);
                            int i = count.getAndIncrement();
                            if (log.isDebugEnabled() && i % 100 == 0) {
                                log.debugf(">>    Moved %s keys\n", i);
                            }
                        }
                        catch (Exception e) {
                            log.keyMigrationFailed(Util.toStr((Object)key), e);
                        }
                    });
                }
                HotRodMigratorHelper.awaitTermination(es);
                return count.longValue();
            }
            throw log.missingMigrationData(cacheName);
        }
        ClusterExecutor clusterExecutor = cache.getCacheManager().executor().timeout(Long.MAX_VALUE, TimeUnit.NANOSECONDS).singleNodeSubmission();
        CacheTopologyInfo sourceCacheTopologyInfo = remoteSourceCache.getCacheTopologyInfo();
        if (sourceCacheTopologyInfo.getSegmentsPerServer().size() == 1) {
            return this.migrateFromSingleServer(cache.getCacheManager(), cacheName, readBatch, threads);
        }
        int sourceSegments = sourceCacheTopologyInfo.getNumSegments();
        List targetServers = cache.getAdvancedCache().getDistributionManager().getWriteConsistentHash().getMembers();
        List<List<Integer>> partitions = HotRodMigratorHelper.split(HotRodMigratorHelper.range(sourceSegments), targetServers.size());
        Iterator iterator = targetServers.iterator();
        AtomicInteger count = new AtomicInteger();
        TriConsumer consumer = (a, value, t) -> {
            if (t != null) {
                throw new CacheException(t);
            }
            count.addAndGet((int)value);
        };
        CompletableFuture[] futures = new CompletableFuture[partitions.size()];
        int offset = 0;
        for (List<Integer> partition : partitions) {
            HashSet<Integer> segmentSet = new HashSet<Integer>();
            segmentSet.addAll(partition);
            futures[offset++] = clusterExecutor.filterTargets(Collections.singleton((Address)iterator.next())).submitConsumer((Function)new MigrationTask(cacheName, segmentSet, readBatch, threads), consumer);
        }
        CompletableFuture.allOf(futures).join();
        return count.get();
    }

    private long migrateFromSingleServer(EmbeddedCacheManager embeddedCacheManager, String cacheName, int readBatch, int threads) {
        MigrationTask migrationTask = new MigrationTask(cacheName, null, readBatch, threads);
        try {
            return migrationTask.apply(embeddedCacheManager).intValue();
        }
        catch (Exception e) {
            throw new CacheException((Throwable)e);
        }
    }

    public void disconnectSource(Cache<Object, Object> cache) throws CacheException {
        ComponentRegistry cr = cache.getAdvancedCache().getComponentRegistry();
        PersistenceManager loaderManager = (PersistenceManager)cr.getComponent(PersistenceManager.class);
        loaderManager.disableStore(RemoteStore.class.getName());
    }
}

