/*
 * Decompiled with CFR 0.152.
 */
package ca.uhn.fhir.jpa.subscription.triggering;

import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IDao;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.api.svc.ISearchSvc;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.jpa.model.config.SubscriptionSettings;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.triggering.ISubscriptionTriggeringSvc;
import ca.uhn.fhir.rest.api.CacheControlDirective;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import ca.uhn.fhir.rest.server.messaging.BaseResourceMessage;
import ca.uhn.fhir.util.ParametersUtil;
import ca.uhn.fhir.util.StopWatch;
import ca.uhn.fhir.util.UrlUtil;
import ca.uhn.fhir.util.ValidateUtil;
import com.google.common.collect.Lists;
import jakarta.annotation.Nullable;
import jakarta.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.hl7.fhir.dstu2.model.IdType;
import org.hl7.fhir.instance.model.api.IBaseParameters;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

public class SubscriptionTriggeringSvcImpl
implements ISubscriptionTriggeringSvc,
IHasScheduledJobs {
    private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTriggeringSvcImpl.class);
    private static final int DEFAULT_MAX_SUBMIT = 10000;
    private final List<SubscriptionTriggeringJobDetails> myActiveJobs = new ArrayList<SubscriptionTriggeringJobDetails>();
    @Autowired
    private FhirContext myFhirContext;
    @Autowired
    private DaoRegistry myDaoRegistry;
    @Autowired
    private SubscriptionSettings mySubscriptionSettings;
    @Autowired
    private ISearchCoordinatorSvc<? extends IResourcePersistentId<?>> mySearchCoordinatorSvc;
    @Autowired
    private MatchUrlService myMatchUrlService;
    @Autowired
    private IResourceModifiedConsumer myResourceModifiedConsumer;
    @Autowired
    private HapiTransactionService myTransactionService;
    private int myMaxSubmitPerPass = 10000;
    private ExecutorService myExecutorService;
    @Autowired
    private ISearchSvc mySearchService;
    @Autowired
    IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
    @Autowired
    private SearchBuilderFactory mySearchBuilderFactory;
    @Autowired
    private SubscriptionCanonicalizer mySubscriptionCanonicalizer;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public IBaseParameters triggerSubscription(@Nullable List<IPrimitiveType<String>> theResourceIds, @Nullable List<IPrimitiveType<String>> theSearchUrls, @Nullable IIdType theSubscriptionId, RequestDetails theRequestDetails) {
        Object next2;
        IFhirResourceDao subscriptionDao;
        IBaseResource subscription;
        if (this.mySubscriptionSettings.getSupportedSubscriptionTypes().isEmpty()) {
            throw new PreconditionFailedException(Msg.code((int)22) + "Subscription processing not active on this server");
        }
        RequestPartitionId requestPartitionId = theSubscriptionId != null ? (this.mySubscriptionCanonicalizer.canonicalize(subscription = (subscriptionDao = this.myDaoRegistry.getSubscriptionDao()).read(theSubscriptionId, theRequestDetails)).isCrossPartitionEnabled() ? RequestPartitionId.allPartitions() : this.myRequestPartitionHelperSvc.determineGenericPartitionForRequest(theRequestDetails)) : this.myRequestPartitionHelperSvc.determineGenericPartitionForRequest(theRequestDetails);
        List resourceIds = (List)ObjectUtils.defaultIfNull(theResourceIds, Collections.emptyList());
        List searchUrls = (List)ObjectUtils.defaultIfNull(theSearchUrls, Collections.emptyList());
        if (resourceIds.isEmpty() && searchUrls.isEmpty()) {
            throw new InvalidRequestException(Msg.code((int)23) + "No resource IDs or search URLs specified for triggering");
        }
        for (Object next2 : resourceIds) {
            IdType resourceId = new IdType((String)next2.getValue());
            ValidateUtil.isTrueOrThrowInvalidRequest((boolean)resourceId.hasResourceType(), (String)"resourceId parameter must have resource type", (Object[])new Object[0]);
            ValidateUtil.isTrueOrThrowInvalidRequest((boolean)resourceId.hasIdPart(), (String)"resourceId parameter must have resource ID part", (Object[])new Object[0]);
        }
        for (Object next2 : searchUrls) {
            if (((String)next2.getValue()).contains("?")) continue;
            throw new InvalidRequestException(Msg.code((int)24) + "Search URL is not valid (must be in the form \"[resource type]?[optional params]\")");
        }
        SubscriptionTriggeringJobDetails jobDetails = new SubscriptionTriggeringJobDetails();
        jobDetails.setJobId(UUID.randomUUID().toString());
        jobDetails.setRequestPartitionId(requestPartitionId == null ? RequestPartitionId.allPartitions() : requestPartitionId);
        jobDetails.setRemainingResourceIds(resourceIds.stream().map(IPrimitiveType::getValue).collect(Collectors.toList()));
        jobDetails.setRemainingSearchUrls(searchUrls.stream().map(IPrimitiveType::getValue).collect(Collectors.toList()));
        if (theSubscriptionId != null) {
            jobDetails.setSubscriptionId(theSubscriptionId.getIdPart());
        }
        next2 = this.myActiveJobs;
        synchronized (next2) {
            this.myActiveJobs.add(jobDetails);
            ourLog.info("Subscription triggering requested for {} resource and {} search - Gave job ID: {} and have {} jobs", new Object[]{resourceIds.size(), searchUrls.size(), jobDetails.getJobId(), this.myActiveJobs.size()});
        }
        IBaseParameters retVal = ParametersUtil.newInstance((FhirContext)this.myFhirContext);
        IPrimitiveType value = (IPrimitiveType)this.myFhirContext.getElementDefinition("string").newInstance();
        value.setValueAsString("Subscription triggering job submitted as JOB ID: " + jobDetails.myJobId);
        ParametersUtil.addParameterToParameters((FhirContext)this.myFhirContext, (IBaseParameters)retVal, (String)"information", (Object)value);
        return retVal;
    }

    public IBaseParameters triggerSubscription(@Nullable List<IPrimitiveType<String>> theResourceIds, @Nullable List<IPrimitiveType<String>> theSearchUrls, @Nullable IIdType theSubscriptionId) {
        return this.triggerSubscription(theResourceIds, theSearchUrls, theSubscriptionId, (RequestDetails)SystemRequestDetails.newSystemRequestAllPartitions());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void runDeliveryPass() {
        List<SubscriptionTriggeringJobDetails> list = this.myActiveJobs;
        synchronized (list) {
            if (this.myActiveJobs.isEmpty()) {
                return;
            }
            String activeJobIds = this.myActiveJobs.stream().map(SubscriptionTriggeringJobDetails::getJobId).collect(Collectors.joining(", "));
            ourLog.info("Starting pass: currently have {} active job IDs: {}", (Object)this.myActiveJobs.size(), (Object)activeJobIds);
            SubscriptionTriggeringJobDetails activeJob = this.myActiveJobs.get(0);
            this.runJob(activeJob);
            if (activeJob.getRemainingResourceIds().isEmpty() && activeJob.getRemainingSearchUrls().isEmpty() && this.jobHasCompleted(activeJob)) {
                this.myActiveJobs.remove(0);
                Object remainingJobsMsg = "";
                if (this.myActiveJobs.size() > 0) {
                    remainingJobsMsg = "(" + this.myActiveJobs.size() + " jobs remaining)";
                }
                ourLog.info("Subscription triggering job {} is complete{}", (Object)activeJob.getJobId(), remainingJobsMsg);
            }
        }
    }

    private void runJob(SubscriptionTriggeringJobDetails theJobDetails) {
        StopWatch sw = new StopWatch();
        ourLog.info("Starting pass of subscription triggering job {}", (Object)theJobDetails.getJobId());
        AtomicInteger totalSubmitted = new AtomicInteger(0);
        ArrayList futures = new ArrayList();
        while (!theJobDetails.getRemainingResourceIds().isEmpty() && totalSubmitted.get() < this.myMaxSubmitPerPass) {
            totalSubmitted.incrementAndGet();
            String nextResourceId = theJobDetails.getRemainingResourceIds().remove(0);
            this.submitResource(theJobDetails.getSubscriptionId(), theJobDetails.getRequestPartitionId(), nextResourceId);
        }
        if (this.validateFuturesAndReturnTrueIfWeShouldAbort(futures)) {
            return;
        }
        IBundleProvider search = null;
        if (this.isInitialStep(theJobDetails) && CollectionUtils.isNotEmpty(theJobDetails.getRemainingSearchUrls()) && totalSubmitted.get() < this.myMaxSubmitPerPass) {
            String nextSearchUrl = theJobDetails.getRemainingSearchUrls().remove(0);
            RuntimeResourceDefinition resourceDef = UrlUtil.parseUrlResourceType((FhirContext)this.myFhirContext, (String)nextSearchUrl);
            String queryPart = nextSearchUrl.substring(nextSearchUrl.indexOf(63));
            SearchParameterMap params = this.myMatchUrlService.translateMatchUrl(queryPart, resourceDef, new MatchUrlService.Flag[0]);
            String resourceType = resourceDef.getName();
            IFhirResourceDao callingDao = this.myDaoRegistry.getResourceDao(resourceType);
            ourLog.info("Triggering job[{}] is starting a search for {}", (Object)theJobDetails.getJobId(), (Object)nextSearchUrl);
            search = this.mySearchCoordinatorSvc.registerSearch(callingDao, params, resourceType, new CacheControlDirective(), null, theJobDetails.getRequestPartitionId());
            if (Objects.isNull(search.getUuid())) {
                theJobDetails.setCurrentSearchUrl(nextSearchUrl);
                theJobDetails.setCurrentOffset(params.getOffset());
            } else {
                theJobDetails.setCurrentSearchUuid(search.getUuid());
            }
            theJobDetails.setCurrentSearchResourceType(resourceType);
            theJobDetails.setCurrentSearchCount(params.getCount());
            theJobDetails.setCurrentSearchLastUploadedIndex(-1);
        }
        if (StringUtils.isNotBlank((CharSequence)theJobDetails.getCurrentSearchUrl()) && totalSubmitted.get() < this.myMaxSubmitPerPass) {
            this.processSynchronous(theJobDetails, totalSubmitted, futures, search);
        }
        if (StringUtils.isNotBlank((CharSequence)theJobDetails.getCurrentSearchUuid()) && totalSubmitted.get() < this.myMaxSubmitPerPass) {
            this.processAsynchronous(theJobDetails, totalSubmitted, futures);
        }
        ourLog.info("Subscription trigger job[{}] triggered {} resources in {}ms ({} res / second)", new Object[]{theJobDetails.getJobId(), totalSubmitted, sw.getMillis(), sw.getThroughput((long)totalSubmitted.get(), TimeUnit.SECONDS)});
    }

    private void processAsynchronous(SubscriptionTriggeringJobDetails theJobDetails, AtomicInteger totalSubmitted, List<Future<?>> futures) {
        List allResourceIds;
        int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1;
        IFhirResourceDao resourceDao = this.myDaoRegistry.getResourceDao(theJobDetails.getCurrentSearchResourceType());
        int maxQuerySize = this.myMaxSubmitPerPass - totalSubmitted.get();
        int toIndex = theJobDetails.getCurrentSearchCount() != null ? Math.min(fromIndex + maxQuerySize, theJobDetails.getCurrentSearchCount()) : fromIndex + maxQuerySize;
        ourLog.info("Triggering job[{}] search {} requesting resources {} - {} from partition {}", new Object[]{theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex, theJobDetails.getRequestPartitionId()});
        RequestPartitionId requestPartitionId = theJobDetails.getRequestPartitionId();
        try {
            allResourceIds = this.mySearchCoordinatorSvc.getResources(theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex, null, requestPartitionId);
        }
        catch (ResourceGoneException e) {
            ourLog.trace("Search has expired, submission is done.");
            allResourceIds = new ArrayList();
        }
        ourLog.info("Triggering job[{}] delivering {} resources", (Object)theJobDetails.getJobId(), (Object)allResourceIds.size());
        AtomicInteger highestIndexSubmitted = new AtomicInteger(theJobDetails.getCurrentSearchLastUploadedIndex());
        List partitions = Lists.partition(allResourceIds, (int)100);
        for (List resourceIds : partitions) {
            Runnable job = () -> {
                String resourceType = this.myFhirContext.getResourceType(theJobDetails.getCurrentSearchResourceType());
                RuntimeResourceDefinition resourceDef = this.myFhirContext.getResourceDefinition(theJobDetails.getCurrentSearchResourceType());
                ISearchBuilder searchBuilder = this.mySearchBuilderFactory.newSearchBuilder((IDao)resourceDao, resourceType, resourceDef.getImplementingClass());
                ArrayList listToPopulate = new ArrayList();
                this.myTransactionService.withRequest(null).execute(() -> searchBuilder.loadResourcesByPid((Collection)resourceIds, Collections.emptyList(), listToPopulate, false, (RequestDetails)new SystemRequestDetails()));
                for (IBaseResource nextResource : listToPopulate) {
                    this.submitResource(theJobDetails.getSubscriptionId(), theJobDetails.getRequestPartitionId(), nextResource);
                    totalSubmitted.incrementAndGet();
                    highestIndexSubmitted.incrementAndGet();
                }
            };
            Future<?> future = this.myExecutorService.submit(job);
            futures.add(future);
        }
        if (!this.validateFuturesAndReturnTrueIfWeShouldAbort(futures)) {
            theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted.get());
            if (allResourceIds.isEmpty() || theJobDetails.getCurrentSearchCount() != null && toIndex >= theJobDetails.getCurrentSearchCount()) {
                ourLog.info("Triggering job[{}] search {} has completed ", (Object)theJobDetails.getJobId(), (Object)theJobDetails.getCurrentSearchUuid());
                theJobDetails.setCurrentSearchResourceType(null);
                theJobDetails.setCurrentSearchUuid(null);
                theJobDetails.setCurrentSearchLastUploadedIndex(-1);
                theJobDetails.setCurrentSearchCount(null);
            }
        }
    }

    private void processSynchronous(SubscriptionTriggeringJobDetails theJobDetails, AtomicInteger totalSubmitted, List<Future<?>> futures, IBundleProvider search) {
        List allCurrentResources;
        int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1;
        String searchUrl = theJobDetails.getCurrentSearchUrl();
        ourLog.info("Triggered job [{}] - Starting synchronous processing at offset {} and index {} on partition {}", new Object[]{theJobDetails.getJobId(), theJobDetails.getCurrentOffset(), fromIndex, theJobDetails.getRequestPartitionId()});
        int submittableCount = this.myMaxSubmitPerPass - totalSubmitted.get();
        int toIndex = fromIndex + submittableCount;
        if (Objects.nonNull(search) && !search.isEmpty()) {
            if (search.getCurrentPageSize() != null) {
                toIndex = search.getCurrentPageSize();
            }
            ourLog.info("Triggered job[{}] will process up to {} resources from partition {}", new Object[]{theJobDetails.getJobId(), toIndex, theJobDetails.getRequestPartitionId()});
            allCurrentResources = search.getResources(0, toIndex);
        } else {
            if (theJobDetails.getCurrentSearchCount() != null) {
                toIndex = Math.min(toIndex, theJobDetails.getCurrentSearchCount());
            }
            RuntimeResourceDefinition resourceDef = UrlUtil.parseUrlResourceType((FhirContext)this.myFhirContext, (String)searchUrl);
            String queryPart = searchUrl.substring(searchUrl.indexOf(63));
            SearchParameterMap params = this.myMatchUrlService.translateMatchUrl(queryPart, resourceDef, new MatchUrlService.Flag[0]);
            int offset = theJobDetails.getCurrentOffset() + fromIndex;
            params.setOffset(Integer.valueOf(offset));
            params.setCount(Integer.valueOf(toIndex));
            ourLog.info("Triggered job[{}] requesting {} resources from offset {}", new Object[]{theJobDetails.getJobId(), toIndex, offset});
            search = this.mySearchService.executeQuery(resourceDef.getName(), params, theJobDetails.getRequestPartitionId());
            allCurrentResources = search.getResources(0, submittableCount);
        }
        ourLog.info("Triggered job[{}] delivering {} resources", (Object)theJobDetails.getJobId(), (Object)allCurrentResources.size());
        AtomicInteger highestIndexSubmitted = new AtomicInteger(theJobDetails.getCurrentSearchLastUploadedIndex());
        for (IBaseResource nextResource : allCurrentResources) {
            Future<?> future = this.myExecutorService.submit(() -> this.submitResource(theJobDetails.getSubscriptionId(), theJobDetails.getRequestPartitionId(), nextResource));
            futures.add(future);
            totalSubmitted.incrementAndGet();
            highestIndexSubmitted.incrementAndGet();
        }
        if (!this.validateFuturesAndReturnTrueIfWeShouldAbort(futures)) {
            theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted.get());
            ourLog.info("Triggered job[{}] lastUploadedIndex is {}", (Object)theJobDetails.getJobId(), (Object)theJobDetails.getCurrentSearchLastUploadedIndex());
            if (allCurrentResources.isEmpty() || Objects.nonNull(theJobDetails.getCurrentSearchCount()) && toIndex > theJobDetails.getCurrentSearchCount()) {
                ourLog.info("Triggered job[{}] for search URL {} has completed ", (Object)theJobDetails.getJobId(), (Object)theJobDetails.getCurrentSearchUrl());
                theJobDetails.setCurrentSearchResourceType(null);
                theJobDetails.clearCurrentSearchUrl();
                theJobDetails.setCurrentSearchLastUploadedIndex(-1);
                theJobDetails.setCurrentSearchCount(null);
            }
        }
    }

    private boolean isInitialStep(SubscriptionTriggeringJobDetails theJobDetails) {
        return StringUtils.isBlank((CharSequence)theJobDetails.myCurrentSearchUuid) && StringUtils.isBlank((CharSequence)theJobDetails.myCurrentSearchUrl);
    }

    private boolean jobHasCompleted(SubscriptionTriggeringJobDetails theJobDetails) {
        return this.isInitialStep(theJobDetails);
    }

    private boolean validateFuturesAndReturnTrueIfWeShouldAbort(List<Future<?>> theFutures) {
        for (Future<?> nextFuture : theFutures) {
            try {
                nextFuture.get();
            }
            catch (Exception e) {
                ourLog.error("Failure triggering resource", (Throwable)e);
                return true;
            }
        }
        theFutures.clear();
        return false;
    }

    private void submitResource(String theSubscriptionId, RequestPartitionId theRequestPartitionId, String theResourceIdToTrigger) {
        org.hl7.fhir.r4.model.IdType resourceId = new org.hl7.fhir.r4.model.IdType(theResourceIdToTrigger);
        IFhirResourceDao dao = this.myDaoRegistry.getResourceDao(resourceId.getResourceType());
        IBaseResource resourceToTrigger = dao.read((IIdType)resourceId, (RequestDetails)SystemRequestDetails.forAllPartitions());
        this.submitResource(theSubscriptionId, theRequestPartitionId, resourceToTrigger);
    }

    private void submitResource(String theSubscriptionId, RequestPartitionId theRequestPartitionId, IBaseResource theResourceToTrigger) {
        ourLog.info("Submitting resource {} to subscription {}", (Object)theResourceToTrigger.getIdElement().toUnqualifiedVersionless().getValue(), (Object)theSubscriptionId);
        ResourceModifiedMessage msg = new ResourceModifiedMessage(this.myFhirContext, theResourceToTrigger, BaseResourceMessage.OperationTypeEnum.MANUALLY_TRIGGERED, theRequestPartitionId);
        msg.setSubscriptionId(theSubscriptionId);
        int i = 0;
        while (true) {
            try {
                this.myResourceModifiedConsumer.submitResourceModified(msg);
            }
            catch (Exception e) {
                if (i >= 3) {
                    throw new InternalErrorException(Msg.code((int)25) + e);
                }
                ourLog.warn("Exception while retriggering subscriptions (going to sleep and retry): {}", (Object)e.toString());
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                }
                ++i;
                continue;
            }
            break;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancelAll() {
        List<SubscriptionTriggeringJobDetails> list = this.myActiveJobs;
        synchronized (list) {
            this.myActiveJobs.clear();
        }
    }

    public void setMaxSubmitPerPass(Integer theMaxSubmitPerPass) {
        Integer maxSubmitPerPass = theMaxSubmitPerPass;
        if (maxSubmitPerPass == null) {
            maxSubmitPerPass = 10000;
        }
        Validate.isTrue((maxSubmitPerPass > 0 ? 1 : 0) != 0, (String)"theMaxSubmitPerPass must be > 0", (Object[])new Object[0]);
        this.myMaxSubmitPerPass = maxSubmitPerPass;
    }

    @PostConstruct
    public void start() {
        this.createExecutorService();
    }

    private void createExecutorService() {
        final LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<Runnable>(1000);
        BasicThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("SubscriptionTriggering-%d").daemon(false).priority(5).build();
        RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler(){

            @Override
            public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) {
                ourLog.info("Note: Subscription triggering queue is full ({} elements), waiting for a slot to become available!", (Object)executorQueue.size());
                StopWatch sw = new StopWatch();
                try {
                    executorQueue.put(theRunnable);
                }
                catch (InterruptedException theE) {
                    Thread.currentThread().interrupt();
                    throw new RejectedExecutionException(Msg.code((int)26) + "Task " + theRunnable.toString() + " rejected from " + theE.toString());
                }
                ourLog.info("Slot become available after {}ms", (Object)sw.getMillis());
            }
        };
        this.myExecutorService = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, executorQueue, (ThreadFactory)threadFactory, rejectedExecutionHandler);
    }

    public void scheduleJobs(ISchedulerService theSchedulerService) {
        ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
        jobDetail.setId(this.getClass().getName());
        jobDetail.setJobClass(Job.class);
        theSchedulerService.scheduleLocalJob(5000L, jobDetail);
    }

    public int getActiveJobCount() {
        return this.myActiveJobs.size();
    }

    private static class SubscriptionTriggeringJobDetails {
        private String myJobId;
        private String mySubscriptionId;
        private List<String> myRemainingResourceIds;
        private List<String> myRemainingSearchUrls;
        private String myCurrentSearchUuid;
        private String myCurrentSearchUrl;
        private Integer myCurrentSearchCount;
        private String myCurrentSearchResourceType;
        private int myCurrentSearchLastUploadedIndex;
        private int myCurrentOffset;
        private RequestPartitionId myRequestPartitionId;

        private SubscriptionTriggeringJobDetails() {
        }

        Integer getCurrentSearchCount() {
            return this.myCurrentSearchCount;
        }

        void setCurrentSearchCount(Integer theCurrentSearchCount) {
            this.myCurrentSearchCount = theCurrentSearchCount;
        }

        String getCurrentSearchResourceType() {
            return this.myCurrentSearchResourceType;
        }

        void setCurrentSearchResourceType(String theCurrentSearchResourceType) {
            this.myCurrentSearchResourceType = theCurrentSearchResourceType;
        }

        String getJobId() {
            return this.myJobId;
        }

        void setJobId(String theJobId) {
            this.myJobId = theJobId;
        }

        String getSubscriptionId() {
            return this.mySubscriptionId;
        }

        void setSubscriptionId(String theSubscriptionId) {
            this.mySubscriptionId = theSubscriptionId;
        }

        List<String> getRemainingResourceIds() {
            return this.myRemainingResourceIds;
        }

        void setRemainingResourceIds(List<String> theRemainingResourceIds) {
            this.myRemainingResourceIds = theRemainingResourceIds;
        }

        List<String> getRemainingSearchUrls() {
            return this.myRemainingSearchUrls;
        }

        void setRemainingSearchUrls(List<String> theRemainingSearchUrls) {
            this.myRemainingSearchUrls = theRemainingSearchUrls;
        }

        String getCurrentSearchUuid() {
            return this.myCurrentSearchUuid;
        }

        void setCurrentSearchUuid(String theCurrentSearchUuid) {
            this.myCurrentSearchUuid = theCurrentSearchUuid;
        }

        public String getCurrentSearchUrl() {
            return this.myCurrentSearchUrl;
        }

        public void setCurrentSearchUrl(String theCurrentSearchUrl) {
            this.myCurrentSearchUrl = theCurrentSearchUrl;
        }

        int getCurrentSearchLastUploadedIndex() {
            return this.myCurrentSearchLastUploadedIndex;
        }

        void setCurrentSearchLastUploadedIndex(int theCurrentSearchLastUploadedIndex) {
            this.myCurrentSearchLastUploadedIndex = theCurrentSearchLastUploadedIndex;
        }

        public void clearCurrentSearchUrl() {
            this.myCurrentSearchUrl = null;
        }

        public int getCurrentOffset() {
            return this.myCurrentOffset;
        }

        public void setCurrentOffset(Integer theCurrentOffset) {
            this.myCurrentOffset = (Integer)ObjectUtils.defaultIfNull((Object)theCurrentOffset, (Object)0);
        }

        public void setRequestPartitionId(RequestPartitionId theRequestPartitionId) {
            this.myRequestPartitionId = theRequestPartitionId;
        }

        public RequestPartitionId getRequestPartitionId() {
            return this.myRequestPartitionId;
        }
    }

    public static class Job
    implements HapiJob {
        @Autowired
        private ISubscriptionTriggeringSvc myTarget;

        public void execute(JobExecutionContext theContext) {
            this.myTarget.runDeliveryPass();
        }
    }
}

