/*
 * Decompiled with CFR 0.152.
 */
package com.bakdata.kafka;

import com.bakdata.kafka.CleanUpRunner;
import com.bakdata.kafka.KafkaApplication;
import com.bakdata.kafka.util.ImprovedAdminClient;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import lombok.Generated;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.config.Configurator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;

public abstract class KafkaStreamsApplication
extends KafkaApplication
implements AutoCloseable {
    @SuppressFBWarnings(justification="generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaStreamsApplication.class);
    private static String appPackageName = KafkaStreamsApplication.class.getPackageName();
    @CommandLine.Option(names={"--input-topics"}, description={"Input topics"}, split=",")
    protected List<String> inputTopics = new ArrayList<String>();
    @CommandLine.Option(names={"--error-topic"}, description={"Error topic (default: ${DEFAULT-VALUE}"})
    protected String errorTopic = "error_topic";
    @CommandLine.Option(names={"--extra-input-topics"}, split=",", description={"Additional input topics"})
    protected Map<String, String> extraInputTopics = new HashMap<String, String>();
    @CommandLine.Option(names={"--productive"}, arity="1")
    private boolean productive = true;
    @CommandLine.Option(names={"--delete-output"}, arity="0..1", description={"Delete the output topic during the clean up."})
    private boolean deleteOutputTopic = false;
    private KafkaStreams streams;

    protected static void startApplication(KafkaStreamsApplication app, String[] args) {
        appPackageName = app.getClass().getPackageName();
        String[] populatedArgs = KafkaStreamsApplication.addEnvironmentVariablesArguments(args);
        CommandLine.run((Runnable)app, (PrintStream)System.out, (String[])populatedArgs);
    }

    @Override
    public void run() {
        log.info("Starting application");
        if (this.debug) {
            Configurator.setLevel((String)"com.bakdata", (Level)Level.DEBUG);
            Configurator.setLevel((String)appPackageName, (Level)Level.DEBUG);
        }
        log.debug(this.toString());
        try {
            Properties kafkaProperties = this.getKafkaProperties();
            this.streams = new KafkaStreams(this.createTopology(), kafkaProperties);
            this.getUncaughtExceptionHandler().ifPresent(arg_0 -> ((KafkaStreams)this.streams).setUncaughtExceptionHandler(arg_0));
            Optional.ofNullable(this.getStateListener()).ifPresent(arg_0 -> ((KafkaStreams)this.streams).setStateListener(arg_0));
            if (this.cleanUp) {
                this.runCleanUp();
            } else {
                this.runStreamsApplication();
            }
        }
        catch (Throwable e) {
            this.closeResources();
            throw e;
        }
    }

    @Override
    public void close() {
        log.info("Stopping application");
        if (this.streams != null) {
            this.streams.close();
        }
        this.closeResources();
    }

    public abstract void buildTopology(StreamsBuilder var1);

    public abstract String getUniqueAppId();

    public Topology createTopology() {
        StreamsBuilder builder = new StreamsBuilder();
        this.buildTopology(builder);
        return builder.build();
    }

    public String getInputTopic() {
        if (this.getInputTopics().isEmpty() || this.getInputTopics().get(0).isBlank()) {
            throw new IllegalArgumentException("One input topic required");
        }
        return this.getInputTopics().get(0);
    }

    protected String getInputTopic(String role) {
        String topic = this.extraInputTopics.get(role);
        Preconditions.checkNotNull((Object)topic, (String)"No input topic for role '%s' available", (Object)role);
        return topic;
    }

    protected Optional<StreamsUncaughtExceptionHandler> getUncaughtExceptionHandler() {
        return Optional.empty();
    }

    @Override
    protected Properties createKafkaProperties() {
        Properties kafkaConfig = new Properties();
        kafkaConfig.setProperty("processing.guarantee", "exactly_once_v2");
        kafkaConfig.put(StreamsConfig.producerPrefix((String)"max.in.flight.requests.per.connection"), (Object)1);
        if (this.productive) {
            kafkaConfig.put("replication.factor", (Object)3);
        }
        kafkaConfig.setProperty(StreamsConfig.producerPrefix((String)"acks"), "all");
        kafkaConfig.setProperty(StreamsConfig.producerPrefix((String)"compression.type"), "gzip");
        kafkaConfig.put("application.id", this.getUniqueAppId());
        kafkaConfig.put("default.key.serde", SpecificAvroSerde.class);
        kafkaConfig.put("default.value.serde", SpecificAvroSerde.class);
        kafkaConfig.setProperty("schema.registry.url", this.getSchemaRegistryUrl());
        kafkaConfig.setProperty("bootstrap.servers", this.getBrokers());
        return kafkaConfig;
    }

    protected void runStreamsApplication() {
        this.streams.start();
        Runtime.getRuntime().addShutdownHook(new Thread(this::close));
    }

    protected void closeResources() {
    }

    protected KafkaStreams.StateListener getStateListener() {
        return (newState, oldState) -> {
            if (newState == KafkaStreams.State.ERROR) {
                log.debug("Closing resources because of state transition from {} to {}", (Object)oldState, (Object)KafkaStreams.State.ERROR);
                this.closeResources();
            }
        };
    }

    @Override
    protected void runCleanUp() {
        try (ImprovedAdminClient adminClient = this.createAdminClient();){
            CleanUpRunner cleanUpRunner = CleanUpRunner.builder().topology(this.createTopology()).appId(this.getUniqueAppId()).adminClient(adminClient).streams(this.streams).build();
            this.cleanUpRun(cleanUpRunner);
        }
        this.close();
    }

    protected void cleanUpRun(CleanUpRunner cleanUpRunner) {
        cleanUpRunner.run(this.deleteOutputTopic);
    }

    @Override
    @SuppressFBWarnings(justification="generated code")
    @Generated
    public String toString() {
        return "KafkaStreamsApplication(super=" + super.toString() + ", inputTopics=" + this.getInputTopics() + ", errorTopic=" + this.getErrorTopic() + ", extraInputTopics=" + this.getExtraInputTopics() + ", productive=" + this.isProductive() + ", deleteOutputTopic=" + this.isDeleteOutputTopic() + ", streams=" + this.getStreams() + ")";
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public List<String> getInputTopics() {
        return this.inputTopics;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public String getErrorTopic() {
        return this.errorTopic;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public Map<String, String> getExtraInputTopics() {
        return this.extraInputTopics;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public boolean isProductive() {
        return this.productive;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public boolean isDeleteOutputTopic() {
        return this.deleteOutputTopic;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public KafkaStreams getStreams() {
        return this.streams;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public void setInputTopics(List<String> inputTopics) {
        this.inputTopics = inputTopics;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public void setErrorTopic(String errorTopic) {
        this.errorTopic = errorTopic;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public void setExtraInputTopics(Map<String, String> extraInputTopics) {
        this.extraInputTopics = extraInputTopics;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public void setProductive(boolean productive) {
        this.productive = productive;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public void setDeleteOutputTopic(boolean deleteOutputTopic) {
        this.deleteOutputTopic = deleteOutputTopic;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public void setStreams(KafkaStreams streams) {
        this.streams = streams;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public KafkaStreamsApplication() {
    }
}

