/*
 * Decompiled with CFR 0.152.
 */
package com.taotao.cloud.canal.abstracts;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.taotao.cloud.canal.interfaces.CanalClient;
import com.taotao.cloud.canal.interfaces.TransponderFactory;
import com.taotao.cloud.canal.properties.CanalProperties;
import com.taotao.cloud.canal.transfer.DefaultMessageTransponder;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Map;
import java.util.Objects;
import org.apache.commons.lang.StringUtils;

public abstract class AbstractCanalClient
implements CanalClient {
    private volatile boolean running;
    private final CanalProperties canalProperties;
    protected final TransponderFactory factory;

    protected AbstractCanalClient(CanalProperties canalProperties) {
        Objects.requireNonNull(canalProperties, "canalConfig \u4e0d\u80fd\u4e3a\u7a7a!");
        Objects.requireNonNull(canalProperties, "transponderFactory \u4e0d\u80fd\u4e3a\u7a7a!");
        this.canalProperties = canalProperties;
        this.factory = DefaultMessageTransponder::new;
    }

    @Override
    public void start() {
        Map<String, CanalProperties.Instance> instanceMap = this.getConfig();
        for (Map.Entry<String, CanalProperties.Instance> instanceEntry : instanceMap.entrySet()) {
            this.process(this.processInstanceEntry(instanceEntry), instanceEntry);
        }
    }

    protected abstract void process(CanalConnector var1, Map.Entry<String, CanalProperties.Instance> var2);

    private CanalConnector processInstanceEntry(Map.Entry<String, CanalProperties.Instance> instanceEntry) {
        CanalConnector connector;
        CanalProperties.Instance instance = instanceEntry.getValue();
        if (instance.getClusterEnabled()) {
            ArrayList<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
            for (String s : instance.getZookeeperAddress()) {
                String[] entry = s.split(":");
                if (entry.length != 2) {
                    throw new CanalClientException("zookeeper \u5730\u5740\u683c\u5f0f\u4e0d\u6b63\u786e\uff0c\u5e94\u8be5\u4e3a ip:port....:" + s);
                }
                addresses.add(new InetSocketAddress(entry[0], Integer.parseInt(entry[1])));
            }
            connector = CanalConnectors.newClusterConnector(addresses, (String)instanceEntry.getKey(), (String)instance.getUserName(), (String)instance.getPassword());
        } else {
            connector = CanalConnectors.newSingleConnector((SocketAddress)new InetSocketAddress(instance.getHost(), instance.getPort()), (String)instanceEntry.getKey(), (String)instance.getUserName(), (String)instance.getPassword());
        }
        connector.connect();
        if (!StringUtils.isEmpty((String)instance.getFilter())) {
            connector.subscribe(instance.getFilter());
        } else {
            connector.subscribe();
        }
        connector.rollback();
        return connector;
    }

    protected Map<String, CanalProperties.Instance> getConfig() {
        CanalProperties config = this.canalProperties;
        Map<String, CanalProperties.Instance> instanceMap = config.getInstances();
        if (instanceMap != null && !instanceMap.isEmpty()) {
            return config.getInstances();
        }
        throw new CanalClientException("\u65e0\u6cd5\u89e3\u6790 canal \u7684\u8fde\u63a5\u4fe1\u606f\uff0c\u8bf7\u8054\u7cfb\u5f00\u53d1\u4eba\u5458!");
    }

    @Override
    public void stop() {
        this.setRunning(false);
    }

    @Override
    public boolean isRunning() {
        return this.running;
    }

    private void setRunning(boolean running) {
        this.running = running;
    }
}

