/*
 * Decompiled with CFR 0.152.
 */
package at.molindo.esi4j.chain.impl;

import at.molindo.esi4j.action.BulkResponseWrapper;
import at.molindo.esi4j.chain.Esi4JEntityResolver;
import at.molindo.esi4j.chain.Esi4JEntityTask;
import at.molindo.esi4j.chain.impl.QueuedTaskProcessor;
import at.molindo.esi4j.chain.impl.SerializableEsi4JOperation;
import at.molindo.esi4j.core.Esi4JOperation;
import at.molindo.utils.collections.ArrayUtils;
import java.io.Serializable;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueuedTaskExecutor {
    private static final Logger log = LoggerFactory.getLogger(QueuedTaskExecutor.class);
    private static final AtomicInteger EXECUTOR_NUMBER = new AtomicInteger(1);
    private final int _executorNumber = EXECUTOR_NUMBER.getAndIncrement();
    private final AtomicInteger _threadNumber = new AtomicInteger(1);
    private final QueuedTaskProcessor _queuedTaskProcessor;
    private final Esi4JEntityResolver _entityResolver;
    private final ThreadPoolExecutor _executorService;
    private final ReentrantReadWriteLock _executionOrderLock = new ReentrantReadWriteLock(true);
    private final int _poolSize;

    public QueuedTaskExecutor(QueuedTaskProcessor queuedTaskProcessor, Esi4JEntityResolver entityResolver) {
        if (queuedTaskProcessor == null) {
            throw new NullPointerException("queuedTaskProcessor");
        }
        this._queuedTaskProcessor = queuedTaskProcessor;
        this._entityResolver = entityResolver;
        this._poolSize = (Runtime.getRuntime().availableProcessors() + 1) / 2;
        this._executorService = this.newExecutorService();
    }

    private ThreadPoolExecutor newExecutorService() {
        log.info("creating new QueuedTaskExecutor with " + this._poolSize + " threads");
        ThreadFactory factory = new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                return new ExecutorThread(r);
            }
        };
        RejectedExecutionHandler handler = new RejectedExecutionHandler(){

            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                log.warn("executor rejected execution of bulk index task");
            }
        };
        return new ThreadPoolExecutor(this._poolSize, this._poolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), factory, handler);
    }

    public void execute(Esi4JEntityTask[] tasks) {
        if (!ArrayUtils.empty((Object[])tasks)) {
            if (this._entityResolver != null) {
                for (int i = 0; i < tasks.length; ++i) {
                    tasks[i].replaceEntity(this._entityResolver);
                }
            }
            this._executorService.execute(new BulkIndexRunnable(tasks));
        }
    }

    public <T> T submit(SerializableEsi4JOperation<T> operation) {
        try {
            Object value = this._executorService.submit(new OperationCallable(operation)).get();
            if (log.isDebugEnabled()) {
                log.debug("finished submitted operation");
            }
            return value;
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public void close() {
        this._executorService.shutdown();
        try {
            this._executorService.awaitTermination(60L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            log.warn("waiting for termination of executor service interrupted", (Throwable)e);
        }
    }

    public QueuedTaskProcessor getTaskProcessor() {
        return this._queuedTaskProcessor;
    }

    public Esi4JEntityResolver getEntityResolver() {
        return this._entityResolver;
    }

    private static final class BulkIndexRunnable
    implements Runnable,
    Serializable {
        private static final long serialVersionUID = 1L;
        private final Esi4JEntityTask[] _tasks;

        public BulkIndexRunnable(Esi4JEntityTask[] tasks) {
            this._tasks = tasks;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            QueuedTaskExecutor executor = ((ExecutorThread)Thread.currentThread()).getQueuedTaskExecutor();
            executor._executionOrderLock.readLock().lock();
            try {
                executor.getTaskProcessor().onBeforeBulkIndex();
                try {
                    this.index(executor);
                }
                finally {
                    executor.getTaskProcessor().onAfterBulkIndex();
                }
            }
            finally {
                executor._executionOrderLock.readLock().unlock();
            }
        }

        private void index(QueuedTaskExecutor executor) {
            Esi4JEntityResolver entityResolver = executor.getEntityResolver();
            if (entityResolver != null) {
                for (int i = 0; i < this._tasks.length; ++i) {
                    this._tasks[i].resolveEntity(entityResolver);
                }
            }
            BulkResponseWrapper response = (BulkResponseWrapper)executor.getTaskProcessor().getIndex().executeBulk(new Esi4JOperation<ListenableActionFuture<BulkResponse>>(){

                @Override
                public ListenableActionFuture<BulkResponse> execute(Client client, String indexName, Esi4JOperation.OperationContext helper) {
                    BulkRequestBuilder bulk = client.prepareBulk();
                    for (int i = 0; i < BulkIndexRunnable.this._tasks.length; ++i) {
                        BulkIndexRunnable.this._tasks[i].addToBulk(bulk, indexName, helper);
                    }
                    ListenableActionFuture response = bulk.execute();
                    return response;
                }
            }).actionGet();
            int failed = 0;
            for (BulkItemResponse item : response.getBulkResponse()) {
                if (!item.isFailed()) continue;
                ++failed;
            }
            if (failed > 0) {
                log.warn("failed to index " + failed + " items. index might be out of sync");
            }
            if (log.isDebugEnabled()) {
                int indexed = response.getBulkResponse().getItems().length - failed;
                log.debug("finished bulk indexing " + indexed + " items");
            }
        }
    }

    private static final class OperationCallable<T>
    implements Callable<T>,
    Serializable {
        private static final long serialVersionUID = 1L;
        private final SerializableEsi4JOperation<T> _operation;

        private OperationCallable(SerializableEsi4JOperation<T> operation) {
            if (operation == null) {
                throw new NullPointerException("operation");
            }
            this._operation = operation;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public T call() throws Exception {
            if (this._operation == null) {
                return null;
            }
            QueuedTaskExecutor executor = ((ExecutorThread)Thread.currentThread()).getQueuedTaskExecutor();
            executor._executionOrderLock.writeLock().lock();
            try {
                T t = executor.getTaskProcessor().getIndex().execute(this._operation);
                return t;
            }
            finally {
                executor._executionOrderLock.writeLock().unlock();
            }
        }
    }

    private final class ExecutorThread
    extends Thread {
        public ExecutorThread(Runnable r) {
            super(r, QueuedTaskProcessor.class.getSimpleName() + "-" + QueuedTaskExecutor.this._executorNumber + "-" + QueuedTaskExecutor.this._threadNumber.getAndIncrement());
            this.setDaemon(true);
        }

        public QueuedTaskExecutor getQueuedTaskExecutor() {
            return QueuedTaskExecutor.this;
        }
    }
}

