/*
 * Decompiled with CFR 0.152.
 */
package org.jsmart.zerocode.core.kafka.helper;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.Resources;
import com.google.gson.Gson;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.util.JsonFormat;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.jsmart.zerocode.core.di.provider.GsonSerDeProvider;
import org.jsmart.zerocode.core.di.provider.ObjectMapperProvider;
import org.jsmart.zerocode.core.kafka.KafkaConstants;
import org.jsmart.zerocode.core.kafka.common.KafkaCommonUtils;
import org.jsmart.zerocode.core.kafka.consume.ConsumerLocalConfigs;
import org.jsmart.zerocode.core.kafka.consume.ConsumerLocalConfigsWrap;
import org.jsmart.zerocode.core.kafka.receive.ConsumerCommonConfigs;
import org.jsmart.zerocode.core.kafka.receive.message.ConsumerJsonRecord;
import org.jsmart.zerocode.core.kafka.receive.message.ConsumerJsonRecords;
import org.jsmart.zerocode.core.kafka.receive.message.ConsumerRawRecords;
import org.jsmart.zerocode.core.utils.SmartUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConsumerHelper {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerHelper.class);
    private static final Gson gson = new GsonSerDeProvider().get();
    private static final ObjectMapper objectMapper = new ObjectMapperProvider().get();

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public static Consumer createConsumer(String bootStrapServers, String consumerPropertyFile, String topic) {
        try (InputStream propsIs = Resources.getResource((String)consumerPropertyFile).openStream();){
            Properties properties = new Properties();
            properties.load(propsIs);
            properties.put("bootstrap.servers", bootStrapServers);
            KafkaCommonUtils.resolveValuePlaceHolders(properties);
            KafkaConsumer consumer = new KafkaConsumer(properties);
            consumer.subscribe(Collections.singletonList(topic));
            KafkaConsumer kafkaConsumer = consumer;
            return kafkaConsumer;
        }
        catch (IOException e) {
            throw new RuntimeException("Exception while reading kafka properties and creating a consumer- " + e);
        }
    }

    public static ConsumerRecords initialPollWaitingForConsumerGroupJoin(Consumer consumer, ConsumerLocalConfigs effectiveLocalConfigs) {
        for (int run = 0; run < 50; ++run) {
            if (!consumer.assignment().isEmpty()) {
                return new ConsumerRecords(new HashMap());
            }
            ConsumerRecords records = consumer.poll(Duration.of(KafkaConsumerHelper.getPollTime(effectiveLocalConfigs), ChronoUnit.MILLIS));
            if (records.isEmpty()) continue;
            return records;
        }
        throw new RuntimeException("\n********* Kafka Consumer unable to join in time - try increasing consumer polling time setting *********\n");
    }

    public static void validateLocalConfigs(ConsumerLocalConfigs localConfigs) {
        if (localConfigs != null) {
            Boolean localCommitSync = localConfigs.getCommitSync();
            Boolean localCommitAsync = localConfigs.getCommitAsync();
            KafkaConsumerHelper.validateCommitFlags(localCommitSync, localCommitAsync);
            KafkaConsumerHelper.validateSeekConfig(localConfigs);
        }
    }

    public static void validateCommonConfigs(ConsumerCommonConfigs consumerCommonConfigs) {
        KafkaConsumerHelper.validateCommitFlags(consumerCommonConfigs.getCommitSync(), consumerCommonConfigs.getCommitAsync());
    }

    public static ConsumerLocalConfigs deriveEffectiveConfigs(ConsumerLocalConfigs consumerLocalTestConfigs, ConsumerCommonConfigs consumerCommonConfigs) {
        KafkaConsumerHelper.validateCommonConfigs(consumerCommonConfigs);
        KafkaConsumerHelper.validateLocalConfigs(consumerLocalTestConfigs);
        return KafkaConsumerHelper.createEffective(consumerCommonConfigs, consumerLocalTestConfigs);
    }

    public static ConsumerLocalConfigs createEffective(ConsumerCommonConfigs consumerCommon, ConsumerLocalConfigs consumerLocal) {
        Boolean effectiveCommitAsync;
        Boolean effectiveCommitSync;
        if (consumerLocal == null) {
            return new ConsumerLocalConfigs(consumerCommon.getRecordType(), consumerCommon.getProtoClassType(), consumerCommon.getFileDumpTo(), consumerCommon.getCommitAsync(), consumerCommon.getCommitSync(), consumerCommon.getShowRecordsConsumed(), consumerCommon.getMaxNoOfRetryPollsOrTimeouts(), consumerCommon.getPollingTime(), consumerCommon.getSeek());
        }
        String effectiveRecordType = Optional.ofNullable(consumerLocal.getRecordType()).orElse(consumerCommon.getRecordType());
        String effectiveProtobufMessageClassType = Optional.ofNullable(consumerLocal.getProtoClassType()).orElse(consumerCommon.getProtoClassType());
        String effectiveFileDumpTo = Optional.ofNullable(consumerLocal.getFileDumpTo()).orElse(consumerCommon.getFileDumpTo());
        Boolean effectiveShowRecordsConsumed = Optional.ofNullable(consumerLocal.getShowRecordsConsumed()).orElse(consumerCommon.getShowRecordsConsumed());
        Integer effectiveMaxNoOfRetryPollsOrTimeouts = Optional.ofNullable(consumerLocal.getMaxNoOfRetryPollsOrTimeouts()).orElse(consumerCommon.getMaxNoOfRetryPollsOrTimeouts());
        Long effectivePollingTime = Optional.ofNullable(consumerLocal.getPollingTime()).orElse(consumerCommon.getPollingTime());
        String effectiveSeek = Optional.ofNullable(consumerLocal.getSeek()).orElse(consumerCommon.getSeek());
        Boolean localCommitSync = consumerLocal.getCommitSync();
        Boolean localCommitAsync = consumerLocal.getCommitAsync();
        if (localCommitSync == null && localCommitAsync == null) {
            effectiveCommitSync = consumerCommon.getCommitSync();
            effectiveCommitAsync = consumerCommon.getCommitAsync();
        } else {
            effectiveCommitSync = localCommitSync;
            effectiveCommitAsync = localCommitAsync;
        }
        return new ConsumerLocalConfigs(effectiveRecordType, effectiveProtobufMessageClassType, effectiveFileDumpTo, effectiveCommitAsync, effectiveCommitSync, effectiveShowRecordsConsumed, effectiveMaxNoOfRetryPollsOrTimeouts, effectivePollingTime, effectiveSeek);
    }

    public static ConsumerLocalConfigs readConsumerLocalTestProperties(String requestJsonWithConfigWrapped) {
        try {
            ConsumerLocalConfigsWrap consumerLocalConfigsWrap = (ConsumerLocalConfigsWrap)new ObjectMapperProvider().get().readValue(requestJsonWithConfigWrapped, ConsumerLocalConfigsWrap.class);
            return consumerLocalConfigsWrap.getConsumerLocalConfigs();
        }
        catch (IOException exx) {
            throw new RuntimeException(exx);
        }
    }

    public static Integer getMaxTimeOuts(ConsumerLocalConfigs effectiveLocalTestProps) {
        return Optional.ofNullable(effectiveLocalTestProps.getMaxNoOfRetryPollsOrTimeouts()).orElse(KafkaConstants.MAX_NO_OF_RETRY_POLLS_OR_TIME_OUTS);
    }

    public static Long getPollTime(ConsumerLocalConfigs effectiveLocal) {
        return Optional.ofNullable(effectiveLocal.getPollingTime()).orElse(KafkaConstants.DEFAULT_POLLING_TIME_MILLI_SEC);
    }

    public static void readRaw(List<ConsumerRecord> rawRecords, Iterator recordIterator) {
        while (recordIterator.hasNext()) {
            ConsumerRecord thisRecord = (ConsumerRecord)recordIterator.next();
            LOGGER.info("\nRecord Key - {} , Record value - {}, Record partition - {}, Record offset - {}", new Object[]{thisRecord.key(), thisRecord.value(), thisRecord.partition(), thisRecord.offset()});
            rawRecords.add(thisRecord);
        }
    }

    public static void readJson(List<ConsumerJsonRecord> jsonRecords, Iterator recordIterator, ConsumerLocalConfigs consumerLocalConfig) throws IOException {
        while (recordIterator.hasNext()) {
            ConsumerRecord thisRecord = (ConsumerRecord)recordIterator.next();
            Object key = thisRecord.key();
            Object valueObj = thisRecord.value();
            Headers headers = thisRecord.headers();
            String keyStr = thisRecord.key() != null ? thisRecord.key().toString() : "";
            String valueStr = consumerLocalConfig != null && "PROTO".equalsIgnoreCase(consumerLocalConfig.getRecordType()) ? KafkaConsumerHelper.convertProtobufToJson(thisRecord, consumerLocalConfig) : valueObj.toString();
            LOGGER.info("\nRecord Key - {} , Record value - {}, Record partition - {}, Record offset - {}, Headers - {}", new Object[]{key, valueStr, thisRecord.partition(), thisRecord.offset(), headers});
            JsonNode keyNode = objectMapper.readTree(keyStr);
            JsonNode valueNode = objectMapper.readTree(valueStr);
            HashMap<String, String> headersMap = null;
            if (headers != null) {
                headersMap = new HashMap<String, String>();
                for (Header header : headers) {
                    headersMap.put(header.key(), new String(header.value()));
                }
            }
            ConsumerJsonRecord jsonRecord = new ConsumerJsonRecord(keyNode, valueNode, headersMap);
            jsonRecords.add(jsonRecord);
        }
    }

    private static String convertProtobufToJson(ConsumerRecord thisRecord, ConsumerLocalConfigs consumerLocalConfig) {
        if (StringUtils.isEmpty((CharSequence)consumerLocalConfig.getProtoClassType())) {
            throw new IllegalArgumentException("[protoClassType] is required consumer config for recordType PROTO.");
        }
        MessageOrBuilder builderOrMessage = KafkaConsumerHelper.createMessageOrBuilder(consumerLocalConfig.getProtoClassType(), (byte[])thisRecord.value());
        try {
            return JsonFormat.printer().includingDefaultValueFields().preservingProtoFieldNames().print(builderOrMessage);
        }
        catch (InvalidProtocolBufferException e) {
            throw new IllegalArgumentException(e);
        }
    }

    private static MessageOrBuilder createMessageOrBuilder(String messageClass, byte[] value) {
        try {
            Class<?> msgClass = Class.forName(messageClass);
            Method method = msgClass.getMethod("parseFrom", byte[].class);
            return (MessageOrBuilder)method.invoke(null, new Object[]{value});
        }
        catch (ClassNotFoundException | IllegalAccessException | IllegalArgumentException | NoSuchMethodException | SecurityException | InvocationTargetException e) {
            throw new IllegalArgumentException(e);
        }
    }

    public static String prepareResult(ConsumerLocalConfigs testConfigs, List<ConsumerJsonRecord> jsonRecords, List<ConsumerRecord> rawRecords) throws JsonProcessingException {
        int size;
        String result = testConfigs != null && testConfigs.getShowRecordsConsumed() == false ? SmartUtils.prettyPrintJson(gson.toJson((Object)new ConsumerRawRecords((size = jsonRecords.size()) == 0 ? rawRecords.size() : size))) : (testConfigs != null && "RAW".equals(testConfigs.getRecordType()) ? SmartUtils.prettyPrintJson(gson.toJson((Object)new ConsumerRawRecords(rawRecords))) : (testConfigs != null && ("JSON".equals(testConfigs.getRecordType()) || "PROTO".equalsIgnoreCase(testConfigs.getRecordType()) || "AVRO".equalsIgnoreCase(testConfigs.getRecordType())) ? SmartUtils.prettyPrintJson(objectMapper.writeValueAsString((Object)new ConsumerJsonRecords(jsonRecords))) : "{\"error\" : \"recordType Undecided, Please chose recordType as JSON or RAW\"}"));
        return result;
    }

    public static void handleCommitSyncAsync(Consumer<Long, String> consumer, ConsumerCommonConfigs consumerCommonConfigs, ConsumerLocalConfigs consumeLocalTestProps) {
        Boolean effectiveCommitAsync;
        Boolean effectiveCommitSync;
        if (consumeLocalTestProps == null) {
            LOGGER.warn("[No local test configs]-Kafka client neither did `commitAsync()` nor `commitSync()`");
            return;
        }
        Boolean localCommitSync = consumeLocalTestProps.getCommitSync();
        Boolean localCommitAsync = consumeLocalTestProps.getCommitAsync();
        if (localCommitSync == null && localCommitAsync == null) {
            effectiveCommitSync = consumerCommonConfigs.getCommitSync();
            effectiveCommitAsync = consumerCommonConfigs.getCommitAsync();
        } else {
            effectiveCommitSync = localCommitSync;
            effectiveCommitAsync = localCommitAsync;
        }
        if (effectiveCommitSync != null && effectiveCommitSync.booleanValue()) {
            consumer.commitSync();
        } else if (effectiveCommitAsync != null && effectiveCommitAsync.booleanValue()) {
            consumer.commitAsync();
        } else {
            LOGGER.warn("Kafka client neither configured for `commitAsync()` nor `commitSync()`");
        }
    }

    public static void handleSeekOffset(ConsumerLocalConfigs effectiveLocal, Consumer consumer) {
        String seek = effectiveLocal.getSeek();
        if (!StringUtils.isEmpty((CharSequence)seek)) {
            String[] seekParts = effectiveLocal.getSeekTopicPartitionOffset();
            String topic = seekParts[0];
            int partition = Integer.parseInt(seekParts[1]);
            long offset = Long.parseLong(seekParts[2]);
            TopicPartition topicPartition = new TopicPartition(topic, partition);
            HashSet<TopicPartition> topicPartitions = new HashSet<TopicPartition>();
            topicPartitions.add(topicPartition);
            consumer.unsubscribe();
            consumer.assign(topicPartitions);
            if (offset <= -1L) {
                consumer.seekToEnd(topicPartitions);
                consumer.seek(topicPartition, consumer.position(topicPartition) + offset);
            } else {
                consumer.seek(topicPartition, offset);
            }
        }
    }

    private static void validateCommitFlags(Boolean commitSync, Boolean commitAsync) {
        if (commitSync != null && commitAsync != null && commitSync.booleanValue() && commitAsync.booleanValue()) {
            throw new RuntimeException("\n********* Both commitSync and commitAsync can not be true *********\n");
        }
    }

    private static void validateSeekConfig(ConsumerLocalConfigs localConfigs) {
        String[] split;
        String seek = localConfigs.getSeek();
        if (!(StringUtils.isEmpty((CharSequence)seek) || (split = seek.split(",")) != null && split.length >= 3)) {
            throw new RuntimeException("\n------> 'seek' should contain 'topic,partition,offset' e.g. 'topic1,0,2' ");
        }
    }
}

