/*
 * Decompiled with CFR 0.152.
 */
package com.blacklocus.jres;

import com.blacklocus.jres.Jres;
import com.blacklocus.jres.request.JresBulkable;
import com.blacklocus.jres.request.JresJsonRequest;
import com.blacklocus.jres.request.bulk.JresBulk;
import com.blacklocus.jres.response.bulk.JresBulkItemResult;
import com.blacklocus.jres.response.bulk.JresBulkReply;
import com.blacklocus.jres.util.DaemonThreadFactory;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.SettableFuture;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JresBulkRequestor
implements Runnable,
Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(JresBulkRequestor.class);
    public static final int POLLING_LENIENCY_SECONDS = 5;
    public static final int CLOSING_LENIENCY_SECONDS = 60;
    private final int batchSize;
    private final int sleepIntervalMs;
    private final String targetIndex;
    private final String targetType;
    private final Jres jres;
    private final Integer numThreads;
    private final ExecutorService executorService;
    private final List<Future<?>> indexerWorkerFutures = new ArrayList();
    private final BlockingQueue<FuturedDocument> q;

    public JresBulkRequestor(int batchSize, int sleepIntervalMs, int numThreads, Jres jres) {
        this(batchSize, sleepIntervalMs, numThreads, null, null, jres);
    }

    public JresBulkRequestor(int batchSize, int sleepIntervalMs, int numThreads, @Nullable String targetIndex, Jres jres) {
        this(batchSize, sleepIntervalMs, numThreads, targetIndex, null, jres);
    }

    public JresBulkRequestor(int batchSize, int sleepIntervalMs, int numThreads, @Nullable String targetIndex, @Nullable String targetType, Jres jres) {
        this.batchSize = batchSize;
        this.sleepIntervalMs = sleepIntervalMs;
        this.numThreads = numThreads;
        this.targetIndex = targetIndex;
        this.targetType = targetType;
        this.jres = jres;
        this.executorService = new ThreadPoolExecutor(2, 2, 1L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new DaemonThreadFactory());
        this.q = new SynchronousQueue<FuturedDocument>(true);
    }

    public JresBulkRequestor start() {
        for (int i = 0; i < this.numThreads; ++i) {
            this.indexerWorkerFutures.add(this.executorService.submit(this));
        }
        return this;
    }

    public Future<?> put(JresBulkable bulkable) {
        FuturedDocument futuredDocument = new FuturedDocument(bulkable);
        try {
            this.q.put(futuredDocument);
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return futuredDocument.future;
    }

    public Future<?> offer(JresBulkable bulkable, long timeout, TimeUnit unit) {
        boolean accepted;
        FuturedDocument futuredDocument = new FuturedDocument(bulkable);
        try {
            accepted = this.q.offer(futuredDocument, timeout, unit);
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return accepted ? futuredDocument.future : null;
    }

    @Override
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                this.indexNextBatch();
            }
            catch (InterruptedException e) {
                LOG.error("Worker interrupted ungracefully. Shutting worker down.", (Object)3);
                Thread.currentThread().interrupt();
            }
            catch (Exception e) {
                String msg = "error in indexer worker, last batch may have been lost";
                LOG.error(msg, (Throwable)e);
            }
        }
        LOG.debug("Worker interrupted. Shutting worker down.");
    }

    private void indexNextBatch() throws InterruptedException {
        FuturedDocument futuredDocument;
        int numDocs;
        ArrayList<JresBulkable> bulk = new ArrayList<JresBulkable>(this.batchSize);
        ArrayList<SettableFuture<JresBulkItemResult>> futures = new ArrayList<SettableFuture<JresBulkItemResult>>(this.batchSize);
        for (numDocs = 0; numDocs < this.batchSize && null != (futuredDocument = this.poll()); ++numDocs) {
            futures.add(futuredDocument.future);
            bulk.add(futuredDocument.bulkable);
        }
        if (bulk.size() > 0) {
            assert (numDocs > 0);
            LOG.info("Submitting bulk index of " + numDocs + " products.");
            if (LOG.isDebugEnabled()) {
                LOG.debug(((Object)bulk).toString());
            }
            JresBulkReply bulkReply = (JresBulkReply)this.jres.quest((JresJsonRequest)new JresBulk(this.targetIndex, this.targetType, bulk));
            ArrayList results = Lists.newArrayList((Iterable)bulkReply.getResults());
            assert (futures.size() == results.size());
            for (int i = 0; i < results.size(); ++i) {
                SettableFuture future = (SettableFuture)futures.get(i);
                JresBulkItemResult result = (JresBulkItemResult)results.get(i);
                if (result.getResult().hasError()) {
                    future.setException((Throwable)new RuntimeException(result.getResult().getError()));
                    continue;
                }
                future.set((Object)result);
            }
        } else {
            Thread.sleep(this.sleepIntervalMs);
        }
    }

    private FuturedDocument poll() {
        try {
            return this.q.poll(5L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            LOG.debug("Poll interrupted. Absorbing this exception. Main loop in this thread should be checking interrupted status and stop the worker in a moment.", (Throwable)e);
            Thread.currentThread().interrupt();
            return null;
        }
    }

    @Override
    public void close() throws IOException {
        LOG.info("Shutting down BulkIndexers");
        while (!this.q.isEmpty()) {
            LOG.info("Waiting for work q to empty.");
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                LOG.warn("Interrupted while shutting down. You interrupted the interrupter! Ignoring.", (Throwable)e);
            }
        }
        try {
            for (Future<?> indexerWorkerFuture : this.indexerWorkerFutures) {
                indexerWorkerFuture.cancel(true);
            }
            this.executorService.shutdown();
            this.executorService.awaitTermination(60L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            LOG.warn("Failed to stop worker threads. Pool may fail to shutdown.", (Throwable)e);
        }
        LOG.info("IndexerWorkers shut down.");
    }

    private static class FuturedDocument {
        final SettableFuture<JresBulkItemResult> future = SettableFuture.create();
        final JresBulkable bulkable;

        FuturedDocument(JresBulkable bulkable) {
            this.bulkable = bulkable;
        }
    }
}

