/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.trogdor.common;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.regex.Pattern;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DescribeTopicsOptions;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;

public final class WorkerUtils {
    private static final int ADMIN_REQUEST_TIMEOUT = 25000;
    private static final int CREATE_TOPICS_CALL_TIMEOUT = 180000;
    private static final int MAX_CREATE_TOPICS_BATCH_SIZE = 10;

    public static void abort(Logger log, String what, Throwable exception, KafkaFutureImpl<String> doneFuture) throws KafkaException {
        log.warn("{} caught an exception", (Object)what, (Object)exception);
        if (exception.getMessage() == null || exception.getMessage().isEmpty()) {
            doneFuture.complete(exception.getClass().getCanonicalName());
        } else {
            doneFuture.complete(exception.getMessage());
        }
        throw new KafkaException(exception);
    }

    public static int perSecToPerPeriod(float perSec, long periodMs) {
        float period = (float)periodMs / 1000.0f;
        float perPeriod = perSec * period;
        perPeriod = Math.max(1.0f, perPeriod);
        return (int)perPeriod;
    }

    public static void addConfigsToProperties(Properties props, Map<String, String> commonConf, Map<String, String> clientConf) {
        for (Map.Entry<String, String> commonEntry : commonConf.entrySet()) {
            props.setProperty(commonEntry.getKey(), commonEntry.getValue());
        }
        for (Map.Entry<String, String> entry : clientConf.entrySet()) {
            props.setProperty(entry.getKey(), entry.getValue());
        }
    }

    public static void createTopics(Logger log, String bootstrapServers, Map<String, String> commonClientConf, Map<String, String> adminClientConf, Map<String, NewTopic> topics, boolean failOnExisting) throws Throwable {
        try (Admin adminClient = WorkerUtils.createAdminClient(bootstrapServers, commonClientConf, adminClientConf);){
            WorkerUtils.createTopics(log, adminClient, topics, failOnExisting);
        }
        catch (Exception e) {
            log.warn("Failed to create or verify topics {}", topics, (Object)e);
            throw e;
        }
    }

    static void createTopics(Logger log, Admin adminClient, Map<String, NewTopic> topics, boolean failOnExisting) throws Throwable {
        if (topics.isEmpty()) {
            log.warn("Request to create topics has an empty topic list.");
            return;
        }
        Collection<String> topicsExists = WorkerUtils.createTopics(log, adminClient, topics.values());
        if (!topicsExists.isEmpty()) {
            if (failOnExisting) {
                log.warn("Topic(s) {} already exist.", topicsExists);
                throw new TopicExistsException("One or more topics already exist.");
            }
            WorkerUtils.verifyTopics(log, adminClient, topicsExists, topics, 3, 2500L);
        }
    }

    private static Collection<String> createTopics(Logger log, Admin adminClient, Collection<NewTopic> topics) throws Throwable {
        ArrayList<String> existingTopics;
        block9: {
            long startMs = Time.SYSTEM.milliseconds();
            int tries = 0;
            existingTopics = new ArrayList<String>();
            HashMap<String, NewTopic> newTopics = new HashMap<String, NewTopic>();
            for (NewTopic newTopic : topics) {
                newTopics.put(newTopic.name(), newTopic);
            }
            ArrayList topicsToCreate = new ArrayList(newTopics.keySet());
            do {
                String topicName;
                log.info("Attempting to create {} topics (try {})...", (Object)topicsToCreate.size(), (Object)(++tries));
                HashMap<String, KafkaFuture<Void>> creations = new HashMap<String, KafkaFuture<Void>>();
                while (!topicsToCreate.isEmpty()) {
                    ArrayList newTopicsBatch = new ArrayList();
                    for (int i = 0; i < 10 && !topicsToCreate.isEmpty(); ++i) {
                        topicName = (String)topicsToCreate.remove(0);
                        newTopicsBatch.add(newTopics.get(topicName));
                    }
                    creations.putAll(adminClient.createTopics(newTopicsBatch).values());
                }
                for (Map.Entry entry : creations.entrySet()) {
                    topicName = (String)entry.getKey();
                    Future future = (Future)entry.getValue();
                    try {
                        future.get();
                        log.debug("Successfully created {}.", (Object)topicName);
                    }
                    catch (Exception e) {
                        if (e.getCause() instanceof TimeoutException || e.getCause() instanceof NotEnoughReplicasException) {
                            log.warn("Attempt to create topic `{}` failed: {}", (Object)topicName, (Object)e.getCause().getMessage());
                            topicsToCreate.add(topicName);
                            continue;
                        }
                        if (e.getCause() instanceof TopicExistsException) {
                            log.info("Topic {} already exists.", (Object)topicName);
                            existingTopics.add(topicName);
                            continue;
                        }
                        log.warn("Failed to create {}", (Object)topicName, (Object)e.getCause());
                        throw e.getCause();
                    }
                }
                if (topicsToCreate.isEmpty()) break block9;
            } while (Time.SYSTEM.milliseconds() <= startMs + 180000L);
            String str = "Unable to create topic(s): " + Utils.join(topicsToCreate, ", ") + "after " + tries + " attempt(s)";
            log.warn(str);
            throw new TimeoutException(str);
        }
        return existingTopics;
    }

    static void verifyTopics(Logger log, Admin adminClient, Collection<String> topicsToVerify, Map<String, NewTopic> topicsInfo, int retryCount, long retryBackoffMs) throws Throwable {
        Map<String, TopicDescription> topicDescriptionMap = WorkerUtils.topicDescriptions(topicsToVerify, adminClient, retryCount, retryBackoffMs);
        for (TopicDescription desc : topicDescriptionMap.values()) {
            int partitions = topicsInfo.get(desc.name()).numPartitions();
            if (partitions == -1 || desc.partitions().size() == partitions) continue;
            String str = "Topic '" + desc.name() + "' exists, but has " + desc.partitions().size() + " partitions, while requested  number of partitions is " + partitions;
            log.warn(str);
            throw new RuntimeException(str);
        }
    }

    private static Map<String, TopicDescription> topicDescriptions(Collection<String> topicsToVerify, Admin adminClient, int retryCount, long retryBackoffMs) throws ExecutionException, InterruptedException {
        UnknownTopicOrPartitionException lastException = null;
        for (int i = 0; i < retryCount; ++i) {
            try {
                DescribeTopicsResult topicsResult = adminClient.describeTopics(topicsToVerify, new DescribeTopicsOptions().timeoutMs(25000));
                return topicsResult.all().get();
            }
            catch (ExecutionException exception) {
                if (!(exception.getCause() instanceof UnknownTopicOrPartitionException)) {
                    throw exception;
                }
                lastException = (UnknownTopicOrPartitionException)exception.getCause();
                Thread.sleep(retryBackoffMs);
                continue;
            }
        }
        throw lastException;
    }

    static Collection<TopicPartition> getMatchingTopicPartitions(Admin adminClient, String topicRegex, int startPartition, int endPartition) throws Throwable {
        Pattern topicNamePattern = Pattern.compile(topicRegex);
        ArrayList<String> matchedTopics = new ArrayList<String>();
        ListTopicsResult res = adminClient.listTopics(new ListTopicsOptions().timeoutMs(25000));
        Map<String, TopicListing> topicListingMap = res.namesToListings().get();
        for (Map.Entry<String, TopicListing> topicListingEntry : topicListingMap.entrySet()) {
            if (topicListingEntry.getValue().isInternal() || !topicNamePattern.matcher(topicListingEntry.getKey()).matches()) continue;
            matchedTopics.add(topicListingEntry.getKey());
        }
        ArrayList<TopicPartition> out = new ArrayList<TopicPartition>();
        DescribeTopicsResult topicsResult = adminClient.describeTopics(matchedTopics, new DescribeTopicsOptions().timeoutMs(25000));
        Map<String, TopicDescription> topicDescriptionMap = topicsResult.all().get();
        for (TopicDescription desc : topicDescriptionMap.values()) {
            List<TopicPartitionInfo> partitions = desc.partitions();
            for (TopicPartitionInfo info : partitions) {
                if (info.partition() < startPartition || info.partition() > endPartition) continue;
                out.add(new TopicPartition(desc.name(), info.partition()));
            }
        }
        return out;
    }

    private static Admin createAdminClient(String bootstrapServers, Map<String, String> commonClientConf, Map<String, String> adminClientConf) {
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("request.timeout.ms", (Object)25000);
        WorkerUtils.addConfigsToProperties(props, commonClientConf, adminClientConf);
        return Admin.create(props);
    }
}

