/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.storage;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.crypto.spec.SecretKeySpec;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.runtime.RestartRequest;
import org.apache.kafka.connect.runtime.SessionKey;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.SharedTopicAdmin;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConfigBackingStore
implements ConfigBackingStore {
    private static final Logger log = LoggerFactory.getLogger(KafkaConfigBackingStore.class);
    public static final String TARGET_STATE_PREFIX = "target-state-";
    public static final String CONNECTOR_PREFIX = "connector-";
    public static final String TASK_PREFIX = "task-";
    public static final String COMMIT_TASKS_PREFIX = "commit-";
    public static final String SESSION_KEY_KEY = "session-key";
    public static final Schema CONNECTOR_CONFIGURATION_V0;
    public static final Schema TASK_CONFIGURATION_V0;
    public static final Schema CONNECTOR_TASKS_COMMIT_V0;
    public static final Schema TARGET_STATE_V0;
    public static final Schema SESSION_KEY_V0;
    public static final String RESTART_PREFIX = "restart-connector-";
    public static final boolean ONLY_FAILED_DEFAULT = false;
    public static final boolean INCLUDE_TASKS_DEFAULT = false;
    public static final String ONLY_FAILED_FIELD_NAME = "only-failed";
    public static final String INCLUDE_TASKS_FIELD_NAME = "include-tasks";
    public static final Schema RESTART_REQUEST_V0;
    private static final long READ_TO_END_TIMEOUT_MS = 30000L;
    private final Object lock;
    private final Converter converter;
    private volatile boolean started = false;
    private ConfigBackingStore.UpdateListener updateListener;
    private final String topic;
    private final KafkaBasedLog<String, byte[]> configLog;
    private final Map<String, Integer> connectorTaskCounts = new HashMap<String, Integer>();
    private final Map<String, Map<String, String>> connectorConfigs = new HashMap<String, Map<String, String>>();
    private final Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<ConnectorTaskId, Map<String, String>>();
    private final Supplier<TopicAdmin> topicAdminSupplier;
    private SharedTopicAdmin ownTopicAdmin;
    private final Set<String> inconsistent = new HashSet<String>();
    private volatile long offset;
    private volatile SessionKey sessionKey;
    private final Map<String, Map<ConnectorTaskId, Map<String, String>>> deferredTaskUpdates = new HashMap<String, Map<ConnectorTaskId, Map<String, String>>>();
    private final Map<String, TargetState> connectorTargetStates = new HashMap<String, TargetState>();
    private final WorkerConfigTransformer configTransformer;

    public static String TARGET_STATE_KEY(String connectorName) {
        return TARGET_STATE_PREFIX + connectorName;
    }

    public static String CONNECTOR_KEY(String connectorName) {
        return CONNECTOR_PREFIX + connectorName;
    }

    public static String TASK_KEY(ConnectorTaskId taskId) {
        return TASK_PREFIX + taskId.connector() + "-" + taskId.task();
    }

    public static String COMMIT_TASKS_KEY(String connectorName) {
        return COMMIT_TASKS_PREFIX + connectorName;
    }

    public static String RESTART_KEY(String connectorName) {
        return RESTART_PREFIX + connectorName;
    }

    @Deprecated
    public KafkaConfigBackingStore(Converter converter, WorkerConfig config, WorkerConfigTransformer configTransformer) {
        this(converter, config, configTransformer, null);
    }

    public KafkaConfigBackingStore(Converter converter, WorkerConfig config, WorkerConfigTransformer configTransformer, Supplier<TopicAdmin> adminSupplier) {
        this.lock = new Object();
        this.converter = converter;
        this.offset = -1L;
        this.topicAdminSupplier = adminSupplier;
        this.topic = config.getString("config.storage.topic");
        if (this.topic == null || this.topic.trim().length() == 0) {
            throw new ConfigException("Must specify topic for connector configuration.");
        }
        this.configLog = this.setupAndCreateKafkaBasedLog(this.topic, config);
        this.configTransformer = configTransformer;
    }

    @Override
    public void setUpdateListener(ConfigBackingStore.UpdateListener listener) {
        this.updateListener = listener;
    }

    @Override
    public void start() {
        log.info("Starting KafkaConfigBackingStore");
        this.configLog.start();
        int partitionCount = this.configLog.partitionCount();
        if (partitionCount > 1) {
            String msg = String.format("Topic '%s' supplied via the '%s' property is required to have a single partition in order to guarantee consistency of connector configurations, but found %d partitions.", this.topic, "config.storage.topic", partitionCount);
            throw new ConfigException(msg);
        }
        this.started = true;
        log.info("Started KafkaConfigBackingStore");
    }

    @Override
    public void stop() {
        log.info("Closing KafkaConfigBackingStore");
        try {
            this.configLog.stop();
        }
        finally {
            if (this.ownTopicAdmin != null) {
                this.ownTopicAdmin.close();
            }
        }
        log.info("Closed KafkaConfigBackingStore");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ClusterConfigState snapshot() {
        Object object = this.lock;
        synchronized (object) {
            return new ClusterConfigState(this.offset, this.sessionKey, new HashMap<String, Integer>(this.connectorTaskCounts), new HashMap<String, Map<String, String>>(this.connectorConfigs), new HashMap<String, TargetState>(this.connectorTargetStates), new HashMap<ConnectorTaskId, Map<String, String>>(this.taskConfigs), new HashSet<String>(this.inconsistent), this.configTransformer);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean contains(String connector) {
        Object object = this.lock;
        synchronized (object) {
            return this.connectorConfigs.containsKey(connector);
        }
    }

    @Override
    public void putConnectorConfig(String connector, Map<String, String> properties) {
        log.debug("Writing connector configuration for connector '{}'", (Object)connector);
        Struct connectConfig = new Struct(CONNECTOR_CONFIGURATION_V0);
        connectConfig.put("properties", properties);
        byte[] serializedConfig = this.converter.fromConnectData(this.topic, CONNECTOR_CONFIGURATION_V0, connectConfig);
        this.updateConnectorConfig(connector, serializedConfig);
    }

    @Override
    public void removeConnectorConfig(String connector) {
        log.debug("Removing connector configuration for connector '{}'", (Object)connector);
        try {
            this.configLog.send(KafkaConfigBackingStore.CONNECTOR_KEY(connector), null);
            this.configLog.send(KafkaConfigBackingStore.TARGET_STATE_KEY(connector), null);
            this.configLog.readToEnd().get(30000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("Failed to remove connector configuration from Kafka: ", (Throwable)e);
            throw new ConnectException("Error removing connector configuration from Kafka", e);
        }
    }

    @Override
    public void removeTaskConfigs(String connector) {
        throw new UnsupportedOperationException("Removal of tasks is not currently supported");
    }

    private void updateConnectorConfig(String connector, byte[] serializedConfig) {
        try {
            this.configLog.send(KafkaConfigBackingStore.CONNECTOR_KEY(connector), serializedConfig);
            this.configLog.readToEnd().get(30000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("Failed to write connector configuration to Kafka: ", (Throwable)e);
            throw new ConnectException("Error writing connector configuration to Kafka", e);
        }
    }

    @Override
    public void putTaskConfigs(String connector, List<Map<String, String>> configs) {
        try {
            this.configLog.readToEnd().get(30000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("Failed to write root configuration to Kafka: ", (Throwable)e);
            throw new ConnectException("Error writing root configuration to Kafka", e);
        }
        int taskCount = configs.size();
        int index = 0;
        for (Map<String, String> taskConfig : configs) {
            Struct connectConfig = new Struct(TASK_CONFIGURATION_V0);
            connectConfig.put("properties", taskConfig);
            byte[] serializedConfig = this.converter.fromConnectData(this.topic, TASK_CONFIGURATION_V0, connectConfig);
            log.debug("Writing configuration for connector '{}' task {}", (Object)connector, (Object)index);
            ConnectorTaskId connectorTaskId = new ConnectorTaskId(connector, index);
            this.configLog.send(KafkaConfigBackingStore.TASK_KEY(connectorTaskId), serializedConfig);
            ++index;
        }
        try {
            if (taskCount > 0) {
                this.configLog.readToEnd().get(30000L, TimeUnit.MILLISECONDS);
            }
            Struct connectConfig = new Struct(CONNECTOR_TASKS_COMMIT_V0);
            connectConfig.put("tasks", (Object)taskCount);
            byte[] serializedConfig = this.converter.fromConnectData(this.topic, CONNECTOR_TASKS_COMMIT_V0, connectConfig);
            log.debug("Writing commit for connector '{}' with {} tasks.", (Object)connector, (Object)taskCount);
            this.configLog.send(KafkaConfigBackingStore.COMMIT_TASKS_KEY(connector), serializedConfig);
            this.configLog.readToEnd().get(30000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("Failed to write root configuration to Kafka: ", (Throwable)e);
            throw new ConnectException("Error writing root configuration to Kafka", e);
        }
    }

    @Override
    public void refresh(long timeout, TimeUnit unit) throws TimeoutException {
        try {
            this.configLog.readToEnd().get(timeout, unit);
        }
        catch (InterruptedException | ExecutionException e) {
            throw new ConnectException("Error trying to read to end of config log", e);
        }
    }

    @Override
    public void putTargetState(String connector, TargetState state) {
        Struct connectTargetState = new Struct(TARGET_STATE_V0);
        connectTargetState.put("state", (Object)state.name());
        byte[] serializedTargetState = this.converter.fromConnectData(this.topic, TARGET_STATE_V0, connectTargetState);
        log.debug("Writing target state {} for connector {}", (Object)state, (Object)connector);
        this.configLog.send(KafkaConfigBackingStore.TARGET_STATE_KEY(connector), serializedTargetState);
    }

    @Override
    public void putSessionKey(SessionKey sessionKey) {
        log.debug("Distributing new session key");
        Struct sessionKeyStruct = new Struct(SESSION_KEY_V0);
        sessionKeyStruct.put("key", (Object)Base64.getEncoder().encodeToString(sessionKey.key().getEncoded()));
        sessionKeyStruct.put("algorithm", (Object)sessionKey.key().getAlgorithm());
        sessionKeyStruct.put("creation-timestamp", (Object)sessionKey.creationTimestamp());
        byte[] serializedSessionKey = this.converter.fromConnectData(this.topic, SESSION_KEY_V0, sessionKeyStruct);
        try {
            this.configLog.send(SESSION_KEY_KEY, serializedSessionKey);
            this.configLog.readToEnd().get(30000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("Failed to write session key to Kafka: ", (Throwable)e);
            throw new ConnectException("Error writing session key to Kafka", e);
        }
    }

    @Override
    public void putRestartRequest(RestartRequest restartRequest) {
        log.debug("Writing {} to Kafka", (Object)restartRequest);
        String key = KafkaConfigBackingStore.RESTART_KEY(restartRequest.connectorName());
        Struct value = new Struct(RESTART_REQUEST_V0);
        value.put(INCLUDE_TASKS_FIELD_NAME, (Object)restartRequest.includeTasks());
        value.put(ONLY_FAILED_FIELD_NAME, (Object)restartRequest.onlyFailed());
        byte[] serializedValue = this.converter.fromConnectData(this.topic, value.schema(), value);
        try {
            this.configLog.send(key, serializedValue);
            this.configLog.readToEnd().get(30000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("Failed to write {} to Kafka: ", (Object)restartRequest, (Object)e);
            throw new ConnectException("Error writing " + restartRequest + " to Kafka", e);
        }
    }

    KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, WorkerConfig config) {
        String clusterId = ConnectUtils.lookupKafkaClusterId(config);
        Map<String, Object> originals = config.originals();
        HashMap<String, Object> producerProps = new HashMap<String, Object>(originals);
        producerProps.put("key.serializer", StringSerializer.class.getName());
        producerProps.put("value.serializer", ByteArraySerializer.class.getName());
        producerProps.put("delivery.timeout.ms", Integer.MAX_VALUE);
        producerProps.put("enable.idempotence", "false");
        ConnectUtils.addMetricsContextProperties(producerProps, config, clusterId);
        HashMap<String, Object> consumerProps = new HashMap<String, Object>(originals);
        consumerProps.put("key.deserializer", StringDeserializer.class.getName());
        consumerProps.put("value.deserializer", ByteArrayDeserializer.class.getName());
        ConnectUtils.addMetricsContextProperties(consumerProps, config, clusterId);
        HashMap<String, Object> adminProps = new HashMap<String, Object>(originals);
        ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
        SharedTopicAdmin adminSupplier = this.topicAdminSupplier != null ? this.topicAdminSupplier : (this.ownTopicAdmin = new SharedTopicAdmin(adminProps));
        Map<String, Object> topicSettings = config instanceof DistributedConfig ? ((DistributedConfig)config).configStorageTopicSettings() : Collections.emptyMap();
        NewTopic topicDescription = TopicAdmin.defineTopic(topic).config(topicSettings).compacted().partitions(1).replicationFactor(config.getShort("config.storage.replication.factor")).build();
        return this.createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier);
    }

    private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps, Map<String, Object> consumerProps, Callback<ConsumerRecord<String, byte[]>> consumedCallback, NewTopic topicDescription, Supplier<TopicAdmin> adminSupplier) {
        Consumer<TopicAdmin> createTopics = admin -> {
            log.debug("Creating admin client to manage Connect internal config topic");
            Set<String> newTopics = admin.createTopics(topicDescription);
            if (!newTopics.contains(topic)) {
                log.debug("Using admin client to check cleanup policy of '{}' topic is '{}'", (Object)topic, (Object)"compact");
                admin.verifyTopicCleanupPolicyOnlyCompact(topic, "config.storage.topic", "connector configurations");
            }
        };
        return new KafkaBasedLog<String, byte[]>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, Time.SYSTEM, createTopics);
    }

    RestartRequest recordToRestartRequest(ConsumerRecord<String, byte[]> record, SchemaAndValue value) {
        boolean includeTasks;
        boolean onlyFailed;
        String connectorName = record.key().substring(RESTART_PREFIX.length());
        if (!(value.value() instanceof Map)) {
            log.error("Ignoring restart request because the value is not a Map but is {}", value.value() == null ? "null" : value.value().getClass());
            return null;
        }
        Map valueAsMap = (Map)value.value();
        Object failed = valueAsMap.get(ONLY_FAILED_FIELD_NAME);
        if (!(failed instanceof Boolean)) {
            log.warn("Invalid data for restart request '{}' field should be a Boolean but is {}, defaulting to {}", new Object[]{ONLY_FAILED_FIELD_NAME, failed == null ? "null" : failed.getClass(), false});
            onlyFailed = false;
        } else {
            onlyFailed = (Boolean)failed;
        }
        Object withTasks = valueAsMap.get(INCLUDE_TASKS_FIELD_NAME);
        if (!(withTasks instanceof Boolean)) {
            log.warn("Invalid data for restart request '{}' field should be a Boolean but is {}, defaulting to {}", new Object[]{INCLUDE_TASKS_FIELD_NAME, withTasks == null ? "null" : withTasks.getClass(), false});
            includeTasks = false;
        } else {
            includeTasks = (Boolean)withTasks;
        }
        return new RestartRequest(connectorName, onlyFailed, includeTasks);
    }

    private ConnectorTaskId parseTaskId(String key) {
        String[] parts = key.split("-");
        if (parts.length < 3) {
            return null;
        }
        try {
            int taskNum = Integer.parseInt(parts[parts.length - 1]);
            String connectorName = Utils.join(Arrays.copyOfRange(parts, 1, parts.length - 1), "-");
            return new ConnectorTaskId(connectorName, taskNum);
        }
        catch (NumberFormatException e) {
            return null;
        }
    }

    private Set<Integer> taskIds(String connector, Map<ConnectorTaskId, Map<String, String>> configs) {
        TreeSet<Integer> tasks = new TreeSet<Integer>();
        if (configs == null) {
            return tasks;
        }
        for (ConnectorTaskId taskId : configs.keySet()) {
            assert (taskId.connector().equals(connector));
            tasks.add(taskId.task());
        }
        return tasks;
    }

    private boolean completeTaskIdSet(Set<Integer> idSet, int expectedSize) {
        if (idSet.size() < expectedSize) {
            return false;
        }
        for (int i = 0; i < expectedSize; ++i) {
            if (idSet.contains(i)) continue;
            return false;
        }
        return true;
    }

    private static int intValue(Object value) {
        if (value instanceof Integer) {
            return (Integer)value;
        }
        if (value instanceof Long) {
            return (int)((Long)value).longValue();
        }
        throw new ConnectException("Expected integer value to be either Integer or Long");
    }

    static {
        TASK_CONFIGURATION_V0 = CONNECTOR_CONFIGURATION_V0 = SchemaBuilder.struct().field("properties", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA).build()).build();
        CONNECTOR_TASKS_COMMIT_V0 = SchemaBuilder.struct().field("tasks", Schema.INT32_SCHEMA).build();
        TARGET_STATE_V0 = SchemaBuilder.struct().field("state", Schema.STRING_SCHEMA).build();
        SESSION_KEY_V0 = SchemaBuilder.struct().field("key", Schema.STRING_SCHEMA).field("algorithm", Schema.STRING_SCHEMA).field("creation-timestamp", Schema.INT64_SCHEMA).build();
        RESTART_REQUEST_V0 = SchemaBuilder.struct().field(INCLUDE_TASKS_FIELD_NAME, Schema.BOOLEAN_SCHEMA).field(ONLY_FAILED_FIELD_NAME, Schema.BOOLEAN_SCHEMA).build();
    }

    private class ConsumeCallback
    implements Callback<ConsumerRecord<String, byte[]>> {
        private ConsumeCallback() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) {
            SchemaAndValue value;
            if (error != null) {
                log.error("Unexpected in consumer callback for KafkaConfigBackingStore: ", error);
                return;
            }
            try {
                value = KafkaConfigBackingStore.this.converter.toConnectData(KafkaConfigBackingStore.this.topic, record.value());
            }
            catch (DataException e) {
                log.error("Failed to convert config data to Kafka Connect format: ", (Throwable)e);
                return;
            }
            KafkaConfigBackingStore.this.offset = record.offset() + 1L;
            if (record.key().startsWith(KafkaConfigBackingStore.TARGET_STATE_PREFIX)) {
                String connectorName = record.key().substring(KafkaConfigBackingStore.TARGET_STATE_PREFIX.length());
                boolean removed = false;
                Object object = KafkaConfigBackingStore.this.lock;
                synchronized (object) {
                    if (value.value() == null) {
                        log.debug("Removed target state for connector {} due to null value in topic.", (Object)connectorName);
                        KafkaConfigBackingStore.this.connectorTargetStates.remove(connectorName);
                        removed = true;
                        if (KafkaConfigBackingStore.this.connectorConfigs.containsKey(connectorName)) {
                            KafkaConfigBackingStore.this.connectorTargetStates.put(connectorName, TargetState.STARTED);
                        }
                    } else {
                        if (!(value.value() instanceof Map)) {
                            log.error("Found target state ({}) in wrong format: {}", (Object)record.key(), value.value().getClass());
                            return;
                        }
                        Object targetState = ((Map)value.value()).get("state");
                        if (!(targetState instanceof String)) {
                            log.error("Invalid data for target state for connector '{}': 'state' field should be a Map but is {}", (Object)connectorName, targetState == null ? null : targetState.getClass());
                            return;
                        }
                        try {
                            TargetState state = TargetState.valueOf((String)targetState);
                            log.debug("Setting target state for connector '{}' to {}", (Object)connectorName, targetState);
                            KafkaConfigBackingStore.this.connectorTargetStates.put(connectorName, state);
                        }
                        catch (IllegalArgumentException e) {
                            log.error("Invalid target state for connector '{}': {}", (Object)connectorName, targetState);
                            return;
                        }
                    }
                }
                if (KafkaConfigBackingStore.this.started && !removed) {
                    KafkaConfigBackingStore.this.updateListener.onConnectorTargetStateChange(connectorName);
                }
            } else if (record.key().startsWith(KafkaConfigBackingStore.CONNECTOR_PREFIX)) {
                String connectorName = record.key().substring(KafkaConfigBackingStore.CONNECTOR_PREFIX.length());
                boolean removed = false;
                Object object = KafkaConfigBackingStore.this.lock;
                synchronized (object) {
                    if (value.value() == null) {
                        log.info("Successfully processed removal of connector '{}'", (Object)connectorName);
                        KafkaConfigBackingStore.this.connectorConfigs.remove(connectorName);
                        KafkaConfigBackingStore.this.connectorTaskCounts.remove(connectorName);
                        KafkaConfigBackingStore.this.taskConfigs.keySet().removeIf(taskId -> taskId.connector().equals(connectorName));
                        removed = true;
                    } else {
                        if (!(value.value() instanceof Map)) {
                            log.error("Found configuration for connector '{}' in wrong format: {}", (Object)record.key(), value.value().getClass());
                            return;
                        }
                        Object newConnectorConfig = ((Map)value.value()).get("properties");
                        if (!(newConnectorConfig instanceof Map)) {
                            log.error("Invalid data for config for connector '{}': 'properties' field should be a Map but is {}", (Object)connectorName, newConnectorConfig == null ? null : newConnectorConfig.getClass());
                            return;
                        }
                        log.debug("Updating configuration for connector '{}'", (Object)connectorName);
                        KafkaConfigBackingStore.this.connectorConfigs.put(connectorName, (Map)newConnectorConfig);
                        if (!KafkaConfigBackingStore.this.connectorTargetStates.containsKey(connectorName)) {
                            KafkaConfigBackingStore.this.connectorTargetStates.put(connectorName, TargetState.STARTED);
                        }
                    }
                }
                if (KafkaConfigBackingStore.this.started) {
                    if (removed) {
                        KafkaConfigBackingStore.this.updateListener.onConnectorConfigRemove(connectorName);
                    } else {
                        KafkaConfigBackingStore.this.updateListener.onConnectorConfigUpdate(connectorName);
                    }
                }
            } else {
                if (record.key().startsWith(KafkaConfigBackingStore.TASK_PREFIX)) {
                    Object connectorName = KafkaConfigBackingStore.this.lock;
                    synchronized (connectorName) {
                        ConnectorTaskId taskId2 = KafkaConfigBackingStore.this.parseTaskId(record.key());
                        if (taskId2 == null) {
                            log.error("Ignoring task configuration because {} couldn't be parsed as a task config key", (Object)record.key());
                            return;
                        }
                        if (value.value() == null) {
                            log.error("Ignoring task configuration for task {} because it is unexpectedly null", (Object)taskId2);
                            return;
                        }
                        if (!(value.value() instanceof Map)) {
                            log.error("Ignoring task configuration for task {} because the value is not a Map but is {}", (Object)taskId2, value.value().getClass());
                            return;
                        }
                        Object newTaskConfig = ((Map)value.value()).get("properties");
                        if (!(newTaskConfig instanceof Map)) {
                            log.error("Invalid data for config of task {} 'properties' field should be a Map but is {}", (Object)taskId2, newTaskConfig.getClass());
                            return;
                        }
                        Map deferred = KafkaConfigBackingStore.this.deferredTaskUpdates.computeIfAbsent(taskId2.connector(), k -> new HashMap());
                        log.debug("Storing new config for task {}; this will wait for a commit message before the new config will take effect.", (Object)taskId2);
                        deferred.put(taskId2, (Map)newTaskConfig);
                    }
                }
                if (record.key().startsWith(KafkaConfigBackingStore.COMMIT_TASKS_PREFIX)) {
                    String connectorName = record.key().substring(KafkaConfigBackingStore.COMMIT_TASKS_PREFIX.length());
                    ArrayList<ConnectorTaskId> updatedTasks = new ArrayList<ConnectorTaskId>();
                    Object newTaskConfig = KafkaConfigBackingStore.this.lock;
                    synchronized (newTaskConfig) {
                        if (!(value.value() instanceof Map)) {
                            log.error("Ignoring connector tasks configuration commit for connector '{}' because it is in the wrong format: {}", (Object)connectorName, value.value());
                            return;
                        }
                        Map deferred = (Map)KafkaConfigBackingStore.this.deferredTaskUpdates.get(connectorName);
                        int newTaskCount = KafkaConfigBackingStore.intValue(((Map)value.value()).get("tasks"));
                        Set taskIdSet = KafkaConfigBackingStore.this.taskIds(connectorName, deferred);
                        if (!KafkaConfigBackingStore.this.completeTaskIdSet(taskIdSet, newTaskCount)) {
                            log.debug("We have an incomplete set of task configs for connector '{}' probably due to compaction. So we are not doing anything with the new configuration.", (Object)connectorName);
                            KafkaConfigBackingStore.this.inconsistent.add(connectorName);
                        } else {
                            if (deferred != null) {
                                KafkaConfigBackingStore.this.taskConfigs.putAll(deferred);
                                updatedTasks.addAll(deferred.keySet());
                            }
                            KafkaConfigBackingStore.this.inconsistent.remove(connectorName);
                        }
                        if (deferred != null) {
                            deferred.clear();
                        }
                        KafkaConfigBackingStore.this.connectorTaskCounts.put(connectorName, newTaskCount);
                    }
                    if (KafkaConfigBackingStore.this.started) {
                        KafkaConfigBackingStore.this.updateListener.onTaskConfigUpdate(updatedTasks);
                    }
                } else if (record.key().startsWith(KafkaConfigBackingStore.RESTART_PREFIX)) {
                    RestartRequest request = KafkaConfigBackingStore.this.recordToRestartRequest(record, value);
                    if (request != null && KafkaConfigBackingStore.this.started) {
                        KafkaConfigBackingStore.this.updateListener.onRestartRequest(request);
                    }
                } else if (record.key().equals(KafkaConfigBackingStore.SESSION_KEY_KEY)) {
                    if (value.value() == null) {
                        log.error("Ignoring session key because it is unexpectedly null");
                        return;
                    }
                    if (!(value.value() instanceof Map)) {
                        log.error("Ignoring session key because the value is not a Map but is {}", value.value().getClass());
                        return;
                    }
                    Map valueAsMap = (Map)value.value();
                    Object sessionKey = valueAsMap.get("key");
                    if (!(sessionKey instanceof String)) {
                        log.error("Invalid data for session key 'key' field should be a String but is {}", sessionKey.getClass());
                        return;
                    }
                    byte[] key = Base64.getDecoder().decode((String)sessionKey);
                    Object keyAlgorithm = valueAsMap.get("algorithm");
                    if (!(keyAlgorithm instanceof String)) {
                        log.error("Invalid data for session key 'algorithm' field should be a String but it is {}", keyAlgorithm.getClass());
                        return;
                    }
                    Object creationTimestamp = valueAsMap.get("creation-timestamp");
                    if (!(creationTimestamp instanceof Long)) {
                        log.error("Invalid data for session key 'creation-timestamp' field should be a long but it is {}", creationTimestamp.getClass());
                        return;
                    }
                    KafkaConfigBackingStore.this.sessionKey = new SessionKey(new SecretKeySpec(key, (String)keyAlgorithm), (Long)creationTimestamp);
                    if (KafkaConfigBackingStore.this.started) {
                        KafkaConfigBackingStore.this.updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);
                    }
                } else {
                    log.error("Discarding config update record with invalid key: {}", (Object)record.key());
                }
            }
        }
    }
}

