/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.apache.kafka.streams.processor.internals.RepartitionTopicConfig;
import org.apache.kafka.streams.processor.internals.UnwindowedUnversionedChangelogTopicConfig;
import org.apache.kafka.streams.processor.internals.VersionedChangelogTopicConfig;
import org.apache.kafka.streams.processor.internals.WindowedChangelogTopicConfig;
import org.slf4j.Logger;

public class InternalTopicManager {
    private static final String BUG_ERROR_MESSAGE = "This indicates a bug. Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the dev-mailing list (https://kafka.apache.org/contact).";
    private static final String INTERRUPTED_ERROR_MESSAGE = "Thread got interrupted. This indicates a bug. Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the dev-mailing list (https://kafka.apache.org/contact).";
    private final Logger log;
    private final Time time;
    private final Admin adminClient;
    private final short replicationFactor;
    private final long windowChangeLogAdditionalRetention;
    private final long retryBackOffMs;
    private final long retryTimeoutMs;
    private final Map<String, String> defaultTopicConfigs = new HashMap<String, String>();

    public InternalTopicManager(Time time, Admin adminClient, StreamsConfig streamsConfig) {
        this.time = time;
        this.adminClient = adminClient;
        LogContext logContext = new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName()));
        this.log = logContext.logger(this.getClass());
        this.replicationFactor = streamsConfig.getInt("replication.factor").shortValue();
        this.windowChangeLogAdditionalRetention = streamsConfig.getLong("windowstore.changelog.additional.retention.ms");
        this.retryBackOffMs = streamsConfig.getLong("retry.backoff.ms");
        Map<String, Object> consumerConfig = streamsConfig.getMainConsumerConfigs("dummy", "dummy", -1);
        consumerConfig.put("key.deserializer", ByteArrayDeserializer.class);
        consumerConfig.put("value.deserializer", ByteArrayDeserializer.class);
        this.retryTimeoutMs = (long)new ClientUtils.QuietConsumerConfig(consumerConfig).getInt("max.poll.interval.ms").intValue() / 2L;
        this.log.debug("Configs:" + Utils.NL + "\t{} = {}" + Utils.NL + "\t{} = {}", new Object[]{"replication.factor", this.replicationFactor, "windowstore.changelog.additional.retention.ms", this.windowChangeLogAdditionalRetention});
        for (Map.Entry entry : streamsConfig.originalsWithPrefix("topic.").entrySet()) {
            if (entry.getValue() == null) continue;
            this.defaultTopicConfigs.put((String)entry.getKey(), entry.getValue().toString());
        }
    }

    public ValidationResult validate(Map<String, InternalTopicConfig> topicConfigs) {
        this.log.info("Starting to validate internal topics {}.", topicConfigs.keySet());
        long now = this.time.milliseconds();
        long deadline = now + this.retryTimeoutMs;
        ValidationResult validationResult = new ValidationResult();
        HashSet<String> topicDescriptionsStillToValidate = new HashSet<String>(topicConfigs.keySet());
        HashSet<String> topicConfigsStillToValidate = new HashSet<String>(topicConfigs.keySet());
        while (!topicDescriptionsStillToValidate.isEmpty() || !topicConfigsStillToValidate.isEmpty()) {
            Map descriptionsForTopic = Collections.emptyMap();
            if (!topicDescriptionsStillToValidate.isEmpty()) {
                DescribeTopicsResult describeTopicsResult = this.adminClient.describeTopics(topicDescriptionsStillToValidate);
                descriptionsForTopic = describeTopicsResult.topicNameValues();
            }
            Map configsForTopic = Collections.emptyMap();
            if (!topicConfigsStillToValidate.isEmpty()) {
                DescribeConfigsResult describeConfigsResult = this.adminClient.describeConfigs((Collection)topicConfigsStillToValidate.stream().map(topic -> new ConfigResource(ConfigResource.Type.TOPIC, topic)).collect(Collectors.toSet()));
                configsForTopic = describeConfigsResult.values().entrySet().stream().collect(Collectors.toMap(entry -> ((ConfigResource)entry.getKey()).name(), Map.Entry::getValue));
            }
            while (!descriptionsForTopic.isEmpty() || !configsForTopic.isEmpty()) {
                if (!descriptionsForTopic.isEmpty()) {
                    this.doValidateTopic(validationResult, descriptionsForTopic, topicConfigs, topicDescriptionsStillToValidate, (streamsSide, brokerSide) -> this.validatePartitionCount(validationResult, (InternalTopicConfig)streamsSide, (TopicDescription)brokerSide));
                }
                if (!configsForTopic.isEmpty()) {
                    this.doValidateTopic(validationResult, configsForTopic, topicConfigs, topicConfigsStillToValidate, (streamsSide, brokerSide) -> this.validateCleanupPolicy(validationResult, (InternalTopicConfig)streamsSide, (Config)brokerSide));
                }
                this.maybeThrowTimeoutException(Arrays.asList(topicDescriptionsStillToValidate, topicConfigsStillToValidate), deadline, String.format("Could not validate internal topics within %d milliseconds. This can happen if the Kafka cluster is temporarily not available.", this.retryTimeoutMs));
                if (descriptionsForTopic.isEmpty() && configsForTopic.isEmpty()) continue;
                Utils.sleep((long)100L);
            }
            this.maybeSleep(Arrays.asList(topicDescriptionsStillToValidate, topicConfigsStillToValidate), deadline, "validated");
        }
        this.log.info("Completed validation of internal topics {}.", topicConfigs.keySet());
        return validationResult;
    }

    private <V> void doValidateTopic(ValidationResult validationResult, Map<String, KafkaFuture<V>> futuresForTopic, Map<String, InternalTopicConfig> topicsConfigs, Set<String> topicsStillToValidate, BiConsumer<InternalTopicConfig, V> validator) {
        for (String topicName : new HashSet<String>(topicsStillToValidate)) {
            if (!futuresForTopic.containsKey(topicName)) {
                throw new IllegalStateException("Description results do not contain topics to validate. This indicates a bug. Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the dev-mailing list (https://kafka.apache.org/contact).");
            }
            KafkaFuture<V> future = futuresForTopic.get(topicName);
            if (!future.isDone()) continue;
            try {
                Object brokerSideTopicConfig = future.get();
                InternalTopicConfig streamsSideTopicConfig = topicsConfigs.get(topicName);
                validator.accept(streamsSideTopicConfig, brokerSideTopicConfig);
                topicsStillToValidate.remove(topicName);
            }
            catch (ExecutionException executionException) {
                Throwable cause = executionException.getCause();
                if (cause instanceof UnknownTopicOrPartitionException) {
                    this.log.info("Internal topic {} is missing", (Object)topicName);
                    validationResult.addMissingTopic(topicName);
                    topicsStillToValidate.remove(topicName);
                    continue;
                }
                if (cause instanceof LeaderNotAvailableException) {
                    this.log.info("The leader of internal topic {} is not available.", (Object)topicName);
                    continue;
                }
                if (cause instanceof TimeoutException) {
                    this.log.info("Retrieving data for internal topic {} timed out.", (Object)topicName);
                    continue;
                }
                this.log.error("Unexpected error during internal topic validation: ", cause);
                throw new StreamsException(String.format("Could not validate internal topic %s for the following reason: ", topicName), cause);
            }
            catch (InterruptedException interruptedException) {
                throw new InterruptException(interruptedException);
            }
            finally {
                futuresForTopic.remove(topicName);
            }
        }
    }

    private void validatePartitionCount(ValidationResult validationResult, InternalTopicConfig topicConfig, TopicDescription topicDescription) {
        String topicName = topicConfig.name();
        int requiredPartitionCount = topicConfig.numberOfPartitions().orElseThrow(() -> new IllegalStateException("No partition count is specified for internal topic " + topicName + ". " + BUG_ERROR_MESSAGE));
        int actualPartitionCount = topicDescription.partitions().size();
        if (actualPartitionCount != requiredPartitionCount) {
            validationResult.addMisconfiguration(topicName, "Internal topic " + topicName + " requires " + requiredPartitionCount + " partitions, but the existing topic on the broker has " + actualPartitionCount + " partitions.");
        }
    }

    private void validateCleanupPolicy(ValidationResult validationResult, InternalTopicConfig topicConfig, Config brokerSideTopicConfig) {
        if (topicConfig instanceof UnwindowedUnversionedChangelogTopicConfig) {
            this.validateCleanupPolicyForUnwindowedUnversionedChangelogs(validationResult, topicConfig, brokerSideTopicConfig);
        } else if (topicConfig instanceof WindowedChangelogTopicConfig) {
            this.validateCleanupPolicyForWindowedChangelogs(validationResult, topicConfig, brokerSideTopicConfig);
        } else if (topicConfig instanceof VersionedChangelogTopicConfig) {
            this.validateCleanupPolicyForVersionedChangelogs(validationResult, topicConfig, brokerSideTopicConfig);
        } else if (topicConfig instanceof RepartitionTopicConfig) {
            this.validateCleanupPolicyForRepartitionTopic(validationResult, topicConfig, brokerSideTopicConfig);
        } else {
            throw new IllegalStateException("Internal topic " + topicConfig.name() + " has unknown type.");
        }
    }

    private void validateCleanupPolicyForUnwindowedUnversionedChangelogs(ValidationResult validationResult, InternalTopicConfig topicConfig, Config brokerSideTopicConfig) {
        String topicName = topicConfig.name();
        String cleanupPolicy = this.getBrokerSideConfigValue(brokerSideTopicConfig, "cleanup.policy", topicName);
        if (cleanupPolicy.contains("delete")) {
            validationResult.addMisconfiguration(topicName, "Cleanup policy (cleanup.policy) of existing internal topic " + topicName + " should not contain \"" + "delete" + "\".");
        }
    }

    private void validateCleanupPolicyForWindowedChangelogs(ValidationResult validationResult, InternalTopicConfig topicConfig, Config brokerSideTopicConfig) {
        String topicName = topicConfig.name();
        String cleanupPolicy = this.getBrokerSideConfigValue(brokerSideTopicConfig, "cleanup.policy", topicName);
        if (cleanupPolicy.contains("delete")) {
            String brokerSideRetentionBytes;
            Map<String, String> streamsSideConfig;
            long streamsSideRetentionMs;
            long brokerSideRetentionMs = Long.parseLong(this.getBrokerSideConfigValue(brokerSideTopicConfig, "retention.ms", topicName));
            if (brokerSideRetentionMs < (streamsSideRetentionMs = Long.parseLong((streamsSideConfig = topicConfig.getProperties(this.defaultTopicConfigs, this.windowChangeLogAdditionalRetention)).get("retention.ms")))) {
                validationResult.addMisconfiguration(topicName, "Retention time (retention.ms) of existing internal topic " + topicName + " is " + brokerSideRetentionMs + " but should be " + streamsSideRetentionMs + " or larger.");
            }
            if ((brokerSideRetentionBytes = this.getBrokerSideConfigValue(brokerSideTopicConfig, "retention.bytes", topicName)) != null) {
                validationResult.addMisconfiguration(topicName, "Retention byte (retention.bytes) of existing internal topic " + topicName + " is set but it should be unset.");
            }
        }
    }

    private void validateCleanupPolicyForVersionedChangelogs(ValidationResult validationResult, InternalTopicConfig topicConfig, Config brokerSideTopicConfig) {
        Map<String, String> streamsSideConfig;
        long streamsSideCompactionLagMs;
        long brokerSideCompactionLagMs;
        String topicName = topicConfig.name();
        String cleanupPolicy = this.getBrokerSideConfigValue(brokerSideTopicConfig, "cleanup.policy", topicName);
        if (cleanupPolicy.contains("delete")) {
            validationResult.addMisconfiguration(topicName, "Cleanup policy (cleanup.policy) of existing internal topic " + topicName + " should not contain \"" + "delete" + "\".");
        }
        if ((brokerSideCompactionLagMs = Long.parseLong(this.getBrokerSideConfigValue(brokerSideTopicConfig, "min.compaction.lag.ms", topicName))) < (streamsSideCompactionLagMs = Long.parseLong((streamsSideConfig = topicConfig.getProperties(this.defaultTopicConfigs, this.windowChangeLogAdditionalRetention)).get("min.compaction.lag.ms")))) {
            validationResult.addMisconfiguration(topicName, "Min compaction lag (min.compaction.lag.ms) of existing internal topic " + topicName + " is " + brokerSideCompactionLagMs + " but should be " + streamsSideCompactionLagMs + " or larger.");
        }
    }

    private void validateCleanupPolicyForRepartitionTopic(ValidationResult validationResult, InternalTopicConfig topicConfig, Config brokerSideTopicConfig) {
        String topicName = topicConfig.name();
        String cleanupPolicy = this.getBrokerSideConfigValue(brokerSideTopicConfig, "cleanup.policy", topicName);
        if (cleanupPolicy.contains("compact")) {
            validationResult.addMisconfiguration(topicName, "Cleanup policy (cleanup.policy) of existing internal topic " + topicName + " should not contain \"" + "compact" + "\".");
        } else if (cleanupPolicy.contains("delete")) {
            String brokerSideRetentionBytes;
            long brokerSideRetentionMs = Long.parseLong(this.getBrokerSideConfigValue(brokerSideTopicConfig, "retention.ms", topicName));
            if (brokerSideRetentionMs != -1L) {
                validationResult.addMisconfiguration(topicName, "Retention time (retention.ms) of existing internal topic " + topicName + " is " + brokerSideRetentionMs + " but should be -1.");
            }
            if ((brokerSideRetentionBytes = this.getBrokerSideConfigValue(brokerSideTopicConfig, "retention.bytes", topicName)) != null) {
                validationResult.addMisconfiguration(topicName, "Retention byte (retention.bytes) of existing internal topic " + topicName + " is set but it should be unset.");
            }
        }
    }

    private String getBrokerSideConfigValue(Config brokerSideTopicConfig, String configName, String topicName) {
        ConfigEntry brokerSideConfigEntry = brokerSideTopicConfig.get(configName);
        if (brokerSideConfigEntry == null) {
            throw new IllegalStateException("The config " + configName + " for topic " + topicName + " could not be retrieved from the brokers. " + BUG_ERROR_MESSAGE);
        }
        return brokerSideConfigEntry.value();
    }

    public Set<String> makeReady(Map<String, InternalTopicConfig> topics) {
        this.log.debug("Starting to validate internal topics {} in partition assignor.", topics);
        long currentWallClockMs = this.time.milliseconds();
        long deadlineMs = currentWallClockMs + this.retryTimeoutMs;
        Set<String> topicsNotReady = new HashSet<String>(topics.keySet());
        HashSet<String> newlyCreatedTopics = new HashSet<String>();
        while (!topicsNotReady.isEmpty()) {
            HashSet<String> tempUnknownTopics = new HashSet<String>();
            topicsNotReady = this.validateTopics(topicsNotReady, topics, tempUnknownTopics);
            newlyCreatedTopics.addAll(topicsNotReady);
            if (!topicsNotReady.isEmpty()) {
                HashSet<NewTopic> newTopics = new HashSet<NewTopic>();
                for (String topicName : topicsNotReady) {
                    if (tempUnknownTopics.contains(topicName)) continue;
                    InternalTopicConfig internalTopicConfig = Objects.requireNonNull(topics.get(topicName));
                    Map<String, String> topicConfig = internalTopicConfig.getProperties(this.defaultTopicConfigs, this.windowChangeLogAdditionalRetention);
                    this.log.debug("Going to create topic {} with {} partitions and config {}.", new Object[]{internalTopicConfig.name(), internalTopicConfig.numberOfPartitions(), topicConfig});
                    newTopics.add(new NewTopic(internalTopicConfig.name(), internalTopicConfig.numberOfPartitions(), Optional.of(this.replicationFactor)).configs(topicConfig));
                }
                if (!newTopics.isEmpty()) {
                    CreateTopicsResult createTopicsResult = this.adminClient.createTopics(newTopics);
                    for (Map.Entry createTopicResult : createTopicsResult.values().entrySet()) {
                        String topicName = (String)createTopicResult.getKey();
                        try {
                            ((KafkaFuture)createTopicResult.getValue()).get();
                            topicsNotReady.remove(topicName);
                        }
                        catch (InterruptedException fatalException) {
                            Thread.currentThread().interrupt();
                            this.log.error(INTERRUPTED_ERROR_MESSAGE, (Throwable)fatalException);
                            throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, fatalException);
                        }
                        catch (ExecutionException executionException) {
                            Throwable cause = executionException.getCause();
                            if (cause instanceof TopicExistsException) {
                                this.log.info("Could not create topic {}. Topic is probably marked for deletion (number of partitions is unknown).\nWill retry to create this topic in {} ms (to let broker finish async delete operation first).\nError message was: {}", new Object[]{topicName, this.retryBackOffMs, cause.toString()});
                                continue;
                            }
                            this.log.error("Unexpected error during topic creation for {}.\nError message was: {}", (Object)topicName, (Object)cause.toString());
                            if (cause instanceof UnsupportedVersionException) {
                                String errorMessage = cause.getMessage();
                                if (errorMessage == null || !errorMessage.startsWith("Creating topics with default partitions/replication factor are only supported in CreateTopicRequest version 4+")) continue;
                                throw new StreamsException(String.format("Could not create topic %s, because brokers don't support configuration replication.factor=-1. You can change the replication.factor config or upgrade your brokers to version 2.4 or newer to avoid this error.", topicName));
                            }
                            if (cause instanceof TimeoutException) {
                                this.log.error("Creating topic {} timed out.\nError message was: {}", (Object)topicName, (Object)cause.toString());
                                continue;
                            }
                            throw new StreamsException(String.format("Could not create topic %s.", topicName), cause);
                        }
                    }
                }
            }
            if (topicsNotReady.isEmpty()) continue;
            currentWallClockMs = this.time.milliseconds();
            if (currentWallClockMs >= deadlineMs) {
                String timeoutError = String.format("Could not create topics within %d milliseconds. This can happen if the Kafka cluster is temporarily not available.", this.retryTimeoutMs);
                this.log.error(timeoutError);
                throw new TimeoutException(timeoutError);
            }
            this.log.info("Topics {} could not be made ready. Will retry in {} milliseconds. Remaining time in milliseconds: {}", new Object[]{topicsNotReady, this.retryBackOffMs, deadlineMs - currentWallClockMs});
            Utils.sleep((long)this.retryBackOffMs);
        }
        this.log.debug("Completed validating internal topics and created {}", newlyCreatedTopics);
        return newlyCreatedTopics;
    }

    protected Map<String, Integer> getNumPartitions(Set<String> topics, Set<String> tempUnknownTopics) {
        this.log.debug("Trying to check if topics {} have been created with expected number of partitions.", topics);
        DescribeTopicsResult describeTopicsResult = this.adminClient.describeTopics(topics);
        Map futures = describeTopicsResult.topicNameValues();
        HashMap<String, Integer> existedTopicPartition = new HashMap<String, Integer>();
        for (Map.Entry topicFuture : futures.entrySet()) {
            String topicName = (String)topicFuture.getKey();
            try {
                TopicDescription topicDescription = (TopicDescription)((KafkaFuture)topicFuture.getValue()).get();
                existedTopicPartition.put(topicName, topicDescription.partitions().size());
            }
            catch (InterruptedException fatalException) {
                Thread.currentThread().interrupt();
                this.log.error(INTERRUPTED_ERROR_MESSAGE, (Throwable)fatalException);
                throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, fatalException);
            }
            catch (ExecutionException couldNotDescribeTopicException) {
                Throwable cause = couldNotDescribeTopicException.getCause();
                if (cause instanceof UnknownTopicOrPartitionException) {
                    this.log.debug("Topic {} is unknown or not found, hence not existed yet.\nError message was: {}", (Object)topicName, (Object)cause.toString());
                    continue;
                }
                if (cause instanceof LeaderNotAvailableException) {
                    tempUnknownTopics.add(topicName);
                    this.log.debug("The leader of topic {} is not available.\nError message was: {}", (Object)topicName, (Object)cause.toString());
                    continue;
                }
                if (cause instanceof TimeoutException) {
                    tempUnknownTopics.add(topicName);
                    this.log.debug("Describing topic {} (to get number of partitions) timed out.\nError message was: {}", (Object)topicName, (Object)cause.toString());
                    continue;
                }
                this.log.error("Unexpected error during topic description for {}.\nError message was: {}", (Object)topicName, (Object)cause.toString());
                throw new StreamsException(String.format("Could not create topic %s.", topicName), cause);
            }
        }
        return existedTopicPartition;
    }

    private Set<String> validateTopics(Set<String> topicsToValidate, Map<String, InternalTopicConfig> topicsMap, Set<String> tempUnknownTopics) {
        if (!topicsMap.keySet().containsAll(topicsToValidate)) {
            throw new IllegalStateException("The topics map " + topicsMap.keySet() + " does not contain all the topics " + topicsToValidate + " trying to validate.");
        }
        Map<String, Integer> existedTopicPartition = this.getNumPartitions(topicsToValidate, tempUnknownTopics);
        HashSet<String> topicsToCreate = new HashSet<String>();
        for (String topicName : topicsToValidate) {
            Optional<Integer> numberOfPartitions = topicsMap.get(topicName).numberOfPartitions();
            if (!numberOfPartitions.isPresent()) {
                this.log.error("Found undefined number of partitions for topic {}", (Object)topicName);
                throw new StreamsException("Topic " + topicName + " number of partitions not defined");
            }
            if (existedTopicPartition.containsKey(topicName)) {
                if (existedTopicPartition.get(topicName).equals(numberOfPartitions.get())) continue;
                String errorMsg = String.format("Existing internal topic %s has invalid partitions: expected: %d; actual: %d. Use 'org.apache.kafka.tools.StreamsResetter' tool to clean up invalid topics before processing.", topicName, numberOfPartitions.get(), existedTopicPartition.get(topicName));
                this.log.error(errorMsg);
                throw new StreamsException(errorMsg);
            }
            topicsToCreate.add(topicName);
        }
        return topicsToCreate;
    }

    public void setup(Map<String, InternalTopicConfig> topicConfigs) {
        this.log.info("Starting to setup internal topics {}.", topicConfigs.keySet());
        long now = this.time.milliseconds();
        long deadline = now + this.retryTimeoutMs;
        Map<String, Map> streamsSideTopicConfigs = topicConfigs.values().stream().collect(Collectors.toMap(InternalTopicConfig::name, topicConfig -> topicConfig.getProperties(this.defaultTopicConfigs, this.windowChangeLogAdditionalRetention)));
        HashSet<String> createdTopics = new HashSet<String>();
        HashSet<String> topicStillToCreate = new HashSet<String>(topicConfigs.keySet());
        while (!topicStillToCreate.isEmpty()) {
            Set newTopics = topicStillToCreate.stream().map(topicName -> new NewTopic(topicName, ((InternalTopicConfig)topicConfigs.get(topicName)).numberOfPartitions(), Optional.of(this.replicationFactor)).configs((Map)streamsSideTopicConfigs.get(topicName))).collect(Collectors.toSet());
            this.log.info("Going to create internal topics: " + newTopics);
            CreateTopicsResult createTopicsResult = this.adminClient.createTopics(newTopics);
            this.processCreateTopicResults(createTopicsResult, topicStillToCreate, createdTopics, deadline);
            this.maybeSleep(Collections.singletonList(topicStillToCreate), deadline, "created");
        }
        this.log.info("Completed setup of internal topics {}.", topicConfigs.keySet());
    }

    private void processCreateTopicResults(CreateTopicsResult createTopicsResult, Set<String> topicStillToCreate, Set<String> createdTopics, long deadline) {
        HashMap<String, Throwable> lastErrorsSeenForTopic = new HashMap<String, Throwable>();
        Map createResultForTopic = createTopicsResult.values();
        while (!createResultForTopic.isEmpty()) {
            for (String topicName : new HashSet<String>(topicStillToCreate)) {
                if (!createResultForTopic.containsKey(topicName)) {
                    this.cleanUpCreatedTopics(createdTopics);
                    throw new IllegalStateException("Create topic results do not contain internal topic " + topicName + " to setup. " + BUG_ERROR_MESSAGE);
                }
                KafkaFuture createResult = (KafkaFuture)createResultForTopic.get(topicName);
                if (!createResult.isDone()) continue;
                try {
                    createResult.get();
                    createdTopics.add(topicName);
                    topicStillToCreate.remove(topicName);
                }
                catch (ExecutionException executionException) {
                    Throwable cause = executionException.getCause();
                    if (cause instanceof TopicExistsException) {
                        lastErrorsSeenForTopic.put(topicName, cause);
                        this.log.info("Internal topic {} already exists. Topic is probably marked for deletion. Will retry to create this topic later (to let broker complete async delete operation first)", (Object)topicName);
                        continue;
                    }
                    if (cause instanceof TimeoutException) {
                        lastErrorsSeenForTopic.put(topicName, cause);
                        this.log.info("Creating internal topic {} timed out.", (Object)topicName);
                        continue;
                    }
                    this.cleanUpCreatedTopics(createdTopics);
                    this.log.error("Unexpected error during creation of internal topic: ", cause);
                    throw new StreamsException(String.format("Could not create internal topic %s for the following reason: ", topicName), cause);
                }
                catch (InterruptedException interruptedException) {
                    throw new InterruptException(interruptedException);
                }
                finally {
                    createResultForTopic.remove(topicName);
                }
            }
            this.maybeThrowTimeoutExceptionDuringSetup(topicStillToCreate, createdTopics, lastErrorsSeenForTopic, deadline);
            if (createResultForTopic.isEmpty()) continue;
            Utils.sleep((long)100L);
        }
    }

    private void cleanUpCreatedTopics(Set<String> topicsToCleanUp) {
        this.log.info("Starting to clean up internal topics {}.", topicsToCleanUp);
        long now = this.time.milliseconds();
        long deadline = now + this.retryTimeoutMs;
        HashSet<String> topicsStillToCleanup = new HashSet<String>(topicsToCleanUp);
        while (!topicsStillToCleanup.isEmpty()) {
            this.log.info("Going to cleanup internal topics: " + topicsStillToCleanup);
            DeleteTopicsResult deleteTopicsResult = this.adminClient.deleteTopics(topicsStillToCleanup);
            Map deleteResultForTopic = deleteTopicsResult.topicNameValues();
            while (!deleteResultForTopic.isEmpty()) {
                for (String topicName : new HashSet<String>(topicsStillToCleanup)) {
                    if (!deleteResultForTopic.containsKey(topicName)) {
                        throw new IllegalStateException("Delete topic results do not contain internal topic " + topicName + " to clean up. " + BUG_ERROR_MESSAGE);
                    }
                    KafkaFuture deleteResult = (KafkaFuture)deleteResultForTopic.get(topicName);
                    if (!deleteResult.isDone()) continue;
                    try {
                        deleteResult.get();
                        topicsStillToCleanup.remove(topicName);
                    }
                    catch (ExecutionException executionException) {
                        Throwable cause = executionException.getCause();
                        if (cause instanceof UnknownTopicOrPartitionException) {
                            this.log.info("Internal topic {} to clean up is missing", (Object)topicName);
                            continue;
                        }
                        if (cause instanceof LeaderNotAvailableException) {
                            this.log.info("The leader of internal topic {} to clean up is not available.", (Object)topicName);
                            continue;
                        }
                        if (cause instanceof TimeoutException) {
                            this.log.info("Cleaning up internal topic {} timed out.", (Object)topicName);
                            continue;
                        }
                        this.log.error("Unexpected error during cleanup of internal topics: ", cause);
                        throw new StreamsException(String.format("Could not clean up internal topics %s, because during the cleanup of topic %s the following error occurred: ", topicsStillToCleanup, topicName), cause);
                    }
                    catch (InterruptedException interruptedException) {
                        throw new InterruptException(interruptedException);
                    }
                    finally {
                        deleteResultForTopic.remove(topicName);
                    }
                }
                this.maybeThrowTimeoutException(Collections.singletonList(topicsStillToCleanup), deadline, String.format("Could not cleanup internal topics within %d milliseconds. This can happen if the Kafka cluster is temporarily not available or the broker did not complete topic creation before the cleanup. The following internal topics could not be cleaned up: %s", this.retryTimeoutMs, topicsStillToCleanup));
                if (deleteResultForTopic.isEmpty()) continue;
                Utils.sleep((long)100L);
            }
            this.maybeSleep(Collections.singletonList(topicsStillToCleanup), deadline, "validated");
        }
        this.log.info("Completed cleanup of internal topics {}.", topicsToCleanUp);
    }

    private void maybeThrowTimeoutException(List<Set<String>> topicStillToProcess, long deadline, String errorMessage) {
        long now;
        if (topicStillToProcess.stream().anyMatch(resultSet -> !resultSet.isEmpty()) && (now = this.time.milliseconds()) >= deadline) {
            this.log.error(errorMessage);
            throw new TimeoutException(errorMessage);
        }
    }

    private void maybeThrowTimeoutExceptionDuringSetup(Set<String> topicStillToProcess, Set<String> createdTopics, Map<String, Throwable> lastErrorsSeenForTopic, long deadline) {
        long now;
        if (topicStillToProcess.stream().anyMatch(resultSet -> !resultSet.isEmpty()) && (now = this.time.milliseconds()) >= deadline) {
            this.cleanUpCreatedTopics(createdTopics);
            String errorMessage = String.format("Could not create internal topics within %d milliseconds. This can happen if the Kafka cluster is temporarily not available or a topic is marked for deletion and the broker did not complete its deletion within the timeout. The last errors seen per topic are: %s", this.retryTimeoutMs, lastErrorsSeenForTopic);
            this.log.error(errorMessage);
            throw new TimeoutException(errorMessage);
        }
    }

    private void maybeSleep(List<Set<String>> resultSetsStillToValidate, long deadline, String action) {
        if (resultSetsStillToValidate.stream().anyMatch(resultSet -> !resultSet.isEmpty())) {
            long now = this.time.milliseconds();
            this.log.info("Internal topics {} could not be {}. Will retry in {} milliseconds. Remaining time in milliseconds: {}", new Object[]{resultSetsStillToValidate.stream().flatMap(Collection::stream).collect(Collectors.toSet()), action, this.retryBackOffMs, deadline - now});
            Utils.sleep((long)this.retryBackOffMs);
        }
    }

    static class ValidationResult {
        private final Set<String> missingTopics = new HashSet<String>();
        private final Map<String, List<String>> misconfigurationsForTopics = new HashMap<String, List<String>>();

        ValidationResult() {
        }

        public void addMissingTopic(String topic) {
            this.missingTopics.add(topic);
        }

        public Set<String> missingTopics() {
            return Collections.unmodifiableSet(this.missingTopics);
        }

        public void addMisconfiguration(String topic, String message) {
            this.misconfigurationsForTopics.computeIfAbsent(topic, ignored -> new ArrayList()).add(message);
        }

        public Map<String, List<String>> misconfigurationsForTopics() {
            return Collections.unmodifiableMap(this.misconfigurationsForTopics);
        }
    }
}

