/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.dcp;

import com.couchbase.client.core.event.EventBus;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.time.Delay;
import com.couchbase.client.core.utils.ConnectionString;
import com.couchbase.client.dcp.ConnectionNameGenerator;
import com.couchbase.client.dcp.ControlEventHandler;
import com.couchbase.client.dcp.DataEventHandler;
import com.couchbase.client.dcp.DefaultConnectionNameGenerator;
import com.couchbase.client.dcp.StreamFrom;
import com.couchbase.client.dcp.StreamTo;
import com.couchbase.client.dcp.SystemEventHandler;
import com.couchbase.client.dcp.conductor.Conductor;
import com.couchbase.client.dcp.conductor.ConfigProvider;
import com.couchbase.client.dcp.config.ClientEnvironment;
import com.couchbase.client.dcp.config.DcpControl;
import com.couchbase.client.dcp.error.BootstrapException;
import com.couchbase.client.dcp.error.RollbackException;
import com.couchbase.client.dcp.message.DcpDeletionMessage;
import com.couchbase.client.dcp.message.DcpExpirationMessage;
import com.couchbase.client.dcp.message.DcpFailoverLogResponse;
import com.couchbase.client.dcp.message.DcpMutationMessage;
import com.couchbase.client.dcp.message.DcpSnapshotMarkerRequest;
import com.couchbase.client.dcp.message.RollbackMessage;
import com.couchbase.client.dcp.state.PartitionState;
import com.couchbase.client.dcp.state.SessionState;
import com.couchbase.client.dcp.state.StateFormat;
import com.couchbase.client.dcp.transport.netty.ChannelFlowController;
import com.couchbase.client.dcp.util.MathUtils;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.channel.EventLoopGroup;
import com.couchbase.client.deps.io.netty.channel.nio.NioEventLoopGroup;
import com.couchbase.client.deps.io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
import java.security.KeyStore;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import rx.Completable;
import rx.CompletableSubscriber;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;

public class Client {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(Client.class);
    private final Conductor conductor;
    private final ClientEnvironment env;
    private final boolean bufferAckEnabled;

    private Client(Builder builder) {
        NioEventLoopGroup eventLoopGroup = builder.eventLoopGroup == null ? new NioEventLoopGroup() : builder.eventLoopGroup;
        this.env = ClientEnvironment.builder().setClusterAt(builder.clusterAt).setConnectionNameGenerator(builder.connectionNameGenerator).setBucket(builder.bucket).setUsername(builder.username == null ? builder.bucket : builder.username).setPassword(builder.password).setDcpControl(builder.dcpControl).setEventLoopGroup((EventLoopGroup)eventLoopGroup, builder.eventLoopGroup == null).setBufferAckWatermark(builder.bufferAckWatermark).setBufferPooling(builder.poolBuffers).setConnectTimeout(builder.connectTimeout).setBootstrapTimeout(builder.bootstrapTimeout).setSocketConnectTimeout(builder.socketConnectTimeout).setConfigProviderReconnectDelay(builder.configProviderReconnectDelay).setConfigProviderReconnectMaxAttempts(builder.configProviderReconnectMaxAttempts).setDcpChannelsReconnectDelay(builder.dcpChannelsReconnectDelay).setDcpChannelsReconnectMaxAttempts(builder.dcpChannelsReconnectMaxAttempts).setEventBus(builder.eventBus).setSslEnabled(builder.sslEnabled).setSslKeystoreFile(builder.sslKeystoreFile).setSslKeystorePassword(builder.sslKeystorePassword).setSslKeystore(builder.sslKeystore).build();
        this.bufferAckEnabled = this.env.dcpControl().bufferAckEnabled();
        if (this.bufferAckEnabled && this.env.bufferAckWatermark() == 0) {
            throw new IllegalArgumentException("The bufferAckWatermark needs to be set if bufferAck is enabled.");
        }
        this.conductor = new Conductor(this.env, builder.configProvider);
        LOGGER.info("Environment Configuration Used: {}", (Object)this.env);
    }

    public static Builder configure() {
        return new Builder();
    }

    private Observable<long[]> getSeqnos() {
        return this.conductor.getSeqnos().flatMap((Func1)new Func1<ByteBuf, Observable<long[]>>(){

            public Observable<long[]> call(ByteBuf buf) {
                int numPairs = buf.readableBytes() / 10;
                ArrayList<long[]> pairs = new ArrayList<long[]>(numPairs);
                for (int i = 0; i < numPairs; ++i) {
                    pairs.add(new long[]{buf.getShort(10 * i), buf.getLong(10 * i + 2)});
                }
                buf.release();
                return Observable.from(pairs);
            }
        });
    }

    public SessionState sessionState() {
        return this.conductor.sessionState();
    }

    public void controlEventHandler(final ControlEventHandler controlEventHandler) {
        this.env.setControlEventHandler(new ControlEventHandler(){

            @Override
            public void onEvent(ChannelFlowController flowController, ByteBuf event) {
                if (DcpSnapshotMarkerRequest.is(event)) {
                    short partition = DcpSnapshotMarkerRequest.partition(event);
                    PartitionState ps = Client.this.sessionState().get(partition);
                    ps.setSnapshotStartSeqno(DcpSnapshotMarkerRequest.startSeqno(event));
                    ps.setSnapshotEndSeqno(DcpSnapshotMarkerRequest.endSeqno(event));
                    Client.this.sessionState().set(partition, ps);
                } else {
                    if (DcpFailoverLogResponse.is(event)) {
                        Client.this.handleFailoverLogResponse(event);
                        event.release();
                        return;
                    }
                    if (RollbackMessage.is(event)) {
                        LOGGER.warn("Received rollback for vbucket {} to seqno {}", (Object)RollbackMessage.vbucket(event), (Object)RollbackMessage.seqno(event));
                    }
                }
                controlEventHandler.onEvent(flowController, event);
            }
        });
    }

    public void systemEventHandler(SystemEventHandler systemEventHandler) {
        this.env.setSystemEventHandler(systemEventHandler);
    }

    private void handleFailoverLogResponse(ByteBuf event) {
        short partition = DcpFailoverLogResponse.vbucket(event);
        int numEntries = DcpFailoverLogResponse.numLogEntries(event);
        PartitionState ps = this.sessionState().get(partition);
        for (int i = 0; i < numEntries; ++i) {
            ps.addToFailoverLog(DcpFailoverLogResponse.seqnoEntry(event, i), DcpFailoverLogResponse.vbuuidEntry(event, i));
        }
        this.sessionState().set(partition, ps);
    }

    public void dataEventHandler(final DataEventHandler dataEventHandler) {
        this.env.setDataEventHandler(new DataEventHandler(){

            @Override
            public void onEvent(ChannelFlowController flowController, ByteBuf event) {
                if (DcpMutationMessage.is(event)) {
                    short partition = DcpMutationMessage.partition(event);
                    PartitionState ps = Client.this.sessionState().get(partition);
                    ps.setStartSeqno(DcpMutationMessage.bySeqno(event));
                    Client.this.sessionState().set(partition, ps);
                } else if (DcpDeletionMessage.is(event)) {
                    short partition = DcpDeletionMessage.partition(event);
                    PartitionState ps = Client.this.sessionState().get(partition);
                    ps.setStartSeqno(DcpDeletionMessage.bySeqno(event));
                    Client.this.sessionState().set(partition, ps);
                } else if (DcpExpirationMessage.is(event)) {
                    short partition = DcpExpirationMessage.partition(event);
                    PartitionState ps = Client.this.sessionState().get(partition);
                    ps.setStartSeqno(DcpExpirationMessage.bySeqno(event));
                    Client.this.sessionState().set(partition, ps);
                }
                dataEventHandler.onEvent(flowController, event);
            }
        });
    }

    public Completable connect() {
        if (!this.conductor.disconnected()) {
            LOGGER.debug("Ignoring duplicate connect attempt, already connecting/connected.");
            return Completable.complete();
        }
        if (this.env.dataEventHandler() == null) {
            throw new IllegalArgumentException("A DataEventHandler needs to be provided!");
        }
        if (this.env.controlEventHandler() == null) {
            throw new IllegalArgumentException("A ControlEventHandler needs to be provided!");
        }
        LOGGER.info("Connecting to seed nodes and bootstrapping bucket {}.", (Object)this.env.bucket());
        return this.conductor.connect().onErrorResumeNext((Func1)new Func1<Throwable, Completable>(){

            public Completable call(Throwable throwable) {
                return Client.this.conductor.stop().andThen(Completable.error((Throwable)((Object)new BootstrapException("Could not connect to Cluster/Bucket", throwable))));
            }
        });
    }

    public Completable disconnect() {
        return this.conductor.stop().andThen(this.env.shutdown());
    }

    public Completable startStreaming(Short ... vbids) {
        List<Short> partitions;
        int numPartitions = this.numPartitions();
        List<Short> initializedPartitions = this.selectInitializedPartitions(numPartitions, partitions = Client.partitionsForVbids(numPartitions, vbids));
        if (initializedPartitions.isEmpty()) {
            LOGGER.info("The configured session state does not require any streams to be opened. Completing immediately.");
            return Completable.complete();
        }
        LOGGER.info("Starting to Stream for " + initializedPartitions.size() + " partitions");
        LOGGER.debug("Stream start against partitions: {}", initializedPartitions);
        return Observable.from(initializedPartitions).flatMap(new Func1<Short, Observable<?>>(){

            public Observable<?> call(Short partition) {
                PartitionState partitionState = Client.this.sessionState().get(partition.shortValue());
                return Client.this.conductor.startStreamForPartition(partition, partitionState.getLastUuid(), partitionState.getStartSeqno(), partitionState.getEndSeqno(), partitionState.getSnapshotStartSeqno(), partitionState.getSnapshotEndSeqno()).onErrorResumeNext((Func1)new Func1<Throwable, Completable>(){

                    public Completable call(Throwable throwable) {
                        if (throwable instanceof RollbackException) {
                            return Completable.complete();
                        }
                        return Completable.error((Throwable)throwable);
                    }
                }).toObservable();
            }
        }).toCompletable();
    }

    private List<Short> selectInitializedPartitions(int clusterPartitions, List<Short> partitions) {
        ArrayList<Short> initializedPartitions = new ArrayList<Short>();
        SessionState state = this.sessionState();
        for (short partition : partitions) {
            PartitionState ps = state.get(partition);
            if (ps != null) {
                if (MathUtils.lessThanUnsigned(ps.getStartSeqno(), ps.getEndSeqno())) {
                    initializedPartitions.add(partition);
                    continue;
                }
                LOGGER.debug("Skipping partition {}, because startSeqno({}) >= endSeqno({})", new Object[]{partition, ps.getStartSeqno(), ps.getEndSeqno()});
                continue;
            }
            LOGGER.debug("Skipping partition {}, because its state is null", (Object)partition);
        }
        if (initializedPartitions.size() > clusterPartitions) {
            throw new IllegalStateException("Session State has " + initializedPartitions + " partitions while the cluster has " + clusterPartitions + "!");
        }
        return initializedPartitions;
    }

    public Completable stopStreaming(Short ... vbids) {
        List<Short> partitions = Client.partitionsForVbids(this.numPartitions(), vbids);
        LOGGER.info("Stopping to Stream for " + partitions.size() + " partitions");
        LOGGER.debug("Stream stop against partitions: {}", partitions);
        return Observable.from(partitions).flatMap(new Func1<Short, Observable<?>>(){

            public Observable<?> call(Short p) {
                return Client.this.conductor.stopStreamForPartition(p).toObservable();
            }
        }).toCompletable();
    }

    private static List<Short> partitionsForVbids(int numPartitions, Short ... vbids) {
        ArrayList<Short> partitions = new ArrayList();
        if (vbids.length > 0) {
            partitions = Arrays.asList(vbids);
        } else {
            for (short i = 0; i < numPartitions; i = (short)(i + 1)) {
                partitions.add(i);
            }
        }
        Collections.sort(partitions);
        return partitions;
    }

    public Observable<ByteBuf> failoverLogs(Short ... vbids) {
        List<Short> partitions = Client.partitionsForVbids(this.numPartitions(), vbids);
        LOGGER.debug("Asking for failover logs on partitions {}", partitions);
        return Observable.from(partitions).flatMap((Func1)new Func1<Short, Observable<ByteBuf>>(){

            public Observable<ByteBuf> call(Short p) {
                return Client.this.conductor.getFailoverLog(p).toObservable();
            }
        });
    }

    public Completable rollbackAndRestartStream(final short partition, final long seqno) {
        return this.stopStreaming(partition).andThen(Completable.create((Completable.OnSubscribe)new Completable.OnSubscribe(){

            public void call(CompletableSubscriber subscriber) {
                Client.this.sessionState().rollbackToPosition(partition, seqno);
                subscriber.onCompleted();
            }
        })).andThen(this.startStreaming(partition));
    }

    public int numPartitions() {
        return this.conductor.numberOfPartitions();
    }

    public boolean streamIsOpen(short vbid) {
        return this.conductor.streamIsOpen(vbid);
    }

    public Completable initializeState(StreamFrom from, StreamTo to) {
        if (from == StreamFrom.BEGINNING && to == StreamTo.INFINITY) {
            Client.buzzMe();
            return this.initFromBeginningToInfinity();
        }
        if (from == StreamFrom.BEGINNING && to == StreamTo.NOW) {
            return this.initFromBeginningToNow();
        }
        if (from == StreamFrom.NOW && to == StreamTo.INFINITY) {
            Client.buzzMe();
            return this.initFromNowToInfinity();
        }
        throw new IllegalStateException("Unsupported FROM/TO combination: " + (Object)((Object)from) + " -> " + (Object)((Object)to));
    }

    public Completable recoverState(final StateFormat format, final byte[] persistedState) {
        return Completable.create((Completable.OnSubscribe)new Completable.OnSubscribe(){

            public void call(CompletableSubscriber subscriber) {
                LOGGER.info("Recovering state from format {}", (Object)format);
                LOGGER.debug("PersistedState on recovery is: {}", (Object)new String(persistedState, CharsetUtil.UTF_8));
                try {
                    if (format == StateFormat.JSON) {
                        Client.this.sessionState().setFromJson(persistedState);
                        subscriber.onCompleted();
                    } else {
                        subscriber.onError((Throwable)new IllegalStateException("Unsupported StateFormat " + (Object)((Object)format)));
                    }
                }
                catch (Exception ex) {
                    subscriber.onError((Throwable)ex);
                }
            }
        });
    }

    public Completable recoverOrInitializeState(StateFormat format, byte[] persistedState, StreamFrom from, StreamTo to) {
        if (persistedState == null || persistedState.length == 0) {
            return this.initializeState(from, to);
        }
        return this.recoverState(format, persistedState);
    }

    private Completable initFromBeginningToInfinity() {
        return Completable.create((Completable.OnSubscribe)new Completable.OnSubscribe(){

            public void call(CompletableSubscriber subscriber) {
                LOGGER.info("Initializing state from beginning to no end.");
                try {
                    Client.this.sessionState().setToBeginningWithNoEnd(Client.this.numPartitions());
                    subscriber.onCompleted();
                }
                catch (Exception ex) {
                    LOGGER.warn("Failed to initialize state from beginning to no end.", (Throwable)ex);
                    subscriber.onError((Throwable)ex);
                }
            }
        });
    }

    private Completable initFromNowToInfinity() {
        return this.initWithCallback(new Action1<long[]>(){

            public void call(long[] longs) {
                short partition = (short)longs[0];
                long seqno = longs[1];
                PartitionState partitionState = Client.this.sessionState().get(partition);
                partitionState.setStartSeqno(seqno);
                partitionState.setSnapshotStartSeqno(seqno);
                partitionState.setSnapshotEndSeqno(seqno);
                Client.this.sessionState().set(partition, partitionState);
            }
        });
    }

    private Completable initFromBeginningToNow() {
        return this.initWithCallback(new Action1<long[]>(){

            public void call(long[] longs) {
                short partition = (short)longs[0];
                long seqno = longs[1];
                PartitionState partitionState = Client.this.sessionState().get(partition);
                partitionState.setEndSeqno(seqno);
                Client.this.sessionState().set(partition, partitionState);
            }
        });
    }

    private Completable initWithCallback(Action1<long[]> callback) {
        this.sessionState().setToBeginningWithNoEnd(this.numPartitions());
        return this.getSeqnos().doOnNext(callback).reduce(new ArrayList(), (Func2)new Func2<ArrayList<Short>, long[], ArrayList<Short>>(){

            public ArrayList<Short> call(ArrayList<Short> shorts, long[] longs) {
                shorts.add((short)longs[0]);
                return shorts;
            }
        }).flatMap((Func1)new Func1<ArrayList<Short>, Observable<ByteBuf>>(){

            public Observable<ByteBuf> call(ArrayList<Short> shorts) {
                return Client.this.failoverLogs(shorts.toArray(new Short[0]));
            }
        }).map((Func1)new Func1<ByteBuf, Short>(){

            public Short call(ByteBuf buf) {
                short partition = DcpFailoverLogResponse.vbucket(buf);
                Client.this.handleFailoverLogResponse(buf);
                buf.release();
                return partition;
            }
        }).last().toCompletable();
    }

    private static void buzzMe() {
        LOGGER.debug("To Infinity... AND BEYOND!");
    }

    public static class Builder {
        private List<InetSocketAddress> clusterAt = Arrays.asList(InetSocketAddress.createUnresolved("127.0.0.1", 0));
        private EventLoopGroup eventLoopGroup;
        private String bucket = "default";
        private String username;
        private String password = "";
        private ConnectionNameGenerator connectionNameGenerator = DefaultConnectionNameGenerator.INSTANCE;
        private DcpControl dcpControl = new DcpControl();
        private ConfigProvider configProvider = null;
        private int bufferAckWatermark;
        private boolean poolBuffers = true;
        private long connectTimeout = ClientEnvironment.DEFAULT_SOCKET_CONNECT_TIMEOUT;
        private long bootstrapTimeout = ClientEnvironment.DEFAULT_BOOTSTRAP_TIMEOUT;
        private long socketConnectTimeout = ClientEnvironment.DEFAULT_SOCKET_CONNECT_TIMEOUT;
        private Delay configProviderReconnectDelay = ClientEnvironment.DEFAULT_CONFIG_PROVIDER_RECONNECT_DELAY;
        private int configProviderReconnectMaxAttempts = Integer.MAX_VALUE;
        private int dcpChannelsReconnectMaxAttempts = Integer.MAX_VALUE;
        private Delay dcpChannelsReconnectDelay = ClientEnvironment.DEFAULT_DCP_CHANNELS_RECONNECT_DELAY;
        private EventBus eventBus;
        private boolean sslEnabled = false;
        private String sslKeystoreFile;
        private String sslKeystorePassword;
        private KeyStore sslKeystore;

        public Builder bufferAckWatermark(int watermark) {
            if (watermark > 100 || watermark < 0) {
                throw new IllegalArgumentException("The bufferAckWatermark is percents, so it needs to be between 0 and 100");
            }
            this.bufferAckWatermark = watermark;
            return this;
        }

        public Builder hostnames(List<String> hostnames) {
            this.clusterAt = new ArrayList<InetSocketAddress>(hostnames.size());
            for (String hostname : hostnames) {
                this.clusterAt.add(new InetSocketAddress(hostname, 0));
            }
            return this;
        }

        public Builder hostnames(String ... hostnames) {
            return this.hostnames(Arrays.asList(hostnames));
        }

        public Builder connectionString(String connectionString) {
            ConnectionString cs = ConnectionString.create((String)connectionString);
            this.clusterAt = cs.hosts();
            return this;
        }

        public Builder eventLoopGroup(EventLoopGroup eventLoopGroup) {
            this.eventLoopGroup = eventLoopGroup;
            return this;
        }

        public Builder bucket(String bucket) {
            this.bucket = bucket;
            return this;
        }

        public Builder username(String username) {
            this.username = username;
            return this;
        }

        public Builder password(String password) {
            this.password = password;
            return this;
        }

        public Builder connectionNameGenerator(ConnectionNameGenerator connectionNameGenerator) {
            this.connectionNameGenerator = connectionNameGenerator;
            return this;
        }

        public Builder controlParam(DcpControl.Names name, Object value) {
            this.dcpControl.put(name, value.toString());
            return this;
        }

        public Builder configProvider(ConfigProvider configProvider) {
            this.configProvider = configProvider;
            return this;
        }

        public Builder poolBuffers(boolean pool) {
            this.poolBuffers = pool;
            return this;
        }

        public Builder socketConnectTimeout(long socketConnectTimeout) {
            this.socketConnectTimeout = socketConnectTimeout;
            return this;
        }

        public Builder bootstrapTimeout(long bootstrapTimeout) {
            this.bootstrapTimeout = bootstrapTimeout;
            return this;
        }

        public Builder connectTimeout(long connectTimeout) {
            this.connectTimeout = connectTimeout;
            return this;
        }

        public Builder configProviderReconnectDelay(Delay configProviderReconnectDelay) {
            this.configProviderReconnectDelay = configProviderReconnectDelay;
            return this;
        }

        public Builder configProviderReconnectMaxAttempts(int configProviderReconnectMaxAttempts) {
            this.configProviderReconnectMaxAttempts = configProviderReconnectMaxAttempts;
            return this;
        }

        public Builder dcpChannelsReconnectMaxAttempts(int dcpChannelsReconnectMaxAttempts) {
            this.dcpChannelsReconnectMaxAttempts = dcpChannelsReconnectMaxAttempts;
            return this;
        }

        public Builder dcpChannelsReconnectDelay(Delay dcpChannelsReconnectDelay) {
            this.dcpChannelsReconnectDelay = dcpChannelsReconnectDelay;
            return this;
        }

        public Builder eventBus(EventBus eventBus) {
            this.eventBus = eventBus;
            return this;
        }

        public Builder sslEnabled(boolean sslEnabled) {
            this.sslEnabled = sslEnabled;
            return this;
        }

        public Builder sslKeystoreFile(String sslKeystoreFile) {
            this.sslKeystoreFile = sslKeystoreFile;
            return this;
        }

        public Builder sslKeystorePassword(String sslKeystorePassword) {
            this.sslKeystorePassword = sslKeystorePassword;
            return this;
        }

        public Builder sslKeystore(KeyStore sslKeystore) {
            this.sslKeystore = sslKeystore;
            return this;
        }

        public Client build() {
            return new Client(this);
        }
    }
}

