/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.persistence.remote.upgrade;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.infinispan.Cache;
import org.infinispan.client.hotrod.CacheTopologyInfo;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.util.Util;
import org.infinispan.distexec.DefaultExecutorService;
import org.infinispan.distexec.DistributedTask;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.persistence.remote.RemoteStore;
import org.infinispan.persistence.remote.configuration.RemoteStoreConfiguration;
import org.infinispan.persistence.remote.logging.Log;
import org.infinispan.persistence.remote.upgrade.HotRodMigratorHelper;
import org.infinispan.persistence.remote.upgrade.MigrationMarshaller;
import org.infinispan.persistence.remote.upgrade.MigrationTask;
import org.infinispan.remoting.transport.Address;
import org.infinispan.upgrade.TargetMigrator;
import org.infinispan.util.logging.LogFactory;

public class HotRodTargetMigrator
implements TargetMigrator {
    private static final Log log = LogFactory.getLog(HotRodTargetMigrator.class, Log.class);

    @Override
    public String getName() {
        return "hotrod";
    }

    @Override
    public long synchronizeData(Cache<Object, Object> cache) throws CacheException {
        return this.synchronizeData(cache, 10000, Runtime.getRuntime().availableProcessors());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public long synchronizeData(Cache<Object, Object> cache, int readBatch, int threads) throws CacheException {
        byte[] knownKeys;
        ComponentRegistry cr = cache.getAdvancedCache().getComponentRegistry();
        PersistenceManager loaderManager = cr.getComponent(PersistenceManager.class);
        Set<RemoteStore> stores = loaderManager.getStores(RemoteStore.class);
        if (stores.size() != 1) {
            throw log.couldNotMigrateData(cache.getName());
        }
        MigrationMarshaller marshaller = new MigrationMarshaller();
        try {
            knownKeys = marshaller.objectToByteBuffer("___MigrationManager_HotRod_KnownKeys___");
        }
        catch (Exception e) {
            throw new CacheException(e);
        }
        RemoteStore store = stores.iterator().next();
        RemoteCache<Object, Object> remoteSourceCache = store.getRemoteCache();
        if (!HotRodMigratorHelper.supportsIteration(store.getConfiguration().protocolVersion())) {
            if (remoteSourceCache.containsKey(knownKeys)) {
                Set keys;
                RemoteStoreConfiguration storeConfig = store.getConfiguration();
                if (!storeConfig.hotRodWrapping()) {
                    throw log.remoteStoreNoHotRodWrapping(cache.getName());
                }
                try {
                    keys = (Set)marshaller.objectFromByteBuffer((byte[])remoteSourceCache.get(knownKeys));
                }
                catch (Exception e) {
                    throw new CacheException(e);
                }
                ExecutorService es = Executors.newFixedThreadPool(threads);
                AtomicInteger count = new AtomicInteger(0);
                for (Object okey : keys) {
                    byte[] key = (byte[])okey;
                    es.submit(() -> {
                        try {
                            cache.get(key);
                            int i = count.getAndIncrement();
                            if (log.isDebugEnabled() && i % 100 == 0) {
                                log.debugf(">>    Moved %s keys\n", i);
                            }
                        }
                        catch (Exception e) {
                            log.keyMigrationFailed(Util.toStr(key), e);
                        }
                    });
                }
                HotRodMigratorHelper.awaitTermination(es);
                return count.longValue();
            }
            throw log.missingMigrationData(cache.getName());
        }
        DefaultExecutorService executor = new DefaultExecutorService(cache);
        try {
            CacheTopologyInfo sourceCacheTopologyInfo = remoteSourceCache.getCacheTopologyInfo();
            if (sourceCacheTopologyInfo.getSegmentsPerServer().size() == 1) {
                long l = this.migrateFromSingleServer(cache, readBatch, threads);
                return l;
            }
            int sourceSegments = sourceCacheTopologyInfo.getNumSegments();
            List<Address> targetServers = cache.getAdvancedCache().getDistributionManager().getWriteConsistentHash().getMembers();
            List<List<Integer>> partitions = HotRodMigratorHelper.split(HotRodMigratorHelper.range(sourceSegments), targetServers.size());
            Iterator<Address> iterator = targetServers.iterator();
            ArrayList<CompletableFuture<Integer>> futures = new ArrayList<CompletableFuture<Integer>>(targetServers.size());
            for (List<Integer> partition : partitions) {
                HashSet<Integer> segmentSet = new HashSet<Integer>();
                segmentSet.addAll(partition);
                DistributedTask<Integer> task = executor.createDistributedTaskBuilder(new MigrationTask(segmentSet, readBatch, threads)).timeout(Long.MAX_VALUE, TimeUnit.NANOSECONDS).build();
                futures.add(executor.submit(iterator.next(), task));
            }
            long l = futures.stream().mapToInt(f -> {
                try {
                    return (Integer)f.get();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw log.couldNotMigrateData(cache.getName());
                }
                catch (ExecutionException e) {
                    throw new CacheException(e);
                }
            }).sum();
            return l;
        }
        finally {
            executor.shutdownNow();
        }
    }

    private long migrateFromSingleServer(Cache<Object, Object> cache, int readBatch, int threads) {
        MigrationTask migrationTask = new MigrationTask(null, readBatch, threads);
        migrationTask.setEnvironment(cache, (Set<Object>)null);
        try {
            return migrationTask.call().intValue();
        }
        catch (Exception e) {
            throw new CacheException(e);
        }
    }

    @Override
    public void disconnectSource(Cache<Object, Object> cache) throws CacheException {
        ComponentRegistry cr = cache.getAdvancedCache().getComponentRegistry();
        PersistenceManager loaderManager = cr.getComponent(PersistenceManager.class);
        loaderManager.disableStore(RemoteStore.class.getName());
    }
}

