/*
 * Decompiled with CFR 0.152.
 */
package com.scin.sdk;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson.parser.Feature;
import com.scin.sdk.api.AppUser;
import com.scin.sdk.api.Authority;
import com.scin.sdk.bean.base.Cat;
import com.scin.sdk.bean.base.Constant;
import com.scin.sdk.bean.base.Tiger;
import com.scin.sdk.enums.SystemStatusEnum;
import com.scin.sdk.exception.BusinessException;
import com.scin.sdk.utils.HttpUtil;
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerClient {
    private final Logger logger = LoggerFactory.getLogger(ConsumerClient.class);
    private Properties kafkaConfig;
    private Authority authority;
    private String url = "https://api-iot-company.hzzxxd.com/company/application/subAuth";
    public Map<Integer, Class> parseClassMap;

    public ConsumerClient(String key, String secret) {
        this.authority = HttpUtil.sign(this.url, new AppUser(key, secret));
        this.kafkaConfig = this.initKafkaConfig();
    }

    public ConsumerClient(String url, String key, String secret) {
        this.authority = HttpUtil.sign(url, new AppUser(key, secret));
        this.parseClassMap = this.initParseClassMap();
        this.kafkaConfig = this.initKafkaConfig();
    }

    private Map<Integer, Class> initParseClassMap() {
        switch (this.authority.getIdentity()) {
            case "battery": {
                return Constant.BATTERY_DATA_CLASS;
            }
            case "vehicle": {
                return Constant.VEHICLE_DATA_CLASS;
            }
            case "cabinet": {
                return Constant.CABINET_DATA_CLASS;
            }
        }
        throw BusinessException.of(SystemStatusEnum.IDENTITY_NOT_SUPPORT);
    }

    private Properties initKafkaConfig() {
        Properties props = new Properties();
        props.put("bootstrap.servers", this.authority.getServers());
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("enable.auto.commit", "true");
        props.put("max.poll.records", (Object)12);
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "earliest");
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "SCRAM-SHA-256");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + this.authority.getUsername() + "\" password=\"" + this.authority.getPassword() + "\";");
        return props;
    }

    public void raw(String topic, int groupNum, Consumer<List<Cat>> action) {
        this.raw(Collections.singleton(topic), groupNum, action);
    }

    public void raw(Set<String> topics, int groupNum, Consumer<List<Cat>> action) {
        Type type = new TypeReference<Cat>(){}.getType();
        this.listen(topics, groupNum, type, action);
    }

    public <T> void ripe(Class<T> clazz, String topic, int groupNum, Consumer<List<Tiger>> action) {
        this.ripe(clazz, Collections.singleton(topic), groupNum, action);
    }

    public void ripe(String topic, int groupNum, Consumer<List<Tiger>> action) {
        this.ripe(Collections.singleton(topic), groupNum, action);
    }

    public void ripe(Set<String> topics, int groupNum, Consumer<List<Tiger>> action) {
        this.ripe(null, topics, groupNum, action);
    }

    public <T> void ripe(Class<T> clazz, Set<String> topics, int groupNum, Consumer<List<Tiger>> action) {
        Consumer<List<T>> messages = this.findConsumerOfCmdKey(clazz, action);
        Type type = new TypeReference<Tiger>(){}.getType();
        this.listen(topics, groupNum, type, messages);
    }

    private Consumer<List<Tiger>> findConsumerOfCmdKey(Class clazz, Consumer<List<Tiger>> action) {
        Integer cmdKey = this.findCmdKey(clazz);
        if (cmdKey == null) {
            return tigers -> {
                List<Tiger> messages = tigers.stream().filter(t -> this.parseClassMap.containsKey(t.getCmdKey())).collect(Collectors.toList());
                if (CollectionUtils.isNotEmpty(messages)) {
                    messages.forEach(m -> m.setDataClazz(this.parseClassMap.get(m.getCmdKey())));
                    action.accept((List<Tiger>)tigers);
                }
            };
        }
        return tigers -> {
            List<Tiger> messages = tigers.stream().filter(t -> t.getCmdKey().equals(cmdKey)).filter(t -> this.parseClassMap.containsKey(t.getCmdKey())).collect(Collectors.toList());
            if (CollectionUtils.isNotEmpty(messages)) {
                messages.forEach(m -> m.setDataClazz(clazz));
                action.accept((List<Tiger>)tigers);
            }
        };
    }

    private Integer findCmdKey(Class clazz) {
        Integer cmdKey = null;
        if (clazz == null) {
            return cmdKey;
        }
        for (Map.Entry<Integer, Class> entry : this.parseClassMap.entrySet()) {
            if (entry.getValue() != clazz) continue;
            cmdKey = entry.getKey();
            break;
        }
        if (cmdKey == null) {
            throw BusinessException.of(SystemStatusEnum.PARSE_CLASS_SUPPORT);
        }
        return cmdKey;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> void listen(Set<String> topics, int groupNum, Type type, Consumer<List<T>> action) {
        if (topics == null || topics.isEmpty()) {
            throw BusinessException.of(SystemStatusEnum.SUBSCRIBE_TOPIC_EMPTY);
        }
        topics.stream().forEach(topic -> {
            if (!this.authority.getTopics().contains(topic)) {
                throw BusinessException.of(SystemStatusEnum.APP_TOPIC_NOT_AUTHORIZED);
            }
        });
        if (groupNum < 1 && groupNum > this.authority.getGroupNum()) {
            throw BusinessException.of(SystemStatusEnum.APP_GROUP_NOT_AUTHORIZED);
        }
        this.kafkaConfig.put("group.id", "app." + this.authority.getUsername() + "." + groupNum);
        KafkaConsumer consumer = null;
        try {
            try {
                consumer = new KafkaConsumer(this.kafkaConfig);
                consumer.subscribe(topics);
                while (true) {
                    ConsumerRecords records;
                    if ((records = consumer.poll(Duration.ofMillis(1000L))).isEmpty()) {
                        continue;
                    }
                    ArrayList<T> messages = new ArrayList<T>(16);
                    for (ConsumerRecord record : records) {
                        T t = this.parse((ConsumerRecord<String, byte[]>)record, type);
                        if (t == null) continue;
                        messages.add(t);
                    }
                    if (!CollectionUtils.isNotEmpty(messages)) continue;
                    action.accept(messages);
                }
            }
            catch (SaslAuthenticationException e) {
                throw BusinessException.of(SystemStatusEnum.KAFKA_USER_NOT_AUTHORIZED);
            }
            catch (TopicAuthorizationException e) {
                throw BusinessException.of(SystemStatusEnum.KAFKA_TOPIC_NOT_AUTHORIZED);
            }
            catch (GroupAuthorizationException e) {
                throw BusinessException.of(SystemStatusEnum.KAFKA_GROUP_NOT_AUTHORIZED);
            }
            catch (Exception e) {
                this.logger.error("\u8ba2\u9605\u5f02\u5e38:" + e.getMessage(), (Throwable)e);
                if (consumer != null) {
                    consumer.close();
                }
            }
        }
        catch (Throwable throwable) {
            if (consumer != null) {
                consumer.close();
            }
            throw throwable;
        }
    }

    private <T> T parse(ConsumerRecord<String, byte[]> record, Type type) {
        try {
            Object t = JSONObject.parseObject((byte[])((byte[])record.value()), (Type)type, (Feature[])new Feature[0]);
            return (T)t;
        }
        catch (Exception e) {
            this.logger.error("\u6570\u636e\u5e8f\u5217\u5316\u5931\u8d25:" + e.getMessage(), (Throwable)e);
            return null;
        }
    }

    public void setKafkaConfig(Properties kafkaConfig) {
        this.kafkaConfig = kafkaConfig;
    }
}

