/*
 * Decompiled with CFR 0.152.
 */
package org.apache.james.mailetcontainer.impl;

import com.github.fge.lambdas.Throwing;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import org.apache.commons.configuration2.HierarchicalConfiguration;
import org.apache.james.lifecycle.api.Configurable;
import org.apache.james.lifecycle.api.Disposable;
import org.apache.james.lifecycle.api.LifecycleUtil;
import org.apache.james.mailetcontainer.api.MailProcessor;
import org.apache.james.mailetcontainer.api.jmx.MailSpoolerMBean;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.metrics.api.TimeMetric;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueueFactory;
import org.apache.james.util.concurrent.NamedThreadFactory;
import org.apache.mailet.Mail;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class JamesMailSpooler
implements Disposable,
Configurable,
MailSpoolerMBean {
    private static final Logger LOGGER = LoggerFactory.getLogger(JamesMailSpooler.class);
    public static final String SPOOL_PROCESSING = "spoolProcessing";
    private MailQueue queue;
    private int numThreads;
    private final AtomicInteger processingActive = new AtomicInteger(0);
    private final MetricFactory metricFactory;
    private MailProcessor mailProcessor;
    private MailQueueFactory<?> queueFactory;
    private reactor.core.Disposable disposable;
    private Scheduler spooler;

    @Inject
    public JamesMailSpooler(MetricFactory metricFactory) {
        this.metricFactory = metricFactory;
    }

    @Inject
    public void setMailQueueFactory(MailQueueFactory<?> queueFactory) {
        this.queueFactory = queueFactory;
    }

    @Inject
    public void setMailProcessor(MailProcessor mailProcessor) {
        this.mailProcessor = mailProcessor;
    }

    public void configure(HierarchicalConfiguration config) {
        this.numThreads = config.getInt("threads", 100);
    }

    @PostConstruct
    public void init() {
        LOGGER.info("init...");
        this.queue = this.queueFactory.createQueue("spool");
        this.spooler = Schedulers.fromExecutor((Executor)Executors.newFixedThreadPool(this.numThreads, (ThreadFactory)NamedThreadFactory.withName((String)"spooler")));
        LOGGER.info("uses {} Thread(s)", (Object)this.numThreads);
        this.run();
    }

    private void run() {
        LOGGER.info("Queue={}", (Object)this.queue);
        this.disposable = Flux.from((Publisher)this.queue.deQueue()).publishOn(this.spooler).flatMap(this::handleOnQueueItem).onErrorContinue((throwable, item) -> LOGGER.error("Exception processing mail while spooling {}", item, throwable)).subscribeOn(Schedulers.elastic()).subscribe();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Mono<Void> handleOnQueueItem(MailQueue.MailQueueItem queueItem) {
        TimeMetric timeMetric = this.metricFactory.timer(SPOOL_PROCESSING);
        try {
            this.processingActive.incrementAndGet();
            Mono<Void> mono = this.processMail(queueItem);
            return mono;
        }
        catch (Throwable e) {
            Mono mono = Mono.error((Throwable)e);
            return mono;
        }
        finally {
            this.processingActive.decrementAndGet();
            timeMetric.stopAndPublish();
        }
    }

    private Mono<Void> processMail(MailQueue.MailQueueItem queueItem) {
        Mail mail = queueItem.getMail();
        return Mono.fromRunnable(() -> LOGGER.debug("==== Begin processing mail {} ====", (Object)mail.getName())).subscribeOn(Schedulers.elastic()).then(Mono.fromCallable(() -> this.performProcessMail(mail))).flatMap(any -> this.acknowledgeItem(queueItem, true)).onErrorResume(any -> this.acknowledgeItem(queueItem, false)).then(Mono.fromRunnable(() -> LOGGER.debug("==== End processing mail {} ====", (Object)mail.getName()))).then(Mono.fromRunnable(() -> LifecycleUtil.dispose((Object)mail)));
    }

    private Mono<Void> acknowledgeItem(MailQueue.MailQueueItem queueItem, boolean success) {
        return Mono.fromRunnable((Runnable)Throwing.runnable(() -> queueItem.done(success)).sneakyThrow());
    }

    private boolean performProcessMail(Mail mail) {
        try {
            this.mailProcessor.service(mail);
            if (Thread.currentThread().isInterrupted()) {
                throw new InterruptedException("Thread has been interrupted");
            }
            return true;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @PreDestroy
    public void dispose() {
        LOGGER.info("start dispose() ...");
        this.disposable.dispose();
        this.spooler.dispose();
        LOGGER.info("thread shutdown completed.");
    }

    public int getThreadCount() {
        return this.numThreads;
    }

    public int getCurrentSpoolCount() {
        return this.processingActive.get();
    }
}

