/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.PluginDiscovery;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.SourceConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;

public abstract class AbstractHerder
implements Herder,
TaskStatus.Listener,
ConnectorStatus.Listener {
    private final String workerId;
    protected final Worker worker;
    protected final StatusBackingStore statusBackingStore;
    protected final ConfigBackingStore configBackingStore;
    private Map<String, Connector> tempConnectors = new ConcurrentHashMap<String, Connector>();
    private Thread classPathTraverser;

    public AbstractHerder(Worker worker, String workerId, StatusBackingStore statusBackingStore, ConfigBackingStore configBackingStore) {
        this.worker = worker;
        this.workerId = workerId;
        this.statusBackingStore = statusBackingStore;
        this.configBackingStore = configBackingStore;
    }

    protected abstract int generation();

    protected void startServices() {
        this.worker.start();
        this.statusBackingStore.start();
        this.configBackingStore.start();
        this.traverseClassPath();
    }

    protected void stopServices() {
        this.statusBackingStore.stop();
        this.configBackingStore.stop();
        this.worker.stop();
        if (this.classPathTraverser != null) {
            try {
                this.classPathTraverser.join();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    @Override
    public void onStartup(String connector) {
        this.statusBackingStore.put(new ConnectorStatus(connector, AbstractStatus.State.RUNNING, this.workerId, this.generation()));
    }

    @Override
    public void onPause(String connector) {
        this.statusBackingStore.put(new ConnectorStatus(connector, AbstractStatus.State.PAUSED, this.workerId, this.generation()));
    }

    @Override
    public void onResume(String connector) {
        this.statusBackingStore.put(new ConnectorStatus(connector, AbstractStatus.State.RUNNING, this.workerId, this.generation()));
    }

    @Override
    public void onShutdown(String connector) {
        this.statusBackingStore.putSafe(new ConnectorStatus(connector, AbstractStatus.State.UNASSIGNED, this.workerId, this.generation()));
    }

    @Override
    public void onFailure(String connector, Throwable cause) {
        this.statusBackingStore.putSafe(new ConnectorStatus(connector, AbstractStatus.State.FAILED, this.trace(cause), this.workerId, this.generation()));
    }

    @Override
    public void onStartup(ConnectorTaskId id) {
        this.statusBackingStore.put(new TaskStatus(id, AbstractStatus.State.RUNNING, this.workerId, this.generation()));
    }

    @Override
    public void onFailure(ConnectorTaskId id, Throwable cause) {
        this.statusBackingStore.putSafe(new TaskStatus(id, AbstractStatus.State.FAILED, this.workerId, this.generation(), this.trace(cause)));
    }

    @Override
    public void onShutdown(ConnectorTaskId id) {
        this.statusBackingStore.putSafe(new TaskStatus(id, AbstractStatus.State.UNASSIGNED, this.workerId, this.generation()));
    }

    @Override
    public void onResume(ConnectorTaskId id) {
        this.statusBackingStore.put(new TaskStatus(id, AbstractStatus.State.RUNNING, this.workerId, this.generation()));
    }

    @Override
    public void onPause(ConnectorTaskId id) {
        this.statusBackingStore.put(new TaskStatus(id, AbstractStatus.State.PAUSED, this.workerId, this.generation()));
    }

    @Override
    public void onDeletion(String connector) {
        for (TaskStatus status : this.statusBackingStore.getAll(connector)) {
            this.statusBackingStore.put(new TaskStatus((ConnectorTaskId)status.id(), AbstractStatus.State.DESTROYED, this.workerId, this.generation()));
        }
        this.statusBackingStore.put(new ConnectorStatus(connector, AbstractStatus.State.DESTROYED, this.workerId, this.generation()));
    }

    @Override
    public void pauseConnector(String connector) {
        if (!this.configBackingStore.contains(connector)) {
            throw new NotFoundException("Unknown connector " + connector);
        }
        this.configBackingStore.putTargetState(connector, TargetState.PAUSED);
    }

    @Override
    public void resumeConnector(String connector) {
        if (!this.configBackingStore.contains(connector)) {
            throw new NotFoundException("Unknown connector " + connector);
        }
        this.configBackingStore.putTargetState(connector, TargetState.STARTED);
    }

    @Override
    public ConnectorStateInfo connectorStatus(String connName) {
        ConnectorStatus connector = this.statusBackingStore.get(connName);
        if (connector == null) {
            throw new NotFoundException("No status found for connector " + connName);
        }
        Collection<TaskStatus> tasks = this.statusBackingStore.getAll(connName);
        ConnectorStateInfo.ConnectorState connectorState = new ConnectorStateInfo.ConnectorState(connector.state().toString(), connector.workerId(), connector.trace());
        ArrayList<ConnectorStateInfo.TaskState> taskStates = new ArrayList<ConnectorStateInfo.TaskState>();
        for (TaskStatus status : tasks) {
            taskStates.add(new ConnectorStateInfo.TaskState(((ConnectorTaskId)status.id()).task(), status.state().toString(), status.workerId(), status.trace()));
        }
        Collections.sort(taskStates);
        return new ConnectorStateInfo(connName, connectorState, taskStates);
    }

    @Override
    public ConnectorStateInfo.TaskState taskStatus(ConnectorTaskId id) {
        TaskStatus status = this.statusBackingStore.get(id);
        if (status == null) {
            throw new NotFoundException("No status found for task " + id);
        }
        return new ConnectorStateInfo.TaskState(id.task(), status.state().toString(), status.workerId(), status.trace());
    }

    protected Map<String, ConfigValue> validateBasicConnectorConfig(Connector connector, ConfigDef configDef, Map<String, String> config) {
        return configDef.validateAll(config);
    }

    @Override
    public ConfigInfos validateConnectorConfig(Map<String, String> connectorConfig) {
        String connType = connectorConfig.get("connector.class");
        if (connType == null) {
            throw new BadRequestException("Connector config " + connectorConfig + " contains no connector type");
        }
        Connector connector = this.getConnector(connType);
        ConfigDef connectorConfigDef = ConnectorConfig.enrich(connector instanceof SourceConnector ? SourceConnectorConfig.configDef() : SinkConnectorConfig.configDef(), connectorConfig, false);
        ArrayList<ConfigValue> configValues = new ArrayList<ConfigValue>();
        HashMap<String, ConfigDef.ConfigKey> configKeys = new HashMap<String, ConfigDef.ConfigKey>();
        ArrayList<String> allGroups = new ArrayList<String>();
        Map<String, ConfigValue> validatedConnectorConfig = this.validateBasicConnectorConfig(connector, connectorConfigDef, connectorConfig);
        configValues.addAll(validatedConnectorConfig.values());
        configKeys.putAll(connectorConfigDef.configKeys());
        allGroups.addAll(connectorConfigDef.groups());
        Config config = connector.validate(connectorConfig);
        ConfigDef configDef = connector.config();
        configKeys.putAll(configDef.configKeys());
        allGroups.addAll(configDef.groups());
        configValues.addAll(config.configValues());
        return AbstractHerder.generateResult(connType, configKeys, configValues, allGroups);
    }

    public static ConfigInfos generateResult(String connType, Map<String, ConfigDef.ConfigKey> configKeys, List<ConfigValue> configValues, List<String> groups) {
        String configName;
        int errorCount = 0;
        LinkedList<ConfigInfo> configInfoList = new LinkedList<ConfigInfo>();
        HashMap<String, ConfigValue> configValueMap = new HashMap<String, ConfigValue>();
        for (ConfigValue configValue : configValues) {
            configName = configValue.name();
            configValueMap.put(configName, configValue);
            if (configKeys.containsKey(configName)) continue;
            configValue.addErrorMessage("Configuration is not defined: " + configName);
            configInfoList.add(new ConfigInfo(null, AbstractHerder.convertConfigValue(configValue, null)));
        }
        for (Map.Entry entry : configKeys.entrySet()) {
            configName = (String)entry.getKey();
            ConfigKeyInfo configKeyInfo = AbstractHerder.convertConfigKey((ConfigDef.ConfigKey)entry.getValue());
            ConfigDef.Type type = ((ConfigDef.ConfigKey)entry.getValue()).type;
            ConfigValueInfo configValueInfo = null;
            if (configValueMap.containsKey(configName)) {
                ConfigValue configValue = (ConfigValue)configValueMap.get(configName);
                configValueInfo = AbstractHerder.convertConfigValue(configValue, type);
                errorCount += configValue.errorMessages().size();
            }
            configInfoList.add(new ConfigInfo(configKeyInfo, configValueInfo));
        }
        return new ConfigInfos(connType, errorCount, groups, configInfoList);
    }

    private static ConfigKeyInfo convertConfigKey(ConfigDef.ConfigKey configKey) {
        String defaultValue;
        String name = configKey.name;
        ConfigDef.Type type = configKey.type;
        String typeName = configKey.type.name();
        boolean required = false;
        if (configKey.defaultValue == ConfigDef.NO_DEFAULT_VALUE) {
            defaultValue = (String)configKey.defaultValue;
            required = true;
        } else {
            defaultValue = ConfigDef.convertToString((Object)configKey.defaultValue, (ConfigDef.Type)type);
        }
        String importance = configKey.importance.name();
        String documentation = configKey.documentation;
        String group = configKey.group;
        int orderInGroup = configKey.orderInGroup;
        String width = configKey.width.name();
        String displayName = configKey.displayName;
        List dependents = configKey.dependents;
        return new ConfigKeyInfo(name, typeName, required, defaultValue, importance, documentation, group, orderInGroup, width, displayName, dependents);
    }

    private static ConfigValueInfo convertConfigValue(ConfigValue configValue, ConfigDef.Type type) {
        String value = ConfigDef.convertToString((Object)configValue.value(), (ConfigDef.Type)type);
        LinkedList<String> recommendedValues = new LinkedList<String>();
        if (type == ConfigDef.Type.LIST) {
            for (Object object : configValue.recommendedValues()) {
                recommendedValues.add(ConfigDef.convertToString(object, (ConfigDef.Type)ConfigDef.Type.STRING));
            }
        } else {
            for (Object object : configValue.recommendedValues()) {
                recommendedValues.add(ConfigDef.convertToString(object, (ConfigDef.Type)type));
            }
        }
        return new ConfigValueInfo(configValue.name(), value, recommendedValues, configValue.errorMessages(), configValue.visible());
    }

    protected Connector getConnector(String connType) {
        if (this.tempConnectors.containsKey(connType)) {
            return this.tempConnectors.get(connType);
        }
        Connector connector = this.worker.getConnectorFactory().newConnector(connType);
        this.tempConnectors.put(connType, connector);
        return connector;
    }

    private String trace(Throwable t) {
        ByteArrayOutputStream output = new ByteArrayOutputStream();
        t.printStackTrace(new PrintStream(output));
        try {
            return output.toString("UTF-8");
        }
        catch (UnsupportedEncodingException e) {
            return null;
        }
    }

    private void traverseClassPath() {
        this.classPathTraverser = new Thread(new Runnable(){

            @Override
            public void run() {
                PluginDiscovery.scanClasspathForPlugins();
            }
        }, "CLASSPATH traversal thread.");
        this.classPathTraverser.start();
    }
}

