/*
 * Decompiled with CFR 0.152.
 */
package org.jsmart.zerocode.core.kafka.helper;

import com.fasterxml.jackson.core.JacksonException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.Resources;
import com.google.gson.Gson;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.util.JsonFormat;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Predicate;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.jsmart.zerocode.core.di.provider.GsonSerDeProvider;
import org.jsmart.zerocode.core.di.provider.ObjectMapperProvider;
import org.jsmart.zerocode.core.kafka.KafkaConstants;
import org.jsmart.zerocode.core.kafka.common.KafkaCommonUtils;
import org.jsmart.zerocode.core.kafka.consume.ConsumerLocalConfigs;
import org.jsmart.zerocode.core.kafka.consume.ConsumerLocalConfigsWrap;
import org.jsmart.zerocode.core.kafka.consume.SeekTimestamp;
import org.jsmart.zerocode.core.kafka.receive.ConsumerCommonConfigs;
import org.jsmart.zerocode.core.kafka.receive.message.ConsumerJsonRecord;
import org.jsmart.zerocode.core.kafka.receive.message.ConsumerJsonRecords;
import org.jsmart.zerocode.core.kafka.receive.message.ConsumerRawRecords;
import org.jsmart.zerocode.core.utils.SmartUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConsumerHelper {
    public static final String CONSUMER = "CONSUMER";
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerHelper.class);
    private static final Gson gson = new GsonSerDeProvider().get();
    private static final ObjectMapper objectMapper = new ObjectMapperProvider().get();
    public static Map<String, Consumer> consumerCacheByTopicMap = new HashMap<String, Consumer>();

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public static Consumer createConsumer(String bootStrapServers, String consumerPropertyFile, String topic, Boolean consumerToBeCached) {
        Consumer sameConsumer = KafkaConsumerHelper.getCachedConsumer(topic, consumerToBeCached);
        if (sameConsumer != null) {
            return sameConsumer;
        }
        try (InputStream propsIs = Resources.getResource((String)consumerPropertyFile).openStream();){
            Properties properties = new Properties();
            properties.load(propsIs);
            properties.put("bootstrap.servers", bootStrapServers);
            KafkaCommonUtils.resolveValuePlaceHolders(properties);
            KafkaConsumer consumer = new KafkaConsumer(properties);
            if (consumerToBeCached.booleanValue()) {
                consumerCacheByTopicMap.forEach((xTopic, xConsumer) -> {
                    if (!xTopic.equals(topic)) {
                        xConsumer.close();
                    }
                });
                consumerCacheByTopicMap.entrySet().removeIf(xTopic -> !xTopic.equals(topic));
                consumerCacheByTopicMap.put(topic, (Consumer)consumer);
            }
            KafkaConsumer kafkaConsumer = consumer;
            return kafkaConsumer;
        }
        catch (IOException e) {
            throw new RuntimeException("Exception while reading kafka properties and creating a consumer- " + e);
        }
    }

    public static ConsumerRecords initialPollWaitingForConsumerGroupJoin(Consumer consumer, ConsumerLocalConfigs effectiveLocalConfigs) {
        for (int run = 0; run < 50; ++run) {
            if (!consumer.assignment().isEmpty()) {
                LOGGER.debug("==> WaitingForConsumerGroupJoin - Partition now assigned. No records yet consumed");
                return new ConsumerRecords(new HashMap());
            }
            LOGGER.debug("==> WaitingForConsumerGroupJoin - Partition not assigned. Polling once");
            ConsumerRecords records = consumer.poll(Duration.of(KafkaConsumerHelper.getPollTime(effectiveLocalConfigs), ChronoUnit.MILLIS));
            LOGGER.debug("==> WaitingForConsumerGroupJoin - polled records length={}", (Object)records.count());
            if (records.isEmpty()) continue;
            return records;
        }
        throw new RuntimeException("\n********* Kafka Consumer unable to join in time - try increasing consumer polling time setting *********\n");
    }

    public static void validateLocalConfigs(ConsumerLocalConfigs localConfigs) {
        if (localConfigs != null) {
            Boolean localCommitSync = localConfigs.getCommitSync();
            Boolean localCommitAsync = localConfigs.getCommitAsync();
            KafkaConsumerHelper.validateCommitFlags(localCommitSync, localCommitAsync);
            KafkaConsumerHelper.validateSeekConfig(localConfigs);
            KafkaConsumerHelper.validateSeekToTimestamp(localConfigs);
        }
    }

    private static void validateSeekToTimestamp(ConsumerLocalConfigs localConfigs) {
        String seekToTimestamp = localConfigs.getSeekEpoch();
        if (StringUtils.isEmpty((CharSequence)seekToTimestamp)) {
            if (StringUtils.isNumeric((CharSequence)seekToTimestamp) && (Long.parseLong(seekToTimestamp) > System.currentTimeMillis() || Long.parseLong(seekToTimestamp) < 0L)) {
                throw new RuntimeException("\n------> 'seekEpoch' is not a valid epoch/Unix timestamp");
            }
            if (!StringUtils.isEmpty((CharSequence)localConfigs.getSeek()) && Objects.nonNull(localConfigs.getSeekTimestamp())) {
                throw new RuntimeException("Only one of 'seek', 'seekEpoch' and 'seekTimestamp' should be provided, but not both. Please fix and rerun");
            }
        }
        if (Objects.nonNull(localConfigs.getSeekTimestamp())) {
            SimpleDateFormat dateFormat = new SimpleDateFormat(localConfigs.getSeekTimestamp().getFormat());
            try {
                Date date = dateFormat.parse(localConfigs.getSeekTimestamp().getTimestamp());
                long epochMillis = date.toInstant().toEpochMilli();
                if (epochMillis > System.currentTimeMillis() || epochMillis < 0L) {
                    throw new RuntimeException("\n------> 'seekTimestamp' is not a valid epoch/Unix timestamp " + epochMillis);
                }
            }
            catch (ParseException e) {
                throw new RuntimeException("Timestamp and format provided in 'seekTimestamp' cannot be parsed ", e);
            }
        }
    }

    public static void validateCommonConfigs(ConsumerCommonConfigs consumerCommonConfigs) {
        KafkaConsumerHelper.validateCommitFlags(consumerCommonConfigs.getCommitSync(), consumerCommonConfigs.getCommitAsync());
    }

    public static ConsumerLocalConfigs deriveEffectiveConfigs(ConsumerLocalConfigs consumerLocalTestConfigs, ConsumerCommonConfigs consumerCommonConfigs) {
        KafkaConsumerHelper.validateCommonConfigs(consumerCommonConfigs);
        KafkaConsumerHelper.validateLocalConfigs(consumerLocalTestConfigs);
        return KafkaConsumerHelper.createEffective(consumerCommonConfigs, consumerLocalTestConfigs);
    }

    public static ConsumerLocalConfigs createEffective(ConsumerCommonConfigs consumerCommon, ConsumerLocalConfigs consumerLocal) {
        Boolean effectiveCommitAsync;
        Boolean effectiveCommitSync;
        if (consumerLocal == null) {
            return new ConsumerLocalConfigs(consumerCommon.getRecordType(), consumerCommon.getProtoClassType(), consumerCommon.getFileDumpTo(), consumerCommon.getCommitAsync(), consumerCommon.getCommitSync(), consumerCommon.getShowRecordsConsumed(), consumerCommon.getMaxNoOfRetryPollsOrTimeouts(), consumerCommon.getPollingTime(), consumerCommon.getCacheByTopic(), consumerCommon.getFilterByJsonPath(), null, null, null);
        }
        String effectiveRecordType = Optional.ofNullable(consumerLocal.getRecordType()).orElse(consumerCommon.getRecordType());
        String effectiveProtobufMessageClassType = Optional.ofNullable(consumerLocal.getProtoClassType()).orElse(consumerCommon.getProtoClassType());
        String effectiveFileDumpTo = Optional.ofNullable(consumerLocal.getFileDumpTo()).orElse(consumerCommon.getFileDumpTo());
        Boolean effectiveShowRecordsConsumed = Optional.ofNullable(consumerLocal.getShowRecordsConsumed()).orElse(consumerCommon.getShowRecordsConsumed());
        Integer effectiveMaxNoOfRetryPollsOrTimeouts = Optional.ofNullable(consumerLocal.getMaxNoOfRetryPollsOrTimeouts()).orElse(consumerCommon.getMaxNoOfRetryPollsOrTimeouts());
        Long effectivePollingTime = Optional.ofNullable(consumerLocal.getPollingTime()).orElse(consumerCommon.getPollingTime());
        String filterByJsonPath = Optional.ofNullable(consumerLocal.getFilterByJsonPath()).orElse(consumerCommon.getFilterByJsonPath());
        Boolean effectiveConsumerCacheByTopic = Optional.ofNullable(consumerLocal.getCacheByTopic()).orElse(consumerCommon.getCacheByTopic());
        Boolean localCommitSync = consumerLocal.getCommitSync();
        Boolean localCommitAsync = consumerLocal.getCommitAsync();
        if (localCommitSync == null && localCommitAsync == null) {
            effectiveCommitSync = consumerCommon.getCommitSync();
            effectiveCommitAsync = consumerCommon.getCommitAsync();
        } else {
            effectiveCommitSync = localCommitSync;
            effectiveCommitAsync = localCommitAsync;
        }
        return new ConsumerLocalConfigs(effectiveRecordType, effectiveProtobufMessageClassType, effectiveFileDumpTo, effectiveCommitAsync, effectiveCommitSync, effectiveShowRecordsConsumed, effectiveMaxNoOfRetryPollsOrTimeouts, effectivePollingTime, effectiveConsumerCacheByTopic, filterByJsonPath, consumerLocal.getSeek(), consumerLocal.getSeekEpoch(), consumerLocal.getSeekTimestamp());
    }

    public static ConsumerLocalConfigs readConsumerLocalTestProperties(String requestJsonWithConfigWrapped) {
        try {
            ConsumerLocalConfigsWrap consumerLocalConfigsWrap = (ConsumerLocalConfigsWrap)new ObjectMapperProvider().get().readValue(requestJsonWithConfigWrapped, ConsumerLocalConfigsWrap.class);
            return consumerLocalConfigsWrap.getConsumerLocalConfigs();
        }
        catch (IOException exx) {
            throw new RuntimeException(exx);
        }
    }

    public static Integer getMaxTimeOuts(ConsumerLocalConfigs effectiveLocalTestProps) {
        return Optional.ofNullable(effectiveLocalTestProps.getMaxNoOfRetryPollsOrTimeouts()).orElse(KafkaConstants.MAX_NO_OF_RETRY_POLLS_OR_TIME_OUTS);
    }

    public static Long getPollTime(ConsumerLocalConfigs effectiveLocal) {
        return Optional.ofNullable(effectiveLocal.getPollingTime()).orElse(KafkaConstants.DEFAULT_POLLING_TIME_MILLI_SEC);
    }

    public static void readRaw(List<ConsumerRecord> rawRecords, Iterator recordIterator) {
        while (recordIterator.hasNext()) {
            ConsumerRecord thisRecord = (ConsumerRecord)recordIterator.next();
            LOGGER.debug("\nRecord Key - {} , Record value - {}, Record partition - {}, Record offset - {}", new Object[]{thisRecord.key(), thisRecord.value(), thisRecord.partition(), thisRecord.offset()});
            rawRecords.add(thisRecord);
        }
    }

    public static void readJson(List<ConsumerJsonRecord> jsonRecords, Iterator recordIterator, ConsumerLocalConfigs consumerLocalConfig) throws IOException {
        while (recordIterator.hasNext()) {
            ConsumerRecord thisRecord = (ConsumerRecord)recordIterator.next();
            Object key = thisRecord.key();
            Object valueObj = thisRecord.value();
            Headers headers = thisRecord.headers();
            String keyStr = thisRecord.key() != null ? thisRecord.key().toString() : "";
            String valueStr = consumerLocalConfig != null && "PROTO".equalsIgnoreCase(consumerLocalConfig.getRecordType()) ? KafkaConsumerHelper.convertProtobufToJson(thisRecord, consumerLocalConfig) : valueObj.toString();
            LOGGER.debug("\nRecord Key - {} , Record value - {}, Record partition - {}, Record offset - {}, Headers - {}", new Object[]{key, valueStr, thisRecord.partition(), thisRecord.offset(), headers});
            if (!KafkaConsumerHelper.isKeyParseableAsJson(keyStr)) {
                LOGGER.info(">>>Converting the key to JSON format for to able to read it");
                keyStr = objectMapper.writeValueAsString((Object)keyStr);
            }
            JsonNode keyNode = objectMapper.readTree(keyStr);
            JsonNode valueNode = objectMapper.readTree(valueStr);
            HashMap<String, String> headersMap = null;
            if (headers != null) {
                headersMap = new HashMap<String, String>();
                for (Header header : headers) {
                    headersMap.put(header.key(), new String(header.value()));
                }
            }
            ConsumerJsonRecord jsonRecord = new ConsumerJsonRecord(keyNode, valueNode, headersMap);
            jsonRecords.add(jsonRecord);
        }
    }

    public static boolean isKeyParseableAsJson(String consumedKey) {
        try {
            objectMapper.readTree(consumedKey);
        }
        catch (JacksonException e) {
            LOGGER.info(">>>The key was not in a parsable JSON format:{}", (Object)consumedKey);
            return false;
        }
        LOGGER.info(">>> The consumed key was fine and parseable:{}", (Object)consumedKey);
        return true;
    }

    private static String convertProtobufToJson(ConsumerRecord thisRecord, ConsumerLocalConfigs consumerLocalConfig) {
        if (StringUtils.isEmpty((CharSequence)consumerLocalConfig.getProtoClassType())) {
            throw new IllegalArgumentException("[protoClassType] is required consumer config for recordType PROTO.");
        }
        MessageOrBuilder builderOrMessage = KafkaConsumerHelper.createMessageOrBuilder(consumerLocalConfig.getProtoClassType(), (byte[])thisRecord.value());
        try {
            return JsonFormat.printer().includingDefaultValueFields().preservingProtoFieldNames().print(builderOrMessage);
        }
        catch (InvalidProtocolBufferException e) {
            throw new IllegalArgumentException(e);
        }
    }

    private static MessageOrBuilder createMessageOrBuilder(String messageClass, byte[] value) {
        try {
            Class<?> msgClass = Class.forName(messageClass);
            Method method = msgClass.getMethod("parseFrom", byte[].class);
            return (MessageOrBuilder)method.invoke(null, new Object[]{value});
        }
        catch (ClassNotFoundException | IllegalAccessException | IllegalArgumentException | NoSuchMethodException | SecurityException | InvocationTargetException e) {
            throw new IllegalArgumentException(e);
        }
    }

    public static String prepareResult(ConsumerLocalConfigs testConfigs, List<ConsumerJsonRecord> jsonRecords, List<ConsumerRecord> rawRecords) throws JsonProcessingException {
        int size;
        String result = testConfigs != null && testConfigs.getShowRecordsConsumed() == false ? SmartUtils.prettyPrintJson(gson.toJson((Object)new ConsumerRawRecords((size = jsonRecords.size()) == 0 ? rawRecords.size() : size))) : (testConfigs != null && "RAW".equals(testConfigs.getRecordType()) ? SmartUtils.prettyPrintJson(gson.toJson((Object)new ConsumerRawRecords(rawRecords))) : (testConfigs != null && ("JSON".equals(testConfigs.getRecordType()) || "PROTO".equalsIgnoreCase(testConfigs.getRecordType()) || "AVRO".equalsIgnoreCase(testConfigs.getRecordType())) ? SmartUtils.prettyPrintJson(objectMapper.writeValueAsString((Object)new ConsumerJsonRecords(jsonRecords))) : "{\"error\" : \"recordType Undecided, Please chose recordType as JSON or RAW\"}"));
        if (testConfigs != null && testConfigs.getFilterByJsonPath() != null) {
            String filteredResult = JsonPath.read((String)result, (String)testConfigs.getFilterByJsonPath(), (Predicate[])new Predicate[0]).toString();
            List filteredRecords = (List)objectMapper.readValue(filteredResult, List.class);
            result = SmartUtils.prettyPrintJson(objectMapper.writeValueAsString((Object)new ConsumerJsonRecords(filteredRecords)));
        }
        return result;
    }

    public static void handleCommitSyncAsync(Consumer<Long, String> consumer, ConsumerCommonConfigs consumerCommonConfigs, ConsumerLocalConfigs consumeLocalTestProps) {
        Boolean effectiveCommitAsync;
        Boolean effectiveCommitSync;
        if (consumeLocalTestProps == null) {
            LOGGER.warn("[No local test configs]-Kafka client neither did `commitAsync()` nor `commitSync()`");
            return;
        }
        Boolean localCommitSync = consumeLocalTestProps.getCommitSync();
        Boolean localCommitAsync = consumeLocalTestProps.getCommitAsync();
        if (localCommitSync == null && localCommitAsync == null) {
            effectiveCommitSync = consumerCommonConfigs.getCommitSync();
            effectiveCommitAsync = consumerCommonConfigs.getCommitAsync();
        } else {
            effectiveCommitSync = localCommitSync;
            effectiveCommitAsync = localCommitAsync;
        }
        if (effectiveCommitSync != null && effectiveCommitSync.booleanValue()) {
            consumer.commitSync();
        } else if (effectiveCommitAsync != null && effectiveCommitAsync.booleanValue()) {
            consumer.commitAsync();
        } else {
            LOGGER.warn("Kafka client neither configured for `commitAsync()` nor `commitSync()`");
        }
    }

    public static void handleSeek(ConsumerLocalConfigs effectiveLocal, Consumer consumer, String topicName) {
        if (!StringUtils.isEmpty((CharSequence)effectiveLocal.getSeek())) {
            KafkaConsumerHelper.handleSeekByOffset(effectiveLocal, consumer);
        } else if (!StringUtils.isEmpty((CharSequence)effectiveLocal.getSeekEpoch())) {
            KafkaConsumerHelper.handleSeekByEpoch(Long.parseLong(effectiveLocal.getSeekEpoch()), consumer, topicName);
        } else if (Objects.nonNull(effectiveLocal.getSeekTimestamp())) {
            KafkaConsumerHelper.handleSeekByTimestamp(effectiveLocal.getSeekTimestamp(), consumer, topicName);
        }
    }

    private static void handleSeekByTimestamp(SeekTimestamp seekTimestamp, Consumer consumer, String topicName) {
        if (Objects.nonNull(seekTimestamp)) {
            SimpleDateFormat dateFormat = new SimpleDateFormat(seekTimestamp.getFormat());
            Date date = null;
            try {
                date = dateFormat.parse(seekTimestamp.getTimestamp());
            }
            catch (ParseException e) {
                throw new RuntimeException("Could not parse timestamp", e);
            }
            KafkaConsumerHelper.handleSeekByEpoch(date.toInstant().toEpochMilli(), consumer, topicName);
        }
    }

    private static void handleSeekByEpoch(Long epoch, Consumer consumer, String topicName) {
        if (Objects.nonNull(epoch)) {
            List noSeekPartitions;
            List partitionInfos = consumer.partitionsFor(topicName);
            List topicPartitions = partitionInfos.stream().map(info -> new TopicPartition(info.topic(), info.partition())).collect(Collectors.toList());
            Map topicPartitionTimestampMap = topicPartitions.stream().collect(Collectors.toMap(Function.identity(), ignore -> epoch));
            Map topicPartitionOffsetAndTimestampMap = consumer.offsetsForTimes(topicPartitionTimestampMap);
            if (consumer.assignment().isEmpty()) {
                consumer.assign(topicPartitionOffsetAndTimestampMap.keySet());
            }
            if (!(noSeekPartitions = topicPartitionOffsetAndTimestampMap.entrySet().stream().filter(tp -> tp.getValue() == null).map(Map.Entry::getKey).collect(Collectors.toList())).isEmpty()) {
                consumer.seekToEnd(noSeekPartitions);
                Map partitionLatestOffsetsToCommit = noSeekPartitions.stream().collect(Collectors.toMap(Function.identity(), tp -> new OffsetAndMetadata(consumer.position(tp) + 1L)));
                LOGGER.debug("==> Committing the following : " + partitionLatestOffsetsToCommit);
                consumer.commitSync(partitionLatestOffsetsToCommit);
            }
            for (Map.Entry topicOffsetEntry : topicPartitionOffsetAndTimestampMap.entrySet()) {
                if (!Objects.nonNull(topicOffsetEntry.getValue())) continue;
                if (consumer.position((TopicPartition)topicOffsetEntry.getKey()) < ((OffsetAndTimestamp)topicOffsetEntry.getValue()).offset()) {
                    consumer.seek((TopicPartition)topicOffsetEntry.getKey(), ((OffsetAndTimestamp)topicOffsetEntry.getValue()).offset());
                }
                LOGGER.debug("==> Seeking to " + topicOffsetEntry);
            }
        }
    }

    private static void handleSeekByOffset(ConsumerLocalConfigs effectiveLocal, Consumer consumer) {
        String[] seekParts = effectiveLocal.getSeekTopicPartitionOffset();
        String topic = seekParts[0];
        int partition = Integer.parseInt(seekParts[1]);
        long offset = Long.parseLong(seekParts[2]);
        TopicPartition topicPartition = new TopicPartition(topic, partition);
        HashSet<TopicPartition> topicPartitions = new HashSet<TopicPartition>();
        topicPartitions.add(topicPartition);
        consumer.assign(topicPartitions);
        if (offset <= -1L) {
            consumer.seekToEnd(topicPartitions);
            consumer.seek(topicPartition, consumer.position(topicPartition) + offset);
        } else {
            consumer.seek(topicPartition, offset);
        }
    }

    private static void validateCommitFlags(Boolean commitSync, Boolean commitAsync) {
        if (commitSync != null && commitAsync != null && commitSync.booleanValue() && commitAsync.booleanValue()) {
            throw new RuntimeException("\n********* Both commitSync and commitAsync can not be true *********\n");
        }
    }

    private static void validateSeekConfig(ConsumerLocalConfigs localConfigs) {
        String[] split;
        String seek = localConfigs.getSeek();
        if (!(StringUtils.isEmpty((CharSequence)seek) || (split = seek.split(",")) != null && split.length >= 3)) {
            throw new RuntimeException("\n------> 'seek' should contain 'topic,partition,offset' e.g. 'topic1,0,2' ");
        }
    }

    private static Consumer getCachedConsumer(String topic, Boolean consumerToBeCached) {
        if (consumerToBeCached.booleanValue()) {
            return consumerCacheByTopicMap.get(topic);
        }
        return null;
    }
}

