/*
 * Decompiled with CFR 0.152.
 */
package com.bakdata.kafka;

import com.bakdata.kafka.util.ConsumerGroupClient;
import com.bakdata.kafka.util.ImprovedAdminClient;
import com.bakdata.kafka.util.SchemaTopicClient;
import com.bakdata.kafka.util.TopologyInformation;
import com.google.common.collect.ImmutableList;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import kafka.tools.StreamsResetter;
import lombok.Generated;
import lombok.NonNull;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class CleanUpRunner {
    @SuppressFBWarnings(justification="generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(CleanUpRunner.class);
    private static final int EXIT_CODE_SUCCESS = 0;
    private final String appId;
    private final KafkaStreams streams;
    private final TopologyInformation topologyInformation;
    @NonNull
    private final ImprovedAdminClient adminClient;

    private CleanUpRunner(@NonNull Topology topology, @NonNull String appId, @NonNull ImprovedAdminClient adminClient, @NonNull KafkaStreams streams) {
        if (topology == null) {
            throw new NullPointerException("topology is marked non-null but is null");
        }
        if (appId == null) {
            throw new NullPointerException("appId is marked non-null but is null");
        }
        if (adminClient == null) {
            throw new NullPointerException("adminClient is marked non-null but is null");
        }
        if (streams == null) {
            throw new NullPointerException("streams is marked non-null but is null");
        }
        this.appId = appId;
        this.adminClient = adminClient;
        this.streams = streams;
        this.topologyInformation = new TopologyInformation(topology, appId);
    }

    public static void runResetter(Collection<String> inputTopics, Collection<String> intermediateTopics, ImprovedAdminClient adminClient, String appId) {
        String[] args;
        StreamsResetter resetter;
        int returnCode;
        Collection<String> existingIntermediateTopics;
        File tempFile = CleanUpRunner.createTemporaryPropertiesFile(appId, adminClient.getProperties());
        ImmutableList.Builder argList = ImmutableList.builder().add((Object[])new String[]{"--application-id", appId}).add((Object[])new String[]{"--bootstrap-servers", adminClient.getBootstrapServers()}).add((Object[])new String[]{"--config-file", tempFile.toString()});
        Collection<String> allTopics = adminClient.getTopicClient().listTopics();
        Collection<String> existingInputTopics = CleanUpRunner.filterExistingTopics(inputTopics, allTopics);
        if (!existingInputTopics.isEmpty()) {
            argList.add((Object[])new String[]{"--input-topics", String.join((CharSequence)",", existingInputTopics)});
        }
        if (!(existingIntermediateTopics = CleanUpRunner.filterExistingTopics(intermediateTopics, allTopics)).isEmpty()) {
            argList.add((Object[])new String[]{"--intermediate-topics", String.join((CharSequence)",", existingIntermediateTopics)});
        }
        if ((returnCode = (resetter = new StreamsResetter()).run(args = (String[])argList.build().toArray(String[]::new))) != 0) {
            throw new RuntimeException("Error running streams resetter. Exit code " + returnCode);
        }
    }

    private static Collection<String> filterExistingTopics(Collection<String> topics, Collection<String> allTopics) {
        return topics.stream().filter(topicName -> {
            boolean exists = allTopics.contains(topicName);
            if (!exists) {
                log.warn("Not resetting missing topic {}", topicName);
            }
            return exists;
        }).collect(Collectors.toList());
    }

    static File createTemporaryPropertiesFile(String appId, Map<Object, Object> config) {
        Properties parsedProperties = CleanUpRunner.toStringBasedProperties(config);
        try {
            File tempFile = File.createTempFile(appId + "-reset", "temp");
            tempFile.deleteOnExit();
            try (FileOutputStream out = new FileOutputStream(tempFile);){
                parsedProperties.store(out, "");
            }
            return tempFile;
        }
        catch (IOException e) {
            throw new RuntimeException("Could not run StreamsResetter", e);
        }
    }

    static Properties toStringBasedProperties(Map<Object, Object> config) {
        Properties parsedProperties = new Properties();
        config.forEach((key, value) -> parsedProperties.setProperty(key.toString(), value.toString()));
        return parsedProperties;
    }

    public void run(boolean deleteOutputTopic) {
        List<String> inputTopics = this.topologyInformation.getExternalSourceTopics();
        List<String> intermediateTopics = this.topologyInformation.getIntermediateTopics();
        CleanUpRunner.runResetter(inputTopics, intermediateTopics, this.adminClient, this.appId);
        this.topologyInformation.getInternalTopics().forEach(this.adminClient.getSchemaTopicClient()::resetSchemaRegistry);
        if (deleteOutputTopic) {
            this.deleteTopics();
            this.deleteConsumerGroup();
        }
        this.streams.cleanUp();
        try {
            Thread.sleep(5000L);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Error waiting for clean up", e);
        }
    }

    public void deleteTopics() {
        SchemaTopicClient schemaTopicClient = this.adminClient.getSchemaTopicClient();
        List<String> externalTopics = this.topologyInformation.getExternalSinkTopics();
        externalTopics.forEach(schemaTopicClient::deleteTopicAndResetSchemaRegistry);
    }

    private void deleteConsumerGroup() {
        ConsumerGroupClient consumerGroupClient = this.adminClient.getConsumerGroupClient();
        consumerGroupClient.deleteGroupIfExists(this.appId);
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public static CleanUpRunnerBuilder builder() {
        return new CleanUpRunnerBuilder();
    }

    @NonNull
    @SuppressFBWarnings(justification="generated code")
    @Generated
    public ImprovedAdminClient getAdminClient() {
        return this.adminClient;
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    public static class CleanUpRunnerBuilder {
        @SuppressFBWarnings(justification="generated code")
        @Generated
        private Topology topology;
        @SuppressFBWarnings(justification="generated code")
        @Generated
        private String appId;
        @SuppressFBWarnings(justification="generated code")
        @Generated
        private ImprovedAdminClient adminClient;
        @SuppressFBWarnings(justification="generated code")
        @Generated
        private KafkaStreams streams;

        @SuppressFBWarnings(justification="generated code")
        @Generated
        CleanUpRunnerBuilder() {
        }

        @SuppressFBWarnings(justification="generated code")
        @Generated
        public CleanUpRunnerBuilder topology(@NonNull Topology topology) {
            if (topology == null) {
                throw new NullPointerException("topology is marked non-null but is null");
            }
            this.topology = topology;
            return this;
        }

        @SuppressFBWarnings(justification="generated code")
        @Generated
        public CleanUpRunnerBuilder appId(@NonNull String appId) {
            if (appId == null) {
                throw new NullPointerException("appId is marked non-null but is null");
            }
            this.appId = appId;
            return this;
        }

        @SuppressFBWarnings(justification="generated code")
        @Generated
        public CleanUpRunnerBuilder adminClient(@NonNull ImprovedAdminClient adminClient) {
            if (adminClient == null) {
                throw new NullPointerException("adminClient is marked non-null but is null");
            }
            this.adminClient = adminClient;
            return this;
        }

        @SuppressFBWarnings(justification="generated code")
        @Generated
        public CleanUpRunnerBuilder streams(@NonNull KafkaStreams streams) {
            if (streams == null) {
                throw new NullPointerException("streams is marked non-null but is null");
            }
            this.streams = streams;
            return this;
        }

        @SuppressFBWarnings(justification="generated code")
        @Generated
        public CleanUpRunner build() {
            return new CleanUpRunner(this.topology, this.appId, this.adminClient, this.streams);
        }

        @SuppressFBWarnings(justification="generated code")
        @Generated
        public String toString() {
            return "CleanUpRunner.CleanUpRunnerBuilder(topology=" + this.topology + ", appId=" + this.appId + ", adminClient=" + this.adminClient + ", streams=" + this.streams + ")";
        }
    }
}

