/*
 * Decompiled with CFR 0.152.
 */
package com.taotao.cloud.canal.model;

import com.alibaba.otter.canal.client.CanalConnector;
import com.taotao.cloud.canal.abstracts.AbstractCanalClient;
import com.taotao.cloud.canal.annotation.CanalEventListener;
import com.taotao.cloud.canal.annotation.ListenPoint;
import com.taotao.cloud.canal.model.ListenerPoint;
import com.taotao.cloud.canal.properties.CanalProperties;
import com.taotao.cloud.common.utils.context.ContextUtils;
import com.taotao.cloud.common.utils.log.LogUtils;
import java.lang.reflect.AnnotatedElement;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.springframework.core.annotation.AnnotatedElementUtils;

public class SimpleCanalClient
extends AbstractCanalClient {
    private final ThreadPoolExecutor executor;
    protected final List<com.taotao.cloud.canal.interfaces.CanalEventListener> listeners = new ArrayList<com.taotao.cloud.canal.interfaces.CanalEventListener>();
    private final List<ListenerPoint> annoListeners = new ArrayList<ListenerPoint>();

    public SimpleCanalClient(CanalProperties canalProperties) {
        super(canalProperties);
        this.executor = new ThreadPoolExecutor(5, 20, 120L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory(){
            private final ThreadFactory factory = Executors.defaultThreadFactory();

            @Override
            public Thread newThread(Runnable r) {
                Thread t = this.factory.newThread(r);
                t.setName("taotao-cloud-canal-thread-" + t.getName());
                t.setDaemon(true);
                return t;
            }
        });
        this.initListeners();
    }

    @Override
    protected void process(CanalConnector connector, Map.Entry<String, CanalProperties.Instance> config) {
        this.executor.submit(this.factory.newTransponder(connector, config, this.listeners, this.annoListeners));
    }

    @Override
    public void stop() {
        super.stop();
        this.executor.shutdown();
    }

    private void initListeners() {
        Map listenerMap;
        LogUtils.info((String)"{}: \u76d1\u542c\u5668\u6b63\u5728\u521d\u59cb\u5316....", (Object[])new Object[]{Thread.currentThread().getName()});
        List list = ContextUtils.getBeansOfTypeWithList(com.taotao.cloud.canal.interfaces.CanalEventListener.class);
        if (list != null) {
            this.listeners.addAll(list);
        }
        if ((listenerMap = ContextUtils.getBeansWithAnnotation(CanalEventListener.class)) != null) {
            for (Object target : listenerMap.values()) {
                Method[] methods = target.getClass().getDeclaredMethods();
                if (methods.length <= 0) continue;
                for (Method method : methods) {
                    ListenPoint lp = (ListenPoint)AnnotatedElementUtils.findMergedAnnotation((AnnotatedElement)method, ListenPoint.class);
                    if (lp == null) continue;
                    this.annoListeners.add(new ListenerPoint(target, method, lp));
                }
            }
        }
        LogUtils.info((String)"{}: \u76d1\u542c\u5668\u521d\u59cb\u5316\u5b8c\u6210.", (Object[])new Object[]{Thread.currentThread().getName()});
        if (LogUtils.isWarnEnabled() && this.listeners.isEmpty() && this.annoListeners.isEmpty()) {
            LogUtils.warn((String)"{}: \u9879\u76ee\u4e2d\u6ca1\u6709\u4efb\u4f55\u76d1\u542c\u7684\u76ee\u6807! ", (Object[])new Object[]{Thread.currentThread().getName()});
        }
    }
}

