/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.gemfire.listener;

import com.gemstone.gemfire.cache.RegionService;
import com.gemstone.gemfire.cache.client.Pool;
import com.gemstone.gemfire.cache.client.PoolManager;
import com.gemstone.gemfire.cache.query.CqAttributes;
import com.gemstone.gemfire.cache.query.CqAttributesFactory;
import com.gemstone.gemfire.cache.query.CqEvent;
import com.gemstone.gemfire.cache.query.CqListener;
import com.gemstone.gemfire.cache.query.CqQuery;
import com.gemstone.gemfire.cache.query.QueryException;
import com.gemstone.gemfire.cache.query.QueryService;
import com.gemstone.gemfire.internal.concurrent.ConcurrentHashSet;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.Executor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.data.gemfire.GemfireQueryException;
import org.springframework.data.gemfire.listener.ContinuousQueryDefinition;
import org.springframework.data.gemfire.listener.ContinuousQueryListener;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ErrorHandler;
import org.springframework.util.StringUtils;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class ContinuousQueryListenerContainer
implements InitializingBean,
DisposableBean,
BeanNameAware,
SmartLifecycle {
    protected final Log logger = LogFactory.getLog(this.getClass());
    public static final String DEFAULT_THREAD_NAME_PREFIX = ClassUtils.getShortName(ContinuousQueryListenerContainer.class) + "-";
    private Executor subscriptionExecutor;
    private Executor taskExecutor;
    private String beanName;
    private ErrorHandler errorHandler;
    private volatile boolean running = false;
    private volatile boolean initialized = false;
    private volatile boolean manageExecutor = false;
    private Set<ContinuousQueryDefinition> defs = new LinkedHashSet<ContinuousQueryDefinition>();
    private Set<CqQuery> queries = new ConcurrentHashSet();
    private QueryService queryService;
    private String poolName;

    public void afterPropertiesSet() {
        if (this.taskExecutor == null) {
            this.manageExecutor = true;
            this.taskExecutor = this.createDefaultTaskExecutor();
        }
        if (this.subscriptionExecutor == null) {
            this.subscriptionExecutor = this.taskExecutor;
        }
        if (StringUtils.hasText((String)this.poolName)) {
            Pool pool = PoolManager.find((String)this.poolName);
            Assert.notNull((Object)pool, (String)("No pool named [" + this.poolName + "] found"));
            this.queryService = pool.getQueryService();
        }
        this.initMapping(this.defs);
        this.initialized = true;
        this.start();
    }

    protected TaskExecutor createDefaultTaskExecutor() {
        String threadNamePrefix = this.beanName != null ? this.beanName + "-" : DEFAULT_THREAD_NAME_PREFIX;
        return new SimpleAsyncTaskExecutor(threadNamePrefix);
    }

    public void destroy() throws Exception {
        this.initialized = false;
        this.stop();
        this.closeQueries();
        if (this.manageExecutor && this.taskExecutor instanceof DisposableBean) {
            ((DisposableBean)this.taskExecutor).destroy();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)"Stopped internally-managed task executor");
            }
        }
    }

    public boolean isAutoStartup() {
        return true;
    }

    public void stop(Runnable callback) {
        this.stop();
        callback.run();
    }

    public int getPhase() {
        return Integer.MAX_VALUE;
    }

    public boolean isRunning() {
        return this.running;
    }

    public void start() {
        if (!this.running) {
            this.running = true;
            this.doStart();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)"Started ContinousQueryListenerContainer");
            }
        }
    }

    public void stop() {
        if (this.running) {
            this.running = false;
            this.doStop();
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)"Stopped ContinousQueryListenerContainer");
        }
    }

    private void doStart() {
        for (CqQuery cq : this.queries) {
            this.executeQuery(cq);
        }
    }

    private void doStop() {
        for (CqQuery cq : this.queries) {
            try {
                cq.stop();
            }
            catch (RuntimeException ex) {
                this.logger.warn((Object)"Cannot stop query", (Throwable)ex);
            }
            catch (QueryException ex) {
                this.logger.warn((Object)"Cannot stop query", (Throwable)ex);
            }
        }
    }

    private void closeQueries() {
        for (CqQuery cq : this.queries) {
            try {
                if (cq.isClosed()) continue;
                cq.close();
            }
            catch (QueryException ex) {
                this.logger.warn((Object)"Cannot close query", (Throwable)ex);
            }
            catch (RuntimeException ex) {
                this.logger.warn((Object)"Cannot close query", (Throwable)ex);
            }
        }
        this.queries.clear();
    }

    protected void executeListener(ContinuousQueryListener listener, CqEvent event) {
        try {
            listener.onEvent(event);
        }
        catch (Throwable ex) {
            this.handleListenerException(ex);
        }
    }

    public final boolean isActive() {
        return this.initialized;
    }

    protected void handleListenerException(Throwable ex) {
        if (this.isActive()) {
            this.invokeErrorHandler(ex);
        } else {
            this.logger.debug((Object)"Listener exception after container shutdown", ex);
        }
    }

    protected void invokeErrorHandler(Throwable ex) {
        if (this.errorHandler != null) {
            this.errorHandler.handleError(ex);
        } else if (this.logger.isWarnEnabled()) {
            this.logger.warn((Object)"Execution of JMS event listener failed, and no ErrorHandler has been set.", ex);
        }
    }

    public void setBeanName(String name) {
        this.beanName = name;
    }

    public void setTaskExecutor(Executor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    public void setErrorHandler(ErrorHandler errorHandler) {
        this.errorHandler = errorHandler;
    }

    public void setCache(RegionService cache) {
        this.queryService = cache.getQueryService();
    }

    public void setQueryService(QueryService service) {
        this.queryService = service;
    }

    public void setPoolName(String poolName) {
        this.poolName = poolName;
    }

    public void setQueryListeners(Set<ContinuousQueryDefinition> queries) {
        this.defs.clear();
        this.defs.addAll(queries);
    }

    public void addListener(ContinuousQueryDefinition cqQuery) {
        this.doAddListener(cqQuery);
    }

    private void initMapping(Set<ContinuousQueryDefinition> queryDefinitions) {
        if (this.isRunning()) {
            this.stop();
        }
        this.closeQueries();
        for (ContinuousQueryDefinition def : queryDefinitions) {
            this.addCQuery(def);
        }
        if (this.initialized) {
            this.start();
        }
    }

    private void doAddListener(ContinuousQueryDefinition def) {
        CqQuery cq = this.addCQuery(def);
        if (this.isRunning()) {
            this.executeQuery(cq);
        }
    }

    private CqQuery addCQuery(ContinuousQueryDefinition def) {
        try {
            CqAttributesFactory caf = new CqAttributesFactory();
            caf.addCqListener((CqListener)new EventDispatcherAdapter(def.getListener()));
            CqAttributes attr = caf.create();
            CqQuery cq = null;
            cq = StringUtils.hasText((String)def.getName()) ? this.queryService.newCq(def.getName(), def.getQuery(), attr, def.isDurable()) : this.queryService.newCq(def.getQuery(), attr, def.isDurable());
            this.queries.add(cq);
            return cq;
        }
        catch (RuntimeException ex) {
            throw new GemfireQueryException("Cannot create query ", ex);
        }
        catch (QueryException ex) {
            throw new GemfireQueryException("Cannot create query ", ex);
        }
    }

    private void executeQuery(CqQuery cq) {
        try {
            cq.execute();
        }
        catch (QueryException ex) {
            throw new GemfireQueryException("Cannot execute query", ex);
        }
        catch (RuntimeException ex) {
            throw new GemfireQueryException("Cannot execute query", ex);
        }
    }

    private void dispatchEvent(final ContinuousQueryListener listener, final CqEvent event) {
        this.taskExecutor.execute(new Runnable(){

            public void run() {
                ContinuousQueryListenerContainer.this.executeListener(listener, event);
            }
        });
    }

    private class EventDispatcherAdapter
    implements CqListener {
        private final ContinuousQueryListener delegate;

        EventDispatcherAdapter(ContinuousQueryListener delegate) {
            this.delegate = delegate;
        }

        public void onError(CqEvent event) {
            ContinuousQueryListenerContainer.this.dispatchEvent(this.delegate, event);
        }

        public void onEvent(CqEvent event) {
            ContinuousQueryListenerContainer.this.dispatchEvent(this.delegate, event);
        }

        public void close() {
        }
    }
}

