/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.dcp.conductor;

import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.state.AbstractStateMachine;
import com.couchbase.client.core.state.LifecycleState;
import com.couchbase.client.core.state.NotConnectedException;
import com.couchbase.client.core.time.Delay;
import com.couchbase.client.dcp.conductor.Conductor;
import com.couchbase.client.dcp.conductor.DcpChannelControlHandler;
import com.couchbase.client.dcp.config.ClientEnvironment;
import com.couchbase.client.dcp.message.DcpCloseStreamRequest;
import com.couchbase.client.dcp.message.DcpFailoverLogRequest;
import com.couchbase.client.dcp.message.DcpFailoverLogResponse;
import com.couchbase.client.dcp.message.DcpGetPartitionSeqnosRequest;
import com.couchbase.client.dcp.message.DcpOpenStreamRequest;
import com.couchbase.client.dcp.message.VbucketState;
import com.couchbase.client.dcp.transport.netty.ChannelUtils;
import com.couchbase.client.dcp.transport.netty.DcpPipeline;
import com.couchbase.client.dcp.util.retry.RetryBuilder;
import com.couchbase.client.deps.io.netty.bootstrap.Bootstrap;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.buffer.PooledByteBufAllocator;
import com.couchbase.client.deps.io.netty.buffer.Unpooled;
import com.couchbase.client.deps.io.netty.buffer.UnpooledByteBufAllocator;
import com.couchbase.client.deps.io.netty.channel.Channel;
import com.couchbase.client.deps.io.netty.channel.ChannelFuture;
import com.couchbase.client.deps.io.netty.channel.ChannelFutureListener;
import com.couchbase.client.deps.io.netty.channel.ChannelHandler;
import com.couchbase.client.deps.io.netty.channel.ChannelOption;
import com.couchbase.client.deps.io.netty.channel.ChannelPromise;
import com.couchbase.client.deps.io.netty.util.concurrent.DefaultPromise;
import com.couchbase.client.deps.io.netty.util.concurrent.EventExecutor;
import com.couchbase.client.deps.io.netty.util.concurrent.Future;
import com.couchbase.client.deps.io.netty.util.concurrent.GenericFutureListener;
import com.couchbase.client.deps.io.netty.util.concurrent.Promise;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
import rx.Completable;
import rx.CompletableSubscriber;
import rx.Single;
import rx.SingleSubscriber;
import rx.Subscription;
import rx.functions.Action4;
import rx.functions.Func1;

public class DcpChannel
extends AbstractStateMachine<LifecycleState> {
    private static final AtomicInteger OPAQUE = new AtomicInteger(0);
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(DcpChannel.class);
    private final DcpChannelControlHandler controlHandler;
    private volatile boolean isShutdown;
    private volatile Channel channel;
    private volatile ChannelFuture connectFuture;
    final ClientEnvironment env;
    final InetSocketAddress inetAddress;
    final Map<Integer, Promise<?>> outstandingPromises;
    final Map<Integer, Short> outstandingVbucketInfos;
    final AtomicIntegerArray openStreams;
    final Conductor conductor;

    public DcpChannel(InetSocketAddress inetAddress, ClientEnvironment env, Conductor conductor) {
        super((Enum)LifecycleState.DISCONNECTED);
        this.inetAddress = inetAddress;
        this.env = env;
        this.conductor = conductor;
        this.outstandingPromises = new ConcurrentHashMap();
        this.outstandingVbucketInfos = new ConcurrentHashMap<Integer, Short>();
        this.controlHandler = new DcpChannelControlHandler(this);
        this.openStreams = new AtomicIntegerArray(1024);
        this.isShutdown = false;
    }

    public Completable connect() {
        return Completable.create((Completable.OnSubscribe)new Completable.OnSubscribe(){

            public void call(final CompletableSubscriber subscriber) {
                if (DcpChannel.this.isShutdown || DcpChannel.this.state() != LifecycleState.DISCONNECTED) {
                    subscriber.onCompleted();
                    return;
                }
                PooledByteBufAllocator allocator = DcpChannel.this.env.poolBuffers() ? PooledByteBufAllocator.DEFAULT : UnpooledByteBufAllocator.DEFAULT;
                Bootstrap bootstrap = (Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().option(ChannelOption.ALLOCATOR, (Object)allocator)).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)((int)DcpChannel.this.env.socketConnectTimeout()))).remoteAddress((SocketAddress)DcpChannel.this.inetAddress).channel(ChannelUtils.channelForEventLoopGroup(DcpChannel.this.env.eventLoopGroup()))).handler((ChannelHandler)new DcpPipeline(DcpChannel.this.env, DcpChannel.this.controlHandler))).group(DcpChannel.this.env.eventLoopGroup());
                DcpChannel.this.transitionState((Enum)LifecycleState.CONNECTING);
                DcpChannel.this.connectFuture = bootstrap.connect();
                DcpChannel.this.connectFuture.addListener((GenericFutureListener)new GenericFutureListener<ChannelFuture>(){

                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (future.isSuccess()) {
                            DcpChannel.this.channel = future.channel();
                            if (DcpChannel.this.isShutdown) {
                                LOGGER.info("Connected Node {}, but got instructed to disconnect in the meantime.", (Object)DcpChannel.this.inetAddress);
                                DcpChannel.this.disconnect().subscribe(new CompletableSubscriber(){

                                    public void onCompleted() {
                                        subscriber.onCompleted();
                                    }

                                    public void onError(Throwable e) {
                                        LOGGER.warn("Got error during disconnect.", e);
                                    }

                                    public void onSubscribe(Subscription d) {
                                    }
                                });
                            } else {
                                DcpChannel.this.transitionState((Enum)LifecycleState.CONNECTED);
                                LOGGER.info("Connected to Node {}", (Object)DcpChannel.this.inetAddress);
                                DcpChannel.this.channel.closeFuture().addListener((GenericFutureListener)new GenericFutureListener<ChannelFuture>(){

                                    public void operationComplete(ChannelFuture future) throws Exception {
                                        LOGGER.debug("Got notified of channel close on Node {}", (Object)DcpChannel.this.inetAddress);
                                        DcpChannel.this.transitionState((Enum)LifecycleState.DISCONNECTED);
                                        if (!DcpChannel.this.isShutdown) {
                                            DcpChannel.this.dispatchReconnect();
                                        }
                                        DcpChannel.this.channel = null;
                                    }
                                });
                                subscriber.onCompleted();
                            }
                        } else {
                            LOGGER.info("Connect attempt to {} failed.", (Object)DcpChannel.this.inetAddress, (Object)future.cause());
                            DcpChannel.this.transitionState((Enum)LifecycleState.DISCONNECTED);
                            subscriber.onError(future.cause());
                        }
                    }
                });
            }
        });
    }

    private void dispatchReconnect() {
        if (this.isShutdown) {
            LOGGER.debug("Ignoring reconnect on {} because already shutdown.", (Object)this.inetAddress);
            return;
        }
        LOGGER.info("Node {} socket closed, initiating reconnect.", (Object)this.inetAddress);
        this.connect().retryWhen((Func1)RetryBuilder.any().max(Integer.MAX_VALUE).delay(Delay.exponential((TimeUnit)TimeUnit.MILLISECONDS, (long)4096L, (long)32L)).doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>(){

            public void call(Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) {
                LOGGER.debug("Rescheduling Node reconnect for DCP channel {}", (Object)DcpChannel.this.inetAddress);
            }
        }).build()).subscribe(new CompletableSubscriber(){

            public void onCompleted() {
                LOGGER.debug("Completed Node connect for DCP channel {}", (Object)DcpChannel.this.inetAddress);
                for (short vbid = 0; vbid < DcpChannel.this.openStreams.length(); vbid = (short)(vbid + 1)) {
                    if (DcpChannel.this.openStreams.get(vbid) == 0) continue;
                    DcpChannel.this.conductor.maybeMovePartition(vbid);
                }
            }

            public void onError(Throwable e) {
                LOGGER.warn("Got error during connect (maybe retried) for node {}" + DcpChannel.this.inetAddress, e);
            }

            public void onSubscribe(Subscription d) {
            }
        });
    }

    public boolean isShutdown() {
        return this.isShutdown;
    }

    public Completable disconnect() {
        return Completable.create((Completable.OnSubscribe)new Completable.OnSubscribe(){

            public void call(final CompletableSubscriber subscriber) {
                DcpChannel.this.isShutdown = true;
                if (DcpChannel.this.channel != null) {
                    DcpChannel.this.transitionState((Enum)LifecycleState.DISCONNECTING);
                    DcpChannel.this.channel.close().addListener((GenericFutureListener)new GenericFutureListener<ChannelFuture>(){

                        public void operationComplete(ChannelFuture future) throws Exception {
                            DcpChannel.this.transitionState((Enum)LifecycleState.DISCONNECTED);
                            LOGGER.info("Disconnected from Node " + DcpChannel.this.address());
                            if (future.isSuccess()) {
                                subscriber.onCompleted();
                            } else {
                                LOGGER.debug("Error during channel close.", future.cause());
                                subscriber.onError(future.cause());
                            }
                        }
                    });
                } else if (DcpChannel.this.connectFuture != null) {
                    DcpChannel.this.connectFuture.addListener((GenericFutureListener)new ChannelFutureListener(){

                        public void operationComplete(ChannelFuture channelFuture) throws Exception {
                            if (channelFuture.isSuccess()) {
                                channelFuture.channel().closeFuture().addListener((GenericFutureListener)new ChannelFutureListener(){

                                    public void operationComplete(ChannelFuture closeFuture) throws Exception {
                                        if (closeFuture.isSuccess()) {
                                            subscriber.onCompleted();
                                        } else {
                                            subscriber.onError(closeFuture.cause());
                                        }
                                    }
                                });
                            } else {
                                subscriber.onCompleted();
                            }
                        }
                    });
                } else {
                    subscriber.onCompleted();
                }
            }
        });
    }

    public InetSocketAddress address() {
        return this.inetAddress;
    }

    public Completable openStream(final short vbid, final long vbuuid, final long startSeqno, final long endSeqno, final long snapshotStartSeqno, final long snapshotEndSeqno) {
        return Completable.create((Completable.OnSubscribe)new Completable.OnSubscribe(){

            public void call(final CompletableSubscriber subscriber) {
                if (DcpChannel.this.state() != LifecycleState.CONNECTED) {
                    subscriber.onError((Throwable)new NotConnectedException());
                    return;
                }
                LOGGER.debug("Opening Stream against {} with vbid: {}, vbuuid: {}, startSeqno: {}, endSeqno: {},  snapshotStartSeqno: {}, snapshotEndSeqno: {}", new Object[]{DcpChannel.this.channel.remoteAddress(), vbid, vbuuid, startSeqno, endSeqno, snapshotStartSeqno, snapshotEndSeqno});
                int opaque = OPAQUE.incrementAndGet();
                ChannelPromise promise = DcpChannel.this.channel.newPromise();
                ByteBuf buffer = Unpooled.buffer();
                DcpOpenStreamRequest.init(buffer, vbid);
                DcpOpenStreamRequest.opaque(buffer, opaque);
                DcpOpenStreamRequest.vbuuid(buffer, vbuuid);
                DcpOpenStreamRequest.startSeqno(buffer, startSeqno);
                DcpOpenStreamRequest.endSeqno(buffer, endSeqno);
                DcpOpenStreamRequest.snapshotStartSeqno(buffer, snapshotStartSeqno);
                DcpOpenStreamRequest.snapshotEndSeqno(buffer, snapshotEndSeqno);
                DcpChannel.this.outstandingPromises.put(opaque, (Promise<?>)promise);
                DcpChannel.this.outstandingVbucketInfos.put(opaque, vbid);
                DcpChannel.this.channel.writeAndFlush((Object)buffer);
                promise.addListener((GenericFutureListener)new GenericFutureListener<ChannelFuture>(){

                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (future.isSuccess()) {
                            LOGGER.debug("Opened Stream against {} with vbid: {}", (Object)DcpChannel.this.channel.remoteAddress(), (Object)vbid);
                            DcpChannel.this.openStreams.set(vbid, 1);
                            subscriber.onCompleted();
                        } else {
                            LOGGER.debug("Failed open Stream against {} with vbid: {}", (Object)DcpChannel.this.channel.remoteAddress(), (Object)vbid);
                            DcpChannel.this.openStreams.set(vbid, 0);
                            subscriber.onError(future.cause());
                        }
                    }
                });
            }
        });
    }

    public Completable closeStream(final short vbid) {
        return Completable.create((Completable.OnSubscribe)new Completable.OnSubscribe(){

            public void call(final CompletableSubscriber subscriber) {
                if (DcpChannel.this.state() != LifecycleState.CONNECTED) {
                    subscriber.onError((Throwable)new NotConnectedException());
                    return;
                }
                LOGGER.debug("Closing Stream against {} with vbid: {}", (Object)DcpChannel.this.channel.remoteAddress(), (Object)vbid);
                int opaque = OPAQUE.incrementAndGet();
                ChannelPromise promise = DcpChannel.this.channel.newPromise();
                ByteBuf buffer = Unpooled.buffer();
                DcpCloseStreamRequest.init(buffer);
                DcpCloseStreamRequest.vbucket(buffer, vbid);
                DcpCloseStreamRequest.opaque(buffer, opaque);
                DcpChannel.this.outstandingPromises.put(opaque, (Promise<?>)promise);
                DcpChannel.this.channel.writeAndFlush((Object)buffer);
                promise.addListener((GenericFutureListener)new GenericFutureListener<ChannelFuture>(){

                    public void operationComplete(ChannelFuture future) throws Exception {
                        DcpChannel.this.openStreams.set(vbid, 0);
                        if (future.isSuccess()) {
                            LOGGER.debug("Closed Stream against {} with vbid: {}", (Object)DcpChannel.this.channel.remoteAddress(), (Object)vbid);
                            subscriber.onCompleted();
                        } else {
                            LOGGER.debug("Failed close Stream against {} with vbid: {}", (Object)DcpChannel.this.channel.remoteAddress(), (Object)vbid);
                            subscriber.onError(future.cause());
                        }
                    }
                });
            }
        });
    }

    public Single<ByteBuf> getSeqnos() {
        return Single.create((Single.OnSubscribe)new Single.OnSubscribe<ByteBuf>(){

            public void call(final SingleSubscriber<? super ByteBuf> subscriber) {
                if (DcpChannel.this.state() != LifecycleState.CONNECTED) {
                    subscriber.onError((Throwable)new NotConnectedException());
                    return;
                }
                int opaque = OPAQUE.incrementAndGet();
                DefaultPromise promise = new DefaultPromise((EventExecutor)DcpChannel.this.channel.eventLoop());
                ByteBuf buffer = Unpooled.buffer();
                DcpGetPartitionSeqnosRequest.init(buffer);
                DcpGetPartitionSeqnosRequest.opaque(buffer, opaque);
                DcpGetPartitionSeqnosRequest.vbucketState(buffer, VbucketState.ACTIVE);
                DcpChannel.this.outstandingPromises.put(opaque, (Promise<?>)promise);
                DcpChannel.this.channel.writeAndFlush((Object)buffer);
                promise.addListener((GenericFutureListener)new GenericFutureListener<Future<ByteBuf>>(){

                    public void operationComplete(Future<ByteBuf> future) throws Exception {
                        if (future.isSuccess()) {
                            subscriber.onSuccess(future.getNow());
                        } else {
                            subscriber.onError(future.cause());
                        }
                    }
                });
            }
        });
    }

    public Single<ByteBuf> getFailoverLog(final short vbid) {
        return Single.create((Single.OnSubscribe)new Single.OnSubscribe<ByteBuf>(){

            public void call(final SingleSubscriber<? super ByteBuf> subscriber) {
                if (DcpChannel.this.state() != LifecycleState.CONNECTED) {
                    subscriber.onError((Throwable)new NotConnectedException());
                    return;
                }
                int opaque = OPAQUE.incrementAndGet();
                DefaultPromise promise = new DefaultPromise((EventExecutor)DcpChannel.this.channel.eventLoop());
                ByteBuf buffer = Unpooled.buffer();
                DcpFailoverLogRequest.init(buffer);
                DcpFailoverLogRequest.opaque(buffer, opaque);
                DcpFailoverLogRequest.vbucket(buffer, vbid);
                DcpChannel.this.outstandingPromises.put(opaque, (Promise<?>)promise);
                DcpChannel.this.outstandingVbucketInfos.put(opaque, vbid);
                DcpChannel.this.channel.writeAndFlush((Object)buffer);
                LOGGER.debug("Asked for failover log on {} for vbid: {}", (Object)DcpChannel.this.channel.remoteAddress(), (Object)vbid);
                promise.addListener((GenericFutureListener)new GenericFutureListener<Future<ByteBuf>>(){

                    public void operationComplete(Future<ByteBuf> future) throws Exception {
                        if (future.isSuccess()) {
                            LOGGER.debug("Failover log for vbid {} is {}", (Object)vbid, (Object)DcpFailoverLogResponse.toString((ByteBuf)future.getNow()));
                            subscriber.onSuccess(future.getNow());
                        } else {
                            LOGGER.debug("Failed to ask for failover log on {} for vbid: {}", (Object)DcpChannel.this.channel.remoteAddress(), (Object)vbid);
                            subscriber.onError(future.cause());
                        }
                    }
                });
            }
        });
    }

    public boolean streamIsOpen(short vbid) {
        return this.openStreams.get(vbid) == 1;
    }

    public boolean equals(Object o) {
        if (o instanceof InetAddress) {
            return this.inetAddress.equals(o);
        }
        if (o instanceof DcpChannel) {
            return this.inetAddress.equals(((DcpChannel)((Object)o)).inetAddress);
        }
        return false;
    }

    public int hashCode() {
        return this.inetAddress.hashCode();
    }

    public String toString() {
        return "DcpChannel{inetAddress=" + this.inetAddress + '}';
    }
}

