/*
 * Decompiled with CFR 0.152.
 */
package nablarch.fw.handler;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import nablarch.core.ThreadContext;
import nablarch.core.log.Logger;
import nablarch.core.log.LoggerManager;
import nablarch.core.log.app.CommitLogger;
import nablarch.core.log.app.FailureLogUtil;
import nablarch.fw.ExecutionContext;
import nablarch.fw.Result;
import nablarch.fw.handler.ExecutionHandler;
import nablarch.fw.handler.ExecutionHandlerCallback;
import nablarch.fw.results.InternalError;

public class MultiThreadExecutionHandler
implements ExecutionHandler<Object, Result.MultiStatus, MultiThreadExecutionHandler> {
    private int concurrentNumber = 1;
    private long terminationTimeout = 600L;
    private CommitLogger commitLogger;
    private ThreadPoolExecutor taskExecutor = null;
    private CompletionService<Result> taskStatus = null;
    private List<Future<Result>> taskTracker = null;
    private final ExecutionHandler.Support<Object, Result.MultiStatus> support = new ExecutionHandler.Support();
    private static final Logger LOGGER = LoggerManager.get(MultiThreadExecutionHandler.class);

    public Result.MultiStatus handle(Object data, ExecutionContext context) {
        ThreadContext.setConcurrentNumber((int)this.concurrentNumber);
        List<ExecutionHandlerCallback> listeners = this.support.prepareListeners(data, context);
        this.support.callPreExecution(listeners, data, context);
        this.support.prepareDataReader(data, context);
        if (this.commitLogger != null) {
            this.commitLogger.initialize();
            context.setSessionScopedVar("nablarch_.commit-logger", (Object)this.commitLogger);
        }
        this.initializeThreadPool();
        for (int i = 0; i < this.concurrentNumber; ++i) {
            this.taskTracker.add(this.taskStatus.submit(this.createTaskFor(data, context)));
        }
        Result.MultiStatus results = new Result.MultiStatus();
        Throwable error = null;
        try {
            for (int i = 0; i < this.concurrentNumber; ++i) {
                results.addResults(new Result[]{this.taskStatus.take().get()});
            }
            this.terminate(context);
        }
        catch (ExecutionException e) {
            error = e;
            try {
                this.terminate(context);
            }
            catch (Throwable t) {
                LOGGER.logWarn("an error occurred while terminating (or waiting to end) subthreads.", t, new Object[0]);
            }
            this.support.callErrorInExecution(listeners, e, context);
            Throwable cause = e.getCause();
            if (cause instanceof Result.Error) {
                Result.Error err = (Result.Error)cause;
                results.addResults(new Result[]{err});
                throw err;
            }
            if (cause instanceof RuntimeException) {
                results.addResults(new Result[]{new InternalError(cause)});
                throw (RuntimeException)cause;
            }
            if (cause instanceof Error) {
                results.addResults(new Result[]{new InternalError(cause)});
                throw (Error)cause;
            }
            throw new RuntimeException("application thread ended abnormally.", cause);
        }
        catch (InterruptedException e) {
            error = e;
            try {
                this.terminate(context);
                Thread.currentThread().interrupt();
            }
            catch (Throwable t) {
                LOGGER.logWarn("an error occurred while terminating (or waiting to end) subthreads.", t, new Object[0]);
            }
            this.support.callErrorInExecution(listeners, e, context);
            throw new RuntimeException("execution was canceled.", e);
        }
        catch (RuntimeException e) {
            error = e;
            try {
                this.terminate(context);
            }
            catch (Throwable t) {
                LOGGER.logWarn("an error occurred while terminating (or waiting to end)  subthreads.", t, new Object[0]);
            }
            this.support.callErrorInExecution(listeners, e, context);
            throw e;
        }
        catch (Error e) {
            error = e;
            try {
                this.terminate(context);
            }
            catch (Throwable t) {
                LOGGER.logWarn("an error occurred while terminating (or waiting to end) subthreads.", t, new Object[0]);
            }
            this.support.callErrorInExecution(listeners, e, context);
            throw e;
        }
        finally {
            block31: {
                try {
                    if (this.commitLogger != null) {
                        this.commitLogger.terminate();
                    }
                    this.support.callPostExecution(listeners, results, context);
                }
                catch (RuntimeException e) {
                    if (error == null) {
                        throw e;
                    }
                }
                catch (Error e) {
                    if (error != null) break block31;
                    throw e;
                }
            }
        }
        return results;
    }

    private void terminate(ExecutionContext context) {
        context.closeReader();
        if (this.taskExecutor.isShutdown()) {
            return;
        }
        this.taskExecutor.shutdownNow();
        boolean terminatedInTime = false;
        try {
            terminatedInTime = this.taskExecutor.awaitTermination(this.terminationTimeout, TimeUnit.SECONDS);
        }
        catch (InterruptedException ie) {
            LOGGER.logWarn("termination was cancelled.", (Throwable)ie, new Object[0]);
            Thread.currentThread().interrupt();
        }
        if (terminatedInTime) {
            this.reportThreadStatus(context);
        } else {
            LOGGER.logWarn("some running tasks could not stop in time. terminationTimeout: " + this.terminationTimeout + " msec.", new Object[0]);
            Thread.currentThread().interrupt();
        }
    }

    private void reportThreadStatus(ExecutionContext context) {
        StringBuilder report = new StringBuilder(Logger.LS);
        for (Future<Result> future : this.taskTracker) {
            try {
                Result result = future.get();
                report.append("Thread Status: normal end." + Logger.LS).append("Thread Result:" + String.valueOf(result) + Logger.LS);
            }
            catch (ExecutionException e) {
                FailureLogUtil.logWarn((Throwable)e, (Object)context.getDataProcessedWhenThrown(e.getCause()), null, (Object[])new Object[0]);
            }
            catch (InterruptedException e) {
                FailureLogUtil.logWarn((Throwable)e, (Object)context.getDataProcessedWhenThrown((Throwable)e), null, (Object[])new Object[0]);
            }
        }
        LOGGER.logInfo(report.toString(), new Object[0]);
    }

    private void initializeThreadPool() {
        if (this.taskExecutor == null || this.taskExecutor.isTerminated()) {
            this.taskExecutor = new ThreadPoolExecutor(this.concurrentNumber, this.concurrentNumber, Long.MAX_VALUE, TimeUnit.NANOSECONDS, new LinkedBlockingQueue<Runnable>());
        } else {
            this.taskExecutor.purge();
        }
        this.taskStatus = new ExecutorCompletionService<Result>(this.taskExecutor);
        this.taskTracker = new ArrayList<Future<Result>>();
    }

    private Callable<Result> createTaskFor(final Object data, final ExecutionContext context) {
        final ExecutionContext clonedContext = context.copy();
        return new Callable<Result>(){

            @Override
            public Result call() throws Exception {
                try {
                    return (Result)clonedContext.handleNext(data);
                }
                catch (RuntimeException e) {
                    this.setDataOnException(e, context, clonedContext);
                    throw e;
                }
                catch (Error e) {
                    this.setDataOnException(e, context, clonedContext);
                    throw e;
                }
            }

            private void setDataOnException(Throwable e, ExecutionContext parent, ExecutionContext child) {
                parent.putDataOnException(e, child.getLastReadData());
            }
        };
    }

    public MultiThreadExecutionHandler setConcurrentNumber(int concurrentNumber) {
        if (concurrentNumber < 1) {
            throw new IllegalArgumentException("concurrentNumber must be greater than or equal to 1.");
        }
        this.concurrentNumber = concurrentNumber;
        return this;
    }

    public MultiThreadExecutionHandler setTerminationTimeout(int terminationTimeout) {
        this.terminationTimeout = terminationTimeout;
        return this;
    }

    public void setCommitLogger(CommitLogger commitLogger) {
        this.commitLogger = commitLogger;
    }
}

