/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.client;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerMetadataBuilder;
import org.apache.bookkeeper.client.MetadataUpdateLoop;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.MockLedgerManager;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.commons.lang3.tuple.Triple;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetadataUpdateLoopTest {
    static final Logger LOG = LoggerFactory.getLogger(MetadataUpdateLoopTest.class);

    @Test
    public void testBasicUpdate() throws Exception {
        try (MockLedgerManager lm = new MockLedgerManager();){
            long ledgerId = 1234L;
            LedgerMetadata initMeta = LedgerMetadataBuilder.create().withId(ledgerId).withEnsembleSize(5).withDigestType(DigestType.CRC32C).withPassword(new byte[0]).newEnsembleEntry(0L, (List)Lists.newArrayList((Object[])new BookieId[]{BookieId.parse((String)"0.0.0.0:3181"), BookieId.parse((String)"0.0.0.1:3181"), BookieId.parse((String)"0.0.0.2:3181"), BookieId.parse((String)"0.0.0.3:3181"), BookieId.parse((String)"0.0.0.4:3181")})).build();
            Versioned writtenMetadata = (Versioned)lm.createLedgerMetadata(ledgerId, initMeta).get();
            AtomicReference<Versioned> reference = new AtomicReference<Versioned>(writtenMetadata);
            BookieId newAddress = BookieId.parse((String)"0.0.0.5:3181");
            MetadataUpdateLoop loop = new MetadataUpdateLoop((LedgerManager)lm, ledgerId, reference::get, currentMetadata -> true, currentMetadata -> {
                ArrayList ensemble = Lists.newArrayList((Iterable)currentMetadata.getEnsembleAt(0L));
                ensemble.set(0, newAddress);
                return LedgerMetadataBuilder.from((LedgerMetadata)currentMetadata).replaceEnsembleEntry(0L, (List)ensemble).build();
            }, reference::compareAndSet);
            loop.run().get();
            Assert.assertNotEquals((Object)reference.get(), (Object)writtenMetadata);
            Assert.assertEquals(((LedgerMetadata)reference.get().getValue()).getEnsembleAt(0L).get(0), (Object)newAddress);
        }
    }

    @Test
    public void testConflictOnWrite() throws Exception {
        try (BlockableMockLedgerManager lm = (BlockableMockLedgerManager)Mockito.spy((Object)new BlockableMockLedgerManager());){
            lm.blockWrites();
            long ledgerId = 1234L;
            BookieId b0 = BookieId.parse((String)"0.0.0.0:3181");
            BookieId b1 = BookieId.parse((String)"0.0.0.1:3181");
            BookieId b2 = BookieId.parse((String)"0.0.0.2:3181");
            BookieId b3 = BookieId.parse((String)"0.0.0.3:3181");
            LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(2).withId(ledgerId).withDigestType(DigestType.CRC32C).withPassword(new byte[0]).withWriteQuorumSize(2).newEnsembleEntry(0L, (List)Lists.newArrayList((Object[])new BookieId[]{b0, b1})).build();
            Versioned<LedgerMetadata> writtenMetadata = lm.createLedgerMetadata(ledgerId, initMeta).get();
            AtomicReference<Versioned<LedgerMetadata>> reference1 = new AtomicReference<Versioned<LedgerMetadata>>(writtenMetadata);
            CompletableFuture loop1 = new MetadataUpdateLoop((LedgerManager)lm, ledgerId, reference1::get, currentMetadata -> currentMetadata.getEnsembleAt(0L).contains(b0), currentMetadata -> {
                ArrayList ensemble = Lists.newArrayList((Iterable)currentMetadata.getEnsembleAt(0L));
                ensemble.set(0, b2);
                return LedgerMetadataBuilder.from((LedgerMetadata)currentMetadata).replaceEnsembleEntry(0L, (List)ensemble).build();
            }, reference1::compareAndSet).run();
            AtomicReference<Versioned<LedgerMetadata>> reference2 = new AtomicReference<Versioned<LedgerMetadata>>(writtenMetadata);
            CompletableFuture loop2 = new MetadataUpdateLoop((LedgerManager)lm, ledgerId, reference2::get, currentMetadata -> currentMetadata.getEnsembleAt(0L).contains(b1), currentMetadata -> {
                ArrayList ensemble = Lists.newArrayList((Iterable)currentMetadata.getEnsembleAt(0L));
                ensemble.set(1, b3);
                return LedgerMetadataBuilder.from((LedgerMetadata)currentMetadata).replaceEnsembleEntry(0L, (List)ensemble).build();
            }, reference2::compareAndSet).run();
            lm.releaseWrites();
            Versioned l1meta = (Versioned)loop1.get();
            Versioned l2meta = (Versioned)loop2.get();
            Assert.assertEquals((Object)l1meta, reference1.get());
            Assert.assertEquals((Object)l2meta, reference2.get());
            Assert.assertEquals((Object)l1meta.getVersion().compare(l2meta.getVersion()), (Object)Version.Occurred.BEFORE);
            Assert.assertEquals(((LedgerMetadata)l1meta.getValue()).getEnsembleAt(0L).get(0), (Object)b2);
            Assert.assertEquals(((LedgerMetadata)l1meta.getValue()).getEnsembleAt(0L).get(1), (Object)b1);
            Assert.assertEquals(((LedgerMetadata)l2meta.getValue()).getEnsembleAt(0L).get(0), (Object)b2);
            Assert.assertEquals(((LedgerMetadata)l2meta.getValue()).getEnsembleAt(0L).get(1), (Object)b3);
            ((BlockableMockLedgerManager)Mockito.verify((Object)lm, (VerificationMode)Mockito.times((int)3))).writeLedgerMetadata(ArgumentMatchers.anyLong(), (LedgerMetadata)ArgumentMatchers.any(), (Version)ArgumentMatchers.any());
        }
    }

    @Test
    public void testConflictOnWriteBothWritingSame() throws Exception {
        try (BlockableMockLedgerManager lm = (BlockableMockLedgerManager)Mockito.spy((Object)new BlockableMockLedgerManager());){
            lm.blockWrites();
            long ledgerId = 1234L;
            BookieId b0 = BookieId.parse((String)"0.0.0.0:3181");
            BookieId b1 = BookieId.parse((String)"0.0.0.1:3181");
            BookieId b2 = BookieId.parse((String)"0.0.0.2:3181");
            LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(2).withId(ledgerId).withDigestType(DigestType.CRC32C).withPassword(new byte[0]).withWriteQuorumSize(2).newEnsembleEntry(0L, (List)Lists.newArrayList((Object[])new BookieId[]{b0, b1})).build();
            Versioned<LedgerMetadata> writtenMetadata = lm.createLedgerMetadata(ledgerId, initMeta).get();
            AtomicReference<Versioned<LedgerMetadata>> reference = new AtomicReference<Versioned<LedgerMetadata>>(writtenMetadata);
            CompletableFuture loop1 = new MetadataUpdateLoop((LedgerManager)lm, ledgerId, reference::get, currentMetadata -> currentMetadata.getEnsembleAt(0L).contains(b0), currentMetadata -> {
                ArrayList ensemble = Lists.newArrayList((Iterable)currentMetadata.getEnsembleAt(0L));
                ensemble.set(0, b2);
                return LedgerMetadataBuilder.from((LedgerMetadata)currentMetadata).replaceEnsembleEntry(0L, (List)ensemble).build();
            }, reference::compareAndSet).run();
            CompletableFuture loop2 = new MetadataUpdateLoop((LedgerManager)lm, ledgerId, reference::get, currentMetadata -> currentMetadata.getEnsembleAt(0L).contains(b0), currentMetadata -> {
                ArrayList ensemble = Lists.newArrayList((Iterable)currentMetadata.getEnsembleAt(0L));
                ensemble.set(0, b2);
                return LedgerMetadataBuilder.from((LedgerMetadata)currentMetadata).replaceEnsembleEntry(0L, (List)ensemble).build();
            }, reference::compareAndSet).run();
            lm.releaseWrites();
            Assert.assertEquals(loop1.get(), loop2.get());
            Assert.assertEquals(loop1.get(), reference.get());
            Assert.assertEquals(((LedgerMetadata)reference.get().getValue()).getEnsembleAt(0L).get(0), (Object)b2);
            Assert.assertEquals(((LedgerMetadata)reference.get().getValue()).getEnsembleAt(0L).get(1), (Object)b1);
            ((BlockableMockLedgerManager)Mockito.verify((Object)lm, (VerificationMode)Mockito.times((int)2))).writeLedgerMetadata(ArgumentMatchers.anyLong(), (LedgerMetadata)ArgumentMatchers.any(), (Version)ArgumentMatchers.any());
        }
    }

    @Test
    public void testConflictOnLocalUpdate() throws Exception {
        try (DeferCallbacksMockLedgerManager lm = (DeferCallbacksMockLedgerManager)Mockito.spy((Object)new DeferCallbacksMockLedgerManager(1));){
            long ledgerId = 1234L;
            BookieId b0 = BookieId.parse((String)"0.0.0.0:3181");
            BookieId b1 = BookieId.parse((String)"0.0.0.1:3181");
            BookieId b2 = BookieId.parse((String)"0.0.0.2:3181");
            BookieId b3 = BookieId.parse((String)"0.0.0.3:3181");
            LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(2).withId(ledgerId).withDigestType(DigestType.CRC32C).withPassword(new byte[0]).withWriteQuorumSize(2).newEnsembleEntry(0L, (List)Lists.newArrayList((Object[])new BookieId[]{b0, b1})).build();
            Versioned<LedgerMetadata> writtenMetadata = lm.createLedgerMetadata(ledgerId, initMeta).get();
            AtomicReference<Versioned<LedgerMetadata>> reference = new AtomicReference<Versioned<LedgerMetadata>>(writtenMetadata);
            CompletableFuture loop1 = new MetadataUpdateLoop((LedgerManager)lm, ledgerId, reference::get, currentMetadata -> currentMetadata.getEnsembleAt(0L).contains(b0), currentMetadata -> {
                ArrayList ensemble = Lists.newArrayList((Iterable)currentMetadata.getEnsembleAt(0L));
                ensemble.set(0, b2);
                return LedgerMetadataBuilder.from((LedgerMetadata)currentMetadata).replaceEnsembleEntry(0L, (List)ensemble).build();
            }, reference::compareAndSet).run();
            lm.waitForWriteCount(1);
            CompletableFuture loop2 = new MetadataUpdateLoop((LedgerManager)lm, ledgerId, reference::get, currentMetadata -> currentMetadata.getEnsembleAt(0L).contains(b1), currentMetadata -> {
                ArrayList ensemble = Lists.newArrayList((Iterable)currentMetadata.getEnsembleAt(0L));
                ensemble.set(1, b3);
                return LedgerMetadataBuilder.from((LedgerMetadata)currentMetadata).replaceEnsembleEntry(0L, (List)ensemble).build();
            }, reference::compareAndSet).run();
            Assert.assertEquals(loop2.get(), reference.get());
            lm.runDeferred();
            Assert.assertEquals(loop1.get(), reference.get());
            Assert.assertEquals(((LedgerMetadata)reference.get().getValue()).getEnsembleAt(0L).get(0), (Object)b2);
            Assert.assertEquals(((LedgerMetadata)reference.get().getValue()).getEnsembleAt(0L).get(1), (Object)b3);
            ((DeferCallbacksMockLedgerManager)Mockito.verify((Object)lm, (VerificationMode)Mockito.times((int)3))).writeLedgerMetadata(ArgumentMatchers.anyLong(), (LedgerMetadata)ArgumentMatchers.any(), (Version)ArgumentMatchers.any());
        }
    }

    private static BookieId address(String s) {
        try {
            return BookieId.parse((String)s);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    public void testHammer() throws Exception {
        try (NonDeterministicMockLedgerManager lm = new NonDeterministicMockLedgerManager();){
            long ledgerId = 1234L;
            int ensembleSize = 100;
            List initialEnsemble = IntStream.range(0, ensembleSize).mapToObj(i -> MetadataUpdateLoopTest.address(String.format("0.0.0.%d:3181", i))).collect(Collectors.toList());
            LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(ensembleSize).withId(ledgerId).withDigestType(DigestType.CRC32C).withPassword(new byte[0]).newEnsembleEntry(0L, initialEnsemble).build();
            Versioned<LedgerMetadata> writtenMetadata = lm.createLedgerMetadata(ledgerId, initMeta).get();
            AtomicReference<Versioned<LedgerMetadata>> reference = new AtomicReference<Versioned<LedgerMetadata>>(writtenMetadata);
            List replacementBookies = IntStream.range(0, ensembleSize).mapToObj(i -> MetadataUpdateLoopTest.address(String.format("0.0.%d.1:3181", i))).collect(Collectors.toList());
            List<CompletableFuture> loops = IntStream.range(0, ensembleSize).mapToObj(i -> new MetadataUpdateLoop((LedgerManager)lm, ledgerId, reference::get, currentMetadata -> currentMetadata.getEnsembleAt(0L).contains(initialEnsemble.get(i)), currentMetadata -> {
                ArrayList ensemble = Lists.newArrayList((Iterable)currentMetadata.getEnsembleAt(0L));
                ensemble.set(i, (BookieId)replacementBookies.get(i));
                return LedgerMetadataBuilder.from((LedgerMetadata)currentMetadata).replaceEnsembleEntry(0L, (List)ensemble).build();
            }, reference::compareAndSet).run()).collect(Collectors.toList());
            loops.forEach(l -> l.join());
            Assert.assertEquals((Object)((LedgerMetadata)reference.get().getValue()).getEnsembleAt(0L), replacementBookies);
        }
    }

    @Test
    public void testNewestValueCannotBeUsedAfterReadBack() throws Exception {
        try (BlockableMockLedgerManager lm = (BlockableMockLedgerManager)Mockito.spy((Object)new BlockableMockLedgerManager());){
            lm.blockWrites();
            long ledgerId = 1234L;
            BookieId b0 = new BookieSocketAddress("0.0.0.0:3181").toBookieId();
            BookieId b1 = new BookieSocketAddress("0.0.0.1:3181").toBookieId();
            LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(1).withId(ledgerId).withDigestType(DigestType.CRC32C).withPassword(new byte[0]).withWriteQuorumSize(1).withAckQuorumSize(1).newEnsembleEntry(0L, (List)Lists.newArrayList((Object[])new BookieId[]{b0})).build();
            Versioned<LedgerMetadata> writtenMetadata = lm.createLedgerMetadata(ledgerId, initMeta).get();
            AtomicReference<Versioned<LedgerMetadata>> reference = new AtomicReference<Versioned<LedgerMetadata>>(writtenMetadata);
            CompletableFuture loop1 = new MetadataUpdateLoop((LedgerManager)lm, ledgerId, reference::get, currentMetadata -> !currentMetadata.isClosed(), currentMetadata -> LedgerMetadataBuilder.from((LedgerMetadata)currentMetadata).withClosedState().withLastEntryId(10L).withLength(100L).build(), reference::compareAndSet).run();
            CompletableFuture loop2 = new MetadataUpdateLoop((LedgerManager)lm, ledgerId, reference::get, currentMetadata -> {
                if (currentMetadata.isClosed()) {
                    throw new BKException.BKLedgerClosedException();
                }
                return currentMetadata.getEnsembleAt(0L).contains(b0);
            }, currentMetadata -> {
                ArrayList ensemble = Lists.newArrayList((Iterable)currentMetadata.getEnsembleAt(0L));
                ensemble.set(0, b1);
                return LedgerMetadataBuilder.from((LedgerMetadata)currentMetadata).replaceEnsembleEntry(0L, (List)ensemble).build();
            }, reference::compareAndSet).run();
            lm.releaseWrites();
            Versioned l1meta = (Versioned)loop1.get();
            try {
                loop2.get();
                Assert.fail((String)"Update loop should have failed");
            }
            catch (ExecutionException ee) {
                Assert.assertEquals(ee.getCause().getClass(), BKException.BKLedgerClosedException.class);
            }
            Assert.assertEquals((Object)l1meta, reference.get());
            Assert.assertEquals(((LedgerMetadata)l1meta.getValue()).getEnsembleAt(0L).get(0), (Object)b0);
            Assert.assertTrue((boolean)((LedgerMetadata)l1meta.getValue()).isClosed());
            ((BlockableMockLedgerManager)Mockito.verify((Object)lm, (VerificationMode)Mockito.times((int)2))).writeLedgerMetadata(ArgumentMatchers.anyLong(), (LedgerMetadata)ArgumentMatchers.any(), (Version)ArgumentMatchers.any());
        }
    }

    static class BlockableMockLedgerManager
    extends MockLedgerManager {
        boolean blocking = false;
        List<DeferredUpdate> reqs = Lists.newArrayList();

        BlockableMockLedgerManager() {
        }

        synchronized void blockWrites() {
            this.blocking = true;
        }

        synchronized void releaseWrites() {
            this.blocking = false;
            this.reqs.forEach(r -> super.writeLedgerMetadata(r.getLedgerId(), r.getMetadata(), r.getCurrentVersion()).whenComplete((written, exception) -> {
                if (exception != null) {
                    r.getPromise().completeExceptionally((Throwable)exception);
                } else {
                    r.getPromise().complete((Versioned<LedgerMetadata>)written);
                }
            }));
        }

        @Override
        public synchronized CompletableFuture<Versioned<LedgerMetadata>> writeLedgerMetadata(long ledgerId, LedgerMetadata metadata, Version currentVersion) {
            if (this.blocking) {
                CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<Versioned<LedgerMetadata>>();
                this.reqs.add(new DeferredUpdate(promise, ledgerId, metadata, currentVersion));
                return promise;
            }
            return super.writeLedgerMetadata(ledgerId, metadata, currentVersion);
        }
    }

    static class DeferredUpdate {
        final CompletableFuture<Versioned<LedgerMetadata>> promise;
        final long ledgerId;
        final LedgerMetadata metadata;
        final Version currentVersion;

        public CompletableFuture<Versioned<LedgerMetadata>> getPromise() {
            return this.promise;
        }

        public long getLedgerId() {
            return this.ledgerId;
        }

        public LedgerMetadata getMetadata() {
            return this.metadata;
        }

        public Version getCurrentVersion() {
            return this.currentVersion;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof DeferredUpdate)) {
                return false;
            }
            DeferredUpdate other = (DeferredUpdate)o;
            if (!other.canEqual(this)) {
                return false;
            }
            if (this.getLedgerId() != other.getLedgerId()) {
                return false;
            }
            CompletableFuture<Versioned<LedgerMetadata>> this$promise = this.getPromise();
            CompletableFuture<Versioned<LedgerMetadata>> other$promise = other.getPromise();
            if (this$promise == null ? other$promise != null : !this$promise.equals(other$promise)) {
                return false;
            }
            LedgerMetadata this$metadata = this.getMetadata();
            LedgerMetadata other$metadata = other.getMetadata();
            if (this$metadata == null ? other$metadata != null : !this$metadata.equals(other$metadata)) {
                return false;
            }
            Version this$currentVersion = this.getCurrentVersion();
            Version other$currentVersion = other.getCurrentVersion();
            return !(this$currentVersion == null ? other$currentVersion != null : !this$currentVersion.equals(other$currentVersion));
        }

        protected boolean canEqual(Object other) {
            return other instanceof DeferredUpdate;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            long $ledgerId = this.getLedgerId();
            result = result * 59 + (int)($ledgerId >>> 32 ^ $ledgerId);
            CompletableFuture<Versioned<LedgerMetadata>> $promise = this.getPromise();
            result = result * 59 + ($promise == null ? 43 : $promise.hashCode());
            LedgerMetadata $metadata = this.getMetadata();
            result = result * 59 + ($metadata == null ? 43 : $metadata.hashCode());
            Version $currentVersion = this.getCurrentVersion();
            result = result * 59 + ($currentVersion == null ? 43 : $currentVersion.hashCode());
            return result;
        }

        public String toString() {
            return "MetadataUpdateLoopTest.DeferredUpdate(promise=" + this.getPromise() + ", ledgerId=" + this.getLedgerId() + ", metadata=" + this.getMetadata() + ", currentVersion=" + this.getCurrentVersion() + ")";
        }

        public DeferredUpdate(CompletableFuture<Versioned<LedgerMetadata>> promise, long ledgerId, LedgerMetadata metadata, Version currentVersion) {
            this.promise = promise;
            this.ledgerId = ledgerId;
            this.metadata = metadata;
            this.currentVersion = currentVersion;
        }
    }

    static class DeferCallbacksMockLedgerManager
    extends MockLedgerManager {
        int writeCount = 0;
        final int numToDefer;
        List<Triple<CompletableFuture<Versioned<LedgerMetadata>>, Versioned<LedgerMetadata>, Throwable>> deferred = Lists.newArrayList();

        DeferCallbacksMockLedgerManager(int numToDefer) {
            this.numToDefer = numToDefer;
        }

        synchronized void runDeferred() {
            this.deferred.forEach(d -> {
                Throwable t = (Throwable)d.getRight();
                if (t != null) {
                    ((CompletableFuture)d.getLeft()).completeExceptionally(t);
                } else {
                    ((CompletableFuture)d.getLeft()).complete((Versioned)d.getMiddle());
                }
            });
        }

        synchronized void waitForWriteCount(int count) throws Exception {
            while (this.writeCount < count) {
                this.wait();
            }
        }

        @Override
        public synchronized CompletableFuture<Versioned<LedgerMetadata>> writeLedgerMetadata(long ledgerId, LedgerMetadata metadata, Version currentVersion) {
            CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<Versioned<LedgerMetadata>>();
            super.writeLedgerMetadata(ledgerId, metadata, currentVersion).whenComplete((written, exception) -> {
                DeferCallbacksMockLedgerManager deferCallbacksMockLedgerManager = this;
                synchronized (deferCallbacksMockLedgerManager) {
                    if (this.writeCount++ < this.numToDefer) {
                        LOG.info("Added to deferals");
                        this.deferred.add((Triple<CompletableFuture<Versioned<LedgerMetadata>>, Versioned<LedgerMetadata>, Throwable>)Triple.of((Object)promise, (Object)written, (Object)exception));
                    } else {
                        LOG.info("Completing {}", (Object)this.numToDefer);
                        if (exception != null) {
                            promise.completeExceptionally((Throwable)exception);
                        } else {
                            promise.complete((Versioned<LedgerMetadata>)written);
                        }
                    }
                    this.notifyAll();
                }
            });
            return promise;
        }
    }

    static class NonDeterministicMockLedgerManager
    extends MockLedgerManager {
        final ExecutorService cbExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("non-deter-%d").build());

        NonDeterministicMockLedgerManager() {
        }

        @Override
        public void executeCallback(Runnable r) {
            this.cbExecutor.execute(r);
        }

        @Override
        public void close() {
            this.cbExecutor.shutdownNow();
            super.close();
        }
    }
}

