/*
 * Decompiled with CFR 0.152.
 */
package com.icthh.xm.commons.scheduler.adapter;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.icthh.xm.commons.config.client.api.RefreshableConfiguration;
import com.icthh.xm.commons.config.client.config.XmConfigProperties;
import com.icthh.xm.commons.config.client.repository.TenantListRepository;
import com.icthh.xm.commons.config.domain.TenantState;
import com.icthh.xm.commons.logging.trace.SleuthWrapper;
import com.icthh.xm.commons.logging.util.MdcUtils;
import com.icthh.xm.commons.scheduler.domain.ScheduledEvent;
import com.icthh.xm.commons.scheduler.service.SchedulerEventService;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.actuate.health.CompositeHealthIndicator;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.boot.actuate.health.HealthIndicatorRegistry;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBindingProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
import org.springframework.cloud.stream.binding.BindingService;
import org.springframework.cloud.stream.binding.SubscribableChannelBindingTargetFactory;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.context.annotation.Import;
import org.springframework.context.event.EventListener;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.Message;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

@Component
@EnableBinding
@EnableIntegration
@Import(value={KafkaBinderConfiguration.class})
@ConditionalOnProperty(value={"application.scheduler-enabled"})
public class SchedulerChannelManager
implements RefreshableConfiguration {
    private static final Logger log = LoggerFactory.getLogger(SchedulerChannelManager.class);
    private static final String PREFIX = "scheduler_";
    private static final String KAFKA = "kafka";
    private static final String DELIMITER = "_";
    private static final String GENERALGROUP = "GENERALGROUP";
    private static final String TOPIC = "_topic";
    private static final String QUEUE = "_queue";
    private static final String SCHEDULER_APP_DEFAULT = "scheduler";
    private static final String WRAP_TOKEN = "\"";
    private final BindingServiceProperties bindingServiceProperties;
    private final SubscribableChannelBindingTargetFactory bindingTargetFactory;
    private final BindingService bindingService;
    private final KafkaExtendedBindingProperties kafkaExtendedBindingProperties = new KafkaExtendedBindingProperties();
    private final Map<String, SubscribableChannel> channels = new ConcurrentHashMap<String, SubscribableChannel>();
    private final SchedulerEventService schedulerEventService;
    private final SleuthWrapper sleuthWrapper;
    private CompositeHealthIndicator bindersHealthIndicator;
    private KafkaBinderHealthIndicator kafkaBinderHealthIndicator;
    private final ObjectMapper objectMapper;
    @Value(value="${spring.application.name}")
    String appName;
    @Value(value="${application.scheduler-config.scheduler-app-name:scheduler}")
    private String schedulerAppName = "scheduler";
    @Value(value="${application.scheduler-config.task-back-off-initial-interval:1000}")
    private int backOffInitialInterval;
    @Value(value="${application.scheduler-config.task-back-off-max-interval:60000}")
    private int backOffMaxInterval;
    @Value(value="${application.kafka-metadata-max-age:60000}")
    private int kafkaMetadataMaxAge;
    private final Set<String> includedTenants;
    private Set<String> tenantToStart;

    @Autowired
    public SchedulerChannelManager(BindingServiceProperties bindingServiceProperties, SubscribableChannelBindingTargetFactory bindingTargetFactory, BindingService bindingService, KafkaMessageChannelBinder kafkaMessageChannelBinder, ObjectMapper objectMapper, SchedulerEventService schedulerEventService, CompositeHealthIndicator bindersHealthIndicator, KafkaBinderHealthIndicator kafkaBinderHealthIndicator, XmConfigProperties xmConfigProperties, SleuthWrapper sleuthWrapper) {
        this.bindingServiceProperties = bindingServiceProperties;
        this.bindingTargetFactory = bindingTargetFactory;
        this.bindingService = bindingService;
        this.sleuthWrapper = sleuthWrapper;
        this.schedulerEventService = schedulerEventService;
        this.objectMapper = objectMapper;
        this.bindersHealthIndicator = bindersHealthIndicator;
        this.kafkaBinderHealthIndicator = kafkaBinderHealthIndicator;
        this.includedTenants = xmConfigProperties.getIncludeTenantLowercase();
        kafkaMessageChannelBinder.setExtendedBindingProperties(this.kafkaExtendedBindingProperties);
    }

    void createChannels(String tenantName) {
        try {
            String tenant = StringUtils.lowerCase((String)tenantName);
            String tenantKey = StringUtils.upperCase((String)tenantName);
            String id = UUID.randomUUID().toString();
            this.createHandler(PREFIX + tenant + QUEUE, GENERALGROUP, tenantKey, KafkaConsumerProperties.StartOffset.earliest);
            this.createHandler(PREFIX + tenant + TOPIC, id, tenantKey, KafkaConsumerProperties.StartOffset.latest);
            this.createHandler(PREFIX + tenant + DELIMITER + this.appName + QUEUE, this.appName, tenantKey, KafkaConsumerProperties.StartOffset.earliest);
            this.createHandler(PREFIX + tenant + DELIMITER + this.appName + TOPIC, id, tenantKey, KafkaConsumerProperties.StartOffset.latest);
        }
        catch (Exception e) {
            log.error("Error create scheduler channels for tenant " + tenantName, (Throwable)e);
            throw e;
        }
    }

    synchronized void createHandler(String chanelName, String consumerGroup, String tenantName, KafkaConsumerProperties.StartOffset startOffset) {
        if (!this.channels.containsKey(chanelName)) {
            log.info("Create binding to {}. Consumer group {}", (Object)chanelName, (Object)consumerGroup);
            KafkaBindingProperties props = new KafkaBindingProperties();
            props.getConsumer().setAutoCommitOffset(false);
            props.getConsumer().setStartOffset(startOffset);
            props.getConsumer().getConfiguration().put("metadata.max.age.ms", String.valueOf(this.kafkaMetadataMaxAge));
            this.kafkaExtendedBindingProperties.setBindings(Collections.singletonMap(chanelName, props));
            ConsumerProperties consumerProperties = new ConsumerProperties();
            consumerProperties.setMaxAttempts(Integer.MAX_VALUE);
            consumerProperties.setHeaderMode(HeaderMode.none);
            consumerProperties.setBackOffInitialInterval(this.backOffInitialInterval);
            consumerProperties.setBackOffMaxInterval(this.backOffMaxInterval);
            BindingProperties bindingProperties = new BindingProperties();
            bindingProperties.setConsumer(consumerProperties);
            bindingProperties.setDestination(chanelName);
            bindingProperties.setGroup(consumerGroup);
            this.bindingServiceProperties.getBindings().put(chanelName, bindingProperties);
            SubscribableChannel channel = this.bindingTargetFactory.createInput(chanelName);
            this.bindingService.bindConsumer((Object)channel, chanelName);
            HealthIndicatorRegistry registry = this.bindersHealthIndicator.getRegistry();
            if (registry.get(KAFKA) == null) {
                this.bindersHealthIndicator.getRegistry().register(KAFKA, (HealthIndicator)this.kafkaBinderHealthIndicator);
            }
            this.channels.put(chanelName, channel);
            channel.subscribe(message -> this.sleuthWrapper.runWithSleuth(message, () -> this.processMessage(tenantName, message)));
        }
    }

    private void processMessage(String tenantName, Message<?> message) {
        try {
            MdcUtils.putRid((String)(MdcUtils.generateRid() + ":" + tenantName));
            StopWatch stopWatch = StopWatch.createStarted();
            String payloadString = (String)message.getPayload();
            payloadString = SchedulerChannelManager.unwrap(payloadString);
            log.debug("start processing message for tenant: [{}], raw body in base64 = {}", (Object)tenantName, (Object)payloadString);
            String eventBody = new String(Base64.getDecoder().decode(payloadString), StandardCharsets.UTF_8);
            log.info("start processing message for tenant: [{}], body = {}", (Object)tenantName, (Object)eventBody);
            this.schedulerEventService.processSchedulerEvent(this.mapToEvent(eventBody), tenantName);
            Optional.ofNullable((Acknowledgment)message.getHeaders().get((Object)"kafka_acknowledgment", Acknowledgment.class)).ifPresent(Acknowledgment::acknowledge);
            log.info("stop processing message for tenant: [{}], time = {}", (Object)tenantName, (Object)stopWatch.getTime());
        }
        catch (Exception e) {
            log.error("error processing event for tenant [{}]", (Object)tenantName, (Object)e);
            throw e;
        }
        finally {
            MdcUtils.clear();
        }
    }

    private ScheduledEvent mapToEvent(String eventBody) {
        return (ScheduledEvent)this.objectMapper.readValue(eventBody, ScheduledEvent.class);
    }

    void parseConfig(String key, String config) {
        log.info("Tenants list was updated, start to parse config");
        if (!"/config/tenants/tenants-list.json".equals(key)) {
            throw new IllegalArgumentException("Wrong config key to update " + key);
        }
        if (StringUtils.isEmpty((CharSequence)config)) {
            throw new IllegalArgumentException("Config file has empty content: " + key);
        }
        Set tenantKeys = TenantListRepository.parseTenantStates((String)config, (ObjectMapper)this.objectMapper).getOrDefault(this.schedulerAppName, new HashSet());
        if (tenantKeys.isEmpty()) {
            log.warn("No one tenant configured to use scheduler. Add tenant state to ms-config/tenant-list.json to section $.scheduler");
        }
        if (!this.includedTenants.isEmpty()) {
            log.warn("Tenant list was overridden by property 'xm-config.include-tenants' to: {}", this.includedTenants);
        }
        this.tenantToStart = tenantKeys.stream().filter(TenantListRepository.isIncluded(this.includedTenants).and(TenantListRepository.isSuspended().negate())).map(TenantState::getName).collect(Collectors.toSet());
        log.info("scheduler will be turned on for tenants: {}", this.tenantToStart);
    }

    @Async
    @EventListener(value={ApplicationReadyEvent.class})
    public void startChannels() {
        if (this.tenantToStart == null) {
            throw new IllegalStateException("Scheduler channel manager was not initialized. Call onInit() first!");
        }
        log.info("Start channels for tenants: {}", this.tenantToStart);
        this.tenantToStart.forEach(this::createChannels);
    }

    public void onRefresh(String key, String config) {
        this.parseConfig(key, config);
        this.startChannels();
    }

    public boolean isListeningConfiguration(String updatedKey) {
        return "/config/tenants/tenants-list.json".equals(updatedKey);
    }

    public void onInit(String key, String config) {
        this.parseConfig(key, config);
    }

    private static String unwrap(String str) {
        if (StringUtils.isEmpty((CharSequence)str) || StringUtils.isEmpty((CharSequence)WRAP_TOKEN)) {
            return str;
        }
        if (StringUtils.startsWith((CharSequence)str, (CharSequence)WRAP_TOKEN) && StringUtils.endsWith((CharSequence)str, (CharSequence)WRAP_TOKEN)) {
            int startIndex = str.indexOf(WRAP_TOKEN);
            int endIndex = str.lastIndexOf(WRAP_TOKEN);
            int wrapLength = WRAP_TOKEN.length();
            if (startIndex != -1 && endIndex != -1) {
                return str.substring(startIndex + wrapLength, endIndex);
            }
        }
        return str;
    }
}

