/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.resourcegroup;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resourcegroup.ResourceUsageConsumer;
import org.apache.pulsar.broker.resourcegroup.ResourceUsagePublisher;
import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
import org.apache.pulsar.broker.service.resource.usage.ResourceUsageInfo;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderListener;
import org.apache.pulsar.shade.com.google.common.collect.Sets;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
import org.apache.pulsar.shade.org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ResourceUsageTransportManager
implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(ResourceUsageTransportManager.class);
    public static final String RESOURCE_USAGE_TOPIC_NAME = "non-persistent://pulsar/system/resource-usage";
    private final PulsarService pulsarService;
    private final PulsarClient pulsarClient;
    private final ResourceUsageWriterTask pTask;
    private final ResourceUsageReader consumer;
    private final Map<String, ResourceUsagePublisher> publisherMap = new ConcurrentHashMap<String, ResourceUsagePublisher>();
    private final Map<String, ResourceUsageConsumer> consumerMap = new ConcurrentHashMap<String, ResourceUsageConsumer>();
    private long staleMessageCount = 0L;

    private void createTenantAndNamespace() throws PulsarServerException, PulsarAdminException {
        List<String> nsList;
        TopicName topicName = TopicName.get(RESOURCE_USAGE_TOPIC_NAME);
        PulsarAdmin admin = this.pulsarService.getAdminClient();
        ServiceConfiguration config = this.pulsarService.getConfig();
        String cluster = config.getClusterName();
        String tenant = topicName.getTenant();
        String namespace = topicName.getNamespace();
        List<String> tenantList = admin.tenants().getTenants();
        if (!tenantList.contains(tenant)) {
            try {
                admin.tenants().createTenant(tenant, new TenantInfoImpl(Sets.newHashSet(config.getSuperUserRoles()), Sets.newHashSet(cluster)));
            }
            catch (PulsarAdminException ex1) {
                if (!(ex1 instanceof PulsarAdminException.ConflictException)) {
                    LOG.error("Unexpected exception {} when creating tenant {}", (Object)ex1, (Object)tenant);
                }
                throw ex1;
            }
        }
        if (!(nsList = admin.namespaces().getNamespaces(tenant)).contains(namespace)) {
            try {
                admin.namespaces().createNamespace(namespace);
            }
            catch (PulsarAdminException ex1) {
                if (!(ex1 instanceof PulsarAdminException.ConflictException)) {
                    LOG.error("Unexpected exception {} when creating namespace {}", (Object)ex1, (Object)namespace);
                }
                throw ex1;
            }
        }
    }

    public ResourceUsageTransportManager(PulsarService pulsarService) throws Exception {
        this.pulsarService = pulsarService;
        this.pulsarClient = pulsarService.getClient();
        try {
            this.createTenantAndNamespace();
            this.consumer = new ResourceUsageReader();
            this.pTask = new ResourceUsageWriterTask();
        }
        catch (Exception ex) {
            LOG.error("Error initializing resource usage transport manager", (Throwable)ex);
            throw ex;
        }
    }

    public void registerResourceUsagePublisher(ResourceUsagePublisher r) {
        this.publisherMap.put(r.getID(), r);
    }

    public void unregisterResourceUsageProducer(ResourceUsagePublisher r) {
        this.publisherMap.remove(r.getID());
    }

    public void registerResourceUsageConsumer(ResourceUsageConsumer r) {
        this.consumerMap.put(r.getID(), r);
    }

    public void unregisterResourceUsageConsumer(ResourceUsageConsumer r) {
        this.consumerMap.remove(r.getID());
    }

    @Override
    public void close() throws Exception {
        try {
            this.pTask.close();
            this.consumer.close();
        }
        catch (Exception ex1) {
            LOG.error("Error closing producer/consumer for resource-usage topic", (Throwable)ex1);
            throw ex1;
        }
    }

    private class ResourceUsageReader
    implements ReaderListener<byte[]>,
    AutoCloseable {
        private final ResourceUsageInfo recdUsageInfo = new ResourceUsageInfo();
        private final Reader<byte[]> consumer;

        public ResourceUsageReader() throws PulsarClientException {
            this.consumer = ResourceUsageTransportManager.this.pulsarClient.newReader().topic(ResourceUsageTransportManager.RESOURCE_USAGE_TOPIC_NAME).startMessageId(MessageId.latest).readerListener(this).create();
        }

        @Override
        public void close() throws Exception {
            this.consumer.close();
        }

        @Override
        public void received(Reader<byte[]> reader, Message<byte[]> msg) {
            long publishTime = msg.getPublishTime();
            long currentTime = System.currentTimeMillis();
            long timeDelta = currentTime - publishTime;
            this.recdUsageInfo.parseFrom(Unpooled.wrappedBuffer(msg.getData()), msg.getData().length);
            if (timeDelta > TimeUnit.SECONDS.toMillis(2 * ResourceUsageTransportManager.this.pulsarService.getConfig().getResourceUsageTransportPublishIntervalInSecs())) {
                LOG.error("Stale resource usage msg from broker {} publish time {} current time{}", new Object[]{this.recdUsageInfo.getBroker(), publishTime, currentTime});
                ResourceUsageTransportManager.this.staleMessageCount++;
                return;
            }
            try {
                this.recdUsageInfo.getUsageMapsList().forEach(ru -> {
                    ResourceUsageConsumer owner = (ResourceUsageConsumer)ResourceUsageTransportManager.this.consumerMap.get(ru.getOwner());
                    if (owner != null) {
                        owner.acceptResourceUsage(this.recdUsageInfo.getBroker(), (ResourceUsage)ru);
                    }
                });
            }
            catch (IllegalStateException exception) {
                LOG.error("Resource usage reader: Error parsing incoming message", (Throwable)exception);
                throw exception;
            }
            catch (Exception exception) {
                LOG.error("Resource usage reader: Unknown exception while parsing message", (Throwable)exception);
                throw exception;
            }
        }
    }

    private class ResourceUsageWriterTask
    implements Runnable,
    AutoCloseable {
        private final Producer<byte[]> producer = this.createProducer();
        private final ScheduledFuture<?> resourceUsagePublishTask;

        private Producer<byte[]> createProducer() throws PulsarClientException {
            int publishDelayMilliSecs = 10;
            int sendTimeoutSecs = 10;
            return ResourceUsageTransportManager.this.pulsarClient.newProducer().topic(ResourceUsageTransportManager.RESOURCE_USAGE_TOPIC_NAME).batchingMaxPublishDelay(10L, TimeUnit.MILLISECONDS).sendTimeout(10, TimeUnit.SECONDS).blockIfQueueFull(false).compressionType(CompressionType.SNAPPY).create();
        }

        public ResourceUsageWriterTask() throws PulsarClientException {
            this.resourceUsagePublishTask = ResourceUsageTransportManager.this.pulsarService.getExecutor().scheduleAtFixedRate(this, ResourceUsageTransportManager.this.pulsarService.getConfig().getResourceUsageTransportPublishIntervalInSecs(), ResourceUsageTransportManager.this.pulsarService.getConfig().getResourceUsageTransportPublishIntervalInSecs(), TimeUnit.SECONDS);
        }

        @Override
        public void run() {
            if (!ResourceUsageTransportManager.this.publisherMap.isEmpty()) {
                ResourceUsageInfo rUsageInfo = new ResourceUsageInfo();
                rUsageInfo.setBroker(ResourceUsageTransportManager.this.pulsarService.getBrokerServiceUrl());
                ResourceUsageTransportManager.this.publisherMap.forEach((key, item) -> item.fillResourceUsage(rUsageInfo.addUsageMap()));
                ByteBuf buf = PulsarByteBufAllocator.DEFAULT.heapBuffer(rUsageInfo.getSerializedSize());
                rUsageInfo.writeTo(buf);
                byte[] bytes = buf.array();
                this.producer.sendAsync(bytes).whenComplete((id, ex) -> {
                    if (null != ex) {
                        LOG.error("Resource usage publisher: sending message ID {} error", id, ex);
                    }
                    buf.release();
                });
            }
        }

        @Override
        public void close() throws Exception {
            this.resourceUsagePublishTask.cancel(true);
            this.producer.close();
        }
    }
}

