/*
 * Decompiled with CFR 0.152.
 */
package ca.uhn.fhir.jpa.subscription.triggering;

import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.triggering.ISubscriptionTriggeringSvc;
import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum;
import ca.uhn.fhir.rest.annotation.IdParam;
import ca.uhn.fhir.rest.api.CacheControlDirective;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
import ca.uhn.fhir.util.ParametersUtil;
import ca.uhn.fhir.util.StopWatch;
import ca.uhn.fhir.util.UrlUtil;
import ca.uhn.fhir.util.ValidateUtil;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.commons.lang3.tuple.Pair;
import org.hl7.fhir.dstu2.model.IdType;
import org.hl7.fhir.instance.model.api.IBaseParameters;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

public class SubscriptionTriggeringSvcImpl
implements ISubscriptionTriggeringSvc {
    private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTriggeringSvcImpl.class);
    private static final int DEFAULT_MAX_SUBMIT = 10000;
    private final List<SubscriptionTriggeringJobDetails> myActiveJobs = new ArrayList<SubscriptionTriggeringJobDetails>();
    @Autowired
    private FhirContext myFhirContext;
    @Autowired
    private DaoRegistry myDaoRegistry;
    @Autowired
    private DaoConfig myDaoConfig;
    @Autowired
    private ISearchCoordinatorSvc mySearchCoordinatorSvc;
    @Autowired
    private MatchUrlService myMatchUrlService;
    @Autowired
    private IResourceModifiedConsumer myResourceModifiedConsumer;
    private int myMaxSubmitPerPass = 10000;
    private ExecutorService myExecutorService;
    @Autowired
    private ISchedulerService mySchedulerService;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public IBaseParameters triggerSubscription(List<IPrimitiveType<String>> theResourceIds, List<IPrimitiveType<String>> theSearchUrls, @IdParam IIdType theSubscriptionId) {
        Object next2;
        if (this.myDaoConfig.getSupportedSubscriptionTypes().isEmpty()) {
            throw new PreconditionFailedException("Subscription processing not active on this server");
        }
        if (theSubscriptionId != null) {
            IFhirResourceDao subscriptionDao = this.myDaoRegistry.getSubscriptionDao();
            IIdType subscriptionId = theSubscriptionId;
            if (!subscriptionId.hasResourceType()) {
                subscriptionId = subscriptionId.withResourceType(ResourceTypeEnum.SUBSCRIPTION.getCode());
            }
            subscriptionDao.read(subscriptionId);
        }
        List resourceIds = (List)ObjectUtils.defaultIfNull(theResourceIds, Collections.emptyList());
        List searchUrls = (List)ObjectUtils.defaultIfNull(theSearchUrls, Collections.emptyList());
        if (resourceIds.size() == 0 && searchUrls.size() == 0) {
            throw new InvalidRequestException("No resource IDs or search URLs specified for triggering");
        }
        for (Object next2 : resourceIds) {
            IdType resourceId = new IdType((String)next2.getValue());
            ValidateUtil.isTrueOrThrowInvalidRequest((boolean)resourceId.hasResourceType(), (String)"resourceId parameter must have resource type", (Object[])new Object[0]);
            ValidateUtil.isTrueOrThrowInvalidRequest((boolean)resourceId.hasIdPart(), (String)"resourceId parameter must have resource ID part", (Object[])new Object[0]);
        }
        for (Object next2 : searchUrls) {
            if (((String)next2.getValue()).contains("?")) continue;
            throw new InvalidRequestException("Search URL is not valid (must be in the form \"[resource type]?[optional params]\")");
        }
        SubscriptionTriggeringJobDetails jobDetails = new SubscriptionTriggeringJobDetails();
        jobDetails.setJobId(UUID.randomUUID().toString());
        jobDetails.setRemainingResourceIds(resourceIds.stream().map(t -> (String)t.getValue()).collect(Collectors.toList()));
        jobDetails.setRemainingSearchUrls(searchUrls.stream().map(t -> (String)t.getValue()).collect(Collectors.toList()));
        if (theSubscriptionId != null) {
            jobDetails.setSubscriptionId(theSubscriptionId.getIdPart());
        }
        next2 = this.myActiveJobs;
        synchronized (next2) {
            this.myActiveJobs.add(jobDetails);
            ourLog.info("Subscription triggering requested for {} resource and {} search - Gave job ID: {} and have {} jobs", new Object[]{resourceIds.size(), searchUrls.size(), jobDetails.getJobId(), this.myActiveJobs.size()});
        }
        IBaseParameters retVal = ParametersUtil.newInstance((FhirContext)this.myFhirContext);
        IPrimitiveType value = (IPrimitiveType)this.myFhirContext.getElementDefinition("string").newInstance();
        value.setValueAsString("Subscription triggering job submitted as JOB ID: " + jobDetails.myJobId);
        ParametersUtil.addParameterToParameters((FhirContext)this.myFhirContext, (IBaseParameters)retVal, (String)"information", (Object)value);
        return retVal;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void runDeliveryPass() {
        List<SubscriptionTriggeringJobDetails> list = this.myActiveJobs;
        synchronized (list) {
            if (this.myActiveJobs.isEmpty()) {
                return;
            }
            String activeJobIds = this.myActiveJobs.stream().map(SubscriptionTriggeringJobDetails::getJobId).collect(Collectors.joining(", "));
            ourLog.info("Starting pass: currently have {} active job IDs: {}", (Object)this.myActiveJobs.size(), (Object)activeJobIds);
            SubscriptionTriggeringJobDetails activeJob = this.myActiveJobs.get(0);
            this.runJob(activeJob);
            if (activeJob.getRemainingResourceIds().isEmpty() && activeJob.getRemainingSearchUrls().isEmpty() && StringUtils.isBlank((CharSequence)activeJob.myCurrentSearchUuid)) {
                this.myActiveJobs.remove(0);
                String remainingJobsMsg = "";
                if (this.myActiveJobs.size() > 0) {
                    remainingJobsMsg = "(" + this.myActiveJobs.size() + " jobs remaining)";
                }
                ourLog.info("Subscription triggering job {} is complete{}", (Object)activeJob.getJobId(), (Object)remainingJobsMsg);
            }
        }
    }

    private void runJob(SubscriptionTriggeringJobDetails theJobDetails) {
        int totalSubmitted;
        StopWatch sw = new StopWatch();
        ourLog.info("Starting pass of subscription triggering job {}", (Object)theJobDetails.getJobId());
        ArrayList<Pair<String, Future<Void>>> futures = new ArrayList<Pair<String, Future<Void>>>();
        for (totalSubmitted = 0; theJobDetails.getRemainingResourceIds().size() > 0 && totalSubmitted < this.myMaxSubmitPerPass; ++totalSubmitted) {
            String nextResourceId = theJobDetails.getRemainingResourceIds().remove(0);
            Future<Void> future = this.submitResource(theJobDetails.getSubscriptionId(), nextResourceId);
            futures.add((Pair<String, Future<Void>>)Pair.of((Object)nextResourceId, future));
        }
        if (this.validateFuturesAndReturnTrueIfWeShouldAbort(futures)) {
            return;
        }
        if (StringUtils.isBlank((CharSequence)theJobDetails.getCurrentSearchUuid()) && theJobDetails.getRemainingSearchUrls().size() > 0 && totalSubmitted < this.myMaxSubmitPerPass) {
            String nextSearchUrl = theJobDetails.getRemainingSearchUrls().remove(0);
            RuntimeResourceDefinition resourceDef = UrlUtil.parseUrlResourceType((FhirContext)this.myFhirContext, (String)nextSearchUrl);
            String queryPart = nextSearchUrl.substring(nextSearchUrl.indexOf(63));
            String resourceType = resourceDef.getName();
            IFhirResourceDao callingDao = this.myDaoRegistry.getResourceDao(resourceType);
            SearchParameterMap params = this.myMatchUrlService.translateMatchUrl(queryPart, resourceDef);
            ourLog.info("Triggering job[{}] is starting a search for {}", (Object)theJobDetails.getJobId(), (Object)nextSearchUrl);
            IBundleProvider search = this.mySearchCoordinatorSvc.registerSearch(callingDao, params, resourceType, new CacheControlDirective(), null);
            theJobDetails.setCurrentSearchUuid(search.getUuid());
            theJobDetails.setCurrentSearchResourceType(resourceType);
            theJobDetails.setCurrentSearchCount(params.getCount());
            theJobDetails.setCurrentSearchLastUploadedIndex(-1);
        }
        if (StringUtils.isNotBlank((CharSequence)theJobDetails.getCurrentSearchUuid()) && totalSubmitted < this.myMaxSubmitPerPass) {
            int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1;
            IFhirResourceDao resourceDao = this.myDaoRegistry.getResourceDao(theJobDetails.getCurrentSearchResourceType());
            int maxQuerySize = this.myMaxSubmitPerPass - totalSubmitted;
            int toIndex = fromIndex + maxQuerySize;
            if (theJobDetails.getCurrentSearchCount() != null) {
                toIndex = Math.min(toIndex, theJobDetails.getCurrentSearchCount());
            }
            ourLog.info("Triggering job[{}] search {} requesting resources {} - {}", new Object[]{theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex});
            List resourceIds = this.mySearchCoordinatorSvc.getResources(theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex, null);
            ourLog.info("Triggering job[{}] delivering {} resources", (Object)theJobDetails.getJobId(), (Object)resourceIds.size());
            int highestIndexSubmitted = theJobDetails.getCurrentSearchLastUploadedIndex();
            for (ResourcePersistentId next : resourceIds) {
                IBaseResource nextResource = resourceDao.readByPid(next);
                Future<Void> future = this.submitResource(theJobDetails.getSubscriptionId(), nextResource);
                futures.add((Pair<String, Future<Void>>)Pair.of((Object)nextResource.getIdElement().getIdPart(), future));
                ++totalSubmitted;
                ++highestIndexSubmitted;
            }
            if (this.validateFuturesAndReturnTrueIfWeShouldAbort(futures)) {
                return;
            }
            theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted);
            if (resourceIds.size() == 0 || theJobDetails.getCurrentSearchCount() != null && toIndex >= theJobDetails.getCurrentSearchCount()) {
                ourLog.info("Triggering job[{}] search {} has completed ", (Object)theJobDetails.getJobId(), (Object)theJobDetails.getCurrentSearchUuid());
                theJobDetails.setCurrentSearchResourceType(null);
                theJobDetails.setCurrentSearchUuid(null);
                theJobDetails.setCurrentSearchLastUploadedIndex(-1);
                theJobDetails.setCurrentSearchCount(null);
            }
        }
        ourLog.info("Subscription trigger job[{}] triggered {} resources in {}ms ({} res / second)", new Object[]{theJobDetails.getJobId(), totalSubmitted, sw.getMillis(), sw.getThroughput((long)totalSubmitted, TimeUnit.SECONDS)});
    }

    private boolean validateFuturesAndReturnTrueIfWeShouldAbort(List<Pair<String, Future<Void>>> theIdToFutures) {
        for (Pair<String, Future<Void>> next : theIdToFutures) {
            String nextDeliveredId = (String)next.getKey();
            try {
                Future nextFuture = (Future)next.getValue();
                nextFuture.get();
                ourLog.info("Finished redelivering {}", (Object)nextDeliveredId);
            }
            catch (Exception e) {
                ourLog.error("Failure triggering resource " + nextDeliveredId, (Throwable)e);
                return true;
            }
        }
        theIdToFutures.clear();
        return false;
    }

    private Future<Void> submitResource(String theSubscriptionId, String theResourceIdToTrigger) {
        org.hl7.fhir.r4.model.IdType resourceId = new org.hl7.fhir.r4.model.IdType(theResourceIdToTrigger);
        IFhirResourceDao dao = this.myDaoRegistry.getResourceDao(resourceId.getResourceType());
        IBaseResource resourceToTrigger = dao.read((IIdType)resourceId);
        return this.submitResource(theSubscriptionId, resourceToTrigger);
    }

    private Future<Void> submitResource(String theSubscriptionId, IBaseResource theResourceToTrigger) {
        ourLog.info("Submitting resource {} to subscription {}", (Object)theResourceToTrigger.getIdElement().toUnqualifiedVersionless().getValue(), (Object)theSubscriptionId);
        ResourceModifiedMessage msg = new ResourceModifiedMessage(this.myFhirContext, theResourceToTrigger, ResourceModifiedMessage.OperationTypeEnum.UPDATE);
        msg.setSubscriptionId(theSubscriptionId);
        return this.myExecutorService.submit(() -> {
            int i = 0;
            while (true) {
                try {
                    this.myResourceModifiedConsumer.submitResourceModified(msg);
                }
                catch (Exception e) {
                    if (i >= 3) {
                        throw new InternalErrorException((Throwable)e);
                    }
                    ourLog.warn("Exception while retriggering subscriptions (going to sleep and retry): {}", (Object)e.toString());
                    Thread.sleep(1000L);
                    ++i;
                    continue;
                }
                break;
            }
            return null;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancelAll() {
        List<SubscriptionTriggeringJobDetails> list = this.myActiveJobs;
        synchronized (list) {
            this.myActiveJobs.clear();
        }
    }

    public void setMaxSubmitPerPass(Integer theMaxSubmitPerPass) {
        Integer maxSubmitPerPass = theMaxSubmitPerPass;
        if (maxSubmitPerPass == null) {
            maxSubmitPerPass = 10000;
        }
        Validate.isTrue((maxSubmitPerPass > 0 ? 1 : 0) != 0, (String)"theMaxSubmitPerPass must be > 0", (Object[])new Object[0]);
        this.myMaxSubmitPerPass = maxSubmitPerPass;
    }

    @PostConstruct
    public void start() {
        this.createExecutorService();
        this.scheduleJob();
    }

    private void createExecutorService() {
        final LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<Runnable>(1000);
        BasicThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("SubscriptionTriggering-%d").daemon(false).priority(5).build();
        RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler(){

            @Override
            public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) {
                ourLog.info("Note: Subscription triggering queue is full ({} elements), waiting for a slot to become available!", (Object)executorQueue.size());
                StopWatch sw = new StopWatch();
                try {
                    executorQueue.put(theRunnable);
                }
                catch (InterruptedException theE) {
                    Thread.currentThread().interrupt();
                    throw new RejectedExecutionException("Task " + theRunnable.toString() + " rejected from " + theE.toString());
                }
                ourLog.info("Slot become available after {}ms", (Object)sw.getMillis());
            }
        };
        this.myExecutorService = new ThreadPoolExecutor(0, 10, 0L, TimeUnit.MILLISECONDS, executorQueue, (ThreadFactory)threadFactory, rejectedExecutionHandler);
    }

    private void scheduleJob() {
        ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
        jobDetail.setId(this.getClass().getName());
        jobDetail.setJobClass(Job.class);
        this.mySchedulerService.scheduleLocalJob(5000L, jobDetail);
    }

    public int getActiveJobCount() {
        return this.myActiveJobs.size();
    }

    private static class SubscriptionTriggeringJobDetails {
        private String myJobId;
        private String mySubscriptionId;
        private List<String> myRemainingResourceIds;
        private List<String> myRemainingSearchUrls;
        private String myCurrentSearchUuid;
        private Integer myCurrentSearchCount;
        private String myCurrentSearchResourceType;
        private int myCurrentSearchLastUploadedIndex;

        private SubscriptionTriggeringJobDetails() {
        }

        Integer getCurrentSearchCount() {
            return this.myCurrentSearchCount;
        }

        void setCurrentSearchCount(Integer theCurrentSearchCount) {
            this.myCurrentSearchCount = theCurrentSearchCount;
        }

        String getCurrentSearchResourceType() {
            return this.myCurrentSearchResourceType;
        }

        void setCurrentSearchResourceType(String theCurrentSearchResourceType) {
            this.myCurrentSearchResourceType = theCurrentSearchResourceType;
        }

        String getJobId() {
            return this.myJobId;
        }

        void setJobId(String theJobId) {
            this.myJobId = theJobId;
        }

        String getSubscriptionId() {
            return this.mySubscriptionId;
        }

        void setSubscriptionId(String theSubscriptionId) {
            this.mySubscriptionId = theSubscriptionId;
        }

        List<String> getRemainingResourceIds() {
            return this.myRemainingResourceIds;
        }

        void setRemainingResourceIds(List<String> theRemainingResourceIds) {
            this.myRemainingResourceIds = theRemainingResourceIds;
        }

        List<String> getRemainingSearchUrls() {
            return this.myRemainingSearchUrls;
        }

        void setRemainingSearchUrls(List<String> theRemainingSearchUrls) {
            this.myRemainingSearchUrls = theRemainingSearchUrls;
        }

        String getCurrentSearchUuid() {
            return this.myCurrentSearchUuid;
        }

        void setCurrentSearchUuid(String theCurrentSearchUuid) {
            this.myCurrentSearchUuid = theCurrentSearchUuid;
        }

        int getCurrentSearchLastUploadedIndex() {
            return this.myCurrentSearchLastUploadedIndex;
        }

        void setCurrentSearchLastUploadedIndex(int theCurrentSearchLastUploadedIndex) {
            this.myCurrentSearchLastUploadedIndex = theCurrentSearchLastUploadedIndex;
        }
    }

    public static class Job
    implements HapiJob {
        @Autowired
        private ISubscriptionTriggeringSvc myTarget;

        public void execute(JobExecutionContext theContext) {
            this.myTarget.runDeliveryPass();
        }
    }
}

