/*
 * Decompiled with CFR 0.152.
 */
package ca.uhn.fhir.jpa.dao.expunge;

import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
import ca.uhn.fhir.util.StopWatch;
import com.google.common.collect.Lists;
import jakarta.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionRunner {
    private static final Logger ourLog = LoggerFactory.getLogger(PartitionRunner.class);
    private static final int MAX_POOL_SIZE = 1000;
    private final String myProcessName;
    private final String myThreadPrefix;
    private final int myBatchSize;
    private final int myThreadCount;
    private final HapiTransactionService myTransactionService;
    private final RequestDetails myRequestDetails;

    public PartitionRunner(String theProcessName, String theThreadPrefix, int theBatchSize, int theThreadCount) {
        this(theProcessName, theThreadPrefix, theBatchSize, theThreadCount, null, null);
    }

    public PartitionRunner(String theProcessName, String theThreadPrefix, int theBatchSize, int theThreadCount, @Nullable HapiTransactionService theTransactionService, @Nullable RequestDetails theRequestDetails) {
        this.myProcessName = theProcessName;
        this.myThreadPrefix = theThreadPrefix;
        this.myBatchSize = theBatchSize;
        this.myThreadCount = theThreadCount;
        this.myTransactionService = theTransactionService;
        this.myRequestDetails = theRequestDetails;
    }

    public <T> void runInPartitionedThreads(List<T> theResourceIds, Consumer<List<T>> partitionConsumer) {
        List<Callable<Void>> runnableTasks = this.buildCallableTasks(theResourceIds, partitionConsumer);
        if (runnableTasks.isEmpty()) {
            return;
        }
        if (this.myTransactionService != null) {
            runnableTasks = runnableTasks.stream().map(t -> () -> (Void)this.myTransactionService.withRequest(this.myRequestDetails).execute(t)).toList();
        }
        if (runnableTasks.size() == 1) {
            try {
                runnableTasks.get(0).call();
                return;
            }
            catch (PreconditionFailedException preconditionFailedException) {
                throw preconditionFailedException;
            }
            catch (Exception e) {
                ourLog.error("Error while " + this.myProcessName, (Throwable)e);
                throw new InternalErrorException(Msg.code((int)1084) + String.valueOf(e));
            }
        }
        ExecutorService executorService = this.buildExecutor(runnableTasks.size());
        try {
            List futures = runnableTasks.stream().map(executorService::submit).collect(Collectors.toList());
            for (Future future : futures) {
                future.get();
            }
        }
        catch (InterruptedException e) {
            ourLog.error("Interrupted while " + this.myProcessName, (Throwable)e);
            Thread.currentThread().interrupt();
        }
        catch (ExecutionException e) {
            ourLog.error("Error while " + this.myProcessName, (Throwable)e);
            throw new InternalErrorException(Msg.code((int)1085) + String.valueOf(e));
        }
        finally {
            executorService.shutdown();
        }
    }

    private <T> List<Callable<Void>> buildCallableTasks(List<T> theResourceIds, Consumer<List<T>> partitionConsumer) {
        ArrayList<Callable<Void>> retval = new ArrayList<Callable<Void>>();
        if (theResourceIds.size() > this.myBatchSize) {
            ourLog.info("Splitting batch job of {} entries into chunks of {}", (Object)theResourceIds.size(), (Object)this.myBatchSize);
        } else {
            ourLog.info("Creating batch job of {} entries", (Object)theResourceIds.size());
        }
        List partitions = Lists.partition(theResourceIds, (int)this.myBatchSize);
        for (List nextPartition : partitions) {
            if (nextPartition.isEmpty()) continue;
            Callable<Void> callableTask = () -> {
                ourLog.info(this.myProcessName + " {} resources", (Object)nextPartition.size());
                partitionConsumer.accept(nextPartition);
                return null;
            };
            retval.add(callableTask);
        }
        return retval;
    }

    private ExecutorService buildExecutor(int numberOfTasks) {
        int threadCount = Math.min(numberOfTasks, this.myThreadCount);
        assert (threadCount > 0);
        ourLog.info(this.myProcessName + " with {} threads", (Object)threadCount);
        LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<Runnable>(1000);
        BasicThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern(this.myThreadPrefix + "-%d").daemon(false).priority(5).build();
        RejectedExecutionHandler rejectedExecutionHandler = (theRunnable, theExecutor) -> {
            ourLog.info("Note: " + this.myThreadPrefix + " executor queue is full ({} elements), waiting for a slot to become available!", (Object)executorQueue.size());
            StopWatch sw = new StopWatch();
            try {
                executorQueue.put(theRunnable);
            }
            catch (InterruptedException e) {
                throw new RejectedExecutionException(Msg.code((int)1086) + "Task " + theRunnable.toString() + " rejected from " + String.valueOf(e));
            }
            ourLog.info("Slot become available after {}ms", (Object)sw.getMillis());
        };
        return new ThreadPoolExecutor(threadCount, threadCount, 0L, TimeUnit.MILLISECONDS, executorQueue, (ThreadFactory)threadFactory, rejectedExecutionHandler);
    }
}

