/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.proto;

import com.google.protobuf.ExtensionRegistry;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
import org.apache.bookkeeper.auth.ClientAuthProvider;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.proto.PerChannelBookieClient;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestPerChannelBookieClient
extends BookKeeperClusterTestCase {
    private static final Logger LOG = LoggerFactory.getLogger(TestPerChannelBookieClient.class);
    ExtensionRegistry extRegistry = ExtensionRegistry.newInstance();
    ClientAuthProvider.Factory authProvider = AuthProviderFactoryFactory.newClientAuthProviderFactory((ClientConfiguration)new ClientConfiguration());

    public TestPerChannelBookieClient() throws Exception {
        super(1);
    }

    @Test
    public void testConnectCloseRace() throws Exception {
        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        OrderedExecutor executor = this.getOrderedSafeExecutor();
        BookieSocketAddress addr = this.getBookie(0);
        for (int i = 0; i < 1000; ++i) {
            PerChannelBookieClient client = new PerChannelBookieClient(executor, (EventLoopGroup)eventLoopGroup, addr, this.authProvider, this.extRegistry);
            client.connectIfNeededAndDoOp((BookkeeperInternalCallbacks.GenericCallback)new BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient>(){

                public void operationComplete(int rc, PerChannelBookieClient client) {
                }
            });
            client.close();
        }
        eventLoopGroup.shutdownGracefully();
        executor.shutdown();
    }

    public OrderedExecutor getOrderedSafeExecutor() {
        return OrderedExecutor.newBuilder().name("PCBC").numThreads(1).traceTaskExecution(true).traceTaskWarnTimeMicroSec(TimeUnit.MILLISECONDS.toMicros(100L)).build();
    }

    @Test
    public void testConnectRace() throws Exception {
        BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient> nullop = new BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient>(){

            public void operationComplete(int rc, PerChannelBookieClient pcbc) {
            }
        };
        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        OrderedExecutor executor = this.getOrderedSafeExecutor();
        BookieSocketAddress addr = this.getBookie(0);
        for (int i = 0; i < 100; ++i) {
            PerChannelBookieClient client = new PerChannelBookieClient(executor, (EventLoopGroup)eventLoopGroup, addr, this.authProvider, this.extRegistry);
            for (int j = i; j < 10; ++j) {
                client.connectIfNeededAndDoOp((BookkeeperInternalCallbacks.GenericCallback)nullop);
            }
            client.close();
        }
        eventLoopGroup.shutdownGracefully();
        executor.shutdown();
    }

    @Test
    public void testDisconnectRace() throws Exception {
        BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient> nullop = new BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient>(){

            public void operationComplete(int rc, PerChannelBookieClient client) {
            }
        };
        int iterations = 100000;
        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        OrderedExecutor executor = this.getOrderedSafeExecutor();
        BookieSocketAddress addr = this.getBookie(0);
        final PerChannelBookieClient client = new PerChannelBookieClient(executor, (EventLoopGroup)eventLoopGroup, addr, this.authProvider, this.extRegistry);
        final AtomicBoolean shouldFail = new AtomicBoolean(false);
        final AtomicBoolean running = new AtomicBoolean(true);
        final CountDownLatch disconnectRunning = new CountDownLatch(1);
        Thread connectThread = new Thread((BookkeeperInternalCallbacks.GenericCallback)nullop){
            final /* synthetic */ BookkeeperInternalCallbacks.GenericCallback val$nullop;
            {
                this.val$nullop = genericCallback;
            }

            @Override
            public void run() {
                try {
                    if (!disconnectRunning.await(10L, TimeUnit.SECONDS)) {
                        LOG.error("Disconnect thread never started");
                        shouldFail.set(true);
                    }
                }
                catch (InterruptedException ie) {
                    LOG.error("Connect thread interrupted", (Throwable)ie);
                    Thread.currentThread().interrupt();
                    running.set(false);
                }
                for (int i = 0; i < 100000 && running.get(); ++i) {
                    client.connectIfNeededAndDoOp(this.val$nullop);
                }
                running.set(false);
            }
        };
        Thread disconnectThread = new Thread(){

            @Override
            public void run() {
                disconnectRunning.countDown();
                while (running.get()) {
                    client.disconnect();
                }
            }
        };
        Thread checkThread = new Thread(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                while (running.get()) {
                    PerChannelBookieClient perChannelBookieClient = client;
                    synchronized (perChannelBookieClient) {
                        PerChannelBookieClient.ConnectionState state = client.state;
                        Channel channel = client.channel;
                        if (state == PerChannelBookieClient.ConnectionState.CONNECTED && (channel == null || !channel.isActive()) || state != PerChannelBookieClient.ConnectionState.CONNECTED && channel != null && channel.isActive()) {
                            LOG.error("State({}) and channel({}) inconsistent " + channel, (Object)state, channel == null ? null : Boolean.valueOf(channel.isActive()));
                            shouldFail.set(true);
                            running.set(false);
                        }
                    }
                }
            }
        };
        connectThread.start();
        disconnectThread.start();
        checkThread.start();
        connectThread.join();
        disconnectThread.join();
        checkThread.join();
        Assert.assertFalse((String)"Failure in threads, check logs", (boolean)shouldFail.get());
        client.close();
        eventLoopGroup.shutdownGracefully();
        executor.shutdown();
    }

    @Test
    public void testRequestCompletesAfterDisconnectRace() throws Exception {
        ServerConfiguration conf = this.killBookie(0);
        Bookie delayBookie = new Bookie(conf){

            public ByteBuf readEntry(long ledgerId, long entryId) throws IOException, Bookie.NoLedgerException {
                try {
                    Thread.sleep(3000L);
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    throw new IOException("Interrupted waiting", ie);
                }
                return super.readEntry(ledgerId, entryId);
            }
        };
        this.bsConfs.add(conf);
        this.bs.add(this.startBookie(conf, delayBookie));
        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        final OrderedExecutor executor = this.getOrderedSafeExecutor();
        BookieSocketAddress addr = this.getBookie(0);
        final PerChannelBookieClient client = new PerChannelBookieClient(executor, (EventLoopGroup)eventLoopGroup, addr, this.authProvider, this.extRegistry);
        final CountDownLatch completion = new CountDownLatch(1);
        final BookkeeperInternalCallbacks.ReadEntryCallback cb = new BookkeeperInternalCallbacks.ReadEntryCallback(){

            public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf buffer, Object ctx) {
                completion.countDown();
            }
        };
        client.connectIfNeededAndDoOp((BookkeeperInternalCallbacks.GenericCallback)new BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient>(){

            public void operationComplete(final int rc, PerChannelBookieClient pcbc) {
                if (rc != 0) {
                    executor.executeOrdered(1, (SafeRunnable)new org.apache.bookkeeper.util.SafeRunnable(){

                        public void safeRun() {
                            cb.readEntryComplete(rc, 1L, 1L, null, null);
                        }
                    });
                    return;
                }
                client.readEntry(1L, 1L, cb, null, 1, "00000111112222233333".getBytes());
            }
        });
        Thread.sleep(1000L);
        client.disconnect();
        client.close();
        Assert.assertTrue((String)"Request should have completed", (boolean)completion.await(5L, TimeUnit.SECONDS));
        eventLoopGroup.shutdownGracefully();
        executor.shutdown();
    }
}

