/*
 * Decompiled with CFR 0.152.
 */
package ca.uhn.fhir.jpa.subscription.match.registry;

import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.cache.IResourceChangeEvent;
import ca.uhn.fhir.jpa.cache.IResourceChangeListener;
import ca.uhn.fhir.jpa.cache.IResourceChangeListenerCache;
import ca.uhn.fhir.jpa.cache.IResourceChangeListenerRegistry;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.searchparam.retry.Retrier;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionActivatingSubscriber;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.model.api.IQueryParameterOr;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.param.TokenOrListParam;
import ca.uhn.fhir.rest.param.TokenParam;
import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
import com.google.common.annotations.VisibleForTesting;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.commons.lang3.StringUtils;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;

public class SubscriptionLoader
implements IResourceChangeListener {
    private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionLoader.class);
    private static final int MAX_RETRIES = 60;
    private static final long REFRESH_INTERVAL = 60000L;
    private final Object mySyncSubscriptionsLock = new Object();
    @Autowired
    private SubscriptionRegistry mySubscriptionRegistry;
    @Autowired
    DaoRegistry myDaoRegistry;
    private Semaphore mySyncSubscriptionsSemaphore = new Semaphore(1);
    @Autowired
    private SubscriptionActivatingSubscriber mySubscriptionActivatingInterceptor;
    @Autowired
    private ISearchParamRegistry mySearchParamRegistry;
    @Autowired
    private IResourceChangeListenerRegistry myResourceChangeListenerRegistry;
    @Autowired
    private SubscriptionCanonicalizer mySubscriptionCanonicalizer;
    private SearchParameterMap mySearchParameterMap;
    private SystemRequestDetails mySystemRequestDetails;
    private boolean myStopping;

    @PostConstruct
    public void registerListener() {
        this.mySearchParameterMap = this.getSearchParameterMap();
        this.mySystemRequestDetails = SystemRequestDetails.forAllPartitions();
        IResourceChangeListenerCache subscriptionCache = this.myResourceChangeListenerRegistry.registerResourceResourceChangeListener("Subscription", this.mySearchParameterMap, (IResourceChangeListener)this, 60000L);
        subscriptionCache.forceRefresh();
    }

    @PreDestroy
    public void unregisterListener() {
        this.myResourceChangeListenerRegistry.unregisterResourceResourceChangeListener((IResourceChangeListener)this);
    }

    private boolean subscriptionsDaoExists() {
        return this.myDaoRegistry != null && this.myDaoRegistry.isResourceTypeSupported("Subscription");
    }

    public void syncSubscriptions() {
        if (!this.subscriptionsDaoExists()) {
            return;
        }
        if (!this.mySyncSubscriptionsSemaphore.tryAcquire()) {
            return;
        }
        try {
            this.doSyncSubscriptionsWithRetry();
        }
        finally {
            this.mySyncSubscriptionsSemaphore.release();
        }
    }

    @VisibleForTesting
    public void acquireSemaphoreForUnitTest() throws InterruptedException {
        this.mySyncSubscriptionsSemaphore.acquire();
    }

    @VisibleForTesting
    public int doSyncSubscriptionsForUnitTest() {
        int first = this.doSyncSubscriptionsWithRetry();
        int second = this.doSyncSubscriptionsWithRetry();
        return first + second;
    }

    synchronized int doSyncSubscriptionsWithRetry() {
        Retrier syncSubscriptionRetrier = new Retrier(this::doSyncSubscriptions, 60);
        return (Integer)syncSubscriptionRetrier.runWithRetry();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int doSyncSubscriptions() {
        if (this.isStopping()) {
            return 0;
        }
        Object object = this.mySyncSubscriptionsLock;
        synchronized (object) {
            ourLog.debug("Starting sync subscriptions");
            IBundleProvider subscriptionBundleList = this.getSubscriptionDao().search(this.mySearchParameterMap, (RequestDetails)this.mySystemRequestDetails);
            Integer subscriptionCount = subscriptionBundleList.size();
            assert (subscriptionCount != null);
            if (subscriptionCount >= 50000) {
                ourLog.error("Currently over 50000 subscriptions.  Some subscriptions have not been loaded.");
            }
            List resourceList = subscriptionBundleList.getResources(0, subscriptionCount.intValue());
            return this.updateSubscriptionRegistry(resourceList);
        }
    }

    @EventListener(value={ContextRefreshedEvent.class})
    public void start() {
        this.myStopping = false;
    }

    @EventListener(value={ContextClosedEvent.class})
    public void shutdown() {
        this.myStopping = true;
    }

    private boolean isStopping() {
        return this.myStopping;
    }

    private IFhirResourceDao<?> getSubscriptionDao() {
        return this.myDaoRegistry.getSubscriptionDao();
    }

    @Nonnull
    private SearchParameterMap getSearchParameterMap() {
        SearchParameterMap map = new SearchParameterMap();
        if (this.mySearchParamRegistry.getActiveSearchParam("Subscription", "status") != null) {
            map.add("status", (IQueryParameterOr)new TokenOrListParam().addOr(new TokenParam(null, Subscription.SubscriptionStatus.REQUESTED.toCode())).addOr(new TokenParam(null, Subscription.SubscriptionStatus.ACTIVE.toCode())));
        }
        map.setLoadSynchronousUpTo(Integer.valueOf(50000));
        return map;
    }

    private int updateSubscriptionRegistry(List<IBaseResource> theResourceList) {
        HashSet<String> allIds = new HashSet<String>();
        int activatedCount = 0;
        int registeredCount = 0;
        for (IBaseResource resource : theResourceList) {
            boolean registered;
            String nextId = resource.getIdElement().getIdPart();
            allIds.add(nextId);
            boolean activated = this.activateSubscriptionIfRequested(resource);
            if (activated) {
                ++activatedCount;
            }
            if (!(registered = this.mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(resource))) continue;
            ++registeredCount;
        }
        this.mySubscriptionRegistry.unregisterAllSubscriptionsNotInCollection(allIds);
        ourLog.debug("Finished sync subscriptions - activated {} and registered {}", (Object)theResourceList.size(), (Object)registeredCount);
        return activatedCount;
    }

    private boolean activateSubscriptionIfRequested(IBaseResource theSubscription) {
        boolean successfullyActivated = false;
        if (SubscriptionConstants.REQUESTED_STATUS.equals(this.mySubscriptionCanonicalizer.getSubscriptionStatus(theSubscription))) {
            if (this.mySubscriptionActivatingInterceptor.isChannelTypeSupported(theSubscription)) {
                if (this.mySubscriptionActivatingInterceptor.activateSubscriptionIfRequired(theSubscription)) {
                    successfullyActivated = true;
                } else {
                    this.logSubscriptionNotActivatedPlusErrorIfPossible(theSubscription);
                }
            } else {
                ourLog.debug("Could not activate subscription {} because channel type {} is not supported.", (Object)theSubscription.getIdElement(), (Object)this.mySubscriptionCanonicalizer.getChannelType(theSubscription));
            }
        }
        return successfullyActivated;
    }

    private void logSubscriptionNotActivatedPlusErrorIfPossible(IBaseResource theSubscription) {
        String error = theSubscription instanceof Subscription ? ((Subscription)theSubscription).getError() : (theSubscription instanceof org.hl7.fhir.dstu3.model.Subscription ? ((org.hl7.fhir.dstu3.model.Subscription)theSubscription).getError() : (theSubscription instanceof org.hl7.fhir.dstu2.model.Subscription ? ((org.hl7.fhir.dstu2.model.Subscription)theSubscription).getError() : ""));
        ourLog.error("Subscription " + theSubscription.getIdElement().getIdPart() + " could not be activated. This will not prevent startup, but it could lead to undesirable outcomes! " + (String)(StringUtils.isBlank((CharSequence)error) ? "" : "Error: " + error));
    }

    public void handleInit(Collection<IIdType> theResourceIds) {
        if (!this.subscriptionsDaoExists()) {
            ourLog.warn("Subsriptions are enabled on this server, but there is no Subscription DAO configured.");
            return;
        }
        IFhirResourceDao<?> subscriptionDao = this.getSubscriptionDao();
        SystemRequestDetails systemRequestDetails = SystemRequestDetails.forAllPartitions();
        List<IBaseResource> resourceList = theResourceIds.stream().map(n -> subscriptionDao.read(n, (RequestDetails)systemRequestDetails)).collect(Collectors.toList());
        this.updateSubscriptionRegistry(resourceList);
    }

    public void handleChange(IResourceChangeEvent theResourceChangeEvent) {
        this.syncSubscriptions();
    }
}

