/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.ProducerInterceptors;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.awaitility.Awaitility;
import org.testng.Assert;

public class PulsarTestClient
extends PulsarClientImpl {
    private volatile int overrideRemoteEndpointProtocolVersion;
    private volatile boolean rejectNewConnections;
    private volatile boolean dropOpSendMessages;
    private volatile Consumer<ProducerImpl.OpSendMsg> pendingMessageCallback;

    public static PulsarTestClient create(ClientBuilder clientBuilder) throws PulsarClientException {
        ClientConfigurationData clientConfigurationData = ((ClientBuilderImpl)clientBuilder).getClientConfigurationData();
        EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup((int)clientConfigurationData.getNumIoThreads(), (boolean)false, (ThreadFactory)new DefaultThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon()));
        AtomicReference<Supplier<ClientCnx>> clientCnxSupplierReference = new AtomicReference<Supplier<ClientCnx>>();
        ConnectionPool connectionPool = new ConnectionPool(clientConfigurationData, eventLoopGroup, () -> (ClientCnx)((Supplier)clientCnxSupplierReference.get()).get());
        return new PulsarTestClient(clientConfigurationData, eventLoopGroup, connectionPool, clientCnxSupplierReference);
    }

    private PulsarTestClient(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, AtomicReference<Supplier<ClientCnx>> clientCnxSupplierReference) throws PulsarClientException {
        super(conf, eventLoopGroup, cnxPool);
        clientCnxSupplierReference.set(this::createClientCnx);
    }

    protected ClientCnx createClientCnx() {
        return new ClientCnx(this.conf, this.eventLoopGroup){

            public int getRemoteEndpointProtocolVersion() {
                return PulsarTestClient.this.overrideRemoteEndpointProtocolVersion != 0 ? PulsarTestClient.this.overrideRemoteEndpointProtocolVersion : super.getRemoteEndpointProtocolVersion();
            }
        };
    }

    public CompletableFuture<ClientCnx> getConnection(String topic) {
        if (this.rejectNewConnections) {
            CompletableFuture<ClientCnx> result = new CompletableFuture<ClientCnx>();
            result.completeExceptionally(new IOException("New connections are rejected."));
            return result;
        }
        return super.getConnection(topic);
    }

    protected <T> ProducerImpl<T> newProducerImpl(String topic, int partitionIndex, ProducerConfigurationData conf, Schema<T> schema, ProducerInterceptors interceptors, CompletableFuture<Producer<T>> producerCreatedFuture) {
        return new ProducerImpl<T>(this, topic, conf, producerCreatedFuture, partitionIndex, schema, interceptors){

            protected ProducerImpl.OpSendMsgQueue createPendingMessagesQueue() {
                return new ProducerImpl.OpSendMsgQueue(){

                    public boolean add(ProducerImpl.OpSendMsg opSendMsg) {
                        boolean added = super.add(opSendMsg);
                        if (PulsarTestClient.this.pendingMessageCallback != null) {
                            PulsarTestClient.this.pendingMessageCallback.accept(opSendMsg);
                        }
                        return added;
                    }
                };
            }

            protected ClientCnx getCnxIfReady() {
                if (PulsarTestClient.this.dropOpSendMessages) {
                    return null;
                }
                return super.getCnxIfReady();
            }
        };
    }

    public void setOverrideRemoteEndpointProtocolVersion(int overrideRemoteEndpointProtocolVersion) {
        this.overrideRemoteEndpointProtocolVersion = overrideRemoteEndpointProtocolVersion;
    }

    public void setRejectNewConnections(boolean rejectNewConnections) {
        this.rejectNewConnections = rejectNewConnections;
    }

    public void disconnectProducerAndRejectReconnecting(ProducerImpl<?> producer) throws IOException {
        Awaitility.await().untilAsserted(() -> {
            if (!this.dropOpSendMessages && producer.isConnected()) {
                Assert.assertEquals((int)producer.getPendingQueueSize(), (int)0);
            }
        });
        this.setRejectNewConnections(true);
        ClientCnx cnx = producer.cnx();
        producer.connectionClosed(cnx);
        cnx.close();
    }

    public void allowReconnecting() {
        this.dropOpSendMessages = false;
        this.setRejectNewConnections(false);
    }

    public void setPendingMessageCallback(Consumer<ProducerImpl.OpSendMsg> pendingMessageCallback) {
        this.pendingMessageCallback = pendingMessageCallback;
    }

    public void dropOpSendMessages() {
        this.dropOpSendMessages = true;
    }
}

