/*
 * Decompiled with CFR 0.152.
 */
package io.wizzie.enricher.builder;

import com.codahale.metrics.jvm.JmxAttributeGauge;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.wizzie.bootstrapper.builder.Bootstrapper;
import io.wizzie.bootstrapper.builder.BootstrapperBuilder;
import io.wizzie.bootstrapper.builder.Config;
import io.wizzie.bootstrapper.builder.Listener;
import io.wizzie.bootstrapper.builder.SourceSystem;
import io.wizzie.enricher.base.utils.Utils;
import io.wizzie.enricher.builder.StreamBuilder;
import io.wizzie.enricher.model.PlanModel;
import io.wizzie.enricher.model.exceptions.PlanBuilderException;
import io.wizzie.enricher.serializers.JsonSerde;
import io.wizzie.metrics.MetricsManager;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Builder
implements Listener {
    private static final Logger log = LoggerFactory.getLogger(Builder.class);
    Config config;
    StreamBuilder streamBuilder;
    KafkaStreams streams;
    MetricsManager metricsManager;
    Bootstrapper bootstrapper;

    public Builder(Config config) throws Exception {
        this.config = config;
        config.put("default.key.serde", Serdes.StringSerde.class);
        config.put("default.value.serde", JsonSerde.class);
        this.metricsManager = new MetricsManager(config.clone().getMapConf());
        this.metricsManager.start();
        Map metricDataBag = config.getOrDefault("metric.databag", new HashMap());
        metricDataBag.put("host", Utils.getIdentifier());
        config.put("metric.databag", metricDataBag);
        this.streamBuilder = new StreamBuilder(config.clone(), this.metricsManager);
        this.bootstrapper = BootstrapperBuilder.makeBuilder().boostrapperClass((String)config.get("bootstraper.classname")).listener(this).withConfigInstance(config).build();
    }

    public void close() throws Exception {
        this.metricsManager.interrupt();
        this.streamBuilder.close();
        this.bootstrapper.close();
        if (this.streams != null) {
            this.streams.close();
        }
    }

    private void registerKafkaMetrics(Config config, MetricsManager metricsManager) {
        Integer streamThreads = config.getOrDefault("num.stream.threads", 1);
        String appId = (String)config.get("application.id");
        log.info("Register kafka jvm metrics: ");
        for (int i = 1; i <= streamThreads; ++i) {
            try {
                log.info(" * {}", (Object)("producer." + i + ".messages_send_per_sec"));
                metricsManager.registerMetric("producer.stream-" + i + ".messages_send_per_sec", new JmxAttributeGauge(new ObjectName("kafka.producer:type=producer-metrics,client-id=" + String.format("%s_%s", appId, "enricher") + "-StreamThread-" + i + "-producer"), "record-send-rate"));
                log.info(" * {}", (Object)("producer." + i + ".output_bytes_per_sec"));
                metricsManager.registerMetric("producer.stream-" + i + ".output_bytes_per_sec", new JmxAttributeGauge(new ObjectName("kafka.producer:type=producer-metrics,client-id=" + String.format("%s_%s", appId, "enricher") + "-StreamThread-" + i + "-producer"), "outgoing-byte-rate"));
                log.info(" * {}", (Object)("producer." + i + ".incoming_bytes_per_sec"));
                metricsManager.registerMetric("producer.stream-" + i + ".incoming_bytes_per_sec", new JmxAttributeGauge(new ObjectName("kafka.producer:type=producer-metrics,client-id=" + String.format("%s_%s", appId, "enricher") + "-StreamThread-" + i + "-producer"), "incoming-byte-rate"));
                log.info(" * {}", (Object)("consumer." + i + ".max_lag"));
                metricsManager.registerMetric("consumer.stream-" + i + ".max_lag", new JmxAttributeGauge(new ObjectName("kafka.consumer:type=consumer-fetch-manager-metrics,client-id=" + String.format("%s_%s", appId, "enricher") + "-StreamThread-" + i + "-consumer"), "records-lag-max"));
                log.info(" * {}", (Object)("consumer." + i + ".output_bytes_per_sec"));
                metricsManager.registerMetric("consumer.stream-" + i + ".output_bytes_per_sec", new JmxAttributeGauge(new ObjectName("kafka.consumer:type=consumer-metrics,client-id=" + String.format("%s_%s", appId, "enricher") + "-StreamThread-" + i + "-consumer"), "outgoing-byte-rate"));
                log.info(" * {}", (Object)("consumer." + i + ".incoming_bytes_per_sec"));
                metricsManager.registerMetric("consumer.stream-" + i + ".incoming_bytes_per_sec", new JmxAttributeGauge(new ObjectName("kafka.consumer:type=consumer-metrics,client-id=" + String.format("%s_%s", appId, "enricher") + "-StreamThread-" + i + "-consumer"), "incoming-byte-rate"));
                log.info(" * {}", (Object)("consumer." + i + ".records_per_sec"));
                metricsManager.registerMetric("consumer.stream-" + i + ".records_per_sec", new JmxAttributeGauge(new ObjectName("kafka.consumer:type=consumer-fetch-manager-metrics,client-id=" + String.format("%s_%s", appId, "enricher") + "-StreamThread-" + i + "-consumer"), "records-consumed-rate"));
                log.info(" * {}", (Object)("streams.stream-" + i + ".process-latency-ms"));
                metricsManager.registerMetric("streams.stream-" + i + ".process-latency-ms", new JmxAttributeGauge(new ObjectName("kafka.streams:type=stream-metrics,client-id=" + String.format("%s_%s", appId, "enricher") + "-StreamThread-" + i), "process-latency-avg"));
                log.info(" * {}", (Object)("streams.stream-" + i + ".poll-latency-ms"));
                metricsManager.registerMetric("streams.stream-" + i + ".poll-latency-ms", new JmxAttributeGauge(new ObjectName("kafka.streams:type=stream-metrics,client-id=" + String.format("%s_%s", appId, "enricher") + "-StreamThread-" + i), "poll-latency-avg"));
                log.info(" * {}", (Object)("streams.stream-" + i + ".commit-latency-ms"));
                metricsManager.registerMetric("streams.stream-" + i + ".commit-latency-ms", new JmxAttributeGauge(new ObjectName("kafka.streams:type=stream-metrics,client-id=" + String.format("%s_%s", appId, "enricher") + "-StreamThread-" + i), "commit-latency-avg"));
                continue;
            }
            catch (MalformedObjectNameException e) {
                log.error("kafka jvm metrics not found", e);
            }
        }
    }

    @Override
    public void updateConfig(SourceSystem sourceSystem, String streamConfig) {
        if (this.streams != null) {
            this.metricsManager.clean();
            this.streamBuilder.close();
            this.streams.close();
            log.info("Clean Enricher process");
        }
        try {
            ObjectMapper objectMapper = new ObjectMapper();
            PlanModel model = objectMapper.readValue(streamConfig, PlanModel.class);
            log.info("Execution plan: {}", (Object)model.printExecutionPlan());
            log.info("-------- TOPOLOGY BUILD START --------");
            StreamsBuilder builder = this.streamBuilder.builder(model);
            log.info("--------  TOPOLOGY BUILD END  --------");
            Config configWithNewAppId = this.config.clone();
            String appId = (String)configWithNewAppId.get("application.id");
            configWithNewAppId.put("application.id", String.format("%s_%s", appId, "enricher"));
            configWithNewAppId.put("client.id", String.format("%s_%s", appId, "enricher"));
            Properties properties = configWithNewAppId.getProperties();
            properties.put(StreamsConfig.producerPrefix("retries"), (Object)Integer.MAX_VALUE);
            properties.put(StreamsConfig.producerPrefix("max.block.ms"), (Object)Integer.MAX_VALUE);
            properties.put("request.timeout.ms", (Object)305000);
            properties.put(StreamsConfig.consumerPrefix("max.poll.interval.ms"), (Object)Integer.MAX_VALUE);
            log.info(builder.build().describe().toString());
            this.streams = new KafkaStreams(builder.build(), properties);
            this.streams.setUncaughtExceptionHandler((thread, exception) -> {
                log.error(exception.getMessage(), exception);
                log.info("Stopping enricher engine");
                try {
                    this.close();
                }
                catch (Exception e) {
                    log.error(e.getMessage(), exception);
                }
                log.info("Closing enricher engine");
            });
            this.streams.start();
            this.registerKafkaMetrics(this.config, this.metricsManager);
            log.info("Started Enricher with conf {}", (Object)this.config.getProperties());
        }
        catch (PlanBuilderException | IOException e) {
            log.error(e.getMessage(), e);
        }
    }
}

