/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.client.control.impl;

import io.pravega.client.admin.KeyValueTableInfo;
import io.pravega.client.control.impl.CancellableRequest;
import io.pravega.client.control.impl.Controller;
import io.pravega.client.control.impl.ControllerFailureException;
import io.pravega.client.control.impl.ControllerImplConfig;
import io.pravega.client.control.impl.ControllerResolverFactory;
import io.pravega.client.control.impl.ModelHelper;
import io.pravega.client.control.impl.PravegaCredentialsWrapper;
import io.pravega.client.control.impl.ReaderGroupConfigRejectedException;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.stream.InvalidStreamException;
import io.pravega.client.stream.NoSuchScopeException;
import io.pravega.client.stream.PingFailedException;
import io.pravega.client.stream.ReaderGroupConfig;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.client.stream.StreamCut;
import io.pravega.client.stream.Transaction;
import io.pravega.client.stream.TxnFailedException;
import io.pravega.client.stream.impl.SegmentWithRange;
import io.pravega.client.stream.impl.StreamImpl;
import io.pravega.client.stream.impl.StreamSegmentSuccessors;
import io.pravega.client.stream.impl.StreamSegments;
import io.pravega.client.stream.impl.StreamSegmentsWithPredecessors;
import io.pravega.client.stream.impl.TxnSegments;
import io.pravega.client.stream.impl.WriterPosition;
import io.pravega.client.tables.KeyValueTableConfiguration;
import io.pravega.client.tables.impl.KeyValueTableSegments;
import io.pravega.common.Exceptions;
import io.pravega.common.LoggerHelpers;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.function.Callbacks;
import io.pravega.common.hash.RandomFactory;
import io.pravega.common.tracing.RequestTracker;
import io.pravega.common.tracing.TagLogger;
import io.pravega.common.util.AsyncIterator;
import io.pravega.common.util.ContinuationTokenAsyncIterator;
import io.pravega.common.util.Retry;
import io.pravega.controller.stream.api.grpc.v1.Controller;
import io.pravega.controller.stream.api.grpc.v1.ControllerServiceGrpc;
import io.pravega.shaded.com.google.common.annotations.VisibleForTesting;
import io.pravega.shaded.com.google.common.base.Preconditions;
import io.pravega.shaded.com.google.common.base.Strings;
import io.pravega.shaded.com.google.protobuf.ProtocolStringList;
import io.pravega.shaded.io.grpc.ManagedChannel;
import io.pravega.shaded.io.grpc.ManagedChannelBuilder;
import io.pravega.shaded.io.grpc.Status;
import io.pravega.shaded.io.grpc.StatusRuntimeException;
import io.pravega.shaded.io.grpc.auth.MoreCallCredentials;
import io.pravega.shaded.io.grpc.netty.GrpcSslContexts;
import io.pravega.shaded.io.grpc.netty.NegotiationType;
import io.pravega.shaded.io.grpc.netty.NettyChannelBuilder;
import io.pravega.shaded.io.grpc.stub.StreamObserver;
import io.pravega.shaded.io.netty.handler.ssl.SslContextBuilder;
import io.pravega.shared.NameUtils;
import io.pravega.shared.controller.tracing.RPCTracingHelpers;
import io.pravega.shared.protocol.netty.PravegaNodeUri;
import io.pravega.shared.security.auth.AccessOperation;
import io.pravega.shared.security.auth.Credentials;
import java.io.File;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.net.ssl.SSLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ControllerImpl
implements Controller {
    private static final TagLogger log = new TagLogger(LoggerFactory.getLogger(ControllerImpl.class));
    private static final long DEFAULT_KEEPALIVE_TIME_MINUTES = 6L;
    private final Retry.RetryAndThrowConditionally retryConfig;
    private final ScheduledExecutorService executor;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final ManagedChannel channel;
    private final ControllerServiceGrpc.ControllerServiceStub client;
    private final Supplier<Long> requestIdGenerator = RandomFactory.create()::nextLong;
    private final long timeoutMillis;

    public ControllerImpl(ControllerImplConfig config, ScheduledExecutorService executor) {
        this(((NettyChannelBuilder)((NettyChannelBuilder)NettyChannelBuilder.forTarget(config.getClientConfig().getControllerURI().toString()).nameResolverFactory(new ControllerResolverFactory(executor))).defaultLoadBalancingPolicy("round_robin")).keepAliveTime(6L, TimeUnit.MINUTES), config, executor);
        log.info("Controller client connecting to server at {}", (Object)config.getClientConfig().getControllerURI().getAuthority());
    }

    @VisibleForTesting
    public ControllerImpl(ManagedChannelBuilder<?> channelBuilder, ControllerImplConfig config, ScheduledExecutorService executor) {
        Preconditions.checkNotNull(channelBuilder, "channelBuilder");
        this.executor = executor;
        this.retryConfig = this.createRetryConfig(config);
        if (config.getClientConfig().isEnableTlsToController()) {
            log.debug("Setting up a SSL/TLS channel builder");
            String trustStore = config.getClientConfig().getTrustStore();
            SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
            if (!Strings.isNullOrEmpty(trustStore)) {
                sslContextBuilder = sslContextBuilder.trustManager(new File(trustStore));
            }
            try {
                channelBuilder = channelBuilder.sslContext(sslContextBuilder.build()).negotiationType(NegotiationType.TLS);
            }
            catch (SSLException e) {
                throw new CompletionException(e);
            }
        } else {
            log.debug("Using a plaintext channel builder");
            channelBuilder = ((NettyChannelBuilder)channelBuilder).negotiationType(NegotiationType.PLAINTEXT);
        }
        channelBuilder = ((ManagedChannelBuilder)channelBuilder).intercept(RPCTracingHelpers.getClientInterceptor());
        this.channel = ((ManagedChannelBuilder)channelBuilder).build();
        this.client = this.getClientWithCredentials(config);
        this.timeoutMillis = config.getTimeoutMillis();
    }

    private ControllerServiceGrpc.ControllerServiceStub getClientWithCredentials(ControllerImplConfig config) {
        ControllerServiceGrpc.ControllerServiceStub client = ControllerServiceGrpc.newStub(this.channel);
        try {
            Credentials credentials = config.getClientConfig().getCredentials();
            if (credentials != null) {
                PravegaCredentialsWrapper wrapper = new PravegaCredentialsWrapper(credentials);
                client = (ControllerServiceGrpc.ControllerServiceStub)client.withCallCredentials(MoreCallCredentials.from(wrapper));
            }
        }
        catch (Exception e) {
            log.error("Error while setting credentials to controller client", e);
            this.closeChannel();
            throw e;
        }
        return client;
    }

    private Retry.RetryAndThrowConditionally createRetryConfig(ControllerImplConfig config) {
        return Retry.withExpBackoff(config.getInitialBackoffMillis(), config.getBackoffMultiple(), config.getRetryAttempts(), config.getMaxBackoffMillis()).retryWhen(e -> {
            Throwable cause = Exceptions.unwrap(e);
            if (cause instanceof StatusRuntimeException) {
                Status.Code code = ((StatusRuntimeException)cause).getStatus().getCode();
                switch (code) {
                    case ABORTED: {
                        return true;
                    }
                    case ALREADY_EXISTS: {
                        return false;
                    }
                    case CANCELLED: {
                        return true;
                    }
                    case DATA_LOSS: {
                        return true;
                    }
                    case DEADLINE_EXCEEDED: {
                        return true;
                    }
                    case FAILED_PRECONDITION: {
                        return true;
                    }
                    case INTERNAL: {
                        return true;
                    }
                    case INVALID_ARGUMENT: {
                        return false;
                    }
                    case NOT_FOUND: {
                        return false;
                    }
                    case OK: {
                        return false;
                    }
                    case OUT_OF_RANGE: {
                        return false;
                    }
                    case PERMISSION_DENIED: {
                        return false;
                    }
                    case RESOURCE_EXHAUSTED: {
                        return true;
                    }
                    case UNAUTHENTICATED: {
                        return false;
                    }
                    case UNAVAILABLE: {
                        return true;
                    }
                    case UNIMPLEMENTED: {
                        return false;
                    }
                    case UNKNOWN: {
                        return true;
                    }
                }
                return true;
            }
            return false;
        });
    }

    @Override
    public CompletableFuture<Boolean> createScope(String scopeName) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        long requestId = this.requestIdGenerator.get();
        long traceId = LoggerHelpers.traceEnter(log, "createScope", scopeName, requestId);
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.CreateScopeStatus> callback = new RPCAsyncCallback<Controller.CreateScopeStatus>(requestId, "createScope", scopeName);
            new ControllerClientTagger(this.client, this.timeoutMillis).withTag(requestId, "createScope", scopeName).createScope(Controller.ScopeInfo.newBuilder().setScope(scopeName).build(), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(x -> {
            switch (x.getStatus()) {
                case FAILURE: {
                    log.warn(requestId, "Failed to create scope: {}", scopeName);
                    throw new ControllerFailureException("Failed to create scope: " + scopeName);
                }
                case INVALID_SCOPE_NAME: {
                    log.warn(requestId, "Illegal scope name: {}", scopeName);
                    throw new IllegalArgumentException("Illegal scope name: " + scopeName);
                }
                case SCOPE_EXISTS: {
                    log.warn(requestId, "Scope already exists: {}", scopeName);
                    return false;
                }
                case SUCCESS: {
                    log.info(requestId, "Scope created successfully: {}", scopeName);
                    return true;
                }
            }
            throw new ControllerFailureException("Unknown return status creating scope " + scopeName + " " + x.getStatus());
        })).whenComplete((x, e) -> {
            if (e != null) {
                log.warn(requestId, "createScope {} failed: ", scopeName, e);
            }
            LoggerHelpers.traceLeave((Logger)log, "createScope", traceId, scopeName, requestId);
        });
    }

    @Override
    public CompletableFuture<Boolean> checkScopeExists(String scopeName) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        long requestId = this.requestIdGenerator.get();
        long traceId = LoggerHelpers.traceEnter(log, "checkScopeExists", scopeName, requestId);
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.ExistsResponse> callback = new RPCAsyncCallback<Controller.ExistsResponse>(requestId, "checkScopeExists", scopeName);
            new ControllerClientTagger(this.client, this.timeoutMillis).withTag(requestId, "checkScopeExists", scopeName).checkScopeExists(Controller.ScopeInfo.newBuilder().setScope(scopeName).build(), callback);
            return callback.getFuture().thenApply(Controller.ExistsResponse::getExists);
        }, this.executor);
        return result.whenComplete((x, e) -> {
            if (e != null) {
                log.warn(requestId, "checkScopeExists {} failed: ", scopeName, e);
            }
            LoggerHelpers.traceLeave((Logger)log, "checkScopeExists", traceId, scopeName, requestId);
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public AsyncIterator<String> listScopes() {
        Exceptions.checkNotClosed(this.closed.get(), this);
        long traceId = LoggerHelpers.traceEnter(log, "listScopes", new Object[0]);
        long requestId = this.requestIdGenerator.get();
        try {
            Function<Controller.ContinuationToken, CompletableFuture> function = token -> this.retryConfig.runAsync(() -> {
                RPCAsyncCallback<Controller.ScopesResponse> callback = new RPCAsyncCallback<Controller.ScopesResponse>(requestId, "listScopes", new Object[0]);
                new ControllerClientTagger(this.client, this.timeoutMillis).withTag(requestId, "listScopes").listScopes(Controller.ScopesRequest.newBuilder().setContinuationToken((Controller.ContinuationToken)token).build(), callback);
                return callback.getFuture().thenApply(x -> {
                    ProtocolStringList result = x.getScopesList();
                    return new AbstractMap.SimpleEntry<Controller.ContinuationToken, ProtocolStringList>(x.getContinuationToken(), result);
                });
            }, this.executor);
            ContinuationTokenAsyncIterator continuationTokenAsyncIterator = new ContinuationTokenAsyncIterator(function, Controller.ContinuationToken.newBuilder().build());
            return continuationTokenAsyncIterator;
        }
        finally {
            LoggerHelpers.traceLeave((Logger)log, "listStreams", traceId, new Object[0]);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public AsyncIterator<Stream> listStreams(String scopeName) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        long traceId = LoggerHelpers.traceEnter(log, "listStreams", scopeName);
        long requestId = this.requestIdGenerator.get();
        try {
            Function<Controller.ContinuationToken, CompletableFuture> function = token -> this.retryConfig.runAsync(() -> {
                RPCAsyncCallback<Controller.StreamsInScopeResponse> callback = new RPCAsyncCallback<Controller.StreamsInScopeResponse>(requestId, "listStreams", scopeName);
                Controller.ScopeInfo scopeInfo = Controller.ScopeInfo.newBuilder().setScope(scopeName).build();
                new ControllerClientTagger(this.client, this.timeoutMillis).withTag(requestId, "listStreams", scopeName).listStreamsInScope(Controller.StreamsInScopeRequest.newBuilder().setScope(scopeInfo).setContinuationToken((Controller.ContinuationToken)token).build(), callback);
                return callback.getFuture().thenApply(x -> {
                    switch (x.getStatus()) {
                        case SCOPE_NOT_FOUND: {
                            log.warn(requestId, "Scope not found: {}", scopeName);
                            throw new NoSuchScopeException();
                        }
                        case FAILURE: {
                            log.warn(requestId, "Internal Server Error while trying to list streams in scope: {}", scopeName);
                            throw new RuntimeException("Failure while trying to list streams");
                        }
                    }
                    List result = x.getStreamsList().stream().map(y -> new StreamImpl(y.getScope(), y.getStream())).collect(Collectors.toList());
                    return new AbstractMap.SimpleEntry(x.getContinuationToken(), result);
                });
            }, this.executor);
            ContinuationTokenAsyncIterator continuationTokenAsyncIterator = new ContinuationTokenAsyncIterator(function, Controller.ContinuationToken.newBuilder().build());
            return continuationTokenAsyncIterator;
        }
        finally {
            LoggerHelpers.traceLeave((Logger)log, "listStreams", traceId, new Object[0]);
        }
    }

    @Override
    public CompletableFuture<Boolean> deleteScope(String scopeName) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        long requestId = this.requestIdGenerator.get();
        long traceId = LoggerHelpers.traceEnter(log, "deleteScope", scopeName, requestId);
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.DeleteScopeStatus> callback = new RPCAsyncCallback<Controller.DeleteScopeStatus>(requestId, "deleteScope", scopeName);
            new ControllerClientTagger(this.client, this.timeoutMillis).withTag(requestId, "deleteScope", scopeName).deleteScope(Controller.ScopeInfo.newBuilder().setScope(scopeName).build(), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(x -> {
            switch (x.getStatus()) {
                case FAILURE: {
                    log.warn(requestId, "Failed to delete scope: {}", scopeName);
                    throw new ControllerFailureException("Failed to delete scope: " + scopeName);
                }
                case SCOPE_NOT_EMPTY: {
                    log.warn(requestId, "Cannot delete non empty scope: {}", scopeName);
                    throw new IllegalStateException("Scope " + scopeName + " is not empty.");
                }
                case SCOPE_NOT_FOUND: {
                    log.warn(requestId, "Scope not found: {}", scopeName);
                    return false;
                }
                case SUCCESS: {
                    log.info(requestId, "Scope deleted successfully: {}", scopeName);
                    return true;
                }
            }
            throw new ControllerFailureException("Unknown return status deleting scope " + scopeName + " " + x.getStatus());
        })).whenComplete((x, e) -> {
            if (e != null) {
                log.warn(requestId, "deleteScope {} failed: ", scopeName, e);
            }
            LoggerHelpers.traceLeave((Logger)log, "deleteScope", traceId, scopeName, requestId);
        });
    }

    @Override
    public CompletableFuture<Boolean> createStream(String scope, String streamName, StreamConfiguration streamConfig) {
        Exceptions.checkNotNullOrEmpty(scope, "scope");
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkNotNull(streamConfig, "streamConfig");
        long requestId = this.requestIdGenerator.get();
        long traceId = LoggerHelpers.traceEnter(log, "createStream", streamConfig, requestId);
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.CreateStreamStatus> callback = new RPCAsyncCallback<Controller.CreateStreamStatus>(requestId, "createStream", scope, streamName, streamConfig);
            new ControllerClientTagger(this.client, this.timeoutMillis).withTag(requestId, "createStream", scope, streamName).createStream(ModelHelper.decode(scope, streamName, streamConfig), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(x -> {
            switch (x.getStatus()) {
                case FAILURE: {
                    log.warn(requestId, "Failed to create stream: {}", streamName);
                    throw new ControllerFailureException("Failed to create stream: " + streamConfig);
                }
                case INVALID_STREAM_NAME: {
                    log.warn(requestId, "Illegal stream name: {}", streamName);
                    throw new IllegalArgumentException("Illegal stream name: " + streamConfig);
                }
                case SCOPE_NOT_FOUND: {
                    log.warn(requestId, "Scope not found: {}", scope);
                    throw new IllegalArgumentException("Scope does not exist: " + streamConfig);
                }
                case STREAM_EXISTS: {
                    log.warn(requestId, "Stream already exists: {}", streamName);
                    return false;
                }
                case SUCCESS: {
                    log.info(requestId, "Stream created successfully: {}", streamName);
                    return true;
                }
            }
            throw new ControllerFailureException("Unknown return status creating stream " + streamConfig + " " + x.getStatus());
        })).whenComplete((x, e) -> {
            if (e != null) {
                log.warn(requestId, "createStream {}/{} failed: ", scope, streamName, e);
            }
            LoggerHelpers.traceLeave((Logger)log, "createStream", traceId, streamConfig, requestId);
        });
    }

    @Override
    public CompletableFuture<Boolean> checkStreamExists(String scopeName, String streamName) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        long requestId = this.requestIdGenerator.get();
        long traceId = LoggerHelpers.traceEnter(log, "checkStreamExists", scopeName, streamName, requestId);
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.ExistsResponse> callback = new RPCAsyncCallback<Controller.ExistsResponse>(requestId, "checkStreamExists", scopeName, streamName);
            new ControllerClientTagger(this.client, this.timeoutMillis).withTag(requestId, "checkStreamExists", scopeName, streamName).checkStreamExists(Controller.StreamInfo.newBuilder().setScope(scopeName).setStream(streamName).build(), callback);
            return callback.getFuture().thenApply(Controller.ExistsResponse::getExists);
        }, this.executor);
        return result.whenComplete((x, e) -> {
            if (e != null) {
                log.warn(requestId, "checkStreamExists {}/{} failed: ", scopeName, streamName, e);
            }
            LoggerHelpers.traceLeave((Logger)log, "checkStreamExists", traceId, scopeName, streamName, requestId);
        });
    }

    @Override
    public CompletableFuture<Boolean> updateStream(String scope, String streamName, StreamConfiguration streamConfig) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkNotNull(streamConfig, "streamConfig");
        long requestId = this.requestIdGenerator.get();
        long traceId = LoggerHelpers.traceEnter(log, "updateStream", streamConfig, requestId);
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.UpdateStreamStatus> callback = new RPCAsyncCallback<Controller.UpdateStreamStatus>(requestId, "updateStream", scope, streamName, streamConfig);
            new ControllerClientTagger(this.client, this.timeoutMillis).withTag(requestId, "updateStream", scope, streamName).updateStream(ModelHelper.decode(scope, streamName, streamConfig), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(x -> {
            switch (x.getStatus()) {
                case FAILURE: {
                    log.warn(requestId, "Failed to update stream: {}", streamName);
                    throw new ControllerFailureException("Failed to update stream: " + streamConfig);
                }
                case SCOPE_NOT_FOUND: {
                    log.warn(requestId, "Scope not found: {}", scope);
                    throw new IllegalArgumentException("Scope does not exist: " + streamConfig);
                }
                case STREAM_NOT_FOUND: {
                    log.warn(requestId, "Stream does not exist: {}", streamName);
                    throw new IllegalArgumentException("Stream does not exist: " + streamConfig);
                }
                case SUCCESS: {
                    log.info(requestId, "Successfully updated stream: {}", streamName);
                    return true;
                }
            }
            throw new ControllerFailureException("Unknown return status updating stream " + streamConfig + " " + x.getStatus());
        })).whenComplete((x, e) -> {
            if (e != null) {
                log.warn(requestId, "updateStream {}/{} failed: ", scope, streamName, e);
            }
            LoggerHelpers.traceLeave((Logger)log, "updateStream", traceId, streamConfig, requestId);
        });
    }

    @Override
    public CompletableFuture<List<String>> listSubscribers(String scope, String streamName) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkNotNull(scope, "scope");
        Preconditions.checkNotNull(streamName, "stream");
        long traceId = LoggerHelpers.traceEnter(log, "listSubscribers", new Object[0]);
        long requestId = this.requestIdGenerator.get();
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.SubscribersResponse> callback = new RPCAsyncCallback<Controller.SubscribersResponse>(requestId, "listSubscribers", new Object[0]);
            new ControllerClientTagger(this.client, this.timeoutMillis).withTag(requestId, "listSubscribers").listSubscribers(ModelHelper.createStreamInfo(scope, streamName), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(x -> {
            switch (x.getStatus()) {
                case FAILURE: {
                    log.warn(requestId, "Failed to list subscribers for stream {}/{}", scope, streamName);
                    throw new ControllerFailureException("Failed to list subscribers for stream" + streamName);
                }
                case STREAM_NOT_FOUND: {
                    log.warn(requestId, "Stream does not exist: {}", streamName);
                    throw new IllegalArgumentException("Stream does not exist: " + streamName);
                }
                case SUCCESS: {
                    log.info(requestId, "Successfully listed subscribers for stream: {}/{}", scope, streamName);
                    return x.getSubscribersList().stream().collect(Collectors.toList());
                }
            }
            throw new ControllerFailureException("Unknown return status listing subscribers " + x.getStatus());
        })).whenComplete((x, e) -> {
            if (e != null) {
                log.warn(requestId, "listSubscribers for stream {}/{} failed: ", scope, streamName, e);
            }
            LoggerHelpers.traceLeave((Logger)log, "listSubscribers", traceId, requestId);
        });
    }

    @Override
    public CompletableFuture<Boolean> updateSubscriberStreamCut(String scope, String streamName, String subscriber, UUID readerGroupId, long generation, StreamCut streamCut) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkNotNull(scope, "scope");
        Preconditions.checkNotNull(streamName, "stream");
        Preconditions.checkNotNull(subscriber, "subscriber");
        Preconditions.checkNotNull(readerGroupId, "readerGroupId");
        long requestId = this.requestIdGenerator.get();
        long traceId = LoggerHelpers.traceEnter(log, "updateTruncationStreamCut", subscriber, requestId);
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.UpdateSubscriberStatus> callback = new RPCAsyncCallback<Controller.UpdateSubscriberStatus>(requestId, "updateTruncationStreamCut", scope, streamName, subscriber, streamCut);
            new ControllerClientTagger(this.client, this.timeoutMillis).withTag(requestId, "updateTruncationStreamCut", scope, streamName).updateSubscriberStreamCut(ModelHelper.decode(scope, streamName, subscriber, readerGroupId, generation, this.getStreamCutMap(streamCut)), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(x -> {
            switch (x.getStatus()) {
                case FAILURE: {
                    log.warn(requestId, "Failed to update stream cut for Reader Group: {}", subscriber);
                    throw new ControllerFailureException("Failed to update stream cut for Reader Group:" + subscriber);
                }
                case STREAM_NOT_FOUND: {
                    log.warn(requestId, "Stream does not exist: {}", streamName);
                    throw new IllegalArgumentException("Stream does not exist: " + streamName);
                }
                case SUBSCRIBER_NOT_FOUND: {
                    log.warn(requestId, "Subscriber does not exist: {} for stream {}/{}", subscriber, scope, streamName);
                    throw new IllegalArgumentException("Subscriber does not exist: " + subscriber);
                }
                case STREAM_CUT_NOT_VALID: {
                    log.warn(requestId, "StreamCut not valid for stream {}/{} subscriber {}.", scope, streamName, subscriber);
                    throw new IllegalArgumentException("StreamCut not valid for stream " + scope + "/" + streamName + ": subscriber:" + subscriber);
                }
                case GENERATION_MISMATCH: {
                    log.warn(requestId, "Invalid generation for ReaderGroup {}.", subscriber);
                    throw new IllegalArgumentException("Invalid generation for ReaderGroup " + subscriber);
                }
                case SUCCESS: {
                    log.info(requestId, "Successfully updated truncationStreamCut for subscriber {} for stream: {}/{}", subscriber, scope, streamName);
                    return true;
                }
            }
            throw new ControllerFailureException("Unknown return status for updateTruncationStreamCut for Stream :" + scope + "/" + streamName + ": subscriber:" + subscriber + ": status=" + x.getStatus());
        })).whenComplete((x, e) -> {
            if (e != null) {
                log.warn(requestId, "updateTruncationStreamCut for Subscriber {} for stream {}/{} failed: ", subscriber, scope, streamName, e);
            }
            LoggerHelpers.traceLeave((Logger)log, "updateTruncationStreamCut", traceId, subscriber, requestId);
        });
    }

    @Override
    public CompletableFuture<Boolean> truncateStream(String scope, String stream, StreamCut streamCut) {
        return this.truncateStream(scope, stream, this.getStreamCutMap(streamCut));
    }

    private CompletableFuture<Boolean> truncateStream(String scope, String stream, Map<Long, Long> streamCut) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkNotNull(streamCut, "streamCut");
        long requestId = this.requestIdGenerator.get();
        long traceId = LoggerHelpers.traceEnter(log, "truncateStream", streamCut, requestId);
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.UpdateStreamStatus> callback = new RPCAsyncCallback<Controller.UpdateStreamStatus>(requestId, "truncateStream", scope, stream);
            new ControllerClientTagger(this.client, this.timeoutMillis).withTag(requestId, "truncateStream", scope, stream).truncateStream(ModelHelper.decode(scope, stream, streamCut), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(x -> {
            switch (x.getStatus()) {
                case FAILURE: {
                    log.warn(requestId, "Failed to truncate stream: {}/{}", scope, stream);
                    throw new ControllerFailureException("Failed to truncate stream: " + scope + "/" + stream);
                }
                case SCOPE_NOT_FOUND: {
                    log.warn(requestId, "Scope not found: {}", scope);
                    throw new IllegalArgumentException("Scope does not exist: " + scope);
                }
                case STREAM_NOT_FOUND: {
                    log.warn(requestId, "Stream does not exist: {}/{}", scope, stream);
                    throw new IllegalArgumentException("Stream does not exist: " + stream);
                }
                case SUCCESS: {
                    log.info(requestId, "Successfully updated stream: {}/{}", scope, stream);
                    return true;
                }
            }
            throw new ControllerFailureException("Unknown return status truncating stream " + scope + "/" + stream + " " + x.getStatus());
        })).whenComplete((x, e) -> {
            if (e != null) {
                log.warn(requestId, "truncateStream {}/{} failed: ", scope, stream, e);
            }
            LoggerHelpers.traceLeave((Logger)log, "truncateStream", traceId, streamCut, requestId);
        });
    }

    @Override
    public CancellableRequest<Boolean> scaleStream(Stream stream, List<Long> sealedSegments, Map<Double, Double> newKeyRanges, ScheduledExecutorService executor) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        CancellableRequest<Boolean> cancellableRequest = new CancellableRequest<Boolean>();
        long requestId = this.requestIdGenerator.get();
        long traceId = LoggerHelpers.traceEnter(log, "scaleStream", stream, requestId);
        this.startScaleInternal(stream, sealedSegments, newKeyRanges, "scaleStream", requestId).whenComplete((startScaleResponse, e) -> {
            if (e != null) {
                log.error(requestId, "Failed to start scale for stream {}", stream, e);
                cancellableRequest.start(() -> Futures.failedFuture(e), any -> true, executor);
            } else {
                try {
                    boolean started = this.handleScaleResponse(stream, (Controller.ScaleResponse)startScaleResponse, requestId);
                    cancellableRequest.start(() -> {
                        if (started) {
                            return this.checkScaleStatus(stream, startScaleResponse.getEpoch());
                        }
                        return CompletableFuture.completedFuture(false);
                    }, isDone -> !started || isDone != false, executor);
                    LoggerHelpers.traceLeave((Logger)log, "scaleStream", traceId, stream, requestId);
                }
                catch (Exception ex) {
                    log.warn(requestId, "Failed to handle scale response: ", ex);
                    cancellableRequest.start(() -> Futures.failedFuture(ex), any -> true, executor);
                }
            }
        });
        return cancellableRequest;
    }

    @Override
    public CompletableFuture<Boolean> startScale(Stream stream, List<Long> sealedSegments, Map<Double, Double> newKeyRanges) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        long requestId = this.requestIdGenerator.get();
        long traceId = LoggerHelpers.traceEnter(log, "scaleStream", stream, requestId);
        return ((CompletableFuture)this.startScaleInternal(stream, sealedSegments, newKeyRanges, "scaleStream", requestId).thenApply(response -> this.handleScaleResponse(stream, (Controller.ScaleResponse)response, traceId))).whenComplete((x, e) -> {
            if (e != null) {
                log.warn(requestId, "Failed to start scale of stream: {} ", stream.getStreamName(), e);
            }
            LoggerHelpers.traceLeave((Logger)log, "scaleStream", traceId, stream, requestId);
        });
    }

    private Boolean handleScaleResponse(Stream stream, Controller.ScaleResponse response, long requestId) {
        switch (response.getStatus()) {
            case FAILURE: {
                log.warn(requestId, "Failed to scale stream: {}", stream.getStreamName());
                throw new ControllerFailureException("Failed to scale stream: " + stream);
            }
            case PRECONDITION_FAILED: {
                log.warn(requestId, "Precondition failed for scale stream: {}", stream.getStreamName());
                return false;
            }
            case STARTED: {
                log.info(requestId, "Successfully started scale stream: {}", stream.getStreamName());
                return true;
            }
        }
        throw new ControllerFailureException("Unknown return status scaling stream " + stream + " " + response.getStatus());
    }

    @Override
    public CompletableFuture<Boolean> checkScaleStatus(Stream stream, int scaleEpoch) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkNotNull(stream, "stream");
        Preconditions.checkArgument(scaleEpoch >= 0);
        long traceId = LoggerHelpers.traceEnter(log, "checkScale", stream);
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.ScaleStatusResponse> callback = new RPCAsyncCallback<Controller.ScaleStatusResponse>(traceId, "checkScale", stream, scaleEpoch);
            ((ControllerServiceGrpc.ControllerServiceStub)this.client.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).checkScale(Controller.ScaleStatusRequest.newBuilder().setStreamInfo(ModelHelper.createStreamInfo(stream.getScope(), stream.getStreamName())).setEpoch(scaleEpoch).build(), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(response -> {
            switch (response.getStatus()) {
                case IN_PROGRESS: {
                    return false;
                }
                case SUCCESS: {
                    return true;
                }
                case INVALID_INPUT: {
                    log.warn("Failed to check scale status of stream " + stream.getStreamName() + " because of invalid input");
                    throw new ControllerFailureException("invalid input");
                }
            }
            throw new ControllerFailureException("Unknown return status checking scale of stream " + stream + " " + response.getStatus());
        })).whenComplete((x, e) -> {
            if (e != null) {
                log.warn("checkScaleStatus {} failed: ", (Object)stream.getStreamName(), e);
            }
            LoggerHelpers.traceLeave((Logger)log, "checkScale", traceId, new Object[0]);
        });
    }

    private CompletableFuture<Controller.ScaleResponse> startScaleInternal(Stream stream, List<Long> sealedSegments, Map<Double, Double> newKeyRanges, String method, long requestId) {
        Preconditions.checkNotNull(stream, "stream");
        Preconditions.checkNotNull(sealedSegments, "sealedSegments");
        Preconditions.checkNotNull(newKeyRanges, "newKeyRanges");
        CompletableFuture<Controller.ScaleResponse> result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.ScaleResponse> callback = new RPCAsyncCallback<Controller.ScaleResponse>(requestId, method, stream);
            long scaleTimestamp = System.currentTimeMillis();
            new ControllerClientTagger(this.client, this.timeoutMillis).withTag(requestId, method, stream.getScope(), stream.getStreamName(), String.valueOf(scaleTimestamp)).scale(Controller.ScaleRequest.newBuilder().setStreamInfo(ModelHelper.createStreamInfo(stream.getScope(), stream.getStreamName())).addAllSealedSegments(sealedSegments).addAllNewKeyRanges(newKeyRanges.entrySet().stream().map(x -> Controller.ScaleRequest.KeyRangeEntry.newBuilder().setStart((Double)x.getKey()).setEnd((Double)x.getValue()).build()).collect(Collectors.toList())).setScaleTimestamp(scaleTimestamp).build(), callback);
            return callback.getFuture();
        }, this.executor);
        return result;
    }

    @Override
    public CompletableFuture<Boolean> sealStream(String scope, String streamName) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Exceptions.checkNotNullOrEmpty(scope, "scope");
        Exceptions.checkNotNullOrEmpty(streamName, "streamName");
        long requestId = this.requestIdGenerator.get();
        long traceId = LoggerHelpers.traceEnter(log, "sealStream", scope, streamName, requestId);
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.UpdateStreamStatus> callback = new RPCAsyncCallback<Controller.UpdateStreamStatus>(requestId, "sealStream", scope, streamName);
            new ControllerClientTagger(this.client, this.timeoutMillis).withTag(requestId, "sealStream", scope, streamName).sealStream(ModelHelper.createStreamInfo(scope, streamName), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(x -> {
            switch (x.getStatus()) {
                case FAILURE: {
                    log.warn(requestId, "Failed to seal stream: {}", streamName);
                    throw new ControllerFailureException("Failed to seal stream: " + streamName);
                }
                case SCOPE_NOT_FOUND: {
                    log.warn(requestId, "Scope not found: {}", scope);
                    throw new InvalidStreamException("Scope does not exist: " + scope);
                }
                case STREAM_NOT_FOUND: {
                    log.warn(requestId, "Stream does not exist: {}", streamName);
                    throw new InvalidStreamException("Stream does not exist: " + streamName);
                }
                case SUCCESS: {
                    log.info(requestId, "Successfully sealed stream: {}", streamName);
                    return true;
                }
            }
            throw new ControllerFailureException("Unknown return status sealing stream " + streamName + " " + x.getStatus());
        })).whenComplete((x, e) -> {
            if (e != null) {
                log.warn(requestId, "sealStream {}/{} failed: ", scope, streamName, e);
            }
            LoggerHelpers.traceLeave((Logger)log, "sealStream", traceId, scope, streamName, requestId);
        });
    }

    @Override
    public CompletableFuture<Boolean> deleteStream(String scope, String streamName) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Exceptions.checkNotNullOrEmpty(scope, "scope");
        Exceptions.checkNotNullOrEmpty(streamName, "streamName");
        long requestId = this.requestIdGenerator.get();
        long traceId = LoggerHelpers.traceEnter(log, "deleteStream", scope, streamName, requestId);
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.DeleteStreamStatus> callback = new RPCAsyncCallback<Controller.DeleteStreamStatus>(requestId, "deleteStream", scope, streamName);
            new ControllerClientTagger(this.client, this.timeoutMillis).withTag(requestId, "deleteStream", scope, streamName).deleteStream(ModelHelper.createStreamInfo(scope, streamName), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(x -> {
            switch (x.getStatus()) {
                case FAILURE: {
                    log.warn(requestId, "Failed to delete stream: {}", streamName);
                    throw new ControllerFailureException("Failed to delete stream: " + streamName);
                }
                case STREAM_NOT_FOUND: {
                    log.warn(requestId, "Stream does not exist: {}", streamName);
                    return false;
                }
                case STREAM_NOT_SEALED: {
                    log.warn(requestId, "Stream is not sealed: {}", streamName);
                    throw new IllegalArgumentException("Stream is not sealed: " + streamName);
                }
                case SUCCESS: {
                    log.info(requestId, "Successfully deleted stream: {}", streamName);
                    return true;
                }
            }
            throw new ControllerFailureException("Unknown return status deleting stream " + streamName + " " + x.getStatus());
        })).whenComplete((x, e) -> {
            if (e != null) {
                log.warn(requestId, "deleteStream {}/{} failed: ", scope, streamName, e);
            }
            LoggerHelpers.traceLeave((Logger)log, "deleteStream", traceId, scope, streamName, requestId);
        });
    }

    @Override
    public CompletableFuture<Map<Segment, Long>> getSegmentsAtTime(Stream stream, long timestamp) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkNotNull(stream, "stream");
        long traceId = LoggerHelpers.traceEnter(log, "getSegmentsAtTime", stream, timestamp);
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.SegmentsAtTime> callback = new RPCAsyncCallback<Controller.SegmentsAtTime>(traceId, "getSegmentsAtTime", stream, timestamp);
            Controller.StreamInfo streamInfo = ModelHelper.createStreamInfo(stream.getScope(), stream.getStreamName(), AccessOperation.NONE);
            Controller.GetSegmentsRequest request = Controller.GetSegmentsRequest.newBuilder().setStreamInfo(streamInfo).setTimestamp(timestamp).build();
            ((ControllerServiceGrpc.ControllerServiceStub)this.client.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).getSegments(request, callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(segments -> {
            log.debug("Received the following data from the controller {}", (Object)segments.getSegmentsList());
            return segments.getSegmentsList().stream().collect(Collectors.toMap(location -> ModelHelper.encode(location.getSegmentId()), location -> location.getOffset()));
        })).whenComplete((x, e) -> {
            if (e != null) {
                log.warn("get Segments of {} at time {} failed: ", stream.getStreamName(), timestamp, e);
            }
            LoggerHelpers.traceLeave((Logger)log, "getSegmentsAtTime", traceId, new Object[0]);
        });
    }

    @Override
    public CompletableFuture<StreamSegmentsWithPredecessors> getSuccessors(Segment segment) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        long traceId = LoggerHelpers.traceEnter(log, "getSuccessors", segment);
        CompletableFuture resultFuture = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.SuccessorResponse> callback = new RPCAsyncCallback<Controller.SuccessorResponse>(traceId, "getSuccessors", segment);
            ((ControllerServiceGrpc.ControllerServiceStub)this.client.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).getSegmentsImmediatelyFollowing(ModelHelper.decode(segment), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)resultFuture.thenApply(successors -> {
            log.debug("Received the following data from the controller {}", (Object)successors.getSegmentsList());
            HashMap<SegmentWithRange, List<Long>> result = new HashMap<SegmentWithRange, List<Long>>();
            for (Controller.SuccessorResponse.SegmentEntry entry : successors.getSegmentsList()) {
                result.put(ModelHelper.encode(entry.getSegment()), entry.getValueList());
            }
            return new StreamSegmentsWithPredecessors(result, successors.getDelegationToken());
        })).whenComplete((x, e) -> {
            if (e != null) {
                log.warn("getSuccessors of segment {} failed: ", (Object)segment.getSegmentId(), e);
            }
            LoggerHelpers.traceLeave((Logger)log, "getSuccessors", traceId, new Object[0]);
        });
    }

    @Override
    public CompletableFuture<StreamSegmentSuccessors> getSuccessors(StreamCut from) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Stream stream = from.asImpl().getStream();
        long traceId = LoggerHelpers.traceEnter(log, "getSuccessors", stream);
        return this.getSegmentsBetweenStreamCuts(from, StreamCut.UNBOUNDED).whenComplete((x, e) -> {
            if (e != null) {
                log.warn("getSuccessorsFromCut for {} failed: ", (Object)stream.getStreamName(), e);
            }
            LoggerHelpers.traceLeave((Logger)log, "getSuccessors", traceId, new Object[0]);
        });
    }

    @Override
    public CompletableFuture<StreamSegmentSuccessors> getSegments(StreamCut fromStreamCut, StreamCut toStreamCut) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkNotNull(fromStreamCut, "fromStreamCut");
        Preconditions.checkNotNull(toStreamCut, "toStreamCut");
        Preconditions.checkArgument(fromStreamCut.asImpl().getStream().equals(toStreamCut.asImpl().getStream()), "Ensure streamCuts for the same stream is passed");
        Stream stream = fromStreamCut.asImpl().getStream();
        long traceId = LoggerHelpers.traceEnter(log, "getSegments", stream);
        return this.getSegmentsBetweenStreamCuts(fromStreamCut, toStreamCut).whenComplete((x, e) -> {
            if (e != null) {
                log.warn("getSegments for {} failed: ", (Object)stream.getStreamName(), e);
            }
            LoggerHelpers.traceLeave((Logger)log, "getSegments", traceId, new Object[0]);
        });
    }

    private CompletableFuture<StreamSegmentSuccessors> getSegmentsBetweenStreamCuts(StreamCut fromStreamCut, StreamCut toStreamCut) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Stream stream = fromStreamCut.asImpl().getStream();
        long traceId = LoggerHelpers.traceEnter(log, "getSegments", stream);
        CompletableFuture resultFuture = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.StreamCutRangeResponse> callback = new RPCAsyncCallback<Controller.StreamCutRangeResponse>(traceId, "getSuccessorsFromCut", new Object[0]);
            ((ControllerServiceGrpc.ControllerServiceStub)this.client.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).getSegmentsBetween(ModelHelper.decode(stream.getScope(), stream.getStreamName(), this.getStreamCutMap(fromStreamCut), this.getStreamCutMap(toStreamCut)), callback);
            return callback.getFuture();
        }, this.executor);
        return resultFuture.thenApply(response -> {
            log.debug("Received the following data from the controller {}", (Object)response.getSegmentsList());
            return new StreamSegmentSuccessors(response.getSegmentsList().stream().map(ModelHelper::encode).collect(Collectors.toSet()), response.getDelegationToken());
        });
    }

    private Map<Long, Long> getStreamCutMap(StreamCut streamCut) {
        if (streamCut.equals(StreamCut.UNBOUNDED)) {
            return Collections.emptyMap();
        }
        return streamCut.asImpl().getPositions().entrySet().stream().collect(Collectors.toMap(x -> ((Segment)x.getKey()).getSegmentId(), Map.Entry::getValue));
    }

    @Override
    public CompletableFuture<StreamSegments> getCurrentSegments(String scope, String stream) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Exceptions.checkNotNullOrEmpty(scope, "scope");
        Exceptions.checkNotNullOrEmpty(stream, "stream");
        long traceId = LoggerHelpers.traceEnter(log, "getCurrentSegments", scope, stream);
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.SegmentRanges> callback = new RPCAsyncCallback<Controller.SegmentRanges>(traceId, "getCurrentSegments", scope, stream);
            ((ControllerServiceGrpc.ControllerServiceStub)this.client.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).getCurrentSegments(ModelHelper.createStreamInfo(scope, stream, AccessOperation.NONE), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(this::getStreamSegments)).whenComplete((x, e) -> {
            if (e != null) {
                log.warn("getCurrentSegments for {}/{} failed: ", scope, stream, e);
            }
            if (x.getNumberOfSegments() == 0) {
                log.warn("getCurrentSegments for {}/{} returned zero segments since the Stream is sealed", (Object)scope, (Object)stream);
            }
            LoggerHelpers.traceLeave((Logger)log, "getCurrentSegments", traceId, new Object[0]);
        });
    }

    @Override
    public CompletableFuture<StreamSegments> getEpochSegments(String scope, String stream, int epoch) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Exceptions.checkNotNullOrEmpty(scope, "scope");
        Exceptions.checkNotNullOrEmpty(stream, "stream");
        Exceptions.checkArgument(epoch >= 0, "epoch", "Should be a positive integer", new Object[0]);
        long traceId = LoggerHelpers.traceEnter(log, "getEpochSegments", scope, stream);
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.SegmentRanges> callback = new RPCAsyncCallback<Controller.SegmentRanges>(traceId, "getEpochSegments", scope, stream);
            Controller.GetEpochSegmentsRequest request = Controller.GetEpochSegmentsRequest.newBuilder().setStreamInfo(ModelHelper.createStreamInfo(scope, stream)).setEpoch(epoch).build();
            ((ControllerServiceGrpc.ControllerServiceStub)this.client.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).getEpochSegments(request, callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(this::getStreamSegments)).whenComplete((x, e) -> {
            if (e != null) {
                log.warn("getEpochSegments for {}/{} with for epoch {} failed: ", scope, stream, epoch, e);
            }
            LoggerHelpers.traceLeave((Logger)log, "getEpochSegments", traceId, new Object[0]);
        });
    }

    private StreamSegments getStreamSegments(Controller.SegmentRanges ranges) {
        log.debug("Received the following data from the controller {}", (Object)ranges.getSegmentRangesList());
        TreeMap<Double, SegmentWithRange> rangeMap = new TreeMap<Double, SegmentWithRange>();
        for (Controller.SegmentRange r : ranges.getSegmentRangesList()) {
            Preconditions.checkState(r.getMinKey() <= r.getMaxKey(), "Min keyrange %s was not less than maximum keyRange %s for segment %s", (Object)r.getMinKey(), (Object)r.getMaxKey(), (Object)r.getSegmentId());
            rangeMap.put(r.getMaxKey(), new SegmentWithRange(ModelHelper.encode(r.getSegmentId()), r.getMinKey(), r.getMaxKey()));
        }
        return new StreamSegments(rangeMap);
    }

    @Override
    public CompletableFuture<PravegaNodeUri> getEndpointForSegment(String qualifiedSegmentName) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Exceptions.checkNotNullOrEmpty(qualifiedSegmentName, "qualifiedSegmentName");
        long traceId = LoggerHelpers.traceEnter(log, "getEndpointForSegment", qualifiedSegmentName);
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.NodeUri> callback = new RPCAsyncCallback<Controller.NodeUri>(traceId, "getEndpointForSegment", qualifiedSegmentName);
            Segment segment = Segment.fromScopedName(qualifiedSegmentName);
            ((ControllerServiceGrpc.ControllerServiceStub)this.client.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).getURI(ModelHelper.createSegmentId(segment.getScope(), segment.getStreamName(), segment.getSegmentId()), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(ModelHelper::encode)).whenComplete((x, e) -> {
            if (e != null) {
                log.warn("getEndpointForSegment {} failed: ", (Object)qualifiedSegmentName, e);
            }
            LoggerHelpers.traceLeave((Logger)log, "getEndpointForSegment", traceId, new Object[0]);
        });
    }

    @Override
    public CompletableFuture<Boolean> isSegmentOpen(Segment segment) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        long traceId = LoggerHelpers.traceEnter(log, "isSegmentOpen", segment);
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.SegmentValidityResponse> callback = new RPCAsyncCallback<Controller.SegmentValidityResponse>(traceId, "isSegmentOpen", segment);
            ((ControllerServiceGrpc.ControllerServiceStub)this.client.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).isSegmentValid(ModelHelper.createSegmentId(segment.getScope(), segment.getStreamName(), segment.getSegmentId()), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(Controller.SegmentValidityResponse::getResponse)).whenComplete((x, e) -> {
            if (e != null) {
                log.warn("isSegmentOpen for segment {} failed: ", (Object)segment, e);
            }
            LoggerHelpers.traceLeave((Logger)log, "isSegmentOpen", traceId, new Object[0]);
        });
    }

    @Override
    public CompletableFuture<TxnSegments> createTransaction(Stream stream, long lease) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkNotNull(stream, "stream");
        long traceId = LoggerHelpers.traceEnter(log, "createTransaction", stream, lease);
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.CreateTxnResponse> callback = new RPCAsyncCallback<Controller.CreateTxnResponse>(traceId, "createTransaction", stream, lease);
            ((ControllerServiceGrpc.ControllerServiceStub)this.client.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).createTransaction(Controller.CreateTxnRequest.newBuilder().setStreamInfo(ModelHelper.createStreamInfo(stream.getScope(), stream.getStreamName())).setLease(lease).build(), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(this::convert)).whenComplete((x, e) -> {
            if (e != null) {
                log.warn("createTransaction on stream {} failed: ", (Object)stream.getStreamName(), e);
            }
            LoggerHelpers.traceLeave((Logger)log, "createTransaction", traceId, new Object[0]);
        });
    }

    private TxnSegments convert(Controller.CreateTxnResponse response) {
        TreeMap<Double, SegmentWithRange> rangeMap = new TreeMap<Double, SegmentWithRange>();
        for (Controller.SegmentRange r : response.getActiveSegmentsList()) {
            Preconditions.checkState(r.getMinKey() <= r.getMaxKey());
            rangeMap.put(r.getMaxKey(), new SegmentWithRange(ModelHelper.encode(r.getSegmentId()), r.getMinKey(), r.getMaxKey()));
        }
        StreamSegments segments = new StreamSegments(rangeMap);
        return new TxnSegments(segments, ModelHelper.encode(response.getTxnId()));
    }

    @Override
    public CompletableFuture<Transaction.PingStatus> pingTransaction(Stream stream, UUID txId, long lease) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        long traceId = LoggerHelpers.traceEnter(log, "pingTransaction", stream, txId, lease);
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.PingTxnStatus> callback = new RPCAsyncCallback<Controller.PingTxnStatus>(traceId, "pingTransaction", txId, lease);
            ((ControllerServiceGrpc.ControllerServiceStub)this.client.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).pingTransaction(Controller.PingTxnRequest.newBuilder().setStreamInfo(ModelHelper.createStreamInfo(stream.getScope(), stream.getStreamName())).setTxnId(ModelHelper.decode(txId)).setLease(lease).build(), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(status -> {
            try {
                return ModelHelper.encode(status.getStatus(), stream + " " + txId);
            }
            catch (PingFailedException ex) {
                throw new CompletionException(ex);
            }
        })).whenComplete((s2, e) -> {
            if (e != null) {
                log.warn("PingTransaction {} failed:", (Object)txId, e);
            }
            LoggerHelpers.traceLeave((Logger)log, "pingTransaction", traceId, new Object[0]);
        });
    }

    @Override
    public CompletableFuture<Void> commitTransaction(Stream stream, String writerId, Long timestamp, UUID txId) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkNotNull(stream, "stream");
        Preconditions.checkNotNull(txId, "txId");
        long traceId = LoggerHelpers.traceEnter(log, "commitTransaction", stream, txId);
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.TxnStatus> callback = new RPCAsyncCallback<Controller.TxnStatus>(traceId, "commitTransaction", stream, writerId, timestamp, txId);
            Controller.TxnRequest.Builder txnRequest = Controller.TxnRequest.newBuilder().setStreamInfo(ModelHelper.createStreamInfo(stream.getScope(), stream.getStreamName())).setWriterId(writerId).setTxnId(ModelHelper.decode(txId));
            if (timestamp != null) {
                txnRequest.setTimestamp(timestamp);
            } else {
                txnRequest.setTimestamp(Long.MIN_VALUE);
            }
            ((ControllerServiceGrpc.ControllerServiceStub)this.client.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).commitTransaction(txnRequest.build(), callback);
            return callback.getFuture();
        }, this.executor);
        return result.thenApply(txnStatus -> {
            if (txnStatus.getStatus().equals(Controller.TxnStatus.Status.STREAM_NOT_FOUND)) {
                log.warn("Stream not found: {}", (Object)stream.getStreamName());
                throw new InvalidStreamException("Stream no longer exists: " + stream);
            }
            if (txnStatus.getStatus().equals(Controller.TxnStatus.Status.TRANSACTION_NOT_FOUND)) {
                log.warn("transaction not found: {}", (Object)txId);
                throw Exceptions.sneakyThrow(new TxnFailedException("Transaction was already either committed or aborted"));
            }
            if (txnStatus.getStatus().equals(Controller.TxnStatus.Status.SUCCESS)) {
                return null;
            }
            log.warn("Unable to commit transaction {} commit status is {}", (Object)txId, (Object)txnStatus.getStatus());
            throw Exceptions.sneakyThrow(new TxnFailedException("Commit transaction failed with status: " + txnStatus.getStatus()));
        });
    }

    @Override
    public CompletableFuture<Void> abortTransaction(Stream stream, UUID txId) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkNotNull(stream, "stream");
        Preconditions.checkNotNull(txId, "txId");
        long traceId = LoggerHelpers.traceEnter(log, "abortTransaction", stream, txId);
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.TxnStatus> callback = new RPCAsyncCallback<Controller.TxnStatus>(traceId, "abortTransaction", stream, txId);
            ((ControllerServiceGrpc.ControllerServiceStub)this.client.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).abortTransaction(Controller.TxnRequest.newBuilder().setStreamInfo(ModelHelper.createStreamInfo(stream.getScope(), stream.getStreamName())).setTxnId(ModelHelper.decode(txId)).build(), callback);
            return callback.getFuture();
        }, this.executor);
        return result.thenApply(txnStatus -> {
            LoggerHelpers.traceLeave((Logger)log, "abortTransaction", traceId, new Object[0]);
            if (txnStatus.getStatus().equals(Controller.TxnStatus.Status.STREAM_NOT_FOUND)) {
                log.warn("Stream not found: {}", (Object)stream.getStreamName());
                throw new InvalidStreamException("Stream no longer exists: " + stream);
            }
            if (txnStatus.getStatus().equals(Controller.TxnStatus.Status.TRANSACTION_NOT_FOUND)) {
                log.warn("transaction not found: {}", (Object)txId);
                throw Exceptions.sneakyThrow(new TxnFailedException("Transaction was already either committed or aborted"));
            }
            if (txnStatus.getStatus().equals(Controller.TxnStatus.Status.SUCCESS)) {
                return null;
            }
            log.warn("Unable to abort transaction {} abort status is {} ", (Object)txId, (Object)txnStatus.getStatus());
            throw new RuntimeException("Error aborting transaction: " + txnStatus.getStatus());
        });
    }

    @Override
    public CompletableFuture<Transaction.Status> checkTransactionStatus(Stream stream, UUID txId) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkNotNull(stream, "stream");
        Preconditions.checkNotNull(txId, "txId");
        long traceId = LoggerHelpers.traceEnter(log, "checkTransactionStatus", stream, txId);
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.TxnState> callback = new RPCAsyncCallback<Controller.TxnState>(traceId, "checkTransactionStatus", stream, txId);
            ((ControllerServiceGrpc.ControllerServiceStub)this.client.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).checkTransactionState(Controller.TxnRequest.newBuilder().setStreamInfo(ModelHelper.createStreamInfo(stream.getScope(), stream.getStreamName())).setTxnId(ModelHelper.decode(txId)).build(), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(status -> ModelHelper.encode(status.getState(), stream + " " + txId))).whenComplete((x, e) -> {
            if (e != null) {
                log.warn("checkTransactionStatus on " + stream + " " + txId + " failed: ", (Throwable)e);
            }
            LoggerHelpers.traceLeave((Logger)log, "checkTransactionStatus", traceId, new Object[0]);
        });
    }

    @Override
    public CompletableFuture<Void> noteTimestampFromWriter(String writer, Stream stream, long timestamp, WriterPosition lastWrittenPosition) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkNotNull(stream, "stream");
        Preconditions.checkNotNull(writer, "writer");
        Preconditions.checkNotNull(lastWrittenPosition, "lastWrittenPosition");
        long traceId = LoggerHelpers.traceEnter(log, "noteTimestampFromWriter", writer, stream);
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.TimestampResponse> callback = new RPCAsyncCallback<Controller.TimestampResponse>(traceId, "lastWrittenPosition", writer, stream, timestamp);
            ((ControllerServiceGrpc.ControllerServiceStub)this.client.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).noteTimestampFromWriter(Controller.TimestampFromWriter.newBuilder().setWriter(writer).setTimestamp(timestamp).setPosition(ModelHelper.createStreamCut(stream, lastWrittenPosition)).build(), callback);
            return callback.getFuture();
        }, this.executor);
        return result.thenApply(response -> {
            LoggerHelpers.traceLeave((Logger)log, "noteTimestampFromWriter", traceId, new Object[0]);
            if (response.getResult().equals(Controller.TimestampResponse.Status.SUCCESS)) {
                return null;
            }
            log.warn("Writer " + writer + " failed to note time because: " + response.getResult() + " time was: " + timestamp + " position=" + lastWrittenPosition);
            throw new RuntimeException("failed to note time because: " + response.getResult());
        });
    }

    @Override
    public CompletableFuture<Void> removeWriter(String writerId, Stream stream) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkNotNull(stream, "stream");
        Preconditions.checkNotNull(writerId, "writerId");
        long traceId = LoggerHelpers.traceEnter(log, "writerShutdown", writerId, stream);
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.RemoveWriterResponse> callback = new RPCAsyncCallback<Controller.RemoveWriterResponse>(traceId, "writerShutdown", writerId, stream);
            ((ControllerServiceGrpc.ControllerServiceStub)this.client.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).removeWriter(Controller.RemoveWriterRequest.newBuilder().setWriter(writerId).setStream(ModelHelper.createStreamInfo(stream.getScope(), stream.getStreamName())).build(), callback);
            return callback.getFuture();
        }, this.executor);
        return result.thenApply(response -> {
            LoggerHelpers.traceLeave((Logger)log, "writerShutdown", traceId, new Object[0]);
            if (response.getResult().equals(Controller.RemoveWriterResponse.Status.SUCCESS)) {
                return null;
            }
            log.warn("Notifying the controller of writer shutdown failed for writer: " + writerId + " because of " + response.getResult());
            throw new RuntimeException("Unable to remove writer due to: " + response.getResult());
        });
    }

    @Override
    public void close() {
        if (!this.closed.getAndSet(true)) {
            Callbacks.invokeSafely(this::closeChannel, ex -> log.error("Error while closing ControllerImpl.", (Throwable)ex));
        }
    }

    private void closeChannel() {
        this.channel.shutdownNow();
        Exceptions.handleInterrupted(() -> {
            boolean shutdownStatus = this.channel.awaitTermination(20L, TimeUnit.SECONDS);
            log.debug("Controller client shutdown has been initiated. Channel status: channel.isTerminated():{}", (Object)shutdownStatus);
        });
    }

    @Override
    public CompletableFuture<String> getOrRefreshDelegationTokenFor(String scope, String streamName, AccessOperation accessOperation) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Exceptions.checkNotNullOrEmpty(scope, "scope");
        Exceptions.checkNotNullOrEmpty(streamName, "stream");
        long traceId = LoggerHelpers.traceEnter(log, "getOrRefreshDelegationTokenFor", scope, streamName);
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.DelegationToken> callback = new RPCAsyncCallback<Controller.DelegationToken>(traceId, "getOrRefreshDelegationTokenFor", scope, streamName);
            ((ControllerServiceGrpc.ControllerServiceStub)this.client.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).getDelegationToken(ModelHelper.createStreamInfo(scope, streamName, accessOperation), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(token -> token.getDelegationToken())).whenComplete((x, e) -> {
            if (e != null) {
                log.warn("getOrRefreshDelegationTokenFor {}/{} failed: ", scope, streamName, e);
            }
            LoggerHelpers.traceLeave((Logger)log, "getOrRefreshDelegationTokenFor", traceId, new Object[0]);
        });
    }

    @Override
    public CompletableFuture<Boolean> createKeyValueTable(String scope, String kvtName, KeyValueTableConfiguration kvtConfig) {
        Exceptions.checkNotNullOrEmpty(scope, "scope");
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkNotNull(kvtConfig, "KeyValueTableConfig");
        long requestId = this.requestIdGenerator.get();
        long traceId = LoggerHelpers.traceEnter(log, "createKeyValueTable", kvtConfig, requestId);
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.CreateKeyValueTableStatus> callback = new RPCAsyncCallback<Controller.CreateKeyValueTableStatus>(requestId, "createKeyValueTable", scope, kvtName, kvtConfig);
            new ControllerClientTagger(this.client, this.timeoutMillis).withTag(requestId, "createKeyValueTable", scope, kvtName).createKeyValueTable(ModelHelper.decode(scope, kvtName, kvtConfig), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(x -> {
            switch (x.getStatus()) {
                case FAILURE: {
                    log.warn(requestId, "Failed to create KeyValueTable: {}", kvtName);
                    throw new ControllerFailureException("Failed to create KeyValueTable: " + kvtConfig);
                }
                case INVALID_TABLE_NAME: {
                    log.warn(requestId, "Illegal KeyValueTable name: {}", kvtName);
                    throw new IllegalArgumentException("Illegal KeyValueTable name: " + kvtName);
                }
                case SCOPE_NOT_FOUND: {
                    log.warn(requestId, "Scope not found: {}", scope);
                    throw new IllegalArgumentException("Scope does not exist: " + scope);
                }
                case TABLE_EXISTS: {
                    log.warn(requestId, "KeyValueTable already exists: {}", kvtName);
                    return false;
                }
                case SUCCESS: {
                    log.info(requestId, "KeyValueTable created successfully: {}/{}", scope, kvtName);
                    return true;
                }
            }
            throw new ControllerFailureException("Unknown return status creating KeyValueTable " + kvtConfig + " " + x.getStatus());
        })).whenComplete((x, e) -> {
            if (e != null) {
                log.warn(requestId, "createKeyValueTable {}/{} failed: ", scope, kvtName, e);
            }
            LoggerHelpers.traceLeave((Logger)log, "createKeyValueTable", traceId, kvtName, requestId);
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public AsyncIterator<KeyValueTableInfo> listKeyValueTables(String scopeName) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        long traceId = LoggerHelpers.traceEnter(log, "listKeyValueTables", scopeName);
        long requestId = this.requestIdGenerator.get();
        try {
            Function<Controller.ContinuationToken, CompletableFuture> function = token -> this.retryConfig.runAsync(() -> {
                RPCAsyncCallback<Controller.KVTablesInScopeResponse> callback = new RPCAsyncCallback<Controller.KVTablesInScopeResponse>(requestId, "listKeyValueTables", scopeName);
                Controller.ScopeInfo scopeInfo = Controller.ScopeInfo.newBuilder().setScope(scopeName).build();
                new ControllerClientTagger(this.client, this.timeoutMillis).withTag(requestId, "listKeyValueTables", scopeName).listKeyValueTables(Controller.KVTablesInScopeRequest.newBuilder().setScope(scopeInfo).setContinuationToken((Controller.ContinuationToken)token).build(), callback);
                return callback.getFuture().thenApply(x -> {
                    switch (x.getStatus()) {
                        case SCOPE_NOT_FOUND: {
                            log.warn(requestId, "Scope not found: {}", scopeName);
                            throw new NoSuchScopeException();
                        }
                        case FAILURE: {
                            log.warn(requestId, "Internal Server Error while trying to list streams in scope: {}", scopeName);
                            throw new RuntimeException("Failure while trying to list streams");
                        }
                    }
                    List kvtList = x.getKvtablesList().stream().map(y -> new KeyValueTableInfo(y.getScope(), y.getKvtName())).collect(Collectors.toList());
                    return new AbstractMap.SimpleEntry(x.getContinuationToken(), kvtList);
                });
            }, this.executor);
            ContinuationTokenAsyncIterator continuationTokenAsyncIterator = new ContinuationTokenAsyncIterator(function, Controller.ContinuationToken.newBuilder().build());
            return continuationTokenAsyncIterator;
        }
        finally {
            LoggerHelpers.traceLeave((Logger)log, "listKeyValueTables", traceId, new Object[0]);
        }
    }

    @Override
    public CompletableFuture<Boolean> deleteKeyValueTable(String scope, String kvtName) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Exceptions.checkNotNullOrEmpty(scope, "scope");
        Exceptions.checkNotNullOrEmpty(kvtName, "KeyValueTableName");
        long requestId = this.requestIdGenerator.get();
        long traceId = LoggerHelpers.traceEnter(log, "deleteKeyValueTable", scope, kvtName, requestId);
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.DeleteKVTableStatus> callback = new RPCAsyncCallback<Controller.DeleteKVTableStatus>(requestId, "deleteKeyValueTable", scope, kvtName);
            new ControllerClientTagger(this.client, this.timeoutMillis).withTag(requestId, "deleteKeyValueTable", scope, kvtName).deleteKeyValueTable(ModelHelper.createKeyValueTableInfo(scope, kvtName), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(x -> {
            switch (x.getStatus()) {
                case FAILURE: {
                    log.warn(requestId, "Failed to delete KeyValueTable: {}", kvtName);
                    throw new ControllerFailureException("Failed to delete KeyValueTable: " + kvtName);
                }
                case TABLE_NOT_FOUND: {
                    log.warn(requestId, "KeyValueTable does not exist: {}", kvtName);
                    return false;
                }
                case SUCCESS: {
                    log.info(requestId, "Successfully deleted KeyValueTable: {}", kvtName);
                    return true;
                }
            }
            throw new ControllerFailureException("Unknown return status deleting KeyValueTable " + kvtName + " " + x.getStatus());
        })).whenComplete((x, e) -> {
            if (e != null) {
                log.warn(requestId, "deleteKeyValueTable {}/{} failed: ", scope, kvtName, e);
            }
            LoggerHelpers.traceLeave((Logger)log, "deleteKeyValueTable", traceId, scope, kvtName, requestId);
        });
    }

    @Override
    public CompletableFuture<KeyValueTableSegments> getCurrentSegmentsForKeyValueTable(String scope, String kvtName) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Exceptions.checkNotNullOrEmpty(scope, "scope");
        Exceptions.checkNotNullOrEmpty(kvtName, "stream");
        long traceId = LoggerHelpers.traceEnter(log, "getCurrentSegmentsForKeyValueTable", scope, kvtName);
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.SegmentRanges> callback = new RPCAsyncCallback<Controller.SegmentRanges>(traceId, "getCurrentSegmentsKeyValueTable", scope, kvtName);
            ((ControllerServiceGrpc.ControllerServiceStub)this.client.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).getCurrentSegmentsKeyValueTable(ModelHelper.createKeyValueTableInfo(scope, kvtName), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(ranges -> {
            log.debug("Received the following data from the controller {}", (Object)ranges.getSegmentRangesList());
            TreeMap<Double, SegmentWithRange> rangeMap = new TreeMap<Double, SegmentWithRange>();
            for (Controller.SegmentRange r : ranges.getSegmentRangesList()) {
                Preconditions.checkState(r.getMinKey() <= r.getMaxKey(), "Min keyrange %s was not less than maximum keyRange %s for segment %s", (Object)r.getMinKey(), (Object)r.getMaxKey(), (Object)r.getSegmentId());
                rangeMap.put(r.getMaxKey(), new SegmentWithRange(ModelHelper.encode(r.getSegmentId()), r.getMinKey(), r.getMaxKey()));
            }
            return new KeyValueTableSegments(rangeMap);
        })).whenComplete((x, e) -> {
            if (e != null) {
                log.warn("getCurrentSegmentsForKeyValueTable for {}/{} failed: ", scope, kvtName, e);
            }
            LoggerHelpers.traceLeave((Logger)log, "getCurrentSegmentsForKeyValueTable", traceId, new Object[0]);
        });
    }

    @Override
    public CompletableFuture<ReaderGroupConfig> createReaderGroup(String scope, String rgName, ReaderGroupConfig rgConfig) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Exceptions.checkNotNullOrEmpty(scope, "scope");
        Exceptions.checkNotNullOrEmpty(rgName, "rgName");
        Preconditions.checkNotNull(rgConfig, "rgConfig");
        long requestId = this.requestIdGenerator.get();
        long traceId = LoggerHelpers.traceEnter(log, "createReaderGroup", rgConfig, requestId);
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.CreateReaderGroupResponse> callback = new RPCAsyncCallback<Controller.CreateReaderGroupResponse>(requestId, "createReaderGroup", scope, rgName, rgConfig);
            new ControllerClientTagger(this.client, this.timeoutMillis).withTag(requestId, "createReaderGroup", scope, rgName).createReaderGroup(ModelHelper.decode(scope, rgName, rgConfig), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(x -> {
            switch (x.getStatus()) {
                case FAILURE: {
                    log.warn(requestId, "Failed to create reader group: {}", rgName);
                    throw new ControllerFailureException("Failed to create reader group: " + rgName);
                }
                case INVALID_RG_NAME: {
                    log.warn(requestId, "Illegal Reader Group Name: {}", rgName);
                    throw new IllegalArgumentException("Illegal readergroup name: " + rgName);
                }
                case SCOPE_NOT_FOUND: {
                    log.warn(requestId, "Scope not found: {}", scope);
                    throw new IllegalArgumentException("Scope does not exist: " + scope);
                }
                case SUCCESS: {
                    log.info(requestId, "ReaderGroup created successfully: {}", rgName);
                    return ModelHelper.encode(x.getConfig());
                }
            }
            throw new ControllerFailureException("Unknown return status creating reader group " + rgName + " " + x.getStatus());
        })).whenComplete((x, e) -> {
            if (e != null) {
                log.warn(requestId, "createReaderGroup {}/{} failed: ", scope, rgName, e);
            }
            LoggerHelpers.traceLeave((Logger)log, "createReaderGroup", traceId, rgConfig, requestId);
        });
    }

    @Override
    public CompletableFuture<Long> updateReaderGroup(String scope, String rgName, ReaderGroupConfig rgConfig) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Exceptions.checkNotNullOrEmpty(scope, "scope");
        Exceptions.checkNotNullOrEmpty(rgName, "rgName");
        Preconditions.checkNotNull(rgConfig, "rgConfig");
        long requestId = this.requestIdGenerator.get();
        long traceId = LoggerHelpers.traceEnter(log, "updateReaderGroup", rgConfig, requestId);
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.UpdateReaderGroupResponse> callback = new RPCAsyncCallback<Controller.UpdateReaderGroupResponse>(requestId, "updateReaderGroup", scope, rgName, rgConfig);
            new ControllerClientTagger(this.client, this.timeoutMillis).withTag(requestId, "updateReaderGroup", scope, rgName).updateReaderGroup(ModelHelper.decode(scope, rgName, rgConfig), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(x -> {
            switch (x.getStatus()) {
                case FAILURE: {
                    log.warn(requestId, "Failed to create reader group: {}", rgName);
                    throw new ControllerFailureException("Failed to create readergroup: " + rgName);
                }
                case INVALID_CONFIG: {
                    log.warn(requestId, "Illegal Reader Group Config for reader group {}: {}", rgName, rgConfig);
                    throw new ReaderGroupConfigRejectedException("Invalid Reader Group Config: " + rgConfig.toString());
                }
                case RG_NOT_FOUND: {
                    log.warn(requestId, "Scope not found: {}", scope);
                    throw new IllegalArgumentException("Scope does not exist: " + scope);
                }
                case SUCCESS: {
                    log.info(requestId, "ReaderGroup created successfully: {}", rgName);
                    return x.getGeneration();
                }
            }
            throw new ControllerFailureException("Unknown return status creating reader group " + rgName + " " + x.getStatus());
        })).whenComplete((x, e) -> {
            if (e != null) {
                log.warn(requestId, "createReaderGroup {}/{} failed: ", scope, rgName, e);
            }
            LoggerHelpers.traceLeave((Logger)log, "createReaderGroup", traceId, rgConfig, requestId);
        });
    }

    @Override
    public CompletableFuture<ReaderGroupConfig> getReaderGroupConfig(String scope, String rgName) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Exceptions.checkNotNullOrEmpty(scope, "scope");
        String emptyUUID = "";
        long requestId = this.requestIdGenerator.get();
        long traceId = LoggerHelpers.traceEnter(log, "getReaderGroupConfig", scope, rgName, requestId);
        String scopedRGName = NameUtils.getScopedReaderGroupName(scope, rgName);
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.ReaderGroupConfigResponse> callback = new RPCAsyncCallback<Controller.ReaderGroupConfigResponse>(requestId, "getReaderGroupConfig", scope, rgName);
            new ControllerClientTagger(this.client, this.timeoutMillis).withTag(requestId, "getReaderGroupConfig", scope, rgName).getReaderGroupConfig(ModelHelper.createReaderGroupInfo(scope, rgName, "", 0L), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(x -> {
            switch (x.getStatus()) {
                case FAILURE: {
                    log.warn(requestId, "Failed to get config for reader group: {}", scopedRGName);
                    throw new ControllerFailureException("Failed to get config for reader group: " + scopedRGName);
                }
                case RG_NOT_FOUND: {
                    log.warn(requestId, "ReaderGroup not found: {}", scopedRGName);
                    throw new IllegalArgumentException("ReaderGroup does not exist: " + scopedRGName);
                }
                case SUCCESS: {
                    log.info(requestId, "Successfully got config for Reader Group: {}", scopedRGName);
                    return ModelHelper.encode(x.getConfig());
                }
            }
            throw new ControllerFailureException("Unknown return status getting config for ReaderGroup " + scopedRGName + " " + x.getStatus());
        })).whenComplete((x, e) -> {
            if (e != null) {
                log.warn(requestId, "getReaderGroupConfig failed for Reader Group: ", scopedRGName, e);
            }
            LoggerHelpers.traceLeave((Logger)log, "getReaderGroupConfig", traceId, scope, rgName, requestId);
        });
    }

    @Override
    public CompletableFuture<Boolean> deleteReaderGroup(String scope, String rgName, UUID readerGroupId) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Exceptions.checkNotNullOrEmpty(scope, "scope");
        Exceptions.checkNotNullOrEmpty(rgName, "rgName");
        Preconditions.checkNotNull(readerGroupId, "rgId");
        long requestId = this.requestIdGenerator.get();
        long traceId = LoggerHelpers.traceEnter(log, "deleteReaderGroup", scope, rgName, requestId);
        String scopedRGName = NameUtils.getScopedReaderGroupName(scope, rgName);
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.DeleteReaderGroupStatus> callback = new RPCAsyncCallback<Controller.DeleteReaderGroupStatus>(requestId, "deleteReaderGroup", scope, rgName);
            new ControllerClientTagger(this.client, this.timeoutMillis).withTag(requestId, "deleteReaderGroup", scope, rgName).deleteReaderGroup(ModelHelper.createReaderGroupInfo(scope, rgName, readerGroupId.toString(), 0L), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(x -> {
            switch (x.getStatus()) {
                case FAILURE: {
                    log.warn(requestId, "Failed to delete reader group: {}", scopedRGName);
                    throw new ControllerFailureException("Failed to delete reader group: " + scopedRGName);
                }
                case RG_NOT_FOUND: {
                    log.warn(requestId, "ReaderGroup not found: {}", scopedRGName);
                    throw new IllegalArgumentException("ReaderGroup does not exist: " + scopedRGName);
                }
                case SUCCESS: {
                    log.info(requestId, "Successfully deleted Reader Group: {}", scopedRGName);
                    return true;
                }
            }
            throw new ControllerFailureException("Unknown return status getting config for ReaderGroup " + scopedRGName + " " + x.getStatus());
        })).whenComplete((x, e) -> {
            if (e != null) {
                log.warn(requestId, "deleteReaderGroup failed for Reader Group: ", scopedRGName, e);
            }
            LoggerHelpers.traceLeave((Logger)log, "deleteReaderGroup", traceId, scope, rgName, requestId);
        });
    }

    private static class ControllerClientTagger {
        private ControllerServiceGrpc.ControllerServiceStub clientStub;
        private final long timeoutMillis;

        ControllerClientTagger(ControllerServiceGrpc.ControllerServiceStub clientStub, long timeoutMillis) {
            this.clientStub = clientStub;
            this.timeoutMillis = timeoutMillis;
        }

        ControllerClientTagger withTag(long requestId, String ... requestInfo) {
            String requestDescriptor = RequestTracker.buildRequestDescriptor(requestInfo);
            log.info(requestId, "Tagging client request ({}).", requestDescriptor);
            this.clientStub = (ControllerServiceGrpc.ControllerServiceStub)((ControllerServiceGrpc.ControllerServiceStub)this.clientStub.withOption(RPCTracingHelpers.REQUEST_DESCRIPTOR_CALL_OPTION, requestDescriptor)).withOption(RPCTracingHelpers.REQUEST_ID_CALL_OPTION, String.valueOf(requestId));
            return this;
        }

        public void createScope(Controller.ScopeInfo scopeInfo, RPCAsyncCallback<Controller.CreateScopeStatus> callback) {
            ((ControllerServiceGrpc.ControllerServiceStub)this.clientStub.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).createScope(scopeInfo, callback);
        }

        public void listScopes(Controller.ScopesRequest request, RPCAsyncCallback<Controller.ScopesResponse> callback) {
            ((ControllerServiceGrpc.ControllerServiceStub)this.clientStub.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).listScopes(request, callback);
        }

        public void checkScopeExists(Controller.ScopeInfo request, RPCAsyncCallback<Controller.ExistsResponse> callback) {
            ((ControllerServiceGrpc.ControllerServiceStub)this.clientStub.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).checkScopeExists(request, callback);
        }

        public void checkStreamExists(Controller.StreamInfo request, RPCAsyncCallback<Controller.ExistsResponse> callback) {
            ((ControllerServiceGrpc.ControllerServiceStub)this.clientStub.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).checkStreamExists(request, callback);
        }

        public void listStreamsInScope(Controller.StreamsInScopeRequest request, RPCAsyncCallback<Controller.StreamsInScopeResponse> callback) {
            ((ControllerServiceGrpc.ControllerServiceStub)this.clientStub.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).listStreamsInScope(request, callback);
        }

        public void deleteScope(Controller.ScopeInfo scopeInfo, RPCAsyncCallback<Controller.DeleteScopeStatus> callback) {
            ((ControllerServiceGrpc.ControllerServiceStub)this.clientStub.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).deleteScope(scopeInfo, callback);
        }

        public void createStream(Controller.StreamConfig streamConfig, RPCAsyncCallback<Controller.CreateStreamStatus> callback) {
            ((ControllerServiceGrpc.ControllerServiceStub)this.clientStub.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).createStream(streamConfig, callback);
        }

        public void scale(Controller.ScaleRequest scaleRequest, RPCAsyncCallback<Controller.ScaleResponse> callback) {
            ((ControllerServiceGrpc.ControllerServiceStub)this.clientStub.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).scale(scaleRequest, callback);
        }

        public void updateStream(Controller.StreamConfig streamConfig, RPCAsyncCallback<Controller.UpdateStreamStatus> callback) {
            ((ControllerServiceGrpc.ControllerServiceStub)this.clientStub.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).updateStream(streamConfig, callback);
        }

        public void truncateStream(Controller.StreamCut streamCut, RPCAsyncCallback<Controller.UpdateStreamStatus> callback) {
            ((ControllerServiceGrpc.ControllerServiceStub)this.clientStub.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).truncateStream(streamCut, callback);
        }

        public void sealStream(Controller.StreamInfo streamInfo, RPCAsyncCallback<Controller.UpdateStreamStatus> callback) {
            ((ControllerServiceGrpc.ControllerServiceStub)this.clientStub.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).sealStream(streamInfo, callback);
        }

        public void deleteStream(Controller.StreamInfo streamInfo, RPCAsyncCallback<Controller.DeleteStreamStatus> callback) {
            ((ControllerServiceGrpc.ControllerServiceStub)this.clientStub.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).deleteStream(streamInfo, callback);
        }

        public void updateSubscriberStreamCut(Controller.SubscriberStreamCut subscriberStreamCut, RPCAsyncCallback<Controller.UpdateSubscriberStatus> callback) {
            ((ControllerServiceGrpc.ControllerServiceStub)this.clientStub.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).updateSubscriberStreamCut(subscriberStreamCut, callback);
        }

        public void listSubscribers(Controller.StreamInfo request, RPCAsyncCallback<Controller.SubscribersResponse> callback) {
            ((ControllerServiceGrpc.ControllerServiceStub)this.clientStub.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).listSubscribers(request, callback);
        }

        public void createKeyValueTable(Controller.KeyValueTableConfig kvtConfig, RPCAsyncCallback<Controller.CreateKeyValueTableStatus> callback) {
            ((ControllerServiceGrpc.ControllerServiceStub)this.clientStub.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).createKeyValueTable(kvtConfig, callback);
        }

        void listKeyValueTables(Controller.KVTablesInScopeRequest request, RPCAsyncCallback<Controller.KVTablesInScopeResponse> callback) {
            ((ControllerServiceGrpc.ControllerServiceStub)this.clientStub.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).listKeyValueTablesInScope(request, callback);
        }

        void deleteKeyValueTable(Controller.KeyValueTableInfo kvtInfo, RPCAsyncCallback<Controller.DeleteKVTableStatus> callback) {
            ((ControllerServiceGrpc.ControllerServiceStub)this.clientStub.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).deleteKeyValueTable(kvtInfo, callback);
        }

        void createReaderGroup(Controller.ReaderGroupConfiguration rgConfig, RPCAsyncCallback<Controller.CreateReaderGroupResponse> callback) {
            ((ControllerServiceGrpc.ControllerServiceStub)this.clientStub.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).createReaderGroup(rgConfig, callback);
        }

        void getReaderGroupConfig(Controller.ReaderGroupInfo readerGroupInfo, RPCAsyncCallback<Controller.ReaderGroupConfigResponse> callback) {
            ((ControllerServiceGrpc.ControllerServiceStub)this.clientStub.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).getReaderGroupConfig(readerGroupInfo, callback);
        }

        void deleteReaderGroup(Controller.ReaderGroupInfo readerGroupInfo, RPCAsyncCallback<Controller.DeleteReaderGroupStatus> callback) {
            ((ControllerServiceGrpc.ControllerServiceStub)this.clientStub.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).deleteReaderGroup(readerGroupInfo, callback);
        }

        void updateReaderGroup(Controller.ReaderGroupConfiguration rgConfig, RPCAsyncCallback<Controller.UpdateReaderGroupResponse> callback) {
            ((ControllerServiceGrpc.ControllerServiceStub)this.clientStub.withDeadlineAfter(this.timeoutMillis, TimeUnit.MILLISECONDS)).updateReaderGroup(rgConfig, callback);
        }
    }

    private static final class RPCAsyncCallback<T>
    implements StreamObserver<T> {
        private final long traceId;
        private final String method;
        private final Object[] parameters;
        private T result = null;
        private final CompletableFuture<T> future = new CompletableFuture();

        RPCAsyncCallback(long traceId, String method, Object ... args) {
            this.traceId = traceId;
            this.method = method;
            this.parameters = args;
        }

        @Override
        public void onNext(T value) {
            this.result = value;
        }

        @Override
        public void onError(Throwable t) {
            log.warn("gRPC call for {} with trace id {} and parameters {} failed with server error.", this.method, this.traceId, this.parameters, t);
            if (t instanceof RuntimeException) {
                this.future.completeExceptionally(t);
            } else {
                this.future.completeExceptionally(new RuntimeException(t));
            }
        }

        @Override
        public void onCompleted() {
            this.future.complete(this.result);
        }

        public CompletableFuture<T> getFuture() {
            return this.future;
        }
    }
}

