/*
 * Decompiled with CFR 0.152.
 */
package ca.uhn.fhir.jpa.subscription.channel.impl;

import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
import ca.uhn.fhir.util.StopWatch;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PreDestroy;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LinkedBlockingChannelFactory
implements IChannelFactory {
    private static final Logger ourLog = LoggerFactory.getLogger(LinkedBlockingChannelFactory.class);
    private Map<String, LinkedBlockingChannel> myChannels = Collections.synchronizedMap(new HashMap());

    @Override
    public IChannelReceiver getOrCreateReceiver(String theChannelName, Class<?> theMessageType, ChannelConsumerSettings theConfig) {
        return this.getOrCreateChannel(theChannelName, theConfig.getConcurrentConsumers());
    }

    @Override
    public IChannelProducer getOrCreateProducer(String theChannelName, Class<?> theMessageType, ChannelConsumerSettings theConfig) {
        return this.getOrCreateChannel(theChannelName, theConfig.getConcurrentConsumers());
    }

    private LinkedBlockingChannel getOrCreateChannel(String theChannelName, int theConcurrentConsumers) {
        return this.myChannels.computeIfAbsent(theChannelName, t -> {
            String threadNamingPattern = theChannelName + "-%d";
            BasicThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern(threadNamingPattern).daemon(false).priority(5).build();
            LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(1000);
            RejectedExecutionHandler rejectedExecutionHandler = (theRunnable, theExecutor) -> {
                ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", (Object)queue.size());
                StopWatch sw = new StopWatch();
                try {
                    queue.put(theRunnable);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RejectedExecutionException("Task " + theRunnable.toString() + " rejected from " + e.toString());
                }
                ourLog.info("Slot become available after {}ms", (Object)sw.getMillis());
            };
            ThreadPoolExecutor executor = new ThreadPoolExecutor(1, theConcurrentConsumers, 0L, TimeUnit.MILLISECONDS, queue, (ThreadFactory)threadFactory, rejectedExecutionHandler);
            return new LinkedBlockingChannel(executor, queue);
        });
    }

    @PreDestroy
    public void stop() {
        this.myChannels.clear();
    }
}

