/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.dcp.conductor;

import com.couchbase.client.dcp.buffer.DcpOps;
import com.couchbase.client.dcp.conductor.Conductor;
import com.couchbase.client.dcp.conductor.DcpChannelControlHandler;
import com.couchbase.client.dcp.conductor.NotMyVbucketException;
import com.couchbase.client.dcp.config.ClientEnvironment;
import com.couchbase.client.dcp.core.logging.RedactableArgument;
import com.couchbase.client.dcp.core.state.AbstractStateMachine;
import com.couchbase.client.dcp.core.state.LifecycleState;
import com.couchbase.client.dcp.core.state.NotConnectedException;
import com.couchbase.client.dcp.core.time.Delay;
import com.couchbase.client.dcp.core.utils.DefaultObjectMapper;
import com.couchbase.client.dcp.deps.io.netty.bootstrap.Bootstrap;
import com.couchbase.client.dcp.deps.io.netty.buffer.AbstractByteBufAllocator;
import com.couchbase.client.dcp.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.dcp.deps.io.netty.buffer.PooledByteBufAllocator;
import com.couchbase.client.dcp.deps.io.netty.buffer.Unpooled;
import com.couchbase.client.dcp.deps.io.netty.buffer.UnpooledByteBufAllocator;
import com.couchbase.client.dcp.deps.io.netty.channel.Channel;
import com.couchbase.client.dcp.deps.io.netty.channel.ChannelFuture;
import com.couchbase.client.dcp.deps.io.netty.channel.ChannelFutureListener;
import com.couchbase.client.dcp.deps.io.netty.channel.ChannelOption;
import com.couchbase.client.dcp.deps.io.netty.util.ReferenceCountUtil;
import com.couchbase.client.dcp.deps.io.netty.util.concurrent.Future;
import com.couchbase.client.dcp.deps.io.netty.util.concurrent.GenericFutureListener;
import com.couchbase.client.dcp.deps.io.netty.util.concurrent.ImmediateEventExecutor;
import com.couchbase.client.dcp.error.RollbackException;
import com.couchbase.client.dcp.highlevel.StreamOffset;
import com.couchbase.client.dcp.highlevel.internal.CollectionsManifest;
import com.couchbase.client.dcp.message.DcpCloseStreamRequest;
import com.couchbase.client.dcp.message.DcpFailoverLogRequest;
import com.couchbase.client.dcp.message.DcpFailoverLogResponse;
import com.couchbase.client.dcp.message.DcpGetPartitionSeqnosRequest;
import com.couchbase.client.dcp.message.DcpOpenStreamRequest;
import com.couchbase.client.dcp.message.DcpOpenStreamResponse;
import com.couchbase.client.dcp.message.HelloFeature;
import com.couchbase.client.dcp.message.MessageUtil;
import com.couchbase.client.dcp.message.ResponseStatus;
import com.couchbase.client.dcp.message.RollbackMessage;
import com.couchbase.client.dcp.message.VbucketState;
import com.couchbase.client.dcp.metrics.DcpChannelMetrics;
import com.couchbase.client.dcp.metrics.MetricsContext;
import com.couchbase.client.dcp.transport.netty.ChannelFlowController;
import com.couchbase.client.dcp.transport.netty.ChannelUtils;
import com.couchbase.client.dcp.transport.netty.DcpMessageHandler;
import com.couchbase.client.dcp.transport.netty.DcpPipeline;
import com.couchbase.client.dcp.transport.netty.DcpResponse;
import com.couchbase.client.dcp.transport.netty.DcpResponseListener;
import com.couchbase.client.dcp.util.AdaptiveDelay;
import com.couchbase.client.dcp.util.AtomicBooleanArray;
import com.couchbase.client.dcp.util.retry.RetryBuilder;
import io.micrometer.core.instrument.Tags;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Completable;
import rx.CompletableSubscriber;
import rx.Single;
import rx.SingleSubscriber;
import rx.Subscription;
import rx.functions.Action4;
import rx.functions.Func1;

public class DcpChannel
extends AbstractStateMachine<LifecycleState> {
    private static final Logger LOGGER = LoggerFactory.getLogger(DcpChannel.class);
    private final DcpChannelControlHandler controlHandler;
    private volatile boolean isShutdown;
    private volatile Channel channel;
    private volatile ChannelFuture connectFuture;
    private final DcpChannelMetrics metrics;
    private final AdaptiveDelay reconnectDelay = new AdaptiveDelay(Delay.exponential(TimeUnit.MILLISECONDS, 4096L, 32L), Duration.ofSeconds(10L));
    final ClientEnvironment env;
    final InetSocketAddress inetAddress;
    final AtomicBooleanArray streamIsOpen = new AtomicBooleanArray(1024);
    final Conductor conductor;

    public DcpChannel(InetSocketAddress inetAddress, ClientEnvironment env, Conductor conductor) {
        super(LifecycleState.DISCONNECTED);
        this.inetAddress = inetAddress;
        this.env = env;
        this.conductor = conductor;
        this.controlHandler = new DcpChannelControlHandler(this);
        this.isShutdown = false;
        this.metrics = new DcpChannelMetrics(new MetricsContext("dcp", Tags.of((String)"remote", (String)inetAddress.toString())));
    }

    public Future<DcpResponse> sendRequest(ByteBuf message) {
        if (this.channel == null) {
            ReferenceCountUtil.safeRelease(message);
            return ImmediateEventExecutor.INSTANCE.newFailedFuture(new NotConnectedException("Failed to issue request; channel is not active."));
        }
        return this.channel.pipeline().get(DcpMessageHandler.class).sendRequest(message);
    }

    public Completable connect() {
        return Completable.create((Completable.OnSubscribe)new Completable.OnSubscribe(){

            public void call(final CompletableSubscriber subscriber) {
                if (DcpChannel.this.isShutdown || DcpChannel.this.state() != LifecycleState.DISCONNECTED) {
                    subscriber.onCompleted();
                    return;
                }
                AbstractByteBufAllocator allocator = DcpChannel.this.env.poolBuffers() ? PooledByteBufAllocator.DEFAULT : UnpooledByteBufAllocator.DEFAULT;
                Bootstrap bootstrap = (Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().option(ChannelOption.ALLOCATOR, allocator)).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int)DcpChannel.this.env.socketConnectTimeout())).remoteAddress(DcpChannel.this.inetAddress).channel(ChannelUtils.channelForEventLoopGroup(DcpChannel.this.env.eventLoopGroup()))).handler(new DcpPipeline(DcpChannel.this.env, DcpChannel.this.controlHandler, DcpChannel.this.conductor.bucketConfigArbiter(), DcpChannel.this.metrics))).group(DcpChannel.this.env.eventLoopGroup());
                DcpChannel.this.transitionState(LifecycleState.CONNECTING);
                DcpChannel.this.connectFuture = DcpChannel.this.metrics.trackConnect(bootstrap.connect());
                DcpChannel.this.connectFuture.addListener((GenericFutureListener<? extends Future<? super Void>>)new GenericFutureListener<ChannelFuture>(){

                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (future.isSuccess()) {
                            DcpChannel.this.channel = future.channel();
                            DcpChannel.this.metrics.trackDisconnect(DcpChannel.this.channel.closeFuture());
                            if (DcpChannel.this.isShutdown) {
                                LOGGER.info("Connected Node {}, but got instructed to disconnect in the meantime.", (Object)RedactableArgument.system(DcpChannel.this.inetAddress));
                                DcpChannel.this.disconnect().subscribe(new CompletableSubscriber(){

                                    public void onCompleted() {
                                        subscriber.onCompleted();
                                    }

                                    public void onError(Throwable e) {
                                        LOGGER.warn("Got error during disconnect.", e);
                                    }

                                    public void onSubscribe(Subscription d) {
                                    }
                                });
                            } else {
                                DcpChannel.this.transitionState(LifecycleState.CONNECTED);
                                LOGGER.info("Connected to Node {}", (Object)RedactableArgument.system(DcpChannel.this.inetAddress));
                                DcpChannel.this.channel.closeFuture().addListener((GenericFutureListener<? extends Future<? super Void>>)new GenericFutureListener<ChannelFuture>(){

                                    @Override
                                    public void operationComplete(ChannelFuture future) throws Exception {
                                        LOGGER.debug("Got notified of channel close on Node {}", (Object)DcpChannel.this.inetAddress);
                                        if (DcpChannel.this.env.persistencePollingEnabled()) {
                                            for (short vbid = 0; vbid < DcpChannel.this.streamIsOpen.length(); vbid = (short)((short)(vbid + 1))) {
                                                if (!DcpChannel.this.streamIsOpen.get(vbid)) continue;
                                                DcpChannel.this.env.streamEventBuffer().clear(vbid);
                                            }
                                        }
                                        DcpChannel.this.transitionState(LifecycleState.DISCONNECTED);
                                        if (!DcpChannel.this.isShutdown) {
                                            DcpChannel.this.dispatchReconnect();
                                        }
                                        DcpChannel.this.channel = null;
                                    }
                                });
                                subscriber.onCompleted();
                            }
                        } else {
                            LOGGER.info("Connect attempt to {} failed.", (Object)RedactableArgument.system(DcpChannel.this.inetAddress), (Object)future.cause());
                            DcpChannel.this.transitionState(LifecycleState.DISCONNECTED);
                            subscriber.onError(future.cause());
                        }
                    }
                });
            }
        });
    }

    private void dispatchReconnect() {
        if (this.isShutdown) {
            LOGGER.debug("Ignoring reconnect on {} because already shutdown.", (Object)this.inetAddress);
            return;
        }
        LOGGER.info("Node {} socket closed, initiating reconnect.", (Object)RedactableArgument.system(this.inetAddress));
        long delayMillis = this.reconnectDelay.calculate().toMillis();
        if (delayMillis > 0L) {
            LOGGER.info("Delaying reconnection attempt by {}ms", (Object)delayMillis);
        }
        Completable.timer((long)delayMillis, (TimeUnit)TimeUnit.MILLISECONDS).andThen(this.connect().retryWhen((Func1)RetryBuilder.any().max(Integer.MAX_VALUE).delay(Delay.exponential(TimeUnit.MILLISECONDS, 4096L, 32L)).doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>(){

            public void call(Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) {
                LOGGER.debug("Rescheduling Node reconnect for DCP channel {}", (Object)DcpChannel.this.inetAddress);
            }
        }).build())).subscribe(new CompletableSubscriber(){

            public void onCompleted() {
                LOGGER.debug("Completed Node connect for DCP channel {}", (Object)DcpChannel.this.inetAddress);
                for (short vbid = 0; vbid < DcpChannel.this.streamIsOpen.length(); vbid = (short)(vbid + 1)) {
                    if (!DcpChannel.this.streamIsOpen.get(vbid)) continue;
                    DcpChannel.this.conductor.maybeMovePartition(vbid);
                }
            }

            public void onError(Throwable e) {
                LOGGER.warn("Got error during connect (maybe retried) for node {}", (Object)RedactableArgument.system(DcpChannel.this.inetAddress), (Object)e);
            }

            public void onSubscribe(Subscription d) {
            }
        });
    }

    public boolean isShutdown() {
        return this.isShutdown;
    }

    public Completable disconnect() {
        return Completable.create((Completable.OnSubscribe)new Completable.OnSubscribe(){

            public void call(final CompletableSubscriber subscriber) {
                DcpChannel.this.isShutdown = true;
                if (DcpChannel.this.channel != null) {
                    DcpChannel.this.transitionState(LifecycleState.DISCONNECTING);
                    ChannelFuture closeFuture = DcpChannel.this.metrics.trackDisconnect(DcpChannel.this.channel.close());
                    closeFuture.addListener((GenericFutureListener<? extends Future<? super Void>>)new GenericFutureListener<ChannelFuture>(){

                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            DcpChannel.this.transitionState(LifecycleState.DISCONNECTED);
                            LOGGER.info("Disconnected from Node {}", (Object)RedactableArgument.system(DcpChannel.this.address()));
                            if (future.isSuccess()) {
                                subscriber.onCompleted();
                            } else {
                                LOGGER.debug("Error during channel close.", future.cause());
                                subscriber.onError(future.cause());
                            }
                        }
                    });
                } else if (DcpChannel.this.connectFuture != null) {
                    DcpChannel.this.connectFuture.addListener(new ChannelFutureListener(){

                        @Override
                        public void operationComplete(ChannelFuture channelFuture) throws Exception {
                            if (channelFuture.isSuccess()) {
                                channelFuture.channel().closeFuture().addListener(new ChannelFutureListener(){

                                    @Override
                                    public void operationComplete(ChannelFuture closeFuture) throws Exception {
                                        if (closeFuture.isSuccess()) {
                                            subscriber.onCompleted();
                                        } else {
                                            subscriber.onError(closeFuture.cause());
                                        }
                                    }
                                });
                            } else {
                                subscriber.onCompleted();
                            }
                        }
                    });
                } else {
                    subscriber.onCompleted();
                }
            }
        });
    }

    public InetSocketAddress address() {
        return this.inetAddress;
    }

    public Single<Optional<CollectionsManifest>> getCollectionsManifest() {
        return Single.create((Single.OnSubscribe)new Single.OnSubscribe<Optional<CollectionsManifest>>(){

            public void call(final SingleSubscriber<? super Optional<CollectionsManifest>> subscriber) {
                if (DcpChannel.this.state() != LifecycleState.CONNECTED) {
                    subscriber.onError((Throwable)new NotConnectedException());
                    return;
                }
                if (!HelloFeature.COLLECTIONS.isEnabled(DcpChannel.this.channel)) {
                    subscriber.onSuccess(Optional.empty());
                    return;
                }
                ByteBuf buffer = Unpooled.buffer();
                MessageUtil.initRequest((byte)-70, buffer);
                DcpChannel.this.sendRequest(buffer).addListener(new DcpResponseListener(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void operationComplete(Future<DcpResponse> future) throws Exception {
                        if (!future.isSuccess()) {
                            if (future.cause() instanceof NotConnectedException) {
                                LOGGER.debug("Failed to get collections manifest from {}; {}", (Object)DcpChannel.this.inetAddress, (Object)future.cause().toString());
                            } else {
                                LOGGER.warn("Failed to get collections manifest from {}; {}", (Object)DcpChannel.this.inetAddress, (Object)future.cause().toString());
                            }
                            subscriber.onError(future.cause());
                            return;
                        }
                        DcpResponse dcpResponse = future.getNow();
                        ByteBuf buf = dcpResponse.buffer();
                        try {
                            ResponseStatus status = dcpResponse.status();
                            if (!status.isSuccess()) {
                                LOGGER.warn("Failed to get collections manifest from {}, response status: {}", (Object)DcpChannel.this.inetAddress, (Object)status);
                                subscriber.onError((Throwable)new DcpOps.BadResponseStatusException(status));
                                return;
                            }
                            byte[] manifestJsonBytes = MessageUtil.getContentAsByteArray(buf);
                            LOGGER.debug("Got collections manifest from {} ; {}", (Object)DcpChannel.this.inetAddress, (Object)new String(manifestJsonBytes, StandardCharsets.UTF_8));
                            try {
                                CollectionsManifest manifest = CollectionsManifest.fromJson(manifestJsonBytes);
                                subscriber.onSuccess(Optional.of(manifest));
                            }
                            catch (Exception e) {
                                LOGGER.error("Unparsable collections manifest from {} ; {}", new Object[]{RedactableArgument.system(DcpChannel.this.inetAddress), RedactableArgument.user(new String(manifestJsonBytes, StandardCharsets.UTF_8)), e});
                                subscriber.onError((Throwable)new RuntimeException("Failed to parse collections manifest", e));
                            }
                        }
                        finally {
                            buf.release();
                        }
                    }
                });
            }
        });
    }

    public Completable openStream(final short vbid, final StreamOffset startOffset, final long endSeqno, final CollectionsManifest manifest) {
        return Completable.create((Completable.OnSubscribe)new Completable.OnSubscribe(){

            public void call(final CompletableSubscriber subscriber) {
                long snapshotEndSeqno;
                long snapshotStartSeqno;
                if (DcpChannel.this.state() != LifecycleState.CONNECTED) {
                    subscriber.onError((Throwable)new NotConnectedException());
                    return;
                }
                long startSeqno = startOffset.getSeqno();
                long origSnapshotStartSeqno = startOffset.getSnapshot().getStartSeqno();
                long origSnapshotEndSeqno = startOffset.getSnapshot().getEndSeqno();
                long vbuuid = startOffset.getVbuuid();
                long collectionsManifestuid = startOffset.getCollectionsManifestUid();
                if (origSnapshotStartSeqno == startSeqno + 1L) {
                    LOGGER.debug("Disregarding snapshot marker from the future.");
                    snapshotStartSeqno = startSeqno;
                    snapshotEndSeqno = startSeqno;
                } else {
                    snapshotEndSeqno = origSnapshotEndSeqno;
                    snapshotStartSeqno = origSnapshotStartSeqno;
                }
                LOGGER.debug("Opening Stream against {} with vbid: {}, vbuuid: {}, startSeqno: {}, endSeqno: {},  snapshotStartSeqno: {}, snapshotEndSeqno: {}, manifest: {}", new Object[]{DcpChannel.this.inetAddress, vbid, vbuuid, startSeqno, endSeqno, snapshotStartSeqno, snapshotEndSeqno, manifest});
                ByteBuf buffer = Unpooled.buffer();
                DcpOpenStreamRequest.init(buffer, Collections.emptySet(), vbid);
                DcpOpenStreamRequest.vbuuid(buffer, vbuuid);
                DcpOpenStreamRequest.startSeqno(buffer, startSeqno);
                DcpOpenStreamRequest.endSeqno(buffer, endSeqno);
                DcpOpenStreamRequest.snapshotStartSeqno(buffer, snapshotStartSeqno);
                DcpOpenStreamRequest.snapshotEndSeqno(buffer, snapshotEndSeqno);
                if (HelloFeature.COLLECTIONS.isEnabled(DcpChannel.this.channel)) {
                    OptionalLong scopeId;
                    HashSet<Long> collectionIds = new HashSet<Long>(DcpChannel.this.env.collectionIds());
                    DcpChannel.this.env.collectionNames().forEach(name -> {
                        CollectionsManifest.CollectionInfo c = manifest.getCollection((String)name);
                        if (c == null) {
                            subscriber.onError((Throwable)new RuntimeException("Can't stream from collection '" + name + "' because it does not exist (not present in the collections manifest)."));
                            return;
                        }
                        LOGGER.debug("resolved collection name '{}' to UID {}", name, (Object)c.id());
                        collectionIds.add(c.id());
                    });
                    if (DcpChannel.this.env.scopeName().isPresent()) {
                        String scopeName = DcpChannel.this.env.scopeName().get();
                        CollectionsManifest.ScopeInfo s = manifest.getScope(scopeName);
                        if (s == null) {
                            subscriber.onError((Throwable)new RuntimeException("Can't stream from scope '" + scopeName + "' because it does not exist (not present in the collections manifest)."));
                            return;
                        }
                        LOGGER.debug("resolved scope name '{}' to UID {}", (Object)scopeName, (Object)s.id());
                        scopeId = OptionalLong.of(s.id());
                    } else {
                        scopeId = DcpChannel.this.env.scopeId();
                    }
                    HashMap<String, Object> value = new HashMap<String, Object>();
                    value.put("uid", DcpChannel.formatUid(collectionsManifestuid));
                    if (!collectionIds.isEmpty()) {
                        value.put("collections", DcpChannel.formatUids(collectionIds));
                    } else if (scopeId.isPresent()) {
                        value.put("scope", DcpChannel.formatUid(scopeId.getAsLong()));
                    }
                    try {
                        byte[] bytes = DefaultObjectMapper.writeValueAsBytes(value);
                        LOGGER.debug("opening stream for partition {} with value: {}", (Object)vbid, (Object)new String(bytes, StandardCharsets.UTF_8));
                        MessageUtil.setContent(bytes, buffer);
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
                DcpChannel.this.sendRequest(buffer).addListener(new DcpResponseListener(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void operationComplete(Future<DcpResponse> future) throws Exception {
                        if (!future.isSuccess()) {
                            LOGGER.debug("Failed open Stream against {} with vbid: {}", (Object)DcpChannel.this.inetAddress, (Object)vbid);
                            DcpChannel.this.streamIsOpen.set(vbid, false);
                            subscriber.onError(future.cause());
                            return;
                        }
                        DcpResponse dcpResponse = future.getNow();
                        ByteBuf buf = dcpResponse.buffer();
                        try {
                            ResponseStatus status = dcpResponse.status();
                            if (status == ResponseStatus.KEY_EXISTS) {
                                LOGGER.debug("Stream already open against {} with vbid: {}", (Object)DcpChannel.this.inetAddress, (Object)vbid);
                                subscriber.onCompleted();
                                return;
                            }
                            if (!status.isSuccess()) {
                                LOGGER.debug("Failed open Stream against {} with vbid: {}", (Object)DcpChannel.this.inetAddress, (Object)vbid);
                                DcpChannel.this.streamIsOpen.set(vbid, false);
                            }
                            if (status.isSuccess()) {
                                LOGGER.debug("Opened Stream against {} with vbid: {}", (Object)DcpChannel.this.inetAddress, (Object)vbid);
                                DcpChannel.this.streamIsOpen.set(vbid, true);
                                subscriber.onCompleted();
                                ByteBuf flog = Unpooled.buffer();
                                DcpFailoverLogResponse.init(flog);
                                DcpFailoverLogResponse.vbucket(flog, DcpOpenStreamResponse.vbucket(buf));
                                ByteBuf content = MessageUtil.getContent(buf).copy().writeShort(vbid);
                                MessageUtil.setContent(content, flog);
                                content.release();
                                DcpChannel.this.env.controlEventHandler().onEvent(ChannelFlowController.dummy, flog);
                            } else if (status == ResponseStatus.ROLLBACK_REQUIRED) {
                                subscriber.onError((Throwable)new RollbackException());
                                ByteBuf rb = Unpooled.buffer();
                                RollbackMessage.init(rb, vbid, DcpOpenStreamResponse.rollbackSeqno(buf));
                                DcpChannel.this.env.controlEventHandler().onEvent(ChannelFlowController.dummy, rb);
                            } else if (status == ResponseStatus.NOT_MY_VBUCKET) {
                                subscriber.onError((Throwable)new NotMyVbucketException());
                            } else {
                                subscriber.onError((Throwable)new DcpOps.BadResponseStatusException(status));
                            }
                        }
                        finally {
                            buf.release();
                        }
                    }
                });
            }
        });
    }

    private static String formatUid(long uid) {
        return Long.toHexString(uid);
    }

    private static List<String> formatUids(Collection<Long> uids) {
        return uids.stream().map(DcpChannel::formatUid).collect(Collectors.toList());
    }

    public Completable closeStream(final short vbid) {
        return Completable.create((Completable.OnSubscribe)new Completable.OnSubscribe(){

            public void call(final CompletableSubscriber subscriber) {
                if (DcpChannel.this.state() != LifecycleState.CONNECTED) {
                    subscriber.onError((Throwable)new NotConnectedException());
                    return;
                }
                LOGGER.debug("Closing Stream against {} with vbid: {}", (Object)DcpChannel.this.inetAddress, (Object)vbid);
                ByteBuf buffer = Unpooled.buffer();
                DcpCloseStreamRequest.init(buffer);
                DcpCloseStreamRequest.vbucket(buffer, vbid);
                DcpChannel.this.sendRequest(buffer).addListener(new DcpResponseListener(){

                    @Override
                    public void operationComplete(Future<DcpResponse> future) throws Exception {
                        DcpChannel.this.streamIsOpen.set(vbid, false);
                        if (future.isSuccess()) {
                            future.getNow().buffer().release();
                            LOGGER.debug("Closed Stream against {} with vbid: {}", (Object)DcpChannel.this.inetAddress, (Object)vbid);
                            subscriber.onCompleted();
                        } else {
                            LOGGER.debug("Failed close Stream against {} with vbid: {}", (Object)DcpChannel.this.inetAddress, (Object)vbid);
                            subscriber.onError(future.cause());
                        }
                    }
                });
            }
        });
    }

    public Single<ByteBuf> getSeqnos() {
        return Single.create((Single.OnSubscribe)new Single.OnSubscribe<ByteBuf>(){

            public void call(final SingleSubscriber<? super ByteBuf> subscriber) {
                if (DcpChannel.this.state() != LifecycleState.CONNECTED) {
                    subscriber.onError((Throwable)new NotConnectedException());
                    return;
                }
                ByteBuf buffer = Unpooled.buffer();
                DcpGetPartitionSeqnosRequest.init(buffer);
                DcpGetPartitionSeqnosRequest.vbucketState(buffer, VbucketState.ACTIVE);
                DcpChannel.this.sendRequest(buffer).addListener(new DcpResponseListener(){

                    @Override
                    public void operationComplete(Future<DcpResponse> future) throws Exception {
                        if (future.isSuccess()) {
                            ByteBuf buf = future.getNow().buffer();
                            try {
                                subscriber.onSuccess((Object)MessageUtil.getContent(buf).copy());
                            }
                            finally {
                                buf.release();
                            }
                        } else {
                            subscriber.onError(future.cause());
                        }
                    }
                });
            }
        });
    }

    public Single<ByteBuf> getFailoverLog(final short vbid) {
        return Single.create((Single.OnSubscribe)new Single.OnSubscribe<ByteBuf>(){

            public void call(final SingleSubscriber<? super ByteBuf> subscriber) {
                if (DcpChannel.this.state() != LifecycleState.CONNECTED) {
                    subscriber.onError((Throwable)new NotConnectedException());
                    return;
                }
                ByteBuf buffer = Unpooled.buffer();
                DcpFailoverLogRequest.init(buffer);
                DcpFailoverLogRequest.vbucket(buffer, vbid);
                LOGGER.debug("Asked for failover log on {} for vbid: {}", (Object)DcpChannel.this.inetAddress, (Object)vbid);
                DcpChannel.this.sendRequest(buffer).addListener(new DcpResponseListener(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void operationComplete(Future<DcpResponse> future) throws Exception {
                        if (!future.isSuccess()) {
                            LOGGER.debug("Failed to ask for failover log on {} for vbid: {}", (Object)DcpChannel.this.inetAddress, (Object)vbid);
                            subscriber.onError(future.cause());
                            return;
                        }
                        ByteBuf buf = future.getNow().buffer();
                        try {
                            ByteBuf flog = Unpooled.buffer();
                            DcpFailoverLogResponse.init(flog);
                            DcpFailoverLogResponse.vbucket(flog, DcpFailoverLogResponse.vbucket(buf));
                            ByteBuf copiedBuf = MessageUtil.getContent(buf).copy().writeShort(vbid);
                            MessageUtil.setContent(copiedBuf, flog);
                            copiedBuf.release();
                            LOGGER.debug("Failover log for vbid {} is {}", (Object)vbid, (Object)DcpFailoverLogResponse.toString(flog));
                            subscriber.onSuccess((Object)flog);
                        }
                        finally {
                            buf.release();
                        }
                    }
                });
            }
        });
    }

    public boolean streamIsOpen(short vbid) {
        return this.streamIsOpen.get(vbid);
    }

    public boolean equals(Object o) {
        if (o instanceof InetAddress) {
            return this.inetAddress.equals(o);
        }
        if (o instanceof DcpChannel) {
            return this.inetAddress.equals(((DcpChannel)o).inetAddress);
        }
        return false;
    }

    public int hashCode() {
        return this.inetAddress.hashCode();
    }

    public String toString() {
        return "DcpChannel{inetAddress=" + this.inetAddress + '}';
    }
}

