/*
 * Decompiled with CFR 0.152.
 */
package com.hortonworks.registries.cache.view.io.loader;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.hortonworks.registries.cache.Cache;
import com.hortonworks.registries.cache.view.datastore.DataStoreReader;
import com.hortonworks.registries.cache.view.io.loader.CacheLoader;
import com.hortonworks.registries.cache.view.io.loader.CacheLoaderCallback;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CacheLoaderAsync<K, V>
extends CacheLoader<K, V> {
    private static final int DEFAULT_NUM_THREADS = 5;
    private static final Logger LOG = LoggerFactory.getLogger(CacheLoaderAsync.class);
    private final ListeningExecutorService executorService;

    public CacheLoaderAsync(Cache<K, V> cache, DataStoreReader<K, V> dataStoreReader) {
        this(cache, dataStoreReader, Executors.newFixedThreadPool(5));
    }

    public CacheLoaderAsync(Cache<K, V> cache, DataStoreReader<K, V> dataStoreReader, ExecutorService executorService) {
        super(cache, dataStoreReader);
        this.executorService = MoreExecutors.listeningDecorator((ExecutorService)executorService);
    }

    @Override
    public void loadAll(Collection<? extends K> keys, CacheLoaderCallback<K, V> callback) {
        try {
            ListenableFuture myCall = this.executorService.submit((Callable)new DataStoreCallable(keys));
            Futures.addCallback((ListenableFuture)myCall, (FutureCallback)new CacheLoaderAsyncFutureCallback(keys, callback), (Executor)this.executorService);
        }
        catch (Exception e) {
            this.handleException(keys, callback, e, LOG);
        }
    }

    public class CacheLoaderAsyncFutureCallback
    implements FutureCallback<Map<K, V>> {
        private final Collection<? extends K> keys;
        private final CacheLoaderCallback<K, V> callback;

        public CacheLoaderAsyncFutureCallback(Collection<? extends K> keys, CacheLoaderCallback<K, V> callback) {
            this.keys = keys;
            this.callback = callback;
        }

        public void onSuccess(Map<K, V> read) {
            LOG.debug("Raw result of call to data store for keys [{}] returned [{}]", this.keys, read);
            HashMap loaded = new HashMap();
            if (read != null) {
                for (Map.Entry re : read.entrySet()) {
                    if (re.getKey() != null) {
                        loaded.put(re.getKey(), re.getValue());
                        continue;
                    }
                    LOG.trace("Not loading into cache entry with null value [{}]", re);
                }
            }
            CacheLoaderAsync.this.cache.putAll(loaded);
            LOG.debug("Loaded cache [{}]", loaded);
            this.callback.onCacheLoaded(loaded);
        }

        public void onFailure(Throwable t) {
            this.callback.onCacheLoadingFailure(t);
        }
    }

    private class DataStoreCallable
    implements Callable<Map<K, V>> {
        private final Collection<? extends K> keys;

        public DataStoreCallable(Collection<? extends K> keys) {
            this.keys = keys;
        }

        @Override
        public Map<K, V> call() throws Exception {
            Map result = CacheLoaderAsync.this.dataStoreReader.readAll(this.keys);
            LOG.debug("Call to data store for keys [{}] returned [{}]", this.keys, result);
            return result;
        }
    }
}

