/*
 * Decompiled with CFR 0.152.
 */
package com.taotao.cloud.canal.abstracts;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.taotao.cloud.canal.abstracts.AbstractMessageTransponder;
import com.taotao.cloud.canal.annotation.ListenPoint;
import com.taotao.cloud.canal.exception.CanalClientException;
import com.taotao.cloud.canal.interfaces.CanalEventListener;
import com.taotao.cloud.canal.model.CanalMsg;
import com.taotao.cloud.canal.model.ListenerPoint;
import com.taotao.cloud.canal.properties.CanalProperties;
import com.taotao.cloud.common.utils.log.LogUtils;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import org.springframework.util.CollectionUtils;

public abstract class AbstractBasicMessageTransponder
extends AbstractMessageTransponder {
    public AbstractBasicMessageTransponder(CanalConnector connector, Map.Entry<String, CanalProperties.Instance> config, List<CanalEventListener> listeners, List<ListenerPoint> annoListeners) {
        super(connector, config, listeners, annoListeners);
    }

    @Override
    protected void distributeEvent(Message message) {
        List entries = message.getEntries();
        for (CanalEntry.Entry entry : entries) {
            CanalEntry.RowChange rowChange;
            List<CanalEntry.EntryType> ignoreEntryTypes = this.getIgnoreEntryTypes();
            if (ignoreEntryTypes != null && ignoreEntryTypes.stream().anyMatch(t -> entry.getEntryType() == t)) continue;
            try {
                rowChange = CanalEntry.RowChange.parseFrom((ByteString)entry.getStoreValue());
            }
            catch (Exception e) {
                throw new CanalClientException("\u9519\u8bef ##\u8f6c\u6362\u9519\u8bef , \u6570\u636e\u4fe1\u606f:" + entry.toString(), e);
            }
            this.distributeByAnnotation(this.destination, entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), rowChange);
            this.distributeByImpl(this.destination, entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), rowChange);
        }
    }

    protected void distributeByAnnotation(String destination, String schemaName, String tableName, CanalEntry.RowChange rowChange) {
        if (!CollectionUtils.isEmpty((Collection)this.annotationListeners)) {
            this.annotationListeners.forEach(point -> point.getInvokeMap().entrySet().stream().filter(this.getAnnotationFilter(destination, schemaName, tableName, rowChange.getEventType())).forEach(entry -> {
                Method method = (Method)entry.getKey();
                method.setAccessible(true);
                try {
                    CanalMsg canalMsg = new CanalMsg();
                    canalMsg.setDestination(destination);
                    canalMsg.setSchemaName(schemaName);
                    canalMsg.setTableName(tableName);
                    Object[] args = this.getInvokeArgs(method, canalMsg, rowChange);
                    method.invoke(point.getTarget(), args);
                }
                catch (Exception e) {
                    LogUtils.error((String)"{}: \u59d4\u6258 canal \u76d1\u542c\u5668\u53d1\u751f\u9519\u8bef! \u9519\u8bef\u7c7b:{}, \u65b9\u6cd5\u540d:{}", (Object[])new Object[]{Thread.currentThread().getName(), point.getTarget().getClass().getName(), method.getName()});
                }
            }));
        }
    }

    protected void distributeByImpl(String destination, String schemaName, String tableName, CanalEntry.RowChange rowChange) {
        if (this.implListeners != null) {
            for (CanalEventListener listener : this.implListeners) {
                listener.onEvent(destination, schemaName, tableName, rowChange);
            }
        }
    }

    protected abstract Predicate<Map.Entry<Method, ListenPoint>> getAnnotationFilter(String var1, String var2, String var3, CanalEntry.EventType var4);

    protected abstract Object[] getInvokeArgs(Method var1, CanalMsg var2, CanalEntry.RowChange var3);

    protected List<CanalEntry.EntryType> getIgnoreEntryTypes() {
        return Collections.emptyList();
    }
}

