/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import java.io.Serializable;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataEvent;
import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarMetadataEventSynchronizer
implements MetadataEventSynchronizer {
    private static final Logger log = LoggerFactory.getLogger(PulsarMetadataEventSynchronizer.class);
    protected PulsarService pulsar;
    protected BrokerService brokerService;
    protected String topicName;
    protected PulsarClientImpl client;
    protected volatile Producer<MetadataEvent> producer;
    protected volatile Consumer<MetadataEvent> consumer;
    private final CopyOnWriteArrayList<Function<MetadataEvent, CompletableFuture<Void>>> listeners = new CopyOnWriteArrayList();
    private volatile boolean started = false;
    public static final String SUBSCRIPTION_NAME = "metadata-syncer";
    private static final int MAX_PRODUCER_PENDING_SIZE = 1000;
    protected final Backoff backOff = new Backoff(100L, TimeUnit.MILLISECONDS, 1L, TimeUnit.MINUTES, 0L, TimeUnit.MILLISECONDS);

    public PulsarMetadataEventSynchronizer(PulsarService pulsar, String topicName) throws PulsarServerException {
        this.pulsar = pulsar;
        this.brokerService = pulsar.getBrokerService();
        this.topicName = topicName;
        if (!StringUtils.isNotBlank((CharSequence)topicName)) {
            log.info("Metadata synchronizer is disabled");
            return;
        }
    }

    public void start() throws PulsarServerException {
        if (StringUtils.isBlank((CharSequence)this.topicName)) {
            log.info("metadata topic doesn't exist.. skipping metadata synchronizer init..");
            return;
        }
        this.client = (PulsarClientImpl)this.pulsar.getClient();
        this.startProducer();
        this.startConsumer();
        log.info("Metadata event synchronizer started on topic {}", (Object)this.topicName);
    }

    public CompletableFuture<Void> notify(MetadataEvent event) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.publishAsync(event, future);
        return future;
    }

    public void registerSyncListener(Function<MetadataEvent, CompletableFuture<Void>> listener) {
        this.listeners.add(listener);
    }

    public String getClusterName() {
        return this.pulsar.getConfig().getClusterName();
    }

    private void publishAsync(MetadataEvent event, CompletableFuture<Void> future) {
        if (!this.started) {
            log.info("Producer is not started on {}, failed to publish {}", (Object)this.topicName, (Object)event);
            future.completeExceptionally(new IllegalStateException("producer is not started yet"));
        }
        ((CompletableFuture)this.producer.newMessage().value((Object)event).sendAsync().thenAccept(__ -> {
            log.info("successfully published metadata change event {}", (Object)event);
            future.complete(null);
        })).exceptionally(ex -> {
            log.warn("failed to publish metadata update {}, will retry in {}", new Object[]{this.topicName, 1000, ex});
            this.pulsar.getBrokerService().executor().schedule(() -> this.publishAsync(event, future), 1000L, TimeUnit.MILLISECONDS);
            return null;
        });
    }

    private void startProducer() {
        log.info("[{}] Starting producer", (Object)this.topicName);
        ((CompletableFuture)this.client.newProducer(Schema.AVRO(MetadataEvent.class)).topic(this.topicName).messageRoutingMode(MessageRoutingMode.SinglePartition).enableBatching(false).enableBatching(false).sendTimeout(0, TimeUnit.SECONDS).maxPendingMessages(1000).createAsync().thenAccept(prod -> {
            this.producer = prod;
            this.started = true;
            log.info("producer is created successfully {}", (Object)this.topicName);
        })).exceptionally(ex -> {
            long waitTimeMs = this.backOff.next();
            log.warn("[{}] Failed to create producer ({}), retrying in {} s", new Object[]{this.topicName, ex.getMessage(), (double)waitTimeMs / 1000.0});
            this.brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS);
            return null;
        });
    }

    private void startConsumer() {
        if (this.consumer != null) {
            return;
        }
        ConsumerBuilder consumerBuilder = this.client.newConsumer(Schema.AVRO(MetadataEvent.class)).topic(new String[]{this.topicName}).subscriptionName(SUBSCRIPTION_NAME).ackTimeout(60L, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Failover).messageListener((MessageListener & Serializable)(c, msg) -> {
            log.info("Processing metadata event for {} with listeners {}", (Object)((MetadataEvent)msg.getValue()).getPath(), (Object)this.listeners.size());
            try {
                if (this.listeners.size() == 0) {
                    c.acknowledgeAsync(msg);
                    return;
                }
                if (this.listeners.size() == 1) {
                    ((CompletableFuture)this.listeners.get(0).apply((MetadataEvent)msg.getValue()).thenApply(__ -> c.acknowledgeAsync(msg))).exceptionally(ex -> {
                        log.warn("Failed to synchronize {} for {}", new Object[]{msg.getMessageId(), this.topicName, ex.getCause()});
                        return null;
                    });
                } else {
                    ((CompletableFuture)FutureUtil.waitForAll((Collection)this.listeners.stream().map(listener -> (CompletableFuture)listener.apply((MetadataEvent)msg.getValue())).collect(Collectors.toList())).thenApply(__ -> c.acknowledgeAsync(msg))).exceptionally(ex -> {
                        log.warn("Failed to synchronize {} for {}", (Object)msg.getMessageId(), (Object)this.topicName);
                        return null;
                    });
                }
            }
            catch (Exception e) {
                log.warn("Failed to synchronize {} for {}", (Object)msg.getMessageId(), (Object)this.topicName);
            }
        });
        ((CompletableFuture)consumerBuilder.subscribeAsync().thenAccept(consumer -> {
            log.info("successfully created consumer {}", (Object)this.topicName);
            this.consumer = consumer;
        })).exceptionally(ex -> {
            long waitTimeMs = this.backOff.next();
            log.warn("[{}] Failed to create consumer ({}), retrying in {} s", new Object[]{this.topicName, ex.getMessage(), (double)waitTimeMs / 1000.0});
            this.brokerService.executor().schedule(this::startConsumer, waitTimeMs, TimeUnit.MILLISECONDS);
            return null;
        });
    }

    public boolean isStarted() {
        return this.started;
    }

    public void close() {
        this.started = false;
        if (this.producer != null) {
            this.producer.closeAsync();
            this.producer = null;
        }
        if (this.consumer != null) {
            this.consumer.closeAsync();
            this.consumer = null;
        }
    }
}

