/*
 * Decompiled with CFR 0.152.
 */
package com.taotao.cloud.mq.pulsar.model;

import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.taotao.cloud.common.utils.log.LogUtils;
import com.taotao.cloud.mq.pulsar.model.DemoPulsarClientInit;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;

public class DemoPulsarDynamicProducerInit {
    private final AsyncLoadingCache<String, Producer<byte[]>> producerCache;
    private final Timer timer = new HashedWheelTimer();

    public DemoPulsarDynamicProducerInit() {
        this.producerCache = Caffeine.newBuilder().expireAfterAccess(600L, TimeUnit.SECONDS).maximumSize(3000L).removalListener((topic, value, cause) -> {
            LogUtils.info((String)"topic {} cache removed, because of {}", (Object[])new Object[]{topic, cause});
            if (value == null) {
                return;
            }
            try {
                value.close();
            }
            catch (Exception e) {
                LogUtils.error((String)"close failed, ", (Object[])new Object[]{e});
            }
        }).buildAsync((AsyncCacheLoader)new AsyncCacheLoader<String, Producer<byte[]>>(){

            public CompletableFuture<Producer<byte[]>> asyncLoad(String topic, Executor executor) {
                return DemoPulsarDynamicProducerInit.this.acquireFuture(topic);
            }

            public CompletableFuture<Producer<byte[]>> asyncReload(String topic, Producer<byte[]> oldValue, Executor executor) {
                return DemoPulsarDynamicProducerInit.this.acquireFuture(topic);
            }
        });
    }

    private CompletableFuture<Producer<byte[]>> acquireFuture(String topic) {
        CompletableFuture<Producer<byte[]>> future = new CompletableFuture<Producer<byte[]>>();
        try {
            ProducerBuilder builder = DemoPulsarClientInit.getInstance().getPulsarClient().newProducer().enableBatching(true);
            Producer producer = builder.topic(topic).create();
            future.complete(producer);
        }
        catch (Exception e) {
            LogUtils.error((String)"create producer exception ", (Object[])new Object[]{e});
            future.completeExceptionally(e);
        }
        return future;
    }

    public void sendMsg(String topic, byte[] msg) {
        CompletableFuture cacheFuture = this.producerCache.get((Object)topic);
        cacheFuture.whenComplete((producer, e) -> {
            if (e != null) {
                LogUtils.error((String)"create pulsar client exception ", (Object[])new Object[]{e});
                return;
            }
            try {
                producer.sendAsync((Object)msg).whenComplete((messageId, throwable) -> {
                    if (throwable == null) {
                        LogUtils.info((String)"topic {} send success, msg id is {}", (Object[])new Object[]{topic, messageId});
                        return;
                    }
                    LogUtils.error((String)"send producer msg error ", (Object[])new Object[]{throwable});
                });
            }
            catch (Exception ex) {
                LogUtils.error((String)"send async failed ", (Object[])new Object[]{ex});
            }
        });
    }

    public void sendMsgWithRetry(String topic, byte[] msg, int retryTimes, int maxRetryTimes) {
        CompletableFuture cacheFuture = this.producerCache.get((Object)topic);
        cacheFuture.whenComplete((producer, e) -> {
            if (e != null) {
                LogUtils.error((String)"create pulsar client exception ", (Object[])new Object[]{e});
                return;
            }
            try {
                producer.sendAsync((Object)msg).whenComplete((messageId, throwable) -> {
                    if (throwable == null) {
                        LogUtils.info((String)"topic {} send success, msg id is {}", (Object[])new Object[]{topic, messageId});
                        return;
                    }
                    if (retryTimes < maxRetryTimes) {
                        LogUtils.warn((String)"topic {} send failed, begin to retry {} times exception is ", (Object[])new Object[]{topic, retryTimes, throwable});
                        this.timer.newTimeout(timeout -> this.sendMsgWithRetry(topic, msg, retryTimes + 1, maxRetryTimes), 1L << retryTimes, TimeUnit.SECONDS);
                    }
                    LogUtils.error((String)"send producer msg error ", (Object[])new Object[]{throwable});
                });
            }
            catch (Exception ex) {
                LogUtils.error((String)"send async failed ", (Object[])new Object[]{ex});
            }
        });
    }
}

