/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.mirror;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.ForwardingAdmin;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.mirror.DefaultReplicationPolicy;
import org.apache.kafka.connect.mirror.DefaultTopicFilter;
import org.apache.kafka.connect.mirror.MirrorClientConfig;
import org.apache.kafka.connect.mirror.ReplicationPolicy;
import org.apache.kafka.connect.runtime.ConnectorConfig;

public abstract class MirrorConnectorConfig
extends AbstractConfig {
    static final String ENABLED_SUFFIX = ".enabled";
    static final String INTERVAL_SECONDS_SUFFIX = ".interval.seconds";
    static final String ENABLED = "enabled";
    static final String ENABLED_DOC = "Whether to replicate source->target.";
    public static final String SOURCE_CLUSTER_ALIAS = "source.cluster.alias";
    public static final String SOURCE_CLUSTER_ALIAS_DEFAULT = "source";
    private static final String SOURCE_CLUSTER_ALIAS_DOC = "Alias of source cluster";
    public static final String TARGET_CLUSTER_ALIAS = "target.cluster.alias";
    public static final String TARGET_CLUSTER_ALIAS_DEFAULT = "target";
    private static final String TARGET_CLUSTER_ALIAS_DOC = "Alias of target cluster. Used in metrics reporting.";
    public static final String REPLICATION_POLICY_CLASS = "replication.policy.class";
    public static final Class<?> REPLICATION_POLICY_CLASS_DEFAULT = MirrorClientConfig.REPLICATION_POLICY_CLASS_DEFAULT;
    private static final String REPLICATION_POLICY_CLASS_DOC = "Class which defines the remote topic naming convention.";
    public static final String REPLICATION_POLICY_SEPARATOR = "replication.policy.separator";
    private static final String REPLICATION_POLICY_SEPARATOR_DOC = "Separator used in remote topic naming convention.";
    public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT = ".";
    private static final String INTERNAL_TOPIC_SEPARATOR_ENABLED = "replication.policy.internal.topic.separator.enabled";
    private static final String INTERNAL_TOPIC_SEPARATOR_ENABLED_DOC = "Whether to use replication.policy.separator to control the names of topics used for checkpoints and offset syncs. By default, custom separators are used in these topic names; however, if upgrading MirrorMaker 2 from older versions that did not allow for these topic names to be customized, it may be necessary to set this property to 'false' in order to continue using the same names for those topics.";
    public static final Boolean INTERNAL_TOPIC_SEPARATOR_ENABLED_DEFAULT = DefaultReplicationPolicy.INTERNAL_TOPIC_SEPARATOR_ENABLED_DEFAULT;
    public static final String ADMIN_TASK_TIMEOUT_MILLIS = "admin.timeout.ms";
    private static final String ADMIN_TASK_TIMEOUT_MILLIS_DOC = "Timeout for administrative tasks, e.g. detecting new topics.";
    public static final long ADMIN_TASK_TIMEOUT_MILLIS_DEFAULT = 60000L;
    public static final String FORWARDING_ADMIN_CLASS = "forwarding.admin.class";
    public static final Class<?> FORWARDING_ADMIN_CLASS_DEFAULT = MirrorClientConfig.FORWARDING_ADMIN_CLASS_DEFAULT;
    private static final String FORWARDING_ADMIN_CLASS_DOC = "Class which extends ForwardingAdmin to define custom cluster resource management (topics, configs, etc). The class must have a constructor with signature <code>(Map<String, Object> config)</code> that is used to configure a KafkaAdminClient and may also be used to configure clients for external systems if necessary.";
    protected static final String SOURCE_CLUSTER_PREFIX = "source.cluster.";
    protected static final String TARGET_CLUSTER_PREFIX = "target.cluster.";
    protected static final String SOURCE_PREFIX = "source.";
    protected static final String TARGET_PREFIX = "target.";
    protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
    protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
    protected static final String ADMIN_CLIENT_PREFIX = "admin.";
    public static final String TOPIC_FILTER_CLASS = "topic.filter.class";
    public static final String TOPIC_FILTER_CLASS_DOC = "TopicFilter to use. Selects topics to replicate.";
    public static final Class<?> TOPIC_FILTER_CLASS_DEFAULT = DefaultTopicFilter.class;
    public static final String OFFSET_SYNCS_TOPIC_LOCATION = "offset-syncs.topic.location";
    public static final String OFFSET_SYNCS_TOPIC_LOCATION_DEFAULT = "source";
    public static final String OFFSET_SYNCS_TOPIC_LOCATION_DOC = "The location (source/target) of the offset-syncs topic.";
    public static final String TASK_INDEX = "task.index";
    private final ReplicationPolicy replicationPolicy = (ReplicationPolicy)this.getConfiguredInstance("replication.policy.class", ReplicationPolicy.class);
    protected static final ConfigDef BASE_CONNECTOR_CONFIG_DEF = new ConfigDef(ConnectorConfig.configDef()).define("enabled", ConfigDef.Type.BOOLEAN, (Object)true, ConfigDef.Importance.LOW, "Whether to replicate source->target.").define("source.cluster.alias", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Alias of source cluster").define("target.cluster.alias", ConfigDef.Type.STRING, (Object)"target", ConfigDef.Importance.HIGH, "Alias of target cluster. Used in metrics reporting.").define("admin.timeout.ms", ConfigDef.Type.LONG, (Object)60000L, ConfigDef.Importance.LOW, "Timeout for administrative tasks, e.g. detecting new topics.").define("replication.policy.class", ConfigDef.Type.CLASS, REPLICATION_POLICY_CLASS_DEFAULT, ConfigDef.Importance.LOW, "Class which defines the remote topic naming convention.").define("replication.policy.separator", ConfigDef.Type.STRING, (Object)".", ConfigDef.Importance.LOW, "Separator used in remote topic naming convention.").define("replication.policy.internal.topic.separator.enabled", ConfigDef.Type.BOOLEAN, (Object)INTERNAL_TOPIC_SEPARATOR_ENABLED_DEFAULT, ConfigDef.Importance.LOW, "Whether to use replication.policy.separator to control the names of topics used for checkpoints and offset syncs. By default, custom separators are used in these topic names; however, if upgrading MirrorMaker 2 from older versions that did not allow for these topic names to be customized, it may be necessary to set this property to 'false' in order to continue using the same names for those topics.").define("forwarding.admin.class", ConfigDef.Type.CLASS, FORWARDING_ADMIN_CLASS_DEFAULT, ConfigDef.Importance.LOW, "Class which extends ForwardingAdmin to define custom cluster resource management (topics, configs, etc). The class must have a constructor with signature <code>(Map<String, Object> config)</code> that is used to configure a KafkaAdminClient and may also be used to configure clients for external systems if necessary.").define("metric.reporters", ConfigDef.Type.LIST, null, ConfigDef.Importance.LOW, "A list of classes to use as metrics reporters. Implementing the <code>org.apache.kafka.common.metrics.MetricsReporter</code> interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.").define("security.protocol", ConfigDef.Type.STRING, (Object)"PLAINTEXT", (ConfigDef.Validator)ConfigDef.CaseInsensitiveValidString.in((String[])Utils.enumOptions(SecurityProtocol.class)), ConfigDef.Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC).define("auto.include.jmx.reporter", ConfigDef.Type.BOOLEAN, (Object)true, ConfigDef.Importance.LOW, "Deprecated. Whether to automatically include JmxReporter even if it's not listed in <code>metric.reporters</code>. This configuration will be removed in Kafka 4.0, users should instead include <code>org.apache.kafka.common.metrics.JmxReporter</code> in <code>metric.reporters</code> in order to enable the JmxReporter.").withClientSslSupport().withClientSaslSupport();

    protected MirrorConnectorConfig(ConfigDef configDef, Map<String, String> props) {
        super(configDef, props, true);
    }

    String connectorName() {
        return this.getString("name");
    }

    boolean enabled() {
        return this.getBoolean(ENABLED);
    }

    Duration adminTimeout() {
        return Duration.ofMillis(this.getLong(ADMIN_TASK_TIMEOUT_MILLIS));
    }

    String sourceClusterAlias() {
        return this.getString(SOURCE_CLUSTER_ALIAS);
    }

    String targetClusterAlias() {
        return this.getString(TARGET_CLUSTER_ALIAS);
    }

    ReplicationPolicy replicationPolicy() {
        return this.replicationPolicy;
    }

    Map<String, Object> sourceProducerConfig(String role) {
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.putAll(this.originalsWithPrefix(SOURCE_CLUSTER_PREFIX));
        props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
        props.putAll(this.originalsWithPrefix(PRODUCER_CLIENT_PREFIX));
        props.putAll(this.originalsWithPrefix("source.producer."));
        this.addClientId(props, role);
        return props;
    }

    Map<String, Object> sourceConsumerConfig(String role) {
        Map<String, Object> result = MirrorConnectorConfig.sourceConsumerConfig(this.originals());
        this.addClientId(result, role);
        return result;
    }

    static Map<String, Object> sourceConsumerConfig(Map<String, ?> props) {
        HashMap<String, Object> result = new HashMap<String, Object>();
        result.putAll(Utils.entriesWithPrefix(props, (String)SOURCE_CLUSTER_PREFIX));
        result.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
        result.putAll(Utils.entriesWithPrefix(props, (String)CONSUMER_CLIENT_PREFIX));
        result.putAll(Utils.entriesWithPrefix(props, (String)"source.consumer."));
        result.put("enable.auto.commit", "false");
        result.putIfAbsent("auto.offset.reset", "earliest");
        return result;
    }

    Map<String, Object> targetAdminConfig(String role) {
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.putAll(this.originalsWithPrefix(TARGET_CLUSTER_PREFIX));
        props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
        props.putAll(this.originalsWithPrefix(ADMIN_CLIENT_PREFIX));
        props.putAll(this.originalsWithPrefix("target.admin."));
        this.addClientId(props, role);
        return props;
    }

    Map<String, Object> targetProducerConfig(String role) {
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.putAll(this.originalsWithPrefix(TARGET_CLUSTER_PREFIX));
        props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
        props.putAll(this.originalsWithPrefix(PRODUCER_CLIENT_PREFIX));
        props.putAll(this.originalsWithPrefix("target.producer."));
        this.addClientId(props, role);
        return props;
    }

    Map<String, Object> targetConsumerConfig(String role) {
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.putAll(this.originalsWithPrefix(TARGET_CLUSTER_PREFIX));
        props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
        props.putAll(this.originalsWithPrefix(CONSUMER_CLIENT_PREFIX));
        props.putAll(this.originalsWithPrefix("target.consumer."));
        props.put("enable.auto.commit", "false");
        props.putIfAbsent("auto.offset.reset", "earliest");
        this.addClientId(props, role);
        return props;
    }

    Map<String, Object> sourceAdminConfig(String role) {
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.putAll(this.originalsWithPrefix(SOURCE_CLUSTER_PREFIX));
        props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
        props.putAll(this.originalsWithPrefix(ADMIN_CLIENT_PREFIX));
        props.putAll(this.originalsWithPrefix("source.admin."));
        this.addClientId(props, role);
        return props;
    }

    List<MetricsReporter> metricsReporters() {
        List reporters = CommonClientConfigs.metricsReporters((AbstractConfig)this);
        KafkaMetricsContext metricsContext = new KafkaMetricsContext("kafka.connect.mirror");
        for (MetricsReporter reporter : reporters) {
            reporter.contextChange((MetricsContext)metricsContext);
        }
        return reporters;
    }

    ForwardingAdmin forwardingAdmin(Map<String, Object> config) {
        try {
            return (ForwardingAdmin)Utils.newParameterizedInstance((String)this.getClass(FORWARDING_ADMIN_CLASS).getName(), (Object[])new Object[]{Map.class, config});
        }
        catch (ClassNotFoundException e) {
            throw new KafkaException("Can't create instance of " + this.get(FORWARDING_ADMIN_CLASS), (Throwable)e);
        }
    }

    void addClientId(Map<String, Object> props, String role) {
        String clientId = this.entityLabel() + (role == null ? "" : "|" + role);
        props.compute("client.id", (k, userClientId) -> (userClientId == null ? "" : userClientId + "|") + clientId);
    }

    String entityLabel() {
        return this.sourceClusterAlias() + "->" + this.targetClusterAlias() + "|" + this.connectorName();
    }

    public static void main(String[] args) {
        System.out.println(BASE_CONNECTOR_CONFIG_DEF.toHtml(4, config -> "mirror_connector_" + config));
    }
}

