/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.HandlerState;
import org.apache.pulsar.client.impl.PartitionedTopicProducerStatsRecorderImpl;
import org.apache.pulsar.client.impl.PartitionsChangedListener;
import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.ProducerInterceptors;
import org.apache.pulsar.client.impl.ProducerStatsRecorderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.RoundRobinPartitionMessageRouterImpl;
import org.apache.pulsar.client.impl.SinglePartitionMessageRouterImpl;
import org.apache.pulsar.client.impl.TopicMetadataImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.ImmutableList;
import org.apache.pulsar.shade.io.netty.util.Timeout;
import org.apache.pulsar.shade.io.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionedProducerImpl<T>
extends ProducerBase<T> {
    private static final Logger log = LoggerFactory.getLogger(PartitionedProducerImpl.class);
    private final Map<Integer, ProducerImpl<T>> producers = new ConcurrentHashMap<Integer, ProducerImpl<T>>();
    private final MessageRouter routerPolicy;
    private final PartitionedTopicProducerStatsRecorderImpl stats;
    private TopicMetadata topicMetadata;
    private final int firstPartitionIndex;
    private String overrideProducerName;
    private volatile Timeout partitionsAutoUpdateTimeout = null;
    TopicsPartitionChangedListener topicsPartitionChangedListener;
    CompletableFuture<Void> partitionsAutoUpdateFuture = null;
    private TimerTask partitionsAutoUpdateTimerTask = new TimerTask(){

        @Override
        public void run(Timeout timeout) throws Exception {
            try {
                if (timeout.isCancelled() || PartitionedProducerImpl.this.getState() != HandlerState.State.Ready) {
                    return;
                }
                if (log.isDebugEnabled()) {
                    log.debug("[{}] run partitionsAutoUpdateTimerTask for partitioned producer", (Object)PartitionedProducerImpl.this.topic);
                }
                if (PartitionedProducerImpl.this.partitionsAutoUpdateFuture == null || PartitionedProducerImpl.this.partitionsAutoUpdateFuture.isDone()) {
                    PartitionedProducerImpl.this.partitionsAutoUpdateFuture = PartitionedProducerImpl.this.topicsPartitionChangedListener.onTopicsExtended(ImmutableList.of(PartitionedProducerImpl.this.topic));
                }
            }
            catch (Throwable th) {
                log.warn("Encountered error in partition auto update timer task for partition producer. Another task will be scheduled.", th);
            }
            finally {
                PartitionedProducerImpl.this.partitionsAutoUpdateTimeout = PartitionedProducerImpl.this.client.timer().newTimeout(PartitionedProducerImpl.this.partitionsAutoUpdateTimerTask, PartitionedProducerImpl.this.conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
            }
        }
    };

    public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf, int numPartitions, CompletableFuture<Producer<T>> producerCreatedFuture, Schema<T> schema, ProducerInterceptors interceptors) {
        super(client, topic, conf, producerCreatedFuture, schema, interceptors);
        this.topicMetadata = new TopicMetadataImpl(numPartitions);
        this.routerPolicy = this.getMessageRouter();
        this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0L ? new PartitionedTopicProducerStatsRecorderImpl() : null;
        int maxPendingMessages = Math.min(conf.getMaxPendingMessages(), conf.getMaxPendingMessagesAcrossPartitions() / numPartitions);
        conf.setMaxPendingMessages(maxPendingMessages);
        List<Integer> indexList = conf.isLazyStartPartitionedProducers() && conf.getAccessMode() == ProducerAccessMode.Shared ? Collections.singletonList(this.routerPolicy.choosePartition(((TypedMessageBuilderImpl)this.newMessage()).getMessage(), this.topicMetadata)) : IntStream.range(0, this.topicMetadata.numPartitions()).boxed().collect(Collectors.toList());
        this.firstPartitionIndex = (Integer)indexList.get(0);
        this.start(indexList);
        if (conf.isAutoUpdatePartitions()) {
            this.topicsPartitionChangedListener = new TopicsPartitionChangedListener();
            this.partitionsAutoUpdateTimeout = client.timer().newTimeout(this.partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
        }
    }

    private MessageRouter getMessageRouter() {
        MessageRouter messageRouter;
        MessageRoutingMode messageRouteMode = this.conf.getMessageRoutingMode();
        switch (messageRouteMode) {
            case CustomPartition: {
                messageRouter = Objects.requireNonNull(this.conf.getCustomMessageRouter());
                break;
            }
            case SinglePartition: {
                messageRouter = new SinglePartitionMessageRouterImpl(ThreadLocalRandom.current().nextInt(this.topicMetadata.numPartitions()), this.conf.getHashingScheme());
                break;
            }
            default: {
                messageRouter = new RoundRobinPartitionMessageRouterImpl(this.conf.getHashingScheme(), ThreadLocalRandom.current().nextInt(this.topicMetadata.numPartitions()), this.conf.isBatchingEnabled(), TimeUnit.MICROSECONDS.toMillis(this.conf.batchingPartitionSwitchFrequencyIntervalMicros()));
            }
        }
        return messageRouter;
    }

    public String getProducerName() {
        return this.producers.get(this.firstPartitionIndex).getProducerName();
    }

    public long getLastSequenceId() {
        return this.producers.values().stream().map(Producer::getLastSequenceId).mapToLong(Long::longValue).max().orElse(-1L);
    }

    private void start(List<Integer> indexList) {
        AtomicReference createFail = new AtomicReference();
        AtomicInteger completed = new AtomicInteger();
        BiConsumer<Boolean, Throwable> afterCreatingProducer = (failFast, createException) -> {
            Runnable closeRunnable = () -> {
                log.error("[{}] Could not create partitioned producer.", (Object)this.topic, (Object)((Throwable)createFail.get()).getCause());
                this.closeAsync().handle((ok, closeException) -> {
                    this.producerCreatedFuture().completeExceptionally((Throwable)createFail.get());
                    this.client.cleanupProducer(this);
                    return null;
                });
            };
            if (createException != null) {
                this.setState(HandlerState.State.Failed);
                createFail.compareAndSet(null, createException);
                if (failFast.booleanValue()) {
                    closeRunnable.run();
                }
            }
            if (completed.incrementAndGet() == indexList.size()) {
                if (createFail.get() == null) {
                    this.setState(HandlerState.State.Ready);
                    log.info("[{}] Created partitioned producer", (Object)this.topic);
                    this.producerCreatedFuture().complete(this);
                } else {
                    closeRunnable.run();
                }
            }
        };
        ProducerImpl<T> firstProducer = this.createProducer(indexList.get(0));
        ((CompletableFuture)firstProducer.producerCreatedFuture().handle((prod, createException) -> {
            afterCreatingProducer.accept(true, (Throwable)createException);
            if (createException != null) {
                throw new RuntimeException((Throwable)createException);
            }
            this.overrideProducerName = firstProducer.getProducerName();
            return Optional.of(this.overrideProducerName);
        })).thenApply(name -> {
            for (int i = 1; i < indexList.size(); ++i) {
                this.createProducer((Integer)indexList.get(i), (Optional<String>)name).producerCreatedFuture().handle((prod, createException) -> {
                    afterCreatingProducer.accept(false, (Throwable)createException);
                    return null;
                });
            }
            return null;
        });
    }

    private ProducerImpl<T> createProducer(int partitionIndex) {
        return this.createProducer(partitionIndex, Optional.empty());
    }

    private ProducerImpl<T> createProducer(int partitionIndex, Optional<String> overrideProducerName) {
        return this.producers.computeIfAbsent(partitionIndex, idx -> {
            String partitionName = TopicName.get(this.topic).getPartition((int)idx).toString();
            return this.client.newProducerImpl(partitionName, (int)idx, this.conf, this.schema, this.interceptors, new CompletableFuture(), overrideProducerName);
        });
    }

    @Override
    CompletableFuture<MessageId> internalSendAsync(Message<?> message) {
        return this.internalSendWithTxnAsync(message, null);
    }

    @Override
    CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transaction txn) {
        ProducerImpl<T> newProducer;
        HandlerState.State createState;
        CompletableFuture<MessageId> completableFuture = new CompletableFuture<MessageId>();
        if (txn != null && !((TransactionImpl)txn).checkIfOpen(completableFuture)) {
            return completableFuture;
        }
        int partition = this.routerPolicy.choosePartition(message, this.topicMetadata);
        Preconditions.checkArgument(partition >= 0 && partition < this.topicMetadata.numPartitions(), "Illegal partition index chosen by the message routing policy: " + partition);
        if (this.conf.isLazyStartPartitionedProducers() && !this.producers.containsKey(partition) && (createState = (HandlerState.State)((Object)((CompletableFuture)(newProducer = this.createProducer(partition, Optional.ofNullable(this.overrideProducerName))).producerCreatedFuture().handle((prod, createException) -> {
            if (createException != null) {
                log.error("[{}] Could not create internal producer. partitionIndex: {}", new Object[]{this.topic, partition, createException});
                try {
                    this.producers.remove(partition, newProducer);
                    newProducer.close();
                }
                catch (PulsarClientException e) {
                    log.error("[{}] Could not close internal producer. partitionIndex: {}", new Object[]{this.topic, partition, e});
                }
                return HandlerState.State.Failed;
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] Created internal producer. partitionIndex: {}", (Object)this.topic, (Object)partition);
            }
            return HandlerState.State.Ready;
        })).join())) == HandlerState.State.Failed) {
            return FutureUtil.failedFuture((Throwable)new PulsarClientException.NotConnectedException());
        }
        switch (this.getState()) {
            case Ready: 
            case Connecting: {
                break;
            }
            case Closing: 
            case Closed: {
                return FutureUtil.failedFuture((Throwable)new PulsarClientException.AlreadyClosedException("Producer already closed"));
            }
            case ProducerFenced: {
                return FutureUtil.failedFuture((Throwable)new PulsarClientException.ProducerFencedException("Producer was fenced"));
            }
            case Terminated: {
                return FutureUtil.failedFuture((Throwable)new PulsarClientException.TopicTerminatedException("Topic was terminated"));
            }
            case Failed: 
            case Uninitialized: {
                return FutureUtil.failedFuture((Throwable)new PulsarClientException.NotConnectedException());
            }
        }
        return this.producers.get(partition).internalSendWithTxnAsync(message, txn);
    }

    public CompletableFuture<Void> flushAsync() {
        return CompletableFuture.allOf((CompletableFuture[])this.producers.values().stream().map(ProducerImpl::flushAsync).toArray(CompletableFuture[]::new));
    }

    @Override
    void triggerFlush() {
        this.producers.values().forEach(ProducerImpl::triggerFlush);
    }

    public boolean isConnected() {
        return this.producers.values().stream().allMatch(ProducerImpl::isConnected);
    }

    public long getLastDisconnectedTimestamp() {
        long lastDisconnectedTimestamp = 0L;
        Optional<ProducerImpl> p = this.producers.values().stream().max(Comparator.comparingLong(ProducerImpl::getLastDisconnectedTimestamp));
        if (p.isPresent()) {
            lastDisconnectedTimestamp = p.get().getLastDisconnectedTimestamp();
        }
        return lastDisconnectedTimestamp;
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        if (this.getState() == HandlerState.State.Closing || this.getState() == HandlerState.State.Closed) {
            return CompletableFuture.completedFuture(null);
        }
        this.setState(HandlerState.State.Closing);
        if (this.partitionsAutoUpdateTimeout != null) {
            this.partitionsAutoUpdateTimeout.cancel();
            this.partitionsAutoUpdateTimeout = null;
        }
        AtomicReference closeFail = new AtomicReference();
        AtomicInteger completed = new AtomicInteger(this.producers.size());
        CompletableFuture<Void> closeFuture = new CompletableFuture<Void>();
        for (Producer producer : this.producers.values()) {
            if (producer == null) continue;
            producer.closeAsync().handle((closed, ex) -> {
                if (ex != null) {
                    closeFail.compareAndSet(null, ex);
                }
                if (completed.decrementAndGet() == 0) {
                    if (closeFail.get() == null) {
                        this.setState(HandlerState.State.Closed);
                        closeFuture.complete(null);
                        log.info("[{}] Closed Partitioned Producer", (Object)this.topic);
                        this.client.cleanupProducer(this);
                    } else {
                        this.setState(HandlerState.State.Failed);
                        closeFuture.completeExceptionally((Throwable)closeFail.get());
                        log.error("[{}] Could not close Partitioned Producer", (Object)this.topic, (Object)((Throwable)closeFail.get()).getCause());
                    }
                }
                return null;
            });
        }
        return closeFuture;
    }

    public synchronized ProducerStatsRecorderImpl getStats() {
        if (this.stats == null) {
            return null;
        }
        this.stats.reset();
        this.producers.forEach((partition, producer) -> this.stats.updateCumulativeStats(producer.getTopic(), producer.getStats()));
        return this.stats;
    }

    public List<ProducerImpl<T>> getProducers() {
        return this.producers.values().stream().sorted(Comparator.comparingInt(e -> TopicName.getPartitionIndex(e.getTopic()))).collect(Collectors.toList());
    }

    @Override
    String getHandlerName() {
        return "partition-producer";
    }

    @VisibleForTesting
    public CompletableFuture<Void> getPartitionsAutoUpdateFuture() {
        return this.partitionsAutoUpdateFuture;
    }

    @VisibleForTesting
    public Timeout getPartitionsAutoUpdateTimeout() {
        return this.partitionsAutoUpdateTimeout;
    }

    @VisibleForTesting
    public CompletableFuture<Void> getOriginalLastSendFuture() {
        return CompletableFuture.allOf((CompletableFuture[])this.producers.values().stream().map(ProducerImpl::getOriginalLastSendFuture).toArray(CompletableFuture[]::new));
    }

    public int getNumOfPartitions() {
        return this.topicMetadata.numPartitions();
    }

    private class TopicsPartitionChangedListener
    implements PartitionsChangedListener {
        private TopicsPartitionChangedListener() {
        }

        @Override
        public CompletableFuture<Void> onTopicsExtended(Collection<String> topicsExtended) {
            CompletableFuture<Void> future = new CompletableFuture<Void>();
            if (topicsExtended.isEmpty() || !topicsExtended.contains(PartitionedProducerImpl.this.topic)) {
                future.complete(null);
                return future;
            }
            ((CompletableFuture)PartitionedProducerImpl.this.client.getPartitionsForTopic(PartitionedProducerImpl.this.topic).thenCompose(list -> {
                int oldPartitionNumber = PartitionedProducerImpl.this.topicMetadata.numPartitions();
                int currentPartitionNumber = list.size();
                if (log.isDebugEnabled()) {
                    log.debug("[{}] partitions number. old: {}, new: {}", new Object[]{PartitionedProducerImpl.this.topic, oldPartitionNumber, currentPartitionNumber});
                }
                if (oldPartitionNumber == currentPartitionNumber) {
                    future.complete(null);
                    return future;
                }
                if (oldPartitionNumber < currentPartitionNumber) {
                    if (PartitionedProducerImpl.this.conf.isLazyStartPartitionedProducers() && PartitionedProducerImpl.this.conf.getAccessMode() == ProducerAccessMode.Shared) {
                        PartitionedProducerImpl.this.topicMetadata = new TopicMetadataImpl(currentPartitionNumber);
                        future.complete(null);
                        PartitionedProducerImpl.this.onPartitionsChange(PartitionedProducerImpl.this.topic, currentPartitionNumber);
                        return future;
                    }
                    List futureList = list.subList(oldPartitionNumber, currentPartitionNumber).stream().map(partitionName -> {
                        int partitionIndex = TopicName.getPartitionIndex(partitionName);
                        return PartitionedProducerImpl.this.producers.computeIfAbsent(partitionIndex, idx -> new ProducerImpl(PartitionedProducerImpl.this.client, (String)partitionName, PartitionedProducerImpl.this.conf, new CompletableFuture(), (int)idx, PartitionedProducerImpl.this.schema, PartitionedProducerImpl.this.interceptors, Optional.ofNullable(PartitionedProducerImpl.this.overrideProducerName))).producerCreatedFuture();
                    }).collect(Collectors.toList());
                    ((CompletableFuture)FutureUtil.waitForAll(futureList).thenAccept(finalFuture -> {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] success create producers for extended partitions. old: {}, new: {}", new Object[]{PartitionedProducerImpl.this.topic, oldPartitionNumber, currentPartitionNumber});
                        }
                        PartitionedProducerImpl.this.topicMetadata = new TopicMetadataImpl(currentPartitionNumber);
                        future.complete(null);
                    })).exceptionally(ex -> {
                        log.warn("[{}] fail create producers for extended partitions. old: {}, new: {}", new Object[]{PartitionedProducerImpl.this.topic, oldPartitionNumber, currentPartitionNumber});
                        IntStream.range(oldPartitionNumber, PartitionedProducerImpl.this.producers.size()).forEach(i -> ((ProducerImpl)PartitionedProducerImpl.this.producers.remove(i)).closeAsync());
                        future.completeExceptionally((Throwable)ex);
                        return null;
                    });
                    PartitionedProducerImpl.this.onPartitionsChange(PartitionedProducerImpl.this.topic, currentPartitionNumber);
                    return future;
                }
                log.error("[{}] not support shrink topic partitions. old: {}, new: {}", new Object[]{PartitionedProducerImpl.this.topic, oldPartitionNumber, currentPartitionNumber});
                future.completeExceptionally((Throwable)new PulsarClientException.NotSupportedException("not support shrink topic partitions"));
                return future;
            })).exceptionally(throwable -> {
                log.error("[{}] Auto getting partitions failed", (Object)PartitionedProducerImpl.this.topic, throwable);
                future.completeExceptionally((Throwable)throwable);
                return null;
            });
            return future;
        }
    }
}

