/*
 * Decompiled with CFR 0.152.
 */
package org.kie.remote.impl.consumer;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.kie.remote.TopicsConfig;
import org.kie.remote.impl.consumer.ListenerThread;
import org.kie.remote.message.ResultMessage;
import org.kie.remote.util.SerializationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaListenerThread
implements ListenerThread {
    private static Logger logger = LoggerFactory.getLogger(KafkaListenerThread.class);
    private Properties configuration;
    private TopicsConfig topicsConfig;
    private Map<String, CompletableFuture<Object>> requestsStore;
    private KafkaConsumer consumer;
    private volatile boolean running = true;

    public KafkaListenerThread(Properties configuration, TopicsConfig config, Map<String, CompletableFuture<Object>> requestsStore) {
        this.configuration = configuration;
        this.topicsConfig = config;
        this.requestsStore = requestsStore;
        this.prepareConsumer();
    }

    private void prepareConsumer() {
        this.consumer = new KafkaConsumer(this.configuration);
        List infos = this.consumer.partitionsFor(this.topicsConfig.getKieSessionInfosTopicName());
        ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
        if (infos != null) {
            for (PartitionInfo partition : infos) {
                partitions.add(new TopicPartition(this.topicsConfig.getKieSessionInfosTopicName(), partition.partition()));
            }
        }
        this.consumer.assign(partitions);
        Map offsets = this.consumer.endOffsets(partitions);
        Long lastOffset = 0L;
        for (Map.Entry entry : offsets.entrySet()) {
            lastOffset = (Long)entry.getValue();
        }
        if (lastOffset == 0L) {
            lastOffset = 1L;
        }
        Set assignments = this.consumer.assignment();
        for (TopicPartition part : assignments) {
            this.consumer.seek(part, lastOffset - 1L);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            while (this.running) {
                ConsumerRecords records = this.consumer.poll(Duration.of(1000L, ChronoUnit.MILLIS));
                for (Object item : records) {
                    ConsumerRecord record = (ConsumerRecord)item;
                    Object msg = SerializationUtil.deserialize((byte[])record.value());
                    if (msg instanceof ResultMessage) {
                        this.complete(this.requestsStore, (ResultMessage)msg, logger);
                        continue;
                    }
                    if (msg == null) continue;
                    throw new IllegalStateException("Wrong type of response message: found " + msg.getClass().getCanonicalName() + " instead of " + ResultMessage.class.getCanonicalName());
                }
            }
        }
        catch (Exception ex) {
            logger.error(ex.getMessage(), (Throwable)ex);
        }
        finally {
            this.consumer.close();
        }
    }

    @Override
    public void stop() {
        this.running = false;
    }
}

