/*
 * Decompiled with CFR 0.152.
 */
package team.bangbang.common.queue.kafka;

import com.alibaba.fastjson.JSONObject;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import team.bangbang.common.config.Config;
import team.bangbang.common.log.Logger;
import team.bangbang.common.queue.IQueueManager;
import team.bangbang.common.queue.Publisher;
import team.bangbang.common.queue.Subscriber;
import team.bangbang.common.queue.kafka.KafkaPublisher;

public class KafkaManager
implements IQueueManager {
    private Logger log = Logger.getInstance(KafkaManager.class);
    private static Map<String, Object> props_p = new HashMap<String, Object>();
    private static Map<String, Object> props_s = new HashMap<String, Object>();
    private static AdminClient adminClient = null;

    private static String getGroupId(Subscriber subscriber) {
        return subscriber.getTopic() + "_group_" + subscriber.getIndex();
    }

    @Override
    public Publisher createPublisher(String topic) {
        KafkaProducer producer = new KafkaProducer(props_p);
        return new KafkaPublisher(producer, topic);
    }

    @Override
    public boolean subscribe(Subscriber subscriber) {
        String topic = subscriber.getTopic();
        String groupId = KafkaManager.getGroupId(subscriber);
        props_s.put("group.id", groupId);
        HashSet<String> gIds = new HashSet<String>();
        gIds.add(groupId);
        try {
            ArrayList<String> topics = new ArrayList<String>();
            topics.add(topic);
            KafkaConsumer consumer = new KafkaConsumer(props_s);
            consumer.subscribe(topics);
            Duration d = Duration.ofMillis(1000L);
            Runnable r = new Runnable((Consumer)consumer, d, subscriber){
                final /* synthetic */ Consumer val$consumer;
                final /* synthetic */ Duration val$d;
                final /* synthetic */ Subscriber val$subscriber;
                {
                    this.val$consumer = consumer;
                    this.val$d = duration;
                    this.val$subscriber = subscriber;
                }

                @Override
                public void run() {
                    while (true) {
                        ConsumerRecords records = this.val$consumer.poll(this.val$d);
                        for (ConsumerRecord record : records) {
                            String value = (String)record.value();
                            JSONObject json = JSONObject.parseObject((String)value);
                            try {
                                this.val$subscriber.consume(json);
                            }
                            catch (Exception ex) {
                                ex.printStackTrace();
                                KafkaManager.this.log.log(9994, (Object)ex.getMessage());
                                KafkaManager.this.log.log(9994, (Object)("\u961f\u5217\u6d88\u8d39\u8005\uff08" + this.val$subscriber.getTopic() + ":" + this.val$subscriber.getIndex() + "\uff09\u56e0\u524d\u8ff0\u5f02\u5e38\u6682\u505c"));
                                break;
                            }
                        }
                        try {
                            Thread.sleep(10L);
                            continue;
                        }
                        catch (InterruptedException e) {
                            e.printStackTrace();
                            continue;
                        }
                        break;
                    }
                }
            };
            Thread t = new Thread(r);
            t.start();
        }
        catch (Exception ex) {
            ex.printStackTrace();
            return false;
        }
        return true;
    }

    @Override
    public boolean remove(Subscriber subscriber) {
        if (subscriber == null || adminClient == null) {
            return false;
        }
        String groupId = KafkaManager.getGroupId(subscriber);
        ArrayList<String> groupIds = new ArrayList<String>();
        groupIds.add(groupId);
        adminClient.deleteConsumerGroups(groupIds);
        return true;
    }

    static {
        props_p.put("bootstrap.servers", Config.getProperty("spring.kafka.bootstrap-servers"));
        props_p.put("acks", "all");
        props_p.put("retries", 3);
        props_p.put("batch.size", 16384);
        props_p.put("linger.ms", 1);
        props_p.put("buffer.memory", 0x2000000);
        props_p.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props_p.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props_s.put("bootstrap.servers", Config.getProperty("spring.kafka.bootstrap-servers"));
        props_s.put("enable.auto.commit", "true");
        props_s.put("auto.commit.interval.ms", "1000");
        props_s.put("auto.offset.reset", "latest");
        props_s.put("session.timeout.ms", "30000");
        props_s.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props_s.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        adminClient = AdminClient.create(props_s);
    }
}

