/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.proto;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookieClientImpl;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.proto.DefaultPerChannelBookieClientPool;
import org.apache.bookkeeper.proto.PerChannelBookieClient;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.tls.SecurityException;
import org.apache.bookkeeper.util.EventLoopUtil;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientSocketDisconnectTest
extends BookKeeperClusterTestCase {
    private static final Logger log = LoggerFactory.getLogger(ClientSocketDisconnectTest.class);

    public ClientSocketDisconnectTest() {
        super(1);
        this.useUUIDasBookieId = true;
    }

    @Test
    public void testAddEntriesCallbackWithBKClientThread() throws Exception {
        ClientConfiguration conf = new ClientConfiguration();
        conf.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        BookKeeper bkc = (BookKeeper)org.apache.bookkeeper.client.api.BookKeeper.newBuilder((ClientConfiguration)conf).eventLoopGroup(EventLoopUtil.getClientEventLoopGroup((ClientConfiguration)conf, (ThreadFactory)new DefaultThreadFactory("test-io"))).build();
        BookieClientImpl bookieClient = (BookieClientImpl)bkc.getClientCtx().getBookieClient();
        LedgerHandle lh = (LedgerHandle)bkc.newCreateLedgerOp().withEnsembleSize(1).withWriteQuorumSize(1).withAckQuorumSize(1).withDigestType(DigestType.CRC32C).withPassword(new byte[0]).execute().join();
        ThreadCounter callbackThreadRecorder = new ThreadCounter();
        List ensemble = (List)lh.getLedgerMetadata().getAllEnsembles().entrySet().iterator().next().getValue();
        DefaultPerChannelBookieClientPool clientPool = (DefaultPerChannelBookieClientPool)bookieClient.lookupClient((BookieId)ensemble.get(0));
        PerChannelBookieClient[] clients = clientPool.clients;
        for (int i = 0; i < clients.length; ++i) {
            clients[i] = new PerChannelBookieClientDecorator(clients[i], (BookieId)ensemble.get(0), callbackThreadRecorder);
        }
        int addCount = 1000;
        CountDownLatch countDownLatch = new CountDownLatch(addCount);
        for (int i = 0; i < addCount; ++i) {
            lh.asyncAddEntry(new byte[]{1}, (rc, lh1, entryId, ctx) -> countDownLatch.countDown(), (Object)i);
        }
        countDownLatch.await();
        for (Thread callbackThread : callbackThreadRecorder.records.keySet()) {
            Assert.assertTrue((String)callbackThread.getName(), (boolean)callbackThread.getName().startsWith("BookKeeperClientWorker"));
        }
    }

    private static class ThreadCounter {
        private final Map<Thread, AtomicInteger> records = new ConcurrentHashMap<Thread, AtomicInteger>();

        private ThreadCounter() {
        }

        public void record() {
            Thread currentThread = Thread.currentThread();
            this.records.computeIfAbsent(currentThread, k -> new AtomicInteger());
            this.records.get(currentThread).incrementAndGet();
        }
    }

    public static class PerChannelBookieClientDecorator
    extends PerChannelBookieClient {
        private final ThreadCounter threadCounter;
        private final AtomicInteger failurePredicate = new AtomicInteger();

        public PerChannelBookieClientDecorator(PerChannelBookieClient client, BookieId addr, ThreadCounter tCounter) throws SecurityException {
            super(client.executor, client.eventLoopGroup, addr, client.bookieAddressResolver);
            this.threadCounter = tCounter;
        }

        protected void addChannelListeners(ChannelFuture future, long connectStartTime) {
            future.addListener((GenericFutureListener)((ChannelFutureListener)future1 -> {
                if (this.failurePredicate.incrementAndGet() % 2 == 1) {
                    future1.channel().close();
                }
            }));
            super.addChannelListeners(future, connectStartTime);
        }

        protected void connectIfNeededAndDoOp(BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient> op) {
            BookieClientImpl.ChannelReadyForAddEntryCallback callback = (BookieClientImpl.ChannelReadyForAddEntryCallback)op;
            BookkeeperInternalCallbacks.WriteCallback originalCallback = callback.cb;
            callback.cb = (rc, ledgerId, entryId, addr, ctx) -> {
                this.threadCounter.record();
                originalCallback.writeComplete(rc, ledgerId, entryId, addr, ctx);
            };
            super.connectIfNeededAndDoOp(op);
        }
    }
}

