/*
 * Decompiled with CFR 0.152.
 */
package com.bakdata.kafka;

import com.bakdata.kafka.KafkaApplication;
import com.bakdata.kafka.util.ImprovedAdminClient;
import com.bakdata.kafka.util.SchemaTopicClient;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
import java.util.Map;
import java.util.Properties;
import lombok.Generated;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.config.Configurator;
import org.jooq.lambda.Seq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;

public abstract class KafkaProducerApplication
extends KafkaApplication {
    @SuppressFBWarnings(justification="generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaProducerApplication.class);
    private static String appPackageName = KafkaProducerApplication.class.getPackageName();

    protected static void startApplication(KafkaProducerApplication app, String[] args) {
        appPackageName = app.getClass().getPackageName();
        String[] populatedArgs = KafkaProducerApplication.addEnvironmentVariablesArguments(args);
        int exitCode = new CommandLine((Object)app).execute(populatedArgs);
        System.exit(exitCode);
    }

    @Override
    public void run() {
        log.info("Starting application");
        if (this.debug) {
            Configurator.setLevel((String)"com.bakdata", (Level)Level.DEBUG);
            Configurator.setLevel((String)appPackageName, (Level)Level.DEBUG);
        }
        log.debug(this.toString());
        if (this.cleanUp) {
            this.runCleanUp();
        } else {
            this.runApplication();
        }
    }

    protected abstract void runApplication();

    @Override
    protected Properties createKafkaProperties() {
        Properties kafkaConfig = new Properties();
        kafkaConfig.put("key.serializer", SpecificAvroSerializer.class);
        kafkaConfig.put("value.serializer", SpecificAvroSerializer.class);
        kafkaConfig.put("max.in.flight.requests.per.connection", (Object)1);
        kafkaConfig.setProperty("acks", "all");
        kafkaConfig.setProperty("compression.type", "gzip");
        kafkaConfig.setProperty("schema.registry.url", this.getSchemaRegistryUrl());
        kafkaConfig.setProperty("bootstrap.servers", this.brokers);
        return kafkaConfig;
    }

    protected <K, V> KafkaProducer<K, V> createProducer() {
        Properties properties = new Properties();
        properties.putAll((Map<?, ?>)this.getKafkaProperties());
        return new KafkaProducer(properties);
    }

    @Override
    protected void runCleanUp() {
        try (ImprovedAdminClient improvedAdminClient = this.createAdminClient();){
            this.cleanUpRun(improvedAdminClient.getSchemaTopicClient());
        }
    }

    protected void cleanUpRun(SchemaTopicClient schemaTopicClient) {
        Iterable<String> outputTopics = this.getAllOutputTopics();
        outputTopics.forEach(schemaTopicClient::deleteTopicAndResetSchemaRegistry);
        try {
            Thread.sleep(5000L);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Error waiting for clean up", e);
        }
    }

    private Iterable<String> getAllOutputTopics() {
        return Seq.of((Object)this.getOutputTopic()).concat(this.extraOutputTopics.values());
    }

    @Override
    @SuppressFBWarnings(justification="generated code")
    @Generated
    public String toString() {
        return "KafkaProducerApplication(super=" + super.toString() + ")";
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public KafkaProducerApplication() {
    }
}

