/*
 * Decompiled with CFR 0.152.
 */
package org.openl.rules.ruleservice.kafka.publish;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rits.cloning.Cloner;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.openl.rules.project.model.RulesDeploy;
import org.openl.rules.ruleservice.core.OpenLService;
import org.openl.rules.ruleservice.core.RuleServiceDeployException;
import org.openl.rules.ruleservice.core.RuleServiceInstantiationException;
import org.openl.rules.ruleservice.core.RuleServiceUndeployException;
import org.openl.rules.ruleservice.core.ServiceDescription;
import org.openl.rules.ruleservice.kafka.RequestMessage;
import org.openl.rules.ruleservice.kafka.conf.BaseKafkaConfig;
import org.openl.rules.ruleservice.kafka.conf.ClientIDGenerator;
import org.openl.rules.ruleservice.kafka.conf.KafkaDeploy;
import org.openl.rules.ruleservice.kafka.conf.KafkaDeployUtils;
import org.openl.rules.ruleservice.kafka.conf.KafkaMethodConfig;
import org.openl.rules.ruleservice.kafka.conf.KafkaServiceConfig;
import org.openl.rules.ruleservice.kafka.conf.YamlObjectMapperBuilder;
import org.openl.rules.ruleservice.kafka.databinding.KafkaConfigHolder;
import org.openl.rules.ruleservice.kafka.publish.KafkaHelpers;
import org.openl.rules.ruleservice.kafka.publish.KafkaService;
import org.openl.rules.ruleservice.kafka.publish.KafkaServiceConfigurationException;
import org.openl.rules.ruleservice.kafka.publish.KafkaServiceException;
import org.openl.rules.ruleservice.publish.RuleServicePublisher;
import org.openl.rules.ruleservice.publish.jaxrs.storelogdata.JacksonObjectSerializer;
import org.openl.rules.ruleservice.storelogdata.ObjectSerializer;
import org.openl.rules.ruleservice.storelogdata.StoreLogDataManager;
import org.openl.rules.serialization.JacksonObjectMapperFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.ObjectFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ResourceLoaderAware;
import org.springframework.core.env.Environment;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;

public class KafkaRuleServicePublisher
implements RuleServicePublisher,
ResourceLoaderAware {
    private final Logger log = LoggerFactory.getLogger(KafkaRuleServicePublisher.class);
    private static final String CLIENT_ID_GENERATOR = "client.id.generator";
    private static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
    private static final String GROUP_ID = "group.id";
    private static final String CLIENT_ID = "client.id";
    private static final String[] CLEAN_UP_PROPERTIES = new String[]{"jackson.defaultTypingMode", "rootClassNamesBinding", "client.id.generator"};
    private final Map<OpenLService, Triple<Collection<KafkaService>, Collection<KafkaProducer<?, ?>>, Collection<KafkaConsumer<?, ?>>>> runningServices = new HashMap();
    private ResourceLoader resourceLoader;
    private KafkaDeploy defaultKafkaDeploy;
    private KafkaDeploy immutableKafkaDeploy;
    private String defaultBootstrapServers;
    private String defaultGroupId;
    private final Cloner cloner = new Cloner();
    @Autowired
    private StoreLogDataManager storeLogDataManager;
    @Autowired
    private Environment env;
    @Autowired
    @Qualifier(value="serviceDescriptionInProcess")
    private ObjectFactory<ServiceDescription> serviceDescriptionObjectFactory;
    @Autowired
    @Qualifier(value="kafkaServiceProducerJacksonObjectMapperFactoryBean")
    private ObjectFactory<JacksonObjectMapperFactory> kafkaServiceProducerJacksonObjectMapperFactoryBeanObjectFactory;
    @Autowired
    @Qualifier(value="kafkaServiceConsumerJacksonObjectMapperFactoryBean")
    private ObjectFactory<JacksonObjectMapperFactory> kafkaServiceConsumerJacksonObjectMapperFactoryBeanObjectFactory;

    public void setStoreLogDataManager(StoreLogDataManager storeLogDataManager) {
        this.storeLogDataManager = storeLogDataManager;
    }

    public StoreLogDataManager getStoreLogDataManager() {
        return this.storeLogDataManager;
    }

    public ObjectFactory<JacksonObjectMapperFactory> getKafkaServiceProducerJacksonObjectMapperFactoryBeanObjectFactory() {
        return this.kafkaServiceProducerJacksonObjectMapperFactoryBeanObjectFactory;
    }

    public void setKafkaServiceProducerJacksonObjectMapperFactoryBeanObjectFactory(ObjectFactory<JacksonObjectMapperFactory> kafkaServiceProducerJacksonObjectMapperFactoryBeanObjectFactory) {
        this.kafkaServiceProducerJacksonObjectMapperFactoryBeanObjectFactory = kafkaServiceProducerJacksonObjectMapperFactoryBeanObjectFactory;
    }

    public ObjectFactory<JacksonObjectMapperFactory> getKafkaServiceConsumerJacksonObjectMapperFactoryBeanObjectFactory() {
        return this.kafkaServiceConsumerJacksonObjectMapperFactoryBeanObjectFactory;
    }

    public void setKafkaServiceConsumerJacksonObjectMapperFactoryBeanObjectFactory(ObjectFactory<JacksonObjectMapperFactory> kafkaServiceConsumerJacksonObjectMapperFactoryBeanObjectFactory) {
        this.kafkaServiceConsumerJacksonObjectMapperFactoryBeanObjectFactory = kafkaServiceConsumerJacksonObjectMapperFactoryBeanObjectFactory;
    }

    public ObjectFactory<ServiceDescription> getServiceDescriptionObjectFactory() {
        return this.serviceDescriptionObjectFactory;
    }

    public void setServiceDescriptionObjectFactory(ObjectFactory<ServiceDescription> serviceDescriptionObjectFactory) {
        this.serviceDescriptionObjectFactory = serviceDescriptionObjectFactory;
    }

    private KafkaDeploy getDefaultKafkaDeploy() throws IOException {
        if (this.defaultKafkaDeploy == null) {
            Resource resource = this.resourceLoader.getResource("classpath:default-kafka-deploy.yaml");
            if (!resource.exists() && !(resource = this.resourceLoader.getResource("classpath:default-kafka-deploy.yml")).exists()) {
                throw new FileNotFoundException("File 'default-kafka-deploy.yaml' or 'default-kafka-deploy.yml' is not found.");
            }
            ObjectMapper mapper = YamlObjectMapperBuilder.newInstance();
            this.defaultKafkaDeploy = (KafkaDeploy)mapper.readValue(resource.getInputStream(), KafkaDeploy.class);
        }
        return this.defaultKafkaDeploy;
    }

    private KafkaDeploy getImmutableKafkaDeploy() throws IOException {
        if (this.immutableKafkaDeploy == null) {
            Resource resource = this.resourceLoader.getResource("classpath:immutable-kafka-deploy.yaml");
            if (!resource.exists() && !(resource = this.resourceLoader.getResource("classpath:immutable-kafka-deploy.yml")).exists()) {
                throw new FileNotFoundException("File 'immutable-kafka-deploy.yaml' or 'immutable-kafka-deploy.yml' is not found.");
            }
            ObjectMapper mapper = YamlObjectMapperBuilder.newInstance();
            this.immutableKafkaDeploy = (KafkaDeploy)mapper.readValue(resource.getInputStream(), KafkaDeploy.class);
        }
        return this.immutableKafkaDeploy;
    }

    private void setBootstrapServers(OpenLService service, BaseKafkaConfig kafkaConfig, String logPrefix, Properties configs) {
        if (configs.containsKey(BOOTSTRAP_SERVERS)) {
            if (kafkaConfig instanceof KafkaMethodConfig) {
                KafkaMethodConfig kafkaMethodConfig = (KafkaMethodConfig)kafkaConfig;
                this.log.warn("{} '{}' property is overridden in service '{}' for method '{}'.", new Object[]{logPrefix, BOOTSTRAP_SERVERS, service.getDeployPath(), kafkaMethodConfig.getMethodName()});
            } else {
                this.log.warn("{} '{}' property is overridden in service '{}'.", new Object[]{logPrefix, BOOTSTRAP_SERVERS, service.getDeployPath()});
            }
        } else {
            configs.setProperty(BOOTSTRAP_SERVERS, this.getDefaultBootstrapServers());
        }
    }

    private Properties getProducerConfigs(OpenLService service, BaseKafkaConfig baseKafkaDeploy, KafkaDeploy kafkaDeploy) throws IOException {
        Properties configs = new Properties();
        if (this.getDefaultKafkaDeploy().getProducerConfigs() != null) {
            configs.putAll((Map<?, ?>)this.getDefaultKafkaDeploy().getProducerConfigs());
        }
        if (kafkaDeploy.getProducerConfigs() != null) {
            configs.putAll((Map<?, ?>)kafkaDeploy.getProducerConfigs());
        }
        if (baseKafkaDeploy.getProducerConfigs() != null) {
            configs.putAll((Map<?, ?>)baseKafkaDeploy.getProducerConfigs());
        }
        if (this.getImmutableKafkaDeploy().getProducerConfigs() != null) {
            configs.putAll((Map<?, ?>)this.getImmutableKafkaDeploy().getProducerConfigs());
        }
        this.useClientIdGeneratorIfPropertyIsNotSet(configs, service, baseKafkaDeploy);
        this.setBootstrapServers(service, baseKafkaDeploy, "Producer", configs);
        return configs;
    }

    private Properties getDltProducerConfigs(OpenLService service, BaseKafkaConfig kafkaConfig, KafkaDeploy kafkaDeploy) throws IOException {
        Properties configs = new Properties();
        if (this.getDefaultKafkaDeploy().getDltProducerConfigs() != null) {
            configs.putAll((Map<?, ?>)this.getDefaultKafkaDeploy().getDltProducerConfigs());
        }
        if (kafkaDeploy.getDltProducerConfigs() != null) {
            configs.putAll((Map<?, ?>)kafkaDeploy.getDltProducerConfigs());
        }
        if (kafkaConfig.getDltProducerConfigs() != null) {
            configs.putAll((Map<?, ?>)kafkaConfig.getDltProducerConfigs());
        }
        if (this.getImmutableKafkaDeploy().getDltProducerConfigs() != null) {
            configs.putAll((Map<?, ?>)this.getImmutableKafkaDeploy().getDltProducerConfigs());
        }
        this.useClientIdGeneratorIfPropertyIsNotSet(configs, service, kafkaConfig);
        this.setBootstrapServers(service, kafkaConfig, "DLT producer", configs);
        return configs;
    }

    private Properties getConsumerConfigs(OpenLService service, BaseKafkaConfig kafkaConfig, KafkaDeploy kafkaDeploy) throws IOException {
        Properties configs = new Properties();
        if (this.getDefaultKafkaDeploy().getConsumerConfigs() != null) {
            configs.putAll((Map<?, ?>)this.getDefaultKafkaDeploy().getConsumerConfigs());
        }
        if (kafkaDeploy.getConsumerConfigs() != null) {
            configs.putAll((Map<?, ?>)kafkaDeploy.getConsumerConfigs());
        }
        if (kafkaConfig.getConsumerConfigs() != null) {
            configs.putAll((Map<?, ?>)kafkaConfig.getConsumerConfigs());
        }
        if (this.getImmutableKafkaDeploy().getConsumerConfigs() != null) {
            configs.putAll((Map<?, ?>)this.getImmutableKafkaDeploy().getConsumerConfigs());
        }
        this.useClientIdGeneratorIfPropertyIsNotSet(configs, service, kafkaConfig);
        this.setBootstrapServers(service, kafkaConfig, "Consumer", configs);
        if (!configs.containsKey(GROUP_ID)) {
            configs.setProperty(GROUP_ID, this.getDefaultGroupId());
        }
        return configs;
    }

    private void useClientIdGeneratorIfPropertyIsNotSet(Properties configs, OpenLService service, BaseKafkaConfig kafkaConfig) {
        if (configs.getProperty(CLIENT_ID) == null && configs.getProperty(CLIENT_ID_GENERATOR) != null) {
            String clientIDGeneratorClassName = configs.getProperty(CLIENT_ID_GENERATOR);
            try {
                Class<?> clientIDGeneratorClass = Thread.currentThread().getContextClassLoader().loadClass(clientIDGeneratorClassName);
                ClientIDGenerator clientIDGenerator = (ClientIDGenerator)clientIDGeneratorClass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
                configs.put(CLIENT_ID, clientIDGenerator.generate(service, kafkaConfig));
            }
            catch (Exception e) {
                this.log.error("Failed to generate 'client.id' property for kafka consumer/producer.", (Throwable)e);
            }
        }
    }

    private boolean requiredFieldIsMissed(Object value) {
        if (value instanceof String) {
            String v = (String)value;
            return StringUtils.isEmpty((CharSequence)v.trim());
        }
        return value == null;
    }

    private void validate(Object config) throws IllegalAccessException, KafkaServiceConfigurationException {
        ArrayList<String> missedRequiredFields = new ArrayList<String>();
        for (Field field : config.getClass().getFields()) {
            JsonProperty jsonPropertyAnnotation = field.getAnnotation(JsonProperty.class);
            if (jsonPropertyAnnotation == null || !jsonPropertyAnnotation.required() || !this.requiredFieldIsMissed(field.get(config))) continue;
            missedRequiredFields.add(jsonPropertyAnnotation.value());
        }
        if (!missedRequiredFields.isEmpty()) {
            String missedRequiredFieldsString = missedRequiredFields.stream().collect(Collectors.joining(",", "[", "]"));
            if (config instanceof KafkaMethodConfig) {
                KafkaMethodConfig kafkaMethodConfig = (KafkaMethodConfig)config;
                throw new KafkaServiceConfigurationException(String.format("Missed mandatory configs %s in method '%s' configuration.", missedRequiredFieldsString, kafkaMethodConfig.getMethodName()));
            }
            throw new KafkaServiceConfigurationException(String.format("Missed mandatory config %s in the service configuration.", missedRequiredFieldsString));
        }
    }

    private <T extends BaseKafkaConfig> T makeMergedKafkaConfig(OpenLService service, T kafkaConfig, KafkaDeploy kafkaDeploy) throws IOException {
        BaseKafkaConfig config = (BaseKafkaConfig)this.cloner.deepClone(kafkaConfig);
        config.setProducerConfigs(this.getProducerConfigs(service, kafkaConfig, kafkaDeploy));
        config.setConsumerConfigs(this.getConsumerConfigs(service, kafkaConfig, kafkaDeploy));
        config.setDltProducerConfigs(this.getDltProducerConfigs(service, kafkaConfig, kafkaDeploy));
        return (T)config;
    }

    protected Properties cleanupConfigs(Properties config) {
        Properties props = new Properties();
        props.putAll((Map<?, ?>)config);
        for (String propertyName : CLEAN_UP_PROPERTIES) {
            props.remove(propertyName);
        }
        return props;
    }

    private <T> T getConfiguredValueDeserializer(OpenLService service, ObjectMapper objectMapper, Method method, String className) throws RuleServiceInstantiationException, ClassNotFoundException, IllegalAccessException, InstantiationException, InvocationTargetException {
        if (className == null) {
            return null;
        }
        Class<?> clazz = service.getClassLoader().loadClass(className);
        try {
            Constructor<?> constructor = clazz.getConstructor(OpenLService.class, ObjectMapper.class, Method.class);
            return (T)constructor.newInstance(service, objectMapper, method);
        }
        catch (NoSuchMethodException e) {
            return (T)clazz.newInstance();
        }
    }

    private <T> T getConfiguredValueSerializer(OpenLService service, ObjectMapper objectMapper, String className) throws RuleServiceInstantiationException, ClassNotFoundException, IllegalAccessException, InstantiationException, InvocationTargetException {
        if (className == null) {
            return null;
        }
        Class<?> clazz = service.getClassLoader().loadClass(className);
        try {
            Constructor<?> constructor = clazz.getConstructor(OpenLService.class, ObjectMapper.class);
            return (T)constructor.newInstance(service, objectMapper);
        }
        catch (NoSuchMethodException e) {
            return (T)clazz.newInstance();
        }
    }

    private <T> T getConfiguredKeySerializerOrDeserializer(OpenLService service, String className) throws RuleServiceInstantiationException, ClassNotFoundException, IllegalAccessException, InstantiationException {
        if (className == null) {
            return null;
        }
        Class<?> clazz = service.getClassLoader().loadClass(className);
        return (T)clazz.newInstance();
    }

    private KafkaProducer<String, Object> buildProducer(OpenLService service, ObjectMapper objectMapper, Properties configs) throws KafkaServiceConfigurationException {
        Serializer valueSerializer;
        Serializer keySerializer;
        try {
            keySerializer = (Serializer)this.getConfiguredKeySerializerOrDeserializer(service, configs.getProperty("key.serializer"));
            valueSerializer = (Serializer)this.getConfiguredValueSerializer(service, objectMapper, configs.getProperty("value.serializer"));
        }
        catch (Exception e) {
            throw new KafkaServiceConfigurationException("Failed to constuct key/value deserializer for producer.", e);
        }
        return new KafkaProducer(configs, keySerializer, valueSerializer);
    }

    private KafkaProducer<String, byte[]> buildDltProducer(Properties configs) {
        return new KafkaProducer(configs);
    }

    private KafkaConsumer<String, RequestMessage> buildConsumer(OpenLService service, ObjectMapper objectMapper, Method method, Properties configs) throws KafkaServiceConfigurationException {
        Deserializer valueDeserializer;
        Deserializer keyDeserializer;
        try {
            keyDeserializer = (Deserializer)this.getConfiguredKeySerializerOrDeserializer(service, configs.getProperty("key.deserializer"));
            valueDeserializer = (Deserializer)this.getConfiguredValueDeserializer(service, objectMapper, method, configs.getProperty("value.deserializer"));
        }
        catch (Exception e) {
            throw new KafkaServiceConfigurationException("Failed to constuct key/value deserializer for consumer.", e);
        }
        return new KafkaConsumer(configs, keyDeserializer, valueDeserializer);
    }

    private <T extends BaseKafkaConfig> void createKafkaService(OpenLService service, Collection<KafkaService> kafkaServices, Collection<KafkaConsumer<?, ?>> kafkaConsumers, Collection<KafkaProducer<?, ?>> kafkaProducers, ServiceDeployContext context, T mergedKafkaConfig, T config, Method method, RulesDeploy rulesDeploy) throws KafkaServiceException {
        ObjectMapper consumerObjectMapper = this.createConsumerJacksonObjectMapper(mergedKafkaConfig);
        KafkaConsumer<String, RequestMessage> consumer = this.buildConsumer(service, consumerObjectMapper, method, this.cleanupConfigs(mergedKafkaConfig.getConsumerConfigs()));
        kafkaConsumers.add(consumer);
        boolean possibleToReuseShared = config.getProducerConfigs() == null || config.getProducerConfigs().isEmpty();
        ObjectSerializer objectSerializer = null;
        KafkaProducer<String, Object> producer = null;
        if (possibleToReuseShared) {
            producer = context.getProducer();
            objectSerializer = context.getObjectSerializer();
        }
        if (producer == null) {
            ObjectMapper producerObjectMapper = this.createProducerJacksonObjectMapper(mergedKafkaConfig);
            objectSerializer = new JacksonObjectSerializer(producerObjectMapper);
            producer = this.buildProducer(service, producerObjectMapper, this.cleanupConfigs(mergedKafkaConfig.getProducerConfigs()));
            if (possibleToReuseShared) {
                context.setProducerAndObjectSerializer(producer, objectSerializer);
            }
            kafkaProducers.add(producer);
        }
        KafkaProducer<String, byte[]> dltProducer = null;
        boolean bl = possibleToReuseShared = config.getDltProducerConfigs() == null || config.getDltProducerConfigs().isEmpty();
        if (possibleToReuseShared) {
            dltProducer = context.getDltProducer();
        }
        if (dltProducer == null) {
            dltProducer = this.buildDltProducer(this.cleanupConfigs(mergedKafkaConfig.getDltProducerConfigs()));
            if (possibleToReuseShared) {
                context.setDltProducer(dltProducer);
            }
            kafkaProducers.add(dltProducer);
        }
        String requestIdHeaderKey = org.openl.util.StringUtils.trimToNull((String)this.env.getProperty("log.request-id.header"));
        KafkaService kafkaService = KafkaService.createService(service, requestIdHeaderKey, mergedKafkaConfig.getInTopic(), mergedKafkaConfig.getOutTopic(), mergedKafkaConfig.getDltTopic(), consumer, producer, dltProducer, objectSerializer, this.getStoreLogDataManager(), this.getStoreLogDataManager().isEnabled(), rulesDeploy);
        kafkaServices.add(kafkaService);
        kafkaService.start();
    }

    public void deploy(OpenLService service) throws RuleServiceDeployException {
        block12: {
            Objects.requireNonNull(service, "service cannot be null");
            ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
            try {
                Method method;
                Thread.currentThread().setContextClassLoader(service.getClassLoader());
                ServiceDescription serviceDescription = (ServiceDescription)this.getServiceDescriptionObjectFactory().getObject();
                KafkaDeploy kafkaDeploy = KafkaDeployUtils.getKafkaDeploy(serviceDescription);
                List<KafkaMethodConfig> kafkaMethodConfigs = kafkaDeploy.getMethodConfigs() == null ? Collections.emptyList() : kafkaDeploy.getMethodConfigs();
                HashSet<KafkaService> kafkaServices = new HashSet<KafkaService>();
                HashSet kafkaProducers = new HashSet();
                HashSet kafkaConsumers = new HashSet();
                if (kafkaDeploy.getServiceConfig() != null) {
                    this.validate(kafkaDeploy.getServiceConfig());
                }
                this.validateConfiguration(service, kafkaMethodConfigs);
                HashMap<KafkaMethodConfig, KafkaMethodConfig> kafkaMethodConfigsMap = new HashMap<KafkaMethodConfig, KafkaMethodConfig>();
                HashMap<KafkaMethodConfig, Method> methodsMap = new HashMap<KafkaMethodConfig, Method>();
                for (KafkaMethodConfig kmc : kafkaMethodConfigs) {
                    KafkaMethodConfig kafkaMethodConfig = this.makeMergedKafkaConfig(service, kmc, kafkaDeploy);
                    this.validate(kafkaMethodConfig);
                    kafkaMethodConfigsMap.put(kmc, kafkaMethodConfig);
                    method = KafkaHelpers.findMethodInService(service, kafkaMethodConfig.getMethodName(), kafkaMethodConfig.getMethodParameters());
                    methodsMap.put(kmc, method);
                }
                try {
                    ServiceDeployContext sharedProducersContext = new ServiceDeployContext();
                    if (kafkaDeploy.getServiceConfig() != null) {
                        KafkaServiceConfig kafkaServiceConfig = this.makeMergedKafkaConfig(service, kafkaDeploy.getServiceConfig(), kafkaDeploy);
                        this.createKafkaService(service, kafkaServices, kafkaConsumers, kafkaProducers, sharedProducersContext, kafkaServiceConfig, kafkaDeploy.getServiceConfig(), null, serviceDescription.getRulesDeploy());
                    }
                    for (KafkaMethodConfig kmc : kafkaMethodConfigs) {
                        method = (Method)methodsMap.get(kmc);
                        KafkaMethodConfig kafkaMethodConfig = (KafkaMethodConfig)kafkaMethodConfigsMap.get(kmc);
                        this.createKafkaService(service, kafkaServices, kafkaConsumers, kafkaProducers, sharedProducersContext, kafkaMethodConfig, kmc, method, serviceDescription.getRulesDeploy());
                    }
                }
                catch (Exception e) {
                    this.stopAndClose(Triple.of(kafkaServices, kafkaProducers, kafkaConsumers));
                    throw e;
                }
                if (!kafkaServices.isEmpty()) {
                    this.runningServices.put(service, Triple.of(kafkaServices, kafkaProducers, kafkaConsumers));
                    this.log.info("Service '{}' has been successfully deployed.", (Object)service.getDeployPath());
                    break block12;
                }
                throw new KafkaServiceConfigurationException(String.format("Failed to deploy service '%s'. Kafka method configs are not found in the configuration.", service.getDeployPath()));
            }
            catch (Exception t) {
                throw new RuleServiceDeployException(String.format("Failed to deploy service '%s'.", service.getDeployPath()), (Throwable)t);
            }
            finally {
                Thread.currentThread().setContextClassLoader(oldClassLoader);
            }
        }
    }

    private ObjectMapper createConsumerJacksonObjectMapper(BaseKafkaConfig config) throws KafkaServiceException {
        try {
            KafkaConfigHolder.getInstance().setKafkaConfig(config);
            JacksonObjectMapperFactory jacksonObjectMapperFactory = (JacksonObjectMapperFactory)this.getKafkaServiceConsumerJacksonObjectMapperFactoryBeanObjectFactory().getObject();
            try {
                ObjectMapper objectMapper = jacksonObjectMapperFactory.createJacksonObjectMapper();
                return objectMapper;
            }
            catch (Exception e) {
                throw new KafkaServiceException("Failed to build 'ObjectMapper' for kafka consumer.", e);
            }
        }
        finally {
            KafkaConfigHolder.getInstance().remove();
        }
    }

    private ObjectMapper createProducerJacksonObjectMapper(BaseKafkaConfig config) throws KafkaServiceException {
        try {
            KafkaConfigHolder.getInstance().setKafkaConfig(config);
            JacksonObjectMapperFactory jacksonObjectMapperFactory = (JacksonObjectMapperFactory)this.getKafkaServiceProducerJacksonObjectMapperFactoryBeanObjectFactory().getObject();
            try {
                ObjectMapper objectMapper = jacksonObjectMapperFactory.createJacksonObjectMapper();
                return objectMapper;
            }
            catch (Exception e) {
                throw new KafkaServiceException("Failed to build 'ObjectMapper' for kafka consumer.", e);
            }
        }
        finally {
            KafkaConfigHolder.getInstance().remove();
        }
    }

    private boolean stopAndClose(Triple<Collection<KafkaService>, Collection<KafkaProducer<?, ?>>, Collection<KafkaConsumer<?, ?>>> t) {
        boolean ret = true;
        for (KafkaService kafkaService : (Collection)t.getLeft()) {
            try {
                kafkaService.stop();
            }
            catch (Exception e1) {
                ret = false;
                this.log.error("Failed to stop kafka service.", (Throwable)e1);
            }
        }
        for (KafkaProducer kafkaProducer : (Collection)t.getMiddle()) {
            try {
                kafkaProducer.close();
            }
            catch (Exception e1) {
                ret = false;
                this.log.error("Failed to close kafka producer.", (Throwable)e1);
            }
        }
        for (KafkaConsumer kafkaConsumer : (Collection)t.getRight()) {
            try {
                kafkaConsumer.close();
            }
            catch (Exception e1) {
                ret = false;
                this.log.error("Failed to close kafka consumer.", (Throwable)e1);
            }
        }
        return ret;
    }

    private void validateConfiguration(OpenLService service, List<KafkaMethodConfig> kafkaMethodConfigs) {
        HashMap<Pair, Integer> w = new HashMap<Pair, Integer>();
        HashMap<Pair, List> w1 = new HashMap<Pair, List>();
        for (KafkaMethodConfig kmc : kafkaMethodConfigs) {
            if (kmc.getInTopic() == null || kmc.getMethodName() == null) continue;
            Pair p = Pair.of((Object)kmc.getInTopic(), (Object)kmc.getConsumerConfigs().getProperty(GROUP_ID));
            Integer t = (Integer)w.get(p);
            w1.computeIfAbsent(p, e -> new ArrayList()).add(kmc.getMethodName());
            if (t == null) {
                w.put(p, 1);
                continue;
            }
            if (t == 1 && this.log.isWarnEnabled()) {
                this.log.warn("Service '{}' uses the input topic name '{}' and group id '{}' for different methods {}.", new Object[]{service.getDeployPath(), p.getLeft(), p.getRight(), ((List)w1.get(p)).stream().collect(Collectors.joining(",", "[", "]"))});
            }
            w.put(p, t + 1);
        }
    }

    public void undeploy(OpenLService service) throws RuleServiceUndeployException {
        Objects.requireNonNull(service, "service cannot be null");
        Triple<Collection<KafkaService>, Collection<KafkaProducer<?, ?>>, Collection<KafkaConsumer<?, ?>>> triple = this.runningServices.get(service);
        if (triple == null) {
            throw new RuleServiceUndeployException(String.format("There is no running service with name '%s'", service.getDeployPath()));
        }
        try {
            if (this.stopAndClose(triple)) {
                this.log.info("Service '{}' has been undeployed successfully.", (Object)service.getDeployPath());
            } else {
                this.log.info("Service '{}' has been undeployed with errors.", (Object)service.getDeployPath());
            }
            this.runningServices.remove(service);
        }
        catch (Exception t) {
            throw new RuleServiceUndeployException(String.format("Failed to undeploy service '%s'.", service.getDeployPath()), (Throwable)t);
        }
    }

    public OpenLService getServiceByDeploy(String deployPath) {
        Objects.requireNonNull(deployPath, "deployPath cannot null.");
        for (OpenLService service : this.runningServices.keySet()) {
            if (!service.getDeployPath().equals(deployPath)) continue;
            return service;
        }
        return null;
    }

    public String getUrl(OpenLService service) {
        return null;
    }

    public void setResourceLoader(ResourceLoader resourceLoader) {
        this.resourceLoader = resourceLoader;
    }

    public String getDefaultGroupId() {
        return this.defaultGroupId;
    }

    public void setDefaultGroupId(String defaultGroupId) {
        this.defaultGroupId = defaultGroupId;
    }

    public String getDefaultBootstrapServers() {
        return this.defaultBootstrapServers;
    }

    public void setDefaultBootstrapServers(String defaultBootstrapServers) {
        this.defaultBootstrapServers = defaultBootstrapServers;
    }

    public String name() {
        return "KAFKA";
    }

    private static final class ServiceDeployContext {
        private KafkaProducer<String, Object> producer;
        private KafkaProducer<String, byte[]> dltProducer;
        private ObjectSerializer objectSerializer;

        private ServiceDeployContext() {
        }

        public KafkaProducer<String, byte[]> getDltProducer() {
            return this.dltProducer;
        }

        public void setDltProducer(KafkaProducer<String, byte[]> dltProducer) {
            this.dltProducer = Objects.requireNonNull(dltProducer);
        }

        public KafkaProducer<String, Object> getProducer() {
            return this.producer;
        }

        public void setProducerAndObjectSerializer(KafkaProducer<String, Object> producer, ObjectSerializer objectSerializer) {
            this.producer = Objects.requireNonNull(producer);
            this.objectSerializer = Objects.requireNonNull(objectSerializer);
        }

        public ObjectSerializer getObjectSerializer() {
            return this.objectSerializer;
        }
    }
}

