/*
 * Decompiled with CFR 0.152.
 */
package picard.vcf.processor;

import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.collect.FluentIterable;
import htsjdk.samtools.util.CloseableIterator;
import htsjdk.samtools.util.Log;
import htsjdk.variant.variantcontext.VariantContext;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import picard.util.AtomicIterator;
import picard.util.Iterators;
import picard.vcf.processor.VariantIteratorProducer;
import picard.vcf.processor.VariantProcessor;

public interface VariantAccumulatorExecutor<ACCUMULATOR extends VariantProcessor.Accumulator<RESULT>, RESULT> {
    public void start();

    public void awaitCompletion() throws InterruptedException;

    public Collection<ACCUMULATOR> accumulators();

    public static class MultiThreadedChunkBased<A extends VariantProcessor.Accumulator<R>, R>
    implements VariantAccumulatorExecutor<A, R> {
        private static final Log LOG = Log.getInstance(MultiThreadedChunkBased.class);
        final AtomicIterator<CloseableIterator<VariantContext>> vcIterators;
        final ExecutorService executor;
        final Collection<A> accumulators = Collections.synchronizedCollection(new ArrayList());
        volatile boolean started = false;
        final int numThreads;
        private final List<Throwable> childrenErrors = Collections.synchronizedList(new ArrayList());
        final VariantProcessor.AccumulatorGenerator<A, R> accumulatorGenerator;

        public MultiThreadedChunkBased(int n, VariantIteratorProducer variantIteratorProducer, VariantProcessor.AccumulatorGenerator<A, R> accumulatorGenerator) {
            this.executor = Executors.newFixedThreadPool(n);
            this.vcIterators = Iterators.atomicIteratorOf(variantIteratorProducer.iterators());
            this.numThreads = n;
            this.accumulatorGenerator = accumulatorGenerator;
        }

        @Override
        public synchronized void start() {
            this.started = true;
            for (int i = 0; i < this.numThreads; ++i) {
                A a = this.accumulatorGenerator.build();
                this.accumulators.add(a);
                this.executor.submit(new Worker((VariantProcessor.Accumulator)a));
            }
            this.executor.shutdown();
        }

        @Override
        public synchronized Collection<A> accumulators() {
            return Collections.unmodifiableCollection(this.accumulators);
        }

        @Override
        public void awaitCompletion() throws InterruptedException {
            if (!this.started) {
                throw new IllegalStateException("This method can be called only after the executor has been started.");
            }
            this.executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
            if (!this.childrenErrors.isEmpty()) {
                throw new MultiException(this.childrenErrors);
            }
        }

        class Worker
        implements Runnable {
            final VariantProcessor.Accumulator processor;

            Worker(VariantProcessor.Accumulator accumulator) {
                this.processor = accumulator;
            }

            @Override
            public void run() {
                try {
                    Optional<CloseableIterator<VariantContext>> optional;
                    while ((optional = MultiThreadedChunkBased.this.vcIterators.next()).isPresent()) {
                        CloseableIterator closeableIterator = (CloseableIterator)optional.get();
                        while (closeableIterator.hasNext()) {
                            this.processor.accumulate((VariantContext)closeableIterator.next());
                        }
                        closeableIterator.close();
                        if (MultiThreadedChunkBased.this.childrenErrors.isEmpty()) continue;
                        LOG.error(new Object[]{Thread.currentThread() + " aborting: observed error in another child thread."});
                        break;
                    }
                }
                catch (Throwable throwable) {
                    try {
                        MultiThreadedChunkBased.this.childrenErrors.add(throwable);
                        LOG.error(throwable, new Object[]{"Unexpected exception encountered in child thread."});
                    }
                    catch (Throwable throwable2) {
                        LOG.debug(new Object[]{String.format("Thread %s is finishing.", Thread.currentThread())});
                        throw throwable2;
                    }
                    LOG.debug(new Object[]{String.format("Thread %s is finishing.", Thread.currentThread())});
                }
                LOG.debug(new Object[]{String.format("Thread %s is finishing.", Thread.currentThread())});
            }
        }

        static class MultiException
        extends RuntimeException {
            final List<Throwable> childrenExceptions;

            public MultiException(List<Throwable> list) {
                this.childrenExceptions = list;
            }

            @Override
            public String getMessage() {
                return "Children threads encountered exceptions:\n" + Joiner.on((String)"\n\t").join((Iterable)FluentIterable.from(this.childrenExceptions).transform((Function)new Function<Throwable, String>(){

                    public String apply(Throwable throwable) {
                        return throwable.getMessage();
                    }
                }));
            }
        }
    }
}

