/*
 * Decompiled with CFR 0.152.
 */
package org.openl.rules.ruleservice.kafka.publish;

import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.UndeclaredThrowableException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.openl.rules.calc.SpreadsheetResultBeanPropertyNamingStrategy;
import org.openl.rules.project.model.RulesDeploy;
import org.openl.rules.ruleservice.core.OpenLService;
import org.openl.rules.ruleservice.core.RuleServiceInstantiationException;
import org.openl.rules.ruleservice.core.RuleServiceWrapperException;
import org.openl.rules.ruleservice.kafka.RequestMessage;
import org.openl.rules.ruleservice.kafka.publish.KafkaServiceException;
import org.openl.rules.ruleservice.storelogdata.ObjectSerializer;
import org.openl.rules.ruleservice.storelogdata.StoreLogData;
import org.openl.rules.ruleservice.storelogdata.StoreLogDataException;
import org.openl.rules.ruleservice.storelogdata.StoreLogDataHolder;
import org.openl.rules.ruleservice.storelogdata.StoreLogDataManager;
import org.openl.rules.serialization.ProjectJacksonObjectMapperFactoryBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public final class KafkaService
implements Runnable {
    private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    private final Logger log = LoggerFactory.getLogger(KafkaService.class);
    private volatile boolean flag = true;
    private final OpenLService service;
    private final String requestIdHeaderKey;
    private final String inTopic;
    private final String outTopic;
    private final String dltTopic;
    private final Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
    private final KafkaProducer<String, Object> producer;
    private final KafkaProducer<String, byte[]> dltProducer;
    private final KafkaConsumer<String, RequestMessage> consumer;
    private Thread loopRunningThread;
    private final ObjectSerializer objectSerializer;
    private final boolean storageEnabled;
    private StoreLogDataManager storeLogDataManager;
    private final SpreadsheetResultBeanPropertyNamingStrategy sprBeanPropertyNamingStrategy;

    public static KafkaService createService(OpenLService service, String requestIdHeaderKey, String inTopic, String outTopic, String dltTopic, KafkaConsumer<String, RequestMessage> consumer, KafkaProducer<String, Object> producer, KafkaProducer<String, byte[]> dltProducer, ObjectSerializer objectSerializer, StoreLogDataManager storeLogDataManager, boolean storeLogDataEnabled, RulesDeploy rulesDeploy) throws KafkaServiceException {
        return new KafkaService(service, requestIdHeaderKey, inTopic, outTopic, dltTopic, consumer, producer, dltProducer, objectSerializer, storeLogDataManager, storeLogDataEnabled, rulesDeploy);
    }

    private KafkaService(OpenLService service, String requestIdHeaderKey, String inTopic, String outTopic, String dltTopic, KafkaConsumer<String, RequestMessage> consumer, KafkaProducer<String, Object> producer, KafkaProducer<String, byte[]> dltProducer, ObjectSerializer objectSerializer, StoreLogDataManager storeLogDataManager, boolean storageEnabled, RulesDeploy rulesDeploy) throws KafkaServiceException {
        this.service = Objects.requireNonNull(service);
        this.requestIdHeaderKey = requestIdHeaderKey;
        this.inTopic = Objects.requireNonNull(inTopic);
        this.producer = Objects.requireNonNull(producer);
        this.consumer = Objects.requireNonNull(consumer);
        this.dltProducer = Objects.requireNonNull(dltProducer);
        this.objectSerializer = Objects.requireNonNull(objectSerializer);
        if (storageEnabled) {
            this.storeLogDataManager = Objects.requireNonNull(storeLogDataManager);
        }
        this.outTopic = outTopic;
        this.dltTopic = dltTopic;
        this.storageEnabled = storageEnabled;
        try {
            PropertyNamingStrategy propertyNamingStrategy = ProjectJacksonObjectMapperFactoryBean.extractPropertyNamingStrategy((RulesDeploy)rulesDeploy, (ClassLoader)service.getClassLoader());
            this.sprBeanPropertyNamingStrategy = propertyNamingStrategy instanceof SpreadsheetResultBeanPropertyNamingStrategy ? (SpreadsheetResultBeanPropertyNamingStrategy)propertyNamingStrategy : null;
        }
        catch (RuleServiceInstantiationException e) {
            throw new KafkaServiceException("Failed to initialize 'PropertyNamingStrategy' for kafka service.", e);
        }
    }

    public boolean isStoreLogDataEnabled() {
        return this.storageEnabled;
    }

    public StoreLogDataManager getStoreLogDataManager() {
        return this.storeLogDataManager;
    }

    public OpenLService getService() {
        return this.service;
    }

    public String getInTopic() {
        return this.inTopic;
    }

    public String getOutTopic(ConsumerRecord<?, ?> record) {
        Header header = record.headers().lastHeader("kafka_replyTopic");
        if (header != null && header.value() != null) {
            return new String(header.value(), StandardCharsets.UTF_8);
        }
        return this.outTopic;
    }

    public String getDltTopic(ConsumerRecord<?, ?> record) {
        Header header = record.headers().lastHeader("kafka_replyDltTopic");
        if (header != null && header.value() != null) {
            return new String(header.value(), StandardCharsets.UTF_8);
        }
        header = record.headers().lastHeader("kafka_replyTopic");
        if (header != null && header.value() != null) {
            return new String(header.value(), StandardCharsets.UTF_8);
        }
        return this.dltTopic;
    }

    private void subscribeConsumer() {
        this.consumer.subscribe(Collections.singletonList(this.getInTopic()), new ConsumerRebalanceListener(){

            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                if (KafkaService.this.log.isInfoEnabled()) {
                    KafkaService.this.log.info("Lost partitions in rebalance. Committing current offsets: {}", KafkaService.this.currentOffsets);
                }
                KafkaService.this.consumer.commitSync(KafkaService.this.currentOffsets);
                KafkaService.this.currentOffsets.clear();
            }

            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            }
        });
    }

    private void initialize() {
        this.loopRunningThread = new Thread(this);
        this.subscribeConsumer();
    }

    private void runLoop() {
        this.loopRunningThread.start();
    }

    public void start() throws KafkaServiceException {
        try {
            this.initialize();
            this.runLoop();
        }
        catch (Exception e) {
            throw new KafkaServiceException("Failed to start kafka service.", e);
        }
    }

    public ObjectSerializer getObjectSerializer() {
        return this.objectSerializer;
    }

    @Override
    public void run() {
        while (this.flag) {
            try {
                ConsumerRecords records = this.consumer.poll(Duration.ofMillis(100L));
                if (records.isEmpty()) continue;
                CountDownLatch countDownLatch = new CountDownLatch(records.count());
                ZonedDateTime incomingTime = ZonedDateTime.now();
                for (ConsumerRecord consumerRecord : records) {
                    executor.submit(() -> {
                        StoreLogData storeLogData = this.isStoreLogDataEnabled() ? StoreLogDataHolder.get() : null;
                        String requestIdHeader = null;
                        try {
                            String outputTopic;
                            if (this.requestIdHeaderKey != null) {
                                Header idHeader = consumerRecord.headers().lastHeader(this.requestIdHeaderKey);
                                if (idHeader != null) {
                                    requestIdHeader = new String(idHeader.value(), StandardCharsets.UTF_8);
                                }
                                if (StringUtils.isBlank(requestIdHeader)) {
                                    requestIdHeader = UUID.randomUUID().toString();
                                }
                                MDC.put((String)"requestId", (String)requestIdHeader);
                            }
                            if (storeLogData != null) {
                                storeLogData.setServiceClass(this.service.getServiceClass());
                                storeLogData.setServiceName(this.service.getName());
                                storeLogData.setIncomingMessageTime(incomingTime);
                                storeLogData.setPublisherType(RulesDeploy.PublisherType.KAFKA);
                                storeLogData.setObjectSerializer(this.getObjectSerializer());
                                storeLogData.setConsumerRecord(consumerRecord);
                            }
                            RequestMessage requestMessage = (RequestMessage)consumerRecord.value();
                            if (storeLogData != null) {
                                storeLogData.setServiceMethod(requestMessage.getMethod());
                                storeLogData.setParameters(requestMessage.getParameters());
                            }
                            if (!StringUtils.isBlank((CharSequence)(outputTopic = this.getOutTopic(consumerRecord)))) {
                                ProducerRecord producerRecord;
                                Object result = requestMessage.getMethod().invoke(this.service.getServiceBean(), requestMessage.getParameters());
                                Header header = consumerRecord.headers().lastHeader("kafka_replyPartition");
                                if (header == null) {
                                    producerRecord = new ProducerRecord(outputTopic, (Object)((String)consumerRecord.key()), result);
                                } else {
                                    Integer partition = Integer.parseInt(new String(header.value(), StandardCharsets.UTF_8));
                                    producerRecord = new ProducerRecord(outputTopic, partition, (Object)((String)consumerRecord.key()), result);
                                }
                                if (requestIdHeader != null) {
                                    producerRecord.headers().add(this.requestIdHeaderKey, requestIdHeader.getBytes(StandardCharsets.UTF_8));
                                }
                                this.forwardHeadersToOutput(consumerRecord, producerRecord);
                                if (storeLogData != null) {
                                    storeLogData.setOutcomingMessageTime(ZonedDateTime.now());
                                }
                                String finalRequestIdHeader = requestIdHeader;
                                this.producer.send(producerRecord, (metadata, exception) -> {
                                    if (storeLogData != null) {
                                        storeLogData.setProducerRecord(producerRecord);
                                    }
                                    if (exception == null && storeLogData != null) {
                                        try {
                                            this.getStoreLogDataManager().store(storeLogData);
                                        }
                                        catch (StoreLogDataException e) {
                                            exception = e;
                                        }
                                    }
                                    if (exception != null) {
                                        try {
                                            if (this.log.isErrorEnabled()) {
                                                this.log.error("Failed to send a result message for method '{}' in service '{}' to output topic '{}'.", new Object[]{requestMessage.getMethod(), this.getService().getDeployPath(), this.getOutTopic(consumerRecord), exception});
                                            }
                                        }
                                        catch (Exception e) {
                                            this.log.error("Unexpected error.", (Throwable)e);
                                        }
                                        this.sendErrorToDlt((ConsumerRecord<String, RequestMessage>)consumerRecord, exception, storeLogData, finalRequestIdHeader);
                                    }
                                });
                            } else if (storeLogData != null) {
                                storeLogData.setOutcomingMessageTime(ZonedDateTime.now());
                                this.getStoreLogDataManager().store(storeLogData);
                            }
                        }
                        catch (InvocationTargetException | UndeclaredThrowableException e) {
                            Throwable ex = e.getCause();
                            this.sendError((ConsumerRecord<String, RequestMessage>)consumerRecord, storeLogData, ex instanceof Exception ? (Exception)ex : e, requestIdHeader);
                        }
                        catch (Exception e) {
                            this.sendError((ConsumerRecord<String, RequestMessage>)consumerRecord, storeLogData, e, requestIdHeader);
                        }
                        finally {
                            countDownLatch.countDown();
                            if (this.isStoreLogDataEnabled()) {
                                StoreLogDataHolder.remove();
                            }
                            if (requestIdHeader != null) {
                                MDC.remove((String)"requestId");
                            }
                        }
                    });
                }
                countDownLatch.await();
                for (ConsumerRecord record : records) {
                    this.currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1L));
                }
                try {
                    this.consumer.commitSync(this.currentOffsets);
                    if (!this.log.isDebugEnabled()) continue;
                    this.log.debug("Current offsets have been committed: {}", this.currentOffsets);
                }
                catch (Exception e) {
                    if (!this.log.isErrorEnabled()) continue;
                    this.log.error("Failed to commit current offsets: {}", this.currentOffsets);
                }
            }
            catch (Exception e) {
                this.log.error("Something wrong.", (Throwable)e);
            }
        }
    }

    private void sendError(ConsumerRecord<String, RequestMessage> consumerRecord, StoreLogData storeLogData, Exception e, String requestIdHeader) {
        if (this.log.isErrorEnabled()) {
            this.log.error("Failed to process a message from input topic '{}'.", (Object)this.getInTopic(), (Object)e);
        }
        this.sendErrorToDlt(consumerRecord, e, storeLogData, requestIdHeader);
    }

    private void forwardHeadersToDlt(ConsumerRecord<?, ?> originalRecord, ProducerRecord<?, ?> record) {
        for (Header header : originalRecord.headers()) {
            record.headers().add(header);
        }
    }

    private void forwardHeadersToOutput(ConsumerRecord<?, ?> originalRecord, ProducerRecord<?, ?> record) {
        for (Header header : originalRecord.headers().headers("kafka_correlationId")) {
            record.headers().add(header);
        }
    }

    private void setDltHeaders(ConsumerRecord<String, RequestMessage> record, Exception e, ProducerRecord<?, ?> dltRecord) {
        dltRecord.headers().add("kafka_dlt-original-message-key", record.key() == null ? null : ((String)record.key()).getBytes(StandardCharsets.UTF_8));
        dltRecord.headers().add("kafka_dlt-original-partition", String.valueOf(record.partition()).getBytes(StandardCharsets.UTF_8));
        dltRecord.headers().add("kafka_dlt-original-offset", String.valueOf(record.offset()).getBytes(StandardCharsets.UTF_8));
        dltRecord.headers().add("kafka_dlt-original-topic", record.topic().getBytes(StandardCharsets.UTF_8));
        this.setDltHeadersForException(dltRecord, ((RequestMessage)record.value()).getException());
        this.setDltHeadersForException(dltRecord, e);
        if (record.key() != null) {
            dltRecord.headers().add("kafka_dlt-original-message-key", ((String)record.key()).getBytes(StandardCharsets.UTF_8));
        }
    }

    private void setDltHeadersForException(ProducerRecord<?, ?> dltRecord, Exception exception) {
        if (exception != null) {
            dltRecord.headers().add("kafka_dlt-exception-fqcn", exception.getClass().getName().getBytes(StandardCharsets.UTF_8));
            dltRecord.headers().add("kafka_dlt-exception-message", (byte[])Optional.ofNullable(RuleServiceWrapperException.create((Throwable)exception, (SpreadsheetResultBeanPropertyNamingStrategy)this.sprBeanPropertyNamingStrategy).getMessage()).map(s -> s.getBytes(StandardCharsets.UTF_8)).orElse(null));
            dltRecord.headers().add("kafka_dlt-exception-stacktrace", ExceptionUtils.getStackTrace((Throwable)exception).getBytes(StandardCharsets.UTF_8));
        }
    }

    private void sendErrorToDlt(ConsumerRecord<String, RequestMessage> record, Exception e, StoreLogData storeLogData, String requestIdHeader) {
        block7: {
            String dltTopic = this.getDltTopic(record);
            if (StringUtils.isEmpty((CharSequence)dltTopic)) {
                return;
            }
            try {
                ProducerRecord dltRecord;
                Header header;
                if (requestIdHeader != null) {
                    record.headers().add(this.requestIdHeaderKey, requestIdHeader.getBytes(StandardCharsets.UTF_8));
                }
                if ((header = record.headers().lastHeader("kafka_replyDltPartition")) == null) {
                    dltRecord = new ProducerRecord(dltTopic, (Object)((String)record.key()), (Object)((RequestMessage)record.value()).getRawData());
                } else {
                    Integer partition = Integer.parseInt(new String(header.value(), StandardCharsets.UTF_8));
                    dltRecord = new ProducerRecord(dltTopic, partition, (Object)((String)record.key()), (Object)((RequestMessage)record.value()).getRawData());
                }
                this.forwardHeadersToDlt(record, dltRecord);
                this.setDltHeaders(record, e, dltRecord);
                if (storeLogData != null) {
                    storeLogData.setOutcomingMessageTime(ZonedDateTime.now());
                }
                this.dltProducer.send(dltRecord, (metadata, exception) -> {
                    if (storeLogData != null) {
                        storeLogData.setDltRecord(dltRecord);
                        storeLogData.fault();
                    }
                    if (exception != null && this.log.isErrorEnabled()) {
                        this.log.error("Failed to send a message to dead letter queue topic '{}'.{}Payload: {}", new Object[]{dltTopic, System.lineSeparator(), ((RequestMessage)record.value()).asText(), exception});
                    } else if (storeLogData != null) {
                        try {
                            this.getStoreLogDataManager().store(storeLogData);
                        }
                        catch (StoreLogDataException e1) {
                            this.log.error("Failed on data store operation.", (Throwable)e1);
                        }
                    }
                });
            }
            catch (Exception e1) {
                if (!this.log.isErrorEnabled()) break block7;
                this.log.error("Failed to send a message to dead letter queue topic '{}'.{}Payload: {}", new Object[]{dltTopic, System.lineSeparator(), ((RequestMessage)record.value()).asText(), e1});
            }
        }
    }

    public void stop() throws InterruptedException {
        this.flag = false;
        this.loopRunningThread.join();
    }
}

