/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.streamnative.pulsar.handlers.kop.DelayedCreatePartitions;
import io.streamnative.pulsar.handlers.kop.DelayedCreateTopics;
import io.streamnative.pulsar.handlers.kop.KafkaLogConfig;
import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
import io.streamnative.pulsar.handlers.kop.exceptions.KoPTopicException;
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperation;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory;
import io.streamnative.pulsar.handlers.kop.utils.timer.SystemTimer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidPartitionsException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.CreatePartitionsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.DescribeConfigsResponse;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AdminManager {
    private static final Logger log = LoggerFactory.getLogger(AdminManager.class);
    private final DelayedOperationPurgatory<DelayedOperation> topicPurgatory = DelayedOperationPurgatory.builder().purgatoryName("topic").timeoutTimer(SystemTimer.builder().executorName("topic").build()).build();
    private final PulsarAdmin admin;
    private final int defaultNumPartitions;
    private volatile Map<String, Set<Node>> brokersCache = Maps.newHashMap();
    private final ReentrantReadWriteLock brokersCacheLock = new ReentrantReadWriteLock();
    private final Random random = new Random();
    private volatile Map<String, Integer> controllerId = Maps.newHashMap();

    public AdminManager(PulsarAdmin admin, KafkaServiceConfiguration conf) {
        this.admin = admin;
        this.defaultNumPartitions = conf.getDefaultNumPartitions();
    }

    public void shutdown() {
        this.topicPurgatory.shutdown();
    }

    public CompletableFuture<Map<String, ApiError>> createTopicsAsync(Map<String, CreateTopicsRequestData.CreatableTopic> createInfo, int timeoutMs, String namespacePrefix) {
        ConcurrentHashMap futureMap = new ConcurrentHashMap();
        AtomicInteger numTopics = new AtomicInteger(createInfo.size());
        CompletableFuture<Map<String, ApiError>> resultFuture = new CompletableFuture<Map<String, ApiError>>();
        Runnable complete = () -> {
            numTopics.set(0);
            futureMap.values().forEach(future -> {
                if (!future.isDone()) {
                    future.complete(new ApiError(Errors.REQUEST_TIMED_OUT, null));
                }
            });
            resultFuture.complete(futureMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> ((CompletableFuture)entry.getValue()).getNow(ApiError.NONE))));
        };
        createInfo.forEach((topic, detail) -> {
            KopTopic kopTopic;
            CompletableFuture<ApiError> errorFuture = new CompletableFuture<ApiError>();
            futureMap.put(topic, errorFuture);
            try {
                kopTopic = new KopTopic((String)topic, namespacePrefix);
            }
            catch (KoPTopicException e2) {
                errorFuture.complete(ApiError.fromThrowable((Throwable)e2));
                if (numTopics.decrementAndGet() == 0) {
                    complete.run();
                }
                return;
            }
            int numPartitions = detail.numPartitions();
            if (numPartitions == -1) {
                numPartitions = this.defaultNumPartitions;
            }
            if (numPartitions < 0) {
                errorFuture.complete(ApiError.fromThrowable((Throwable)new InvalidRequestException("The partition '" + numPartitions + "' is negative")));
                if (numTopics.decrementAndGet() == 0) {
                    complete.run();
                }
                return;
            }
            this.admin.topics().createPartitionedTopicAsync(kopTopic.getFullName(), numPartitions).whenComplete((ignored, e) -> {
                if (e == null) {
                    if (log.isDebugEnabled()) {
                        log.debug("Successfully create topic '{}'", topic);
                    }
                } else {
                    log.error("Failed to create topic '{}': {}", topic, e);
                }
                if (e == null) {
                    errorFuture.complete(ApiError.NONE);
                } else if (e instanceof PulsarAdminException.ConflictException) {
                    errorFuture.complete(ApiError.fromThrowable((Throwable)new TopicExistsException("Topic '" + topic + "' already exists.")));
                } else {
                    errorFuture.complete(ApiError.fromThrowable((Throwable)e));
                }
                if (numTopics.decrementAndGet() == 0) {
                    complete.run();
                }
            });
        });
        if (timeoutMs <= 0) {
            complete.run();
        } else {
            List<Object> delayedCreateKeys = createInfo.keySet().stream().map(DelayedOperationKey.TopicKey::new).collect(Collectors.toList());
            DelayedCreateTopics delayedCreate = new DelayedCreateTopics(timeoutMs, numTopics, complete);
            this.topicPurgatory.tryCompleteElseWatch(delayedCreate, delayedCreateKeys);
        }
        return resultFuture;
    }

    CompletableFuture<Map<ConfigResource, DescribeConfigsResponse.Config>> describeConfigsAsync(Map<ConfigResource, Optional<Set<String>>> resourceToConfigNames, String namespacePrefix) {
        DescribeConfigsResponse.Config defaultTopicConfig = new DescribeConfigsResponse.Config(ApiError.NONE, (Collection)KafkaLogConfig.getEntries().entrySet().stream().map(entry -> new DescribeConfigsResponse.ConfigEntry((String)entry.getKey(), (String)entry.getValue(), DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG, false, false, Collections.emptyList())).collect(Collectors.toList()));
        Map<ConfigResource, CompletableFuture> futureMap = resourceToConfigNames.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> {
            ConfigResource resource = (ConfigResource)entry.getKey();
            try {
                CompletableFuture<DescribeConfigsResponse.Config> future = new CompletableFuture<DescribeConfigsResponse.Config>();
                switch (resource.type()) {
                    case TOPIC: {
                        KopTopic kopTopic = new KopTopic(resource.name(), namespacePrefix);
                        this.admin.topics().getPartitionedTopicMetadataAsync(kopTopic.getFullName()).whenComplete((metadata, e) -> {
                            if (e != null) {
                                if (e instanceof PulsarAdminException.NotFoundException) {
                                    ApiError error = new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "Topic " + kopTopic.getOriginalName() + " doesn't exist");
                                    future.complete(new DescribeConfigsResponse.Config(error, Collections.emptyList()));
                                } else {
                                    future.complete(new DescribeConfigsResponse.Config(ApiError.fromThrowable((Throwable)e), Collections.emptyList()));
                                }
                            } else if (metadata.partitions > 0) {
                                future.complete(defaultTopicConfig);
                            } else {
                                ApiError error = new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "Topic " + kopTopic.getOriginalName() + " is non-partitioned");
                                future.complete(new DescribeConfigsResponse.Config(error, Collections.emptyList()));
                            }
                        });
                        break;
                    }
                    case BROKER: {
                        ArrayList<DescribeConfigsResponse.ConfigEntry> dummyConfig = new ArrayList<DescribeConfigsResponse.ConfigEntry>();
                        dummyConfig.add(this.buildDummyEntryConfig("num.partitions", "" + this.defaultNumPartitions));
                        dummyConfig.add(this.buildDummyEntryConfig("default.replication.factor", "1"));
                        dummyConfig.add(this.buildDummyEntryConfig("delete.topic.enable", "true"));
                        future.complete(new DescribeConfigsResponse.Config(ApiError.NONE, dummyConfig));
                        break;
                    }
                    default: {
                        return CompletableFuture.completedFuture(new DescribeConfigsResponse.Config(ApiError.fromThrowable((Throwable)new InvalidRequestException("Unsupported resource type: " + resource.type())), Collections.emptyList()));
                    }
                }
                return future;
            }
            catch (Exception e2) {
                return CompletableFuture.completedFuture(new DescribeConfigsResponse.Config(ApiError.fromThrowable((Throwable)e2), Collections.emptyList()));
            }
        }));
        CompletableFuture<Map<ConfigResource, DescribeConfigsResponse.Config>> resultFuture = new CompletableFuture<Map<ConfigResource, DescribeConfigsResponse.Config>>();
        CompletableFuture.allOf(futureMap.values().toArray(new CompletableFuture[0])).whenComplete((ignored, e) -> {
            if (e != null) {
                resultFuture.completeExceptionally((Throwable)e);
                return;
            }
            resultFuture.complete(futureMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> ((CompletableFuture)entry.getValue()).getNow(null))));
        });
        return resultFuture;
    }

    private DescribeConfigsResponse.ConfigEntry buildDummyEntryConfig(String configName, String configValue) {
        return new DescribeConfigsResponse.ConfigEntry(configName, configValue, DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG, true, true, Collections.emptyList());
    }

    public void deleteTopic(String topicToDelete, Consumer<String> successConsumer, Consumer<String> errorConsumer) {
        ((CompletableFuture)this.admin.topics().deletePartitionedTopicAsync(topicToDelete).thenRun(() -> {
            log.info("delete topic {} successfully.", (Object)topicToDelete);
            successConsumer.accept(topicToDelete);
        })).exceptionally(e -> {
            log.error("delete topic {} failed, exception: ", (Object)topicToDelete, e);
            errorConsumer.accept(topicToDelete);
            return null;
        });
    }

    public void truncateTopic(String topicToDelete, long offset, Position position, Consumer<String> successConsumer, Consumer<String> errorConsumer) {
        log.info("truncateTopic {} at offset {}, pulsar position {}", new Object[]{topicToDelete, offset, position});
        if (position == null) {
            errorConsumer.accept("Cannot find position");
            return;
        }
        if (position.equals(PositionImpl.LATEST)) {
            ((CompletableFuture)this.admin.topics().truncateAsync(topicToDelete).thenRun(() -> {
                log.info("truncated topic {} successfully.", (Object)topicToDelete);
                successConsumer.accept(topicToDelete);
            })).exceptionally(e -> {
                log.error("truncated topic {} failed, exception: ", (Object)topicToDelete, e);
                errorConsumer.accept(topicToDelete);
                return null;
            });
        } else {
            errorConsumer.accept("Not implemented truncate topic at position " + position);
        }
    }

    CompletableFuture<Map<String, ApiError>> createPartitionsAsync(Map<String, CreatePartitionsRequestData.CreatePartitionsTopic> createInfo, int timeoutMs, String namespacePrefix) {
        ConcurrentHashMap futureMap = new ConcurrentHashMap();
        AtomicInteger numTopics = new AtomicInteger(createInfo.size());
        CompletableFuture<Map<String, ApiError>> resultFuture = new CompletableFuture<Map<String, ApiError>>();
        Runnable complete = () -> {
            numTopics.set(0);
            futureMap.values().forEach(future -> {
                if (!future.isDone()) {
                    future.complete(new ApiError(Errors.REQUEST_TIMED_OUT, null));
                }
            });
            resultFuture.complete(futureMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> ((CompletableFuture)entry.getValue()).getNow(ApiError.NONE))));
        };
        createInfo.forEach((topic, newPartitions) -> {
            block8: {
                CompletableFuture<ApiError> errorFuture = new CompletableFuture<ApiError>();
                futureMap.put(topic, errorFuture);
                try {
                    KopTopic kopTopic = new KopTopic((String)topic, namespacePrefix);
                    int numPartitions = newPartitions.count();
                    if (numPartitions < 0) {
                        errorFuture.complete(ApiError.fromThrowable((Throwable)new InvalidPartitionsException("The partition '" + numPartitions + "' is negative")));
                        if (numTopics.decrementAndGet() == 0) {
                            complete.run();
                        }
                    } else if (newPartitions.assignments() != null && !newPartitions.assignments().isEmpty()) {
                        errorFuture.complete(ApiError.fromThrowable((Throwable)new InvalidRequestException("Kop server currently doesn't support manual assignment replica sets '" + newPartitions.assignments().stream().map(CreatePartitionsRequestData.CreatePartitionsAssignment::brokerIds).map(String::valueOf).collect(Collectors.joining(", ", "[", "]")) + "' the number of partitions must be specified ")));
                        if (numTopics.decrementAndGet() == 0) {
                            complete.run();
                        }
                    } else {
                        this.handleUpdatePartitionsAsync((String)topic, kopTopic, numPartitions, errorFuture, numTopics, complete);
                    }
                }
                catch (KoPTopicException e) {
                    errorFuture.complete(ApiError.fromThrowable((Throwable)e));
                    if (numTopics.decrementAndGet() != 0) break block8;
                    complete.run();
                }
            }
        });
        if (timeoutMs <= 0) {
            complete.run();
        } else {
            List<Object> delayedCreateKeys = createInfo.keySet().stream().map(DelayedOperationKey.TopicKey::new).collect(Collectors.toList());
            DelayedCreatePartitions delayedCreate = new DelayedCreatePartitions(timeoutMs, numTopics, complete);
            this.topicPurgatory.tryCompleteElseWatch(delayedCreate, delayedCreateKeys);
        }
        return resultFuture;
    }

    private void handleUpdatePartitionsAsync(String topic, KopTopic kopTopic, int newPartitions, CompletableFuture<ApiError> errorFuture, AtomicInteger numTopics, Runnable complete) {
        this.admin.topics().getPartitionedTopicMetadataAsync(kopTopic.getFullName()).whenComplete((metadata, t) -> {
            if (t == null) {
                int oldPartitions = metadata.partitions;
                if (oldPartitions > newPartitions) {
                    errorFuture.complete(ApiError.fromThrowable((Throwable)new InvalidPartitionsException("Topic currently has '" + oldPartitions + "' partitions, which is higher than the requested '" + newPartitions + "'.")));
                    if (numTopics.decrementAndGet() == 0) {
                        complete.run();
                    }
                    return;
                }
                this.admin.topics().updatePartitionedTopicAsync(kopTopic.getFullName(), newPartitions).whenComplete((ignored, e) -> {
                    if (e == null) {
                        if (log.isDebugEnabled()) {
                            log.debug("Successfully create topic '{}' new partitions '{}'", (Object)topic, (Object)newPartitions);
                        }
                        errorFuture.complete(ApiError.NONE);
                    } else {
                        log.error("Failed to create topic '{}' new partitions '{}': {}", new Object[]{topic, newPartitions, e});
                        errorFuture.complete(ApiError.fromThrowable((Throwable)e));
                    }
                    if (numTopics.decrementAndGet() == 0) {
                        complete.run();
                    }
                });
            } else {
                if (t instanceof PulsarAdminException.NotFoundException) {
                    errorFuture.complete(ApiError.fromThrowable((Throwable)new UnknownTopicOrPartitionException("Topic '" + topic + "' doesn't exist.")));
                } else {
                    errorFuture.complete(ApiError.fromThrowable((Throwable)t));
                }
                if (numTopics.decrementAndGet() == 0) {
                    complete.run();
                }
            }
        });
    }

    public Collection<? extends Node> getBrokers(String listenerName) {
        if (this.brokersCache.containsKey(listenerName)) {
            return this.brokersCache.get(listenerName);
        }
        return Collections.emptyList();
    }

    public Map<String, Set<Node>> getAllBrokers() {
        return this.brokersCache;
    }

    public void setBrokers(Map<String, Set<Node>> newBrokers) {
        this.brokersCacheLock.writeLock().lock();
        try {
            this.setControllerId(newBrokers);
            this.brokersCache = newBrokers;
        }
        finally {
            this.brokersCacheLock.writeLock().unlock();
        }
    }

    private void setControllerId(Map<String, Set<Node>> newBrokers) {
        HashMap newControllerId = Maps.newHashMap();
        newBrokers.forEach((listenerName, brokers) -> {
            if (brokers.size() == 0) {
                newControllerId.put(listenerName, -1);
            } else {
                ArrayList nodes = Lists.newArrayList((Iterable)brokers);
                newControllerId.put(listenerName, nodes.size() > 1 ? ((Node)nodes.get(this.random.nextInt(brokers.size()))).id() : ((Node)nodes.get(0)).id());
            }
        });
        this.controllerId = newControllerId;
    }

    public int getControllerId(String listenerName) {
        return this.controllerId.getOrDefault(listenerName, -1);
    }
}

