/*
 * Decompiled with CFR 0.152.
 */
package io.wizzie.enricher.utils.bootstrap;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.wizzie.bootstrapper.builder.Config;
import io.wizzie.enricher.model.PlanModel;
import io.wizzie.enricher.model.exceptions.PlanBuilderException;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;

public class StreamerKafkaConfig {
    public static void main(String[] args) throws IOException, PlanBuilderException {
        if (args.length == 3) {
            String line;
            Properties properties = new Properties();
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("bootstrap.servers", args[0]);
            properties.put("acks", "1");
            BufferedReader bufferedReader = new BufferedReader(new FileReader(args[2]));
            StringBuilder stringBuffer = new StringBuilder();
            while ((line = bufferedReader.readLine()) != null) {
                stringBuffer.append(line).append("\n");
            }
            String streamConfig = stringBuffer.toString();
            ObjectMapper objectMapper = new ObjectMapper();
            PlanModel model = objectMapper.readValue(streamConfig, PlanModel.class);
            model.validate(new Config());
            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
            producer.send(new ProducerRecord<String, String>("__enricher_bootstrap", 0, args[1], streamConfig), (metadata, exception) -> {
                if (exception == null) {
                    System.out.println(String.format("Wrote stream config with appID[%s] with offset: %d", args[1], metadata.offset()));
                } else {
                    System.out.println(exception.getMessage());
                    exception.printStackTrace();
                }
            });
            producer.flush();
            producer.close();
            System.out.println("New executing plan: ");
            System.out.println(model.printExecutionPlan());
        } else if (args.length == 2) {
            Properties consumerConfig = new Properties();
            consumerConfig.put("bootstrap.servers", args[0]);
            consumerConfig.put("auto.offset.reset", "earliest");
            consumerConfig.put("enable.auto.commit", "false");
            consumerConfig.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            consumerConfig.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            consumerConfig.put("group.id", String.format("enricher-bootstraper-%s-%s", args[1], UUID.randomUUID().toString()));
            KafkaConsumer restoreConsumer = new KafkaConsumer(consumerConfig);
            TopicPartition storePartition = new TopicPartition("__enricher_bootstrap", 0);
            restoreConsumer.assign(Collections.singletonList(storePartition));
            restoreConsumer.seekToEnd(Collections.singleton(storePartition));
            long endOffset = restoreConsumer.position(storePartition);
            restoreConsumer.seekToBeginning(Collections.singleton(storePartition));
            String jsonStreamConfig = null;
            long jsonOffset = 0L;
            long offset = 0L;
            while (offset < endOffset) {
                for (ConsumerRecord record : restoreConsumer.poll(100L).records(storePartition)) {
                    if (!((String)record.key()).equals(args[1])) continue;
                    jsonStreamConfig = (String)record.value();
                    jsonOffset = record.offset();
                }
                offset = restoreConsumer.position(storePartition);
            }
            if (jsonStreamConfig != null) {
                System.out.println(String.format("Find stream configuration with app id [%s] with offset: %d", args[1], jsonOffset));
                ObjectMapper objectMapper = new ObjectMapper();
                PlanModel model = objectMapper.readValue(jsonStreamConfig, PlanModel.class);
                model.validate(new Config());
                System.out.println("Current executing plan: ");
                System.out.println(model.printExecutionPlan());
                System.out.println("Stream json config: ");
                System.out.println(objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(model));
            } else {
                System.out.println(String.format("Don't find any stream configuration with app id [%s]", args[1]));
            }
        } else {
            System.out.println("Usage: java -cp enricher-selfcontained.jar io.utils.bootstrap.StreamerKafkaConfig <bootstrap_kafka_servers> <app_id> [stream_config_path]");
        }
    }
}

