/*
 * Decompiled with CFR 0.152.
 */
package com.sap.cds.feature.messaging.kafka.client;

import com.google.common.annotations.VisibleForTesting;
import com.sap.cds.feature.messaging.kafka.utils.KafkaServiceBinding;
import com.sap.cds.feature.messaging.kafka.utils.KafkaUtils;
import com.sap.cds.services.messaging.service.MessagingBrokerQueueListener;
import com.sap.cds.services.messaging.utils.CloudEventUtils;
import com.sap.cds.services.runtime.CdsRuntime;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaMessagingConsumer {
    private static final Logger logger = LoggerFactory.getLogger(KafkaMessagingConsumer.class);
    @VisibleForTesting
    protected CdsRuntime runtime;
    private final KafkaConsumer<String, String> consumer;
    private final String name;
    private final AtomicBoolean isRunning = new AtomicBoolean(false);
    private volatile CountDownLatch consumerPaused;
    private MessagingBrokerQueueListener messageConsumer;

    public KafkaMessagingConsumer(String name, String groupId, KafkaServiceBinding binding, Map<String, Object> config, CdsRuntime runtime) {
        this.runtime = runtime;
        this.name = name;
        this.consumer = this.createKafkaConsumer(binding, groupId, config);
    }

    @VisibleForTesting
    public KafkaConsumer<String, String> getConsumer() {
        return this.consumer;
    }

    public void setMessageConsumer(MessagingBrokerQueueListener messageConsumer) {
        this.messageConsumer = messageConsumer;
    }

    public void subscribe(String topic) {
        HashSet<String> subscriptions = new HashSet<String>(this.consumer.subscription());
        subscriptions.add(topic);
        this.consumer.subscribe(subscriptions);
    }

    public void start() {
        if (!this.isRunning.get()) {
            this.isRunning.set(true);
            this.consumerPaused = new CountDownLatch(1);
            this.runConsumer();
        }
    }

    public void close() throws InterruptedException {
        this.pause();
        this.consumer.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void pause() throws InterruptedException {
        this.isRunning.set(false);
        CountDownLatch countDownLatch = this.consumerPaused;
        synchronized (countDownLatch) {
            boolean consumerAwait = false;
            while (!consumerAwait) {
                consumerAwait = this.consumerPaused.await(100L, TimeUnit.MILLISECONDS);
            }
        }
    }

    private void runConsumer() {
        Thread daemon = new Thread(() -> {
            while (this.isRunning.get()) {
                try {
                    ConsumerRecords records = this.consumer.poll(Duration.of(3L, ChronoUnit.SECONDS));
                    if (records.isEmpty()) continue;
                    records.forEach(r -> this.dispatchConsumerRecord((ConsumerRecord<String, String>)r, this.consumer));
                }
                catch (Exception ex) {
                    logger.error("Error while consuming record from Kafka.", (Throwable)ex);
                }
            }
            this.consumerPaused.countDown();
        }, "kafka-consumer-" + this.name);
        daemon.setDaemon(true);
        daemon.start();
    }

    @VisibleForTesting
    protected void wait(int sec) {
        try {
            Thread.sleep((long)sec * 1000L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void dispatchConsumerRecord(ConsumerRecord<String, String> consumerRecord, KafkaConsumer<String, String> consumer) {
        ReceivedMessage message = new ReceivedMessage(consumerRecord, consumer);
        try {
            this.messageConsumer.receivedMessage((MessagingBrokerQueueListener.MessageAccess)message);
        }
        finally {
            if (message.isAcknowledged()) {
                message.commitOffset();
            } else {
                message.rollbackOffset();
                this.wait(3);
            }
        }
    }

    @VisibleForTesting
    KafkaConsumer<String, String> createKafkaConsumer(KafkaServiceBinding binding, String groupId, Map<String, Object> config) {
        return new KafkaConsumer(KafkaUtils.createMessageConsumerProperties(binding, groupId, config));
    }

    @VisibleForTesting
    static class ReceivedMessage
    implements MessagingBrokerQueueListener.MessageAccess {
        private final KafkaConsumer<String, String> consumer;
        private final String recordMessage;
        private final String topic;
        private final TopicPartition partition;
        private final long offset;
        private final Map<String, String> technicalHeaders = new HashMap<String, String>();
        private final Map<String, String> recordHeaders = new HashMap<String, String>();
        private String message;
        private Map<String, Object> headersMap;
        private Map<String, Object> dataMap;
        private String id;
        private String tenantId;
        private boolean headersInMessage;
        boolean acknowledged;

        ReceivedMessage(ConsumerRecord<String, String> consumerRecord, KafkaConsumer<String, String> consumer) {
            this.consumer = consumer;
            this.recordMessage = (String)consumerRecord.value();
            this.topic = consumerRecord.topic();
            this.partition = new TopicPartition(this.topic, consumerRecord.partition());
            this.offset = consumerRecord.offset() + 1L;
            consumerRecord.headers().forEach(header -> {
                if (Objects.equals(header.key(), "x-sap-cap-message-id")) {
                    this.id = KafkaUtils.toString(header.value());
                } else if (Objects.equals(header.key(), "x-sap-cap-tenant-id")) {
                    this.tenantId = KafkaUtils.toString(header.value());
                } else if (Objects.equals(header.key(), "x-sap-cap-effective-topic")) {
                    this.technicalHeaders.put(header.key(), KafkaUtils.toString(header.value()));
                } else if (Objects.equals(header.key(), "x-sap-cap-headers-in-message")) {
                    this.headersInMessage = true;
                } else {
                    this.recordHeaders.put(header.key(), KafkaUtils.toString(header.value()));
                }
            });
        }

        public String getId() {
            return this.id;
        }

        public String getTenant() {
            return this.tenantId;
        }

        public String getMessage() {
            if (this.message == null) {
                Map messageMap = CloudEventUtils.toMap((String)this.recordMessage);
                if (this.headersInMessage || messageMap == null) {
                    this.message = this.recordMessage;
                } else {
                    HashMap<String, String> structuredCloudEvents = new HashMap<String, String>(this.recordHeaders);
                    structuredCloudEvents.put("data", (String)((Object)messageMap));
                    this.message = CloudEventUtils.toJson(structuredCloudEvents);
                }
            }
            return this.message;
        }

        public Map<String, Object> getDataMap() {
            if (this.dataMap == null) {
                this.populateMaps();
            }
            return this.dataMap;
        }

        public Map<String, Object> getHeadersMap() {
            if (this.headersMap == null) {
                this.populateMaps();
            }
            return this.headersMap;
        }

        private void populateMaps() {
            this.headersMap = new HashMap<String, String>(this.recordHeaders);
            Map map = CloudEventUtils.toMap((String)this.recordMessage);
            if (map == null) {
                this.dataMap = new HashMap<String, Object>();
                this.dataMap.put("message", this.recordMessage);
            } else if (this.headersInMessage && map.get("data") instanceof Map) {
                this.dataMap = (Map)map.remove("data");
                this.headersMap.putAll(map);
            } else {
                this.dataMap = map;
            }
        }

        public String getBrokerTopic() {
            return this.topic;
        }

        public void acknowledge() {
            this.acknowledged = true;
        }

        boolean isAcknowledged() {
            return this.acknowledged;
        }

        void commitOffset() {
            this.consumer.commitSync(Collections.singletonMap(this.partition, new OffsetAndMetadata(this.offset)));
        }

        void rollbackOffset() {
            this.consumer.seek(this.partition, this.offset - 1L);
        }

        public Map<String, String> getTechnicalHeaders() {
            return this.technicalHeaders;
        }
    }
}

