/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.client.stream.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.grpc.Channel;
import io.grpc.ClientInterceptor;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancerRegistry;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.auth.MoreCallCredentials;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.StreamObserver;
import io.netty.handler.ssl.SslContextBuilder;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.stream.InvalidStreamException;
import io.pravega.client.stream.NoSuchScopeException;
import io.pravega.client.stream.PingFailedException;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.client.stream.StreamCut;
import io.pravega.client.stream.Transaction;
import io.pravega.client.stream.TxnFailedException;
import io.pravega.client.stream.impl.CancellableRequest;
import io.pravega.client.stream.impl.Controller;
import io.pravega.client.stream.impl.ControllerFailureException;
import io.pravega.client.stream.impl.ControllerImplConfig;
import io.pravega.client.stream.impl.ControllerResolverFactory;
import io.pravega.client.stream.impl.Credentials;
import io.pravega.client.stream.impl.ModelHelper;
import io.pravega.client.stream.impl.PravegaCredentialsWrapper;
import io.pravega.client.stream.impl.SegmentWithRange;
import io.pravega.client.stream.impl.StreamImpl;
import io.pravega.client.stream.impl.StreamSegmentSuccessors;
import io.pravega.client.stream.impl.StreamSegments;
import io.pravega.client.stream.impl.StreamSegmentsWithPredecessors;
import io.pravega.client.stream.impl.TxnSegments;
import io.pravega.client.stream.impl.WriterPosition;
import io.pravega.common.Exceptions;
import io.pravega.common.LoggerHelpers;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.hash.RandomFactory;
import io.pravega.common.tracing.RequestTracker;
import io.pravega.common.tracing.TagLogger;
import io.pravega.common.util.AsyncIterator;
import io.pravega.common.util.ContinuationTokenAsyncIterator;
import io.pravega.common.util.Retry;
import io.pravega.controller.stream.api.grpc.v1.Controller;
import io.pravega.controller.stream.api.grpc.v1.ControllerServiceGrpc;
import io.pravega.shared.controller.tracing.RPCTracingHelpers;
import io.pravega.shared.protocol.netty.PravegaNodeUri;
import java.io.File;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.net.ssl.SSLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ControllerImpl
implements Controller {
    private static final TagLogger log = new TagLogger(LoggerFactory.getLogger(ControllerImpl.class));
    private static final long DEFAULT_KEEPALIVE_TIME_MINUTES = 6L;
    private final Retry.RetryAndThrowConditionally retryConfig;
    private final ScheduledExecutorService executor;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final ManagedChannel channel;
    private final ControllerServiceGrpc.ControllerServiceStub client;
    private final Supplier<Long> requestIdGenerator = RandomFactory.create()::nextLong;

    public ControllerImpl(ControllerImplConfig config, ScheduledExecutorService executor) {
        this((ManagedChannelBuilder<?>)((NettyChannelBuilder)((NettyChannelBuilder)NettyChannelBuilder.forTarget((String)config.getClientConfig().getControllerURI().toString()).nameResolverFactory((NameResolver.Factory)new ControllerResolverFactory())).loadBalancerFactory((LoadBalancer.Factory)LoadBalancerRegistry.getDefaultRegistry().getProvider("round_robin"))).keepAliveTime(6L, TimeUnit.MINUTES), config, executor);
        log.info("Controller client connecting to server at {}", (Object)config.getClientConfig().getControllerURI().getAuthority());
    }

    @VisibleForTesting
    public ControllerImpl(ManagedChannelBuilder<?> channelBuilder, ControllerImplConfig config, ScheduledExecutorService executor) {
        Preconditions.checkNotNull(channelBuilder, (Object)"channelBuilder");
        this.executor = executor;
        this.retryConfig = this.createRetryConfig(config);
        if (config.getClientConfig().isEnableTlsToController()) {
            log.debug("Setting up a SSL/TLS channel builder");
            String trustStore = config.getClientConfig().getTrustStore();
            SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
            if (!Strings.isNullOrEmpty((String)trustStore)) {
                sslContextBuilder = sslContextBuilder.trustManager(new File(trustStore));
            }
            try {
                channelBuilder = channelBuilder.sslContext(sslContextBuilder.build()).negotiationType(NegotiationType.TLS);
            }
            catch (SSLException e) {
                throw new CompletionException(e);
            }
        } else {
            log.debug("Using a plaintext channel builder");
            channelBuilder = ((NettyChannelBuilder)channelBuilder).negotiationType(NegotiationType.PLAINTEXT);
        }
        channelBuilder = channelBuilder.intercept(new ClientInterceptor[]{RPCTracingHelpers.getClientInterceptor()});
        this.channel = channelBuilder.build();
        ControllerServiceGrpc.ControllerServiceStub client = ControllerServiceGrpc.newStub((Channel)this.channel);
        Credentials credentials = config.getClientConfig().getCredentials();
        if (credentials != null) {
            PravegaCredentialsWrapper wrapper = new PravegaCredentialsWrapper(credentials);
            client = (ControllerServiceGrpc.ControllerServiceStub)client.withCallCredentials(MoreCallCredentials.from((com.google.auth.Credentials)wrapper));
        }
        this.client = client;
    }

    private Retry.RetryAndThrowConditionally createRetryConfig(ControllerImplConfig config) {
        return Retry.withExpBackoff((long)config.getInitialBackoffMillis(), (int)config.getBackoffMultiple(), (int)config.getRetryAttempts(), (long)config.getMaxBackoffMillis()).retryWhen(e -> {
            Throwable cause = Exceptions.unwrap((Throwable)e);
            if (cause instanceof StatusRuntimeException) {
                Status.Code code = ((StatusRuntimeException)cause).getStatus().getCode();
                switch (code) {
                    case ABORTED: {
                        return true;
                    }
                    case ALREADY_EXISTS: {
                        return false;
                    }
                    case CANCELLED: {
                        return true;
                    }
                    case DATA_LOSS: {
                        return true;
                    }
                    case DEADLINE_EXCEEDED: {
                        return true;
                    }
                    case FAILED_PRECONDITION: {
                        return true;
                    }
                    case INTERNAL: {
                        return true;
                    }
                    case INVALID_ARGUMENT: {
                        return false;
                    }
                    case NOT_FOUND: {
                        return false;
                    }
                    case OK: {
                        return false;
                    }
                    case OUT_OF_RANGE: {
                        return false;
                    }
                    case PERMISSION_DENIED: {
                        return false;
                    }
                    case RESOURCE_EXHAUSTED: {
                        return true;
                    }
                    case UNAUTHENTICATED: {
                        return false;
                    }
                    case UNAVAILABLE: {
                        return true;
                    }
                    case UNIMPLEMENTED: {
                        return false;
                    }
                    case UNKNOWN: {
                        return true;
                    }
                }
                return true;
            }
            return false;
        });
    }

    @Override
    public CompletableFuture<Boolean> createScope(String scopeName) {
        Exceptions.checkNotClosed((boolean)this.closed.get(), (Object)this);
        long requestId = this.requestIdGenerator.get();
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"createScope", (Object[])new Object[]{scopeName, requestId});
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.CreateScopeStatus> callback = new RPCAsyncCallback<Controller.CreateScopeStatus>(requestId, "createScope");
            new ControllerClientTagger(this.client).withTag(requestId, "createScope", scopeName).createScope(Controller.ScopeInfo.newBuilder().setScope(scopeName).build(), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(x -> {
            switch (x.getStatus()) {
                case FAILURE: {
                    log.warn(requestId, "Failed to create scope: {}", new Object[]{scopeName});
                    throw new ControllerFailureException("Failed to create scope: " + scopeName);
                }
                case INVALID_SCOPE_NAME: {
                    log.warn(requestId, "Illegal scope name: {}", new Object[]{scopeName});
                    throw new IllegalArgumentException("Illegal scope name: " + scopeName);
                }
                case SCOPE_EXISTS: {
                    log.warn(requestId, "Scope already exists: {}", new Object[]{scopeName});
                    return false;
                }
                case SUCCESS: {
                    log.info(requestId, "Scope created successfully: {}", new Object[]{scopeName});
                    return true;
                }
            }
            throw new ControllerFailureException("Unknown return status creating scope " + scopeName + " " + x.getStatus());
        })).whenComplete((x, e) -> {
            if (e != null) {
                log.warn(requestId, "createScope failed: ", new Object[]{e});
            }
            LoggerHelpers.traceLeave((Logger)log, (String)"createScope", (long)traceId, (Object[])new Object[]{scopeName, requestId});
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public AsyncIterator<Stream> listStreams(String scopeName) {
        Exceptions.checkNotClosed((boolean)this.closed.get(), (Object)this);
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"listStreams", (Object[])new Object[]{scopeName});
        long requestId = this.requestIdGenerator.get();
        try {
            Function<Controller.ContinuationToken, CompletableFuture> function = token -> this.retryConfig.runAsync(() -> {
                RPCAsyncCallback<Controller.StreamsInScopeResponse> callback = new RPCAsyncCallback<Controller.StreamsInScopeResponse>(requestId, "listStreams");
                Controller.ScopeInfo scopeInfo = Controller.ScopeInfo.newBuilder().setScope(scopeName).build();
                new ControllerClientTagger(this.client).withTag(requestId, "listStreams", scopeName).listStreamsInScope(Controller.StreamsInScopeRequest.newBuilder().setScope(scopeInfo).setContinuationToken(token).build(), callback);
                return callback.getFuture().thenApply(x -> {
                    switch (x.getStatus()) {
                        case SCOPE_NOT_FOUND: {
                            log.warn(requestId, "Scope not found: {}", new Object[]{scopeName});
                            throw new NoSuchScopeException();
                        }
                        case FAILURE: {
                            log.warn(requestId, "Internal Server Error while trying to list streams in scope: {}", new Object[]{scopeName});
                            throw new RuntimeException("Failure while trying to list streams");
                        }
                    }
                    List result = x.getStreamsList().stream().map(y -> new StreamImpl(y.getScope(), y.getStream())).collect(Collectors.toList());
                    return new AbstractMap.SimpleEntry(x.getContinuationToken(), result);
                });
            }, this.executor);
            ContinuationTokenAsyncIterator continuationTokenAsyncIterator = new ContinuationTokenAsyncIterator(function, (Object)Controller.ContinuationToken.newBuilder().build());
            return continuationTokenAsyncIterator;
        }
        finally {
            LoggerHelpers.traceLeave((Logger)log, (String)"listStreams", (long)traceId, (Object[])new Object[0]);
        }
    }

    @Override
    public CompletableFuture<Boolean> deleteScope(String scopeName) {
        Exceptions.checkNotClosed((boolean)this.closed.get(), (Object)this);
        long requestId = this.requestIdGenerator.get();
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"deleteScope", (Object[])new Object[]{scopeName, requestId});
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.DeleteScopeStatus> callback = new RPCAsyncCallback<Controller.DeleteScopeStatus>(requestId, "deleteScope");
            new ControllerClientTagger(this.client).withTag(requestId, "deleteScope", scopeName).deleteScope(Controller.ScopeInfo.newBuilder().setScope(scopeName).build(), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(x -> {
            switch (x.getStatus()) {
                case FAILURE: {
                    log.warn(requestId, "Failed to delete scope: {}", new Object[]{scopeName});
                    throw new ControllerFailureException("Failed to delete scope: " + scopeName);
                }
                case SCOPE_NOT_EMPTY: {
                    log.warn(requestId, "Cannot delete non empty scope: {}", new Object[]{scopeName});
                    throw new IllegalStateException("Scope " + scopeName + " is not empty.");
                }
                case SCOPE_NOT_FOUND: {
                    log.warn(requestId, "Scope not found: {}", new Object[]{scopeName});
                    return false;
                }
                case SUCCESS: {
                    log.info(requestId, "Scope deleted successfully: {}", new Object[]{scopeName});
                    return true;
                }
            }
            throw new ControllerFailureException("Unknown return status deleting scope " + scopeName + " " + x.getStatus());
        })).whenComplete((x, e) -> {
            if (e != null) {
                log.warn(requestId, "deleteScope failed: ", new Object[]{e});
            }
            LoggerHelpers.traceLeave((Logger)log, (String)"deleteScope", (long)traceId, (Object[])new Object[]{scopeName, requestId});
        });
    }

    @Override
    public CompletableFuture<Boolean> createStream(String scope, String streamName, StreamConfiguration streamConfig) {
        Exceptions.checkNotNullOrEmpty((String)scope, (String)"scope");
        Exceptions.checkNotClosed((boolean)this.closed.get(), (Object)this);
        Preconditions.checkNotNull((Object)streamConfig, (Object)"streamConfig");
        long requestId = this.requestIdGenerator.get();
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"createStream", (Object[])new Object[]{streamConfig, requestId});
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.CreateStreamStatus> callback = new RPCAsyncCallback<Controller.CreateStreamStatus>(requestId, "createStream");
            new ControllerClientTagger(this.client).withTag(requestId, "createStream", scope, streamName).createStream(ModelHelper.decode(scope, streamName, streamConfig), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(x -> {
            switch (x.getStatus()) {
                case FAILURE: {
                    log.warn(requestId, "Failed to create stream: {}", new Object[]{streamName});
                    throw new ControllerFailureException("Failed to create stream: " + streamConfig);
                }
                case INVALID_STREAM_NAME: {
                    log.warn(requestId, "Illegal stream name: {}", new Object[]{streamName});
                    throw new IllegalArgumentException("Illegal stream name: " + streamConfig);
                }
                case SCOPE_NOT_FOUND: {
                    log.warn(requestId, "Scope not found: {}", new Object[]{scope});
                    throw new IllegalArgumentException("Scope does not exist: " + streamConfig);
                }
                case STREAM_EXISTS: {
                    log.warn(requestId, "Stream already exists: {}", new Object[]{streamName});
                    return false;
                }
                case SUCCESS: {
                    log.info(requestId, "Stream created successfully: {}", new Object[]{streamName});
                    return true;
                }
            }
            throw new ControllerFailureException("Unknown return status creating stream " + streamConfig + " " + x.getStatus());
        })).whenComplete((x, e) -> {
            if (e != null) {
                log.warn(requestId, "createStream failed: ", new Object[]{e});
            }
            LoggerHelpers.traceLeave((Logger)log, (String)"createStream", (long)traceId, (Object[])new Object[]{streamConfig, requestId});
        });
    }

    @Override
    public CompletableFuture<Boolean> updateStream(String scope, String streamName, StreamConfiguration streamConfig) {
        Exceptions.checkNotClosed((boolean)this.closed.get(), (Object)this);
        Preconditions.checkNotNull((Object)streamConfig, (Object)"streamConfig");
        long requestId = this.requestIdGenerator.get();
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"updateStream", (Object[])new Object[]{streamConfig, requestId});
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.UpdateStreamStatus> callback = new RPCAsyncCallback<Controller.UpdateStreamStatus>(requestId, "updateStream");
            new ControllerClientTagger(this.client).withTag(requestId, "updateStream", scope, streamName).updateStream(ModelHelper.decode(scope, streamName, streamConfig), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(x -> {
            switch (x.getStatus()) {
                case FAILURE: {
                    log.warn(requestId, "Failed to update stream: {}", new Object[]{streamName});
                    throw new ControllerFailureException("Failed to update stream: " + streamConfig);
                }
                case SCOPE_NOT_FOUND: {
                    log.warn(requestId, "Scope not found: {}", new Object[]{scope});
                    throw new IllegalArgumentException("Scope does not exist: " + streamConfig);
                }
                case STREAM_NOT_FOUND: {
                    log.warn(requestId, "Stream does not exist: {}", new Object[]{streamName});
                    throw new IllegalArgumentException("Stream does not exist: " + streamConfig);
                }
                case SUCCESS: {
                    log.info(requestId, "Successfully updated stream: {}", new Object[]{streamName});
                    return true;
                }
            }
            throw new ControllerFailureException("Unknown return status updating stream " + streamConfig + " " + x.getStatus());
        })).whenComplete((x, e) -> {
            if (e != null) {
                log.warn(requestId, "updateStream failed: ", new Object[]{e});
            }
            LoggerHelpers.traceLeave((Logger)log, (String)"updateStream", (long)traceId, (Object[])new Object[]{streamConfig, requestId});
        });
    }

    @Override
    public CompletableFuture<Boolean> truncateStream(String scope, String stream, StreamCut streamCut) {
        return this.truncateStream(scope, stream, this.getStreamCutMap(streamCut));
    }

    private CompletableFuture<Boolean> truncateStream(String scope, String stream, Map<Long, Long> streamCut) {
        Exceptions.checkNotClosed((boolean)this.closed.get(), (Object)this);
        Preconditions.checkNotNull(streamCut, (Object)"streamCut");
        long requestId = this.requestIdGenerator.get();
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"truncateStream", (Object[])new Object[]{streamCut, requestId});
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.UpdateStreamStatus> callback = new RPCAsyncCallback<Controller.UpdateStreamStatus>(requestId, "truncateStream");
            new ControllerClientTagger(this.client).withTag(requestId, "truncateStream", scope, stream).truncateStream(ModelHelper.decode(scope, stream, streamCut), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(x -> {
            switch (x.getStatus()) {
                case FAILURE: {
                    log.warn(requestId, "Failed to truncate stream: {}/{}", new Object[]{scope, stream});
                    throw new ControllerFailureException("Failed to truncate stream: " + scope + "/" + stream);
                }
                case SCOPE_NOT_FOUND: {
                    log.warn(requestId, "Scope not found: {}", new Object[]{scope});
                    throw new IllegalArgumentException("Scope does not exist: " + scope);
                }
                case STREAM_NOT_FOUND: {
                    log.warn(requestId, "Stream does not exist: {}/{}", new Object[]{scope, stream});
                    throw new IllegalArgumentException("Stream does not exist: " + stream);
                }
                case SUCCESS: {
                    log.info(requestId, "Successfully updated stream: {}/{}", new Object[]{scope, stream});
                    return true;
                }
            }
            throw new ControllerFailureException("Unknown return status truncating stream " + scope + "/" + stream + " " + x.getStatus());
        })).whenComplete((x, e) -> {
            if (e != null) {
                log.warn(requestId, "truncateStream failed: ", new Object[]{e});
            }
            LoggerHelpers.traceLeave((Logger)log, (String)"truncateStream", (long)traceId, (Object[])new Object[]{streamCut, requestId});
        });
    }

    @Override
    public CancellableRequest<Boolean> scaleStream(Stream stream, List<Long> sealedSegments, Map<Double, Double> newKeyRanges, ScheduledExecutorService executor) {
        Exceptions.checkNotClosed((boolean)this.closed.get(), (Object)this);
        CancellableRequest<Boolean> cancellableRequest = new CancellableRequest<Boolean>();
        long requestId = this.requestIdGenerator.get();
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"scaleStream", (Object[])new Object[]{stream, requestId});
        this.startScaleInternal(stream, sealedSegments, newKeyRanges, "scaleStream", requestId).whenComplete((startScaleResponse, e) -> {
            if (e != null) {
                log.error(requestId, "failed to start scale {}", new Object[]{e});
                cancellableRequest.start(() -> Futures.failedFuture((Throwable)e), any -> true, executor);
            } else {
                try {
                    boolean started = this.handleScaleResponse(stream, (Controller.ScaleResponse)startScaleResponse, requestId);
                    cancellableRequest.start(() -> {
                        if (started) {
                            return this.checkScaleStatus(stream, startScaleResponse.getEpoch());
                        }
                        return CompletableFuture.completedFuture(false);
                    }, isDone -> !started || isDone != false, executor);
                    LoggerHelpers.traceLeave((Logger)log, (String)"scaleStream", (long)traceId, (Object[])new Object[]{stream, requestId});
                }
                catch (Exception ex) {
                    cancellableRequest.start(() -> Futures.failedFuture((Throwable)ex), any -> true, executor);
                }
            }
        });
        return cancellableRequest;
    }

    @Override
    public CompletableFuture<Boolean> startScale(Stream stream, List<Long> sealedSegments, Map<Double, Double> newKeyRanges) {
        Exceptions.checkNotClosed((boolean)this.closed.get(), (Object)this);
        long requestId = this.requestIdGenerator.get();
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"scaleStream", (Object[])new Object[]{stream, requestId});
        return ((CompletableFuture)this.startScaleInternal(stream, sealedSegments, newKeyRanges, "scaleStream", requestId).thenApply(response -> this.handleScaleResponse(stream, (Controller.ScaleResponse)response, traceId))).whenComplete((x, e) -> {
            if (e != null) {
                log.warn(requestId, "scaleStream failed: ", new Object[]{e});
            }
            LoggerHelpers.traceLeave((Logger)log, (String)"scaleStream", (long)traceId, (Object[])new Object[]{stream, requestId});
        });
    }

    private Boolean handleScaleResponse(Stream stream, Controller.ScaleResponse response, long requestId) {
        switch (response.getStatus()) {
            case FAILURE: {
                log.warn(requestId, "Failed to scale stream: {}", new Object[]{stream.getStreamName()});
                throw new ControllerFailureException("Failed to scale stream: " + stream);
            }
            case PRECONDITION_FAILED: {
                log.warn(requestId, "Precondition failed for scale stream: {}", new Object[]{stream.getStreamName()});
                return false;
            }
            case STARTED: {
                log.info(requestId, "Successfully started scale stream: {}", new Object[]{stream.getStreamName()});
                return true;
            }
        }
        throw new ControllerFailureException("Unknown return status scaling stream " + stream + " " + response.getStatus());
    }

    @Override
    public CompletableFuture<Boolean> checkScaleStatus(Stream stream, int scaleEpoch) {
        Exceptions.checkNotClosed((boolean)this.closed.get(), (Object)this);
        Preconditions.checkNotNull((Object)stream, (Object)"stream");
        Preconditions.checkArgument((scaleEpoch >= 0 ? 1 : 0) != 0);
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"checkScale", (Object[])new Object[]{stream});
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback callback = new RPCAsyncCallback(traceId, "checkScale");
            this.client.checkScale(Controller.ScaleStatusRequest.newBuilder().setStreamInfo(ModelHelper.createStreamInfo(stream.getScope(), stream.getStreamName())).setEpoch(scaleEpoch).build(), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(response -> {
            switch (response.getStatus()) {
                case IN_PROGRESS: {
                    return false;
                }
                case SUCCESS: {
                    return true;
                }
                case INVALID_INPUT: {
                    throw new ControllerFailureException("invalid input");
                }
            }
            throw new ControllerFailureException("unknown error");
        })).whenComplete((x, e) -> {
            if (e != null) {
                log.warn("checking status failed: ", e);
            }
            LoggerHelpers.traceLeave((Logger)log, (String)"checkScale", (long)traceId, (Object[])new Object[0]);
        });
    }

    private CompletableFuture<Controller.ScaleResponse> startScaleInternal(Stream stream, List<Long> sealedSegments, Map<Double, Double> newKeyRanges, String method, long requestId) {
        Preconditions.checkNotNull((Object)stream, (Object)"stream");
        Preconditions.checkNotNull(sealedSegments, (Object)"sealedSegments");
        Preconditions.checkNotNull(newKeyRanges, (Object)"newKeyRanges");
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.ScaleResponse> callback = new RPCAsyncCallback<Controller.ScaleResponse>(requestId, method);
            long scaleTimestamp = System.currentTimeMillis();
            new ControllerClientTagger(this.client).withTag(requestId, method, stream.getScope(), stream.getStreamName(), String.valueOf(scaleTimestamp)).scale(Controller.ScaleRequest.newBuilder().setStreamInfo(ModelHelper.createStreamInfo(stream.getScope(), stream.getStreamName())).addAllSealedSegments((Iterable)sealedSegments).addAllNewKeyRanges((Iterable)newKeyRanges.entrySet().stream().map(x -> Controller.ScaleRequest.KeyRangeEntry.newBuilder().setStart(((Double)x.getKey()).doubleValue()).setEnd(((Double)x.getValue()).doubleValue()).build()).collect(Collectors.toList())).setScaleTimestamp(scaleTimestamp).build(), callback);
            return callback.getFuture();
        }, this.executor);
        return result;
    }

    @Override
    public CompletableFuture<Boolean> sealStream(String scope, String streamName) {
        Exceptions.checkNotClosed((boolean)this.closed.get(), (Object)this);
        Exceptions.checkNotNullOrEmpty((String)scope, (String)"scope");
        Exceptions.checkNotNullOrEmpty((String)streamName, (String)"streamName");
        long requestId = this.requestIdGenerator.get();
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"sealStream", (Object[])new Object[]{scope, streamName, requestId});
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.UpdateStreamStatus> callback = new RPCAsyncCallback<Controller.UpdateStreamStatus>(requestId, "sealStream");
            new ControllerClientTagger(this.client).withTag(requestId, "sealStream", scope, streamName).sealStream(ModelHelper.createStreamInfo(scope, streamName), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(x -> {
            switch (x.getStatus()) {
                case FAILURE: {
                    log.warn(requestId, "Failed to seal stream: {}", new Object[]{streamName});
                    throw new ControllerFailureException("Failed to seal stream: " + streamName);
                }
                case SCOPE_NOT_FOUND: {
                    log.warn(requestId, "Scope not found: {}", new Object[]{scope});
                    throw new InvalidStreamException("Scope does not exist: " + scope);
                }
                case STREAM_NOT_FOUND: {
                    log.warn(requestId, "Stream does not exist: {}", new Object[]{streamName});
                    throw new InvalidStreamException("Stream does not exist: " + streamName);
                }
                case SUCCESS: {
                    log.info(requestId, "Successfully sealed stream: {}", new Object[]{streamName});
                    return true;
                }
            }
            throw new ControllerFailureException("Unknown return status sealing stream " + streamName + " " + x.getStatus());
        })).whenComplete((x, e) -> {
            if (e != null) {
                log.warn(requestId, "sealStream failed: ", new Object[]{e});
            }
            LoggerHelpers.traceLeave((Logger)log, (String)"sealStream", (long)traceId, (Object[])new Object[]{scope, streamName, requestId});
        });
    }

    @Override
    public CompletableFuture<Boolean> deleteStream(String scope, String streamName) {
        Exceptions.checkNotClosed((boolean)this.closed.get(), (Object)this);
        Exceptions.checkNotNullOrEmpty((String)scope, (String)"scope");
        Exceptions.checkNotNullOrEmpty((String)streamName, (String)"streamName");
        long requestId = this.requestIdGenerator.get();
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"deleteStream", (Object[])new Object[]{scope, streamName, requestId});
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback<Controller.DeleteStreamStatus> callback = new RPCAsyncCallback<Controller.DeleteStreamStatus>(requestId, "deleteStream");
            new ControllerClientTagger(this.client).withTag(requestId, "deleteStream", scope, streamName).deleteStream(ModelHelper.createStreamInfo(scope, streamName), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(x -> {
            switch (x.getStatus()) {
                case FAILURE: {
                    log.warn(requestId, "Failed to delete stream: {}", new Object[]{streamName});
                    throw new ControllerFailureException("Failed to delete stream: " + streamName);
                }
                case STREAM_NOT_FOUND: {
                    log.warn(requestId, "Stream does not exist: {}", new Object[]{streamName});
                    return false;
                }
                case STREAM_NOT_SEALED: {
                    log.warn(requestId, "Stream is not sealed: {}", new Object[]{streamName});
                    throw new IllegalArgumentException("Stream is not sealed: " + streamName);
                }
                case SUCCESS: {
                    log.info(requestId, "Successfully deleted stream: {}", new Object[]{streamName});
                    return true;
                }
            }
            throw new ControllerFailureException("Unknown return status deleting stream " + streamName + " " + x.getStatus());
        })).whenComplete((x, e) -> {
            if (e != null) {
                log.warn(requestId, "deleteStream failed: ", new Object[]{e});
            }
            LoggerHelpers.traceLeave((Logger)log, (String)"deleteStream", (long)traceId, (Object[])new Object[]{scope, streamName, requestId});
        });
    }

    @Override
    public CompletableFuture<Map<Segment, Long>> getSegmentsAtTime(Stream stream, long timestamp) {
        Exceptions.checkNotClosed((boolean)this.closed.get(), (Object)this);
        Preconditions.checkNotNull((Object)stream, (Object)"stream");
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"getSegmentsAtTime", (Object[])new Object[]{stream, timestamp});
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback callback = new RPCAsyncCallback(traceId, "getSegmentsAtTime");
            Controller.StreamInfo streamInfo = ModelHelper.createStreamInfo(stream.getScope(), stream.getStreamName());
            Controller.GetSegmentsRequest request = Controller.GetSegmentsRequest.newBuilder().setStreamInfo(streamInfo).setTimestamp(timestamp).build();
            this.client.getSegments(request, callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(segments -> {
            log.debug("Received the following data from the controller {}", (Object)segments.getSegmentsList());
            return segments.getSegmentsList().stream().collect(Collectors.toMap(location -> ModelHelper.encode(location.getSegmentId()), location -> location.getOffset()));
        })).whenComplete((x, e) -> {
            if (e != null) {
                log.warn("getSegmentsAtTime failed: ", e);
            }
            LoggerHelpers.traceLeave((Logger)log, (String)"getSegmentsAtTime", (long)traceId, (Object[])new Object[0]);
        });
    }

    @Override
    public CompletableFuture<StreamSegmentsWithPredecessors> getSuccessors(Segment segment) {
        Exceptions.checkNotClosed((boolean)this.closed.get(), (Object)this);
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"getSuccessors", (Object[])new Object[]{segment});
        CompletableFuture resultFuture = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback callback = new RPCAsyncCallback(traceId, "getSuccessors");
            this.client.getSegmentsImmediatelyFollowing(ModelHelper.decode(segment), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)resultFuture.thenApply(successors -> {
            log.debug("Received the following data from the controller {}", (Object)successors.getSegmentsList());
            HashMap<SegmentWithRange, List<Long>> result = new HashMap<SegmentWithRange, List<Long>>();
            for (Controller.SuccessorResponse.SegmentEntry entry : successors.getSegmentsList()) {
                result.put(ModelHelper.encode(entry.getSegment()), entry.getValueList());
            }
            return new StreamSegmentsWithPredecessors(result, successors.getDelegationToken());
        })).whenComplete((x, e) -> {
            if (e != null) {
                log.warn("getSuccessors failed: ", e);
            }
            LoggerHelpers.traceLeave((Logger)log, (String)"getSuccessors", (long)traceId, (Object[])new Object[0]);
        });
    }

    @Override
    public CompletableFuture<StreamSegmentSuccessors> getSuccessors(StreamCut from) {
        Exceptions.checkNotClosed((boolean)this.closed.get(), (Object)this);
        Stream stream = from.asImpl().getStream();
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"getSuccessorsFromCut", (Object[])new Object[]{stream});
        return this.getSegmentsBetweenStreamCuts(from, StreamCut.UNBOUNDED).whenComplete((x, e) -> {
            if (e != null) {
                log.warn("getSuccessors failed: ", e);
            }
            LoggerHelpers.traceLeave((Logger)log, (String)"getSuccessors", (long)traceId, (Object[])new Object[0]);
        });
    }

    @Override
    public CompletableFuture<StreamSegmentSuccessors> getSegments(StreamCut fromStreamCut, StreamCut toStreamCut) {
        Exceptions.checkNotClosed((boolean)this.closed.get(), (Object)this);
        Preconditions.checkNotNull((Object)fromStreamCut, (Object)"fromStreamCut");
        Preconditions.checkNotNull((Object)toStreamCut, (Object)"toStreamCut");
        Preconditions.checkArgument((boolean)fromStreamCut.asImpl().getStream().equals(toStreamCut.asImpl().getStream()), (Object)"Ensure streamCuts for the same stream is passed");
        Stream stream = fromStreamCut.asImpl().getStream();
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"getSegments", (Object[])new Object[]{stream});
        return this.getSegmentsBetweenStreamCuts(fromStreamCut, toStreamCut).whenComplete((x, e) -> {
            if (e != null) {
                log.warn("getSuccessors failed: ", e);
            }
            LoggerHelpers.traceLeave((Logger)log, (String)"getSuccessors", (long)traceId, (Object[])new Object[0]);
        });
    }

    private CompletableFuture<StreamSegmentSuccessors> getSegmentsBetweenStreamCuts(StreamCut fromStreamCut, StreamCut toStreamCut) {
        Exceptions.checkNotClosed((boolean)this.closed.get(), (Object)this);
        Stream stream = fromStreamCut.asImpl().getStream();
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"getSegments", (Object[])new Object[]{stream});
        CompletableFuture<String> token = this.getOrRefreshDelegationTokenFor(stream.getScope(), stream.getStreamName());
        CompletableFuture resultFuture = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback callback = new RPCAsyncCallback(traceId, "getSuccessorsFromCut");
            this.client.getSegmentsBetween(ModelHelper.decode(stream.getScope(), stream.getStreamName(), this.getStreamCutMap(fromStreamCut), this.getStreamCutMap(toStreamCut)), callback);
            return callback.getFuture();
        }, this.executor);
        return resultFuture.thenApply(response -> {
            log.debug("Received the following data from the controller {}", (Object)response.getSegmentsList());
            return new StreamSegmentSuccessors(response.getSegmentsList().stream().map(ModelHelper::encode).collect(Collectors.toSet()), response.getDelegationToken());
        });
    }

    private Map<Long, Long> getStreamCutMap(StreamCut streamCut) {
        if (streamCut.equals(StreamCut.UNBOUNDED)) {
            return Collections.emptyMap();
        }
        return streamCut.asImpl().getPositions().entrySet().stream().collect(Collectors.toMap(x -> ((Segment)x.getKey()).getSegmentId(), Map.Entry::getValue));
    }

    @Override
    public CompletableFuture<StreamSegments> getCurrentSegments(String scope, String stream) {
        Exceptions.checkNotClosed((boolean)this.closed.get(), (Object)this);
        Exceptions.checkNotNullOrEmpty((String)scope, (String)"scope");
        Exceptions.checkNotNullOrEmpty((String)stream, (String)"stream");
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"getCurrentSegments", (Object[])new Object[]{scope, stream});
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback callback = new RPCAsyncCallback(traceId, "getCurrentSegments");
            this.client.getCurrentSegments(ModelHelper.createStreamInfo(scope, stream), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(ranges -> {
            log.debug("Received the following data from the controller {}", (Object)ranges.getSegmentRangesList());
            TreeMap<Double, SegmentWithRange> rangeMap = new TreeMap<Double, SegmentWithRange>();
            for (Controller.SegmentRange r : ranges.getSegmentRangesList()) {
                Preconditions.checkState((r.getMinKey() <= r.getMaxKey() ? 1 : 0) != 0, (String)"Min keyrange %s was not less than maximum keyRange %s for segment %s", (Object)r.getMinKey(), (Object)r.getMaxKey(), (Object)r.getSegmentId());
                rangeMap.put(r.getMaxKey(), new SegmentWithRange(ModelHelper.encode(r.getSegmentId()), r.getMinKey(), r.getMaxKey()));
            }
            return new StreamSegments(rangeMap, ranges.getDelegationToken());
        })).whenComplete((x, e) -> {
            if (e != null) {
                log.warn("getCurrentSegments failed: ", e);
            }
            LoggerHelpers.traceLeave((Logger)log, (String)"getCurrentSegments", (long)traceId, (Object[])new Object[0]);
        });
    }

    @Override
    public CompletableFuture<PravegaNodeUri> getEndpointForSegment(String qualifiedSegmentName) {
        Exceptions.checkNotClosed((boolean)this.closed.get(), (Object)this);
        Exceptions.checkNotNullOrEmpty((String)qualifiedSegmentName, (String)"qualifiedSegmentName");
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"getEndpointForSegment", (Object[])new Object[]{qualifiedSegmentName});
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback callback = new RPCAsyncCallback(traceId, "getEndpointForSegment");
            Segment segment = Segment.fromScopedName(qualifiedSegmentName);
            this.client.getURI(ModelHelper.createSegmentId(segment.getScope(), segment.getStreamName(), segment.getSegmentId()), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(ModelHelper::encode)).whenComplete((x, e) -> {
            if (e != null) {
                log.warn("getEndpointForSegment failed: ", e);
            }
            LoggerHelpers.traceLeave((Logger)log, (String)"getEndpointForSegment", (long)traceId, (Object[])new Object[0]);
        });
    }

    @Override
    public CompletableFuture<Boolean> isSegmentOpen(Segment segment) {
        Exceptions.checkNotClosed((boolean)this.closed.get(), (Object)this);
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"isSegmentOpen", (Object[])new Object[]{segment});
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback callback = new RPCAsyncCallback(traceId, "isSegmentOpen");
            this.client.isSegmentValid(ModelHelper.createSegmentId(segment.getScope(), segment.getStreamName(), segment.getSegmentId()), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(Controller.SegmentValidityResponse::getResponse)).whenComplete((x, e) -> {
            if (e != null) {
                log.warn("isSegmentOpen failed: ", e);
            }
            LoggerHelpers.traceLeave((Logger)log, (String)"isSegmentOpen", (long)traceId, (Object[])new Object[0]);
        });
    }

    @Override
    public CompletableFuture<TxnSegments> createTransaction(Stream stream, long lease) {
        Exceptions.checkNotClosed((boolean)this.closed.get(), (Object)this);
        Preconditions.checkNotNull((Object)stream, (Object)"stream");
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"createTransaction", (Object[])new Object[]{stream, lease});
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback callback = new RPCAsyncCallback(traceId, "createTransaction");
            this.client.createTransaction(Controller.CreateTxnRequest.newBuilder().setStreamInfo(ModelHelper.createStreamInfo(stream.getScope(), stream.getStreamName())).setLease(lease).build(), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(this::convert)).whenComplete((x, e) -> {
            if (e != null) {
                log.warn("createTransaction failed: ", e);
            }
            LoggerHelpers.traceLeave((Logger)log, (String)"createTransaction", (long)traceId, (Object[])new Object[0]);
        });
    }

    private TxnSegments convert(Controller.CreateTxnResponse response) {
        TreeMap<Double, SegmentWithRange> rangeMap = new TreeMap<Double, SegmentWithRange>();
        for (Controller.SegmentRange r : response.getActiveSegmentsList()) {
            Preconditions.checkState((r.getMinKey() <= r.getMaxKey() ? 1 : 0) != 0);
            rangeMap.put(r.getMaxKey(), new SegmentWithRange(ModelHelper.encode(r.getSegmentId()), r.getMinKey(), r.getMaxKey()));
        }
        StreamSegments segments = new StreamSegments(rangeMap, response.getDelegationToken());
        return new TxnSegments(segments, ModelHelper.encode(response.getTxnId()));
    }

    @Override
    public CompletableFuture<Transaction.PingStatus> pingTransaction(Stream stream, UUID txId, long lease) {
        Exceptions.checkNotClosed((boolean)this.closed.get(), (Object)this);
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"pingTransaction", (Object[])new Object[]{stream, txId, lease});
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback callback = new RPCAsyncCallback(traceId, "pingTransaction");
            this.client.pingTransaction(Controller.PingTxnRequest.newBuilder().setStreamInfo(ModelHelper.createStreamInfo(stream.getScope(), stream.getStreamName())).setTxnId(ModelHelper.decode(txId)).setLease(lease).build(), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(status -> {
            try {
                return ModelHelper.encode(status.getStatus(), stream + " " + txId);
            }
            catch (PingFailedException ex) {
                throw new CompletionException(ex);
            }
        })).whenComplete((s, e) -> {
            if (e != null) {
                log.warn("Ping Transaction failed:", e);
            }
            LoggerHelpers.traceLeave((Logger)log, (String)"pingTransaction", (long)traceId, (Object[])new Object[0]);
        });
    }

    @Override
    public CompletableFuture<Void> commitTransaction(Stream stream, String writerId, Long timestamp, UUID txId) {
        Exceptions.checkNotClosed((boolean)this.closed.get(), (Object)this);
        Preconditions.checkNotNull((Object)stream, (Object)"stream");
        Preconditions.checkNotNull((Object)txId, (Object)"txId");
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"commitTransaction", (Object[])new Object[]{stream, txId});
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback callback = new RPCAsyncCallback(traceId, "commitTransaction");
            Controller.TxnRequest.Builder txnRequest = Controller.TxnRequest.newBuilder().setStreamInfo(ModelHelper.createStreamInfo(stream.getScope(), stream.getStreamName())).setWriterId(writerId).setTxnId(ModelHelper.decode(txId));
            if (timestamp != null) {
                txnRequest.setTimestamp(timestamp.longValue());
            } else {
                txnRequest.setTimestamp(Long.MIN_VALUE);
            }
            this.client.commitTransaction(txnRequest.build(), callback);
            return callback.getFuture();
        }, this.executor);
        return result.thenApply(txnStatus -> {
            if (txnStatus.getStatus().equals((Object)Controller.TxnStatus.Status.STREAM_NOT_FOUND)) {
                throw new InvalidStreamException("Stream no longer exists: " + stream);
            }
            if (txnStatus.getStatus().equals((Object)Controller.TxnStatus.Status.TRANSACTION_NOT_FOUND)) {
                throw Exceptions.sneakyThrow((Throwable)new TxnFailedException("Transaction was already either committed or aborted"));
            }
            if (txnStatus.getStatus().equals((Object)Controller.TxnStatus.Status.SUCCESS)) {
                return null;
            }
            log.info("Unable to commit " + txnStatus + " because of " + txnStatus.getStatus());
            throw Exceptions.sneakyThrow((Throwable)new TxnFailedException("Commit transaction failed with status: " + txnStatus.getStatus()));
        });
    }

    @Override
    public CompletableFuture<Void> abortTransaction(Stream stream, UUID txId) {
        Exceptions.checkNotClosed((boolean)this.closed.get(), (Object)this);
        Preconditions.checkNotNull((Object)stream, (Object)"stream");
        Preconditions.checkNotNull((Object)txId, (Object)"txId");
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"abortTransaction", (Object[])new Object[]{stream, txId});
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback callback = new RPCAsyncCallback(traceId, "abortTransaction");
            this.client.abortTransaction(Controller.TxnRequest.newBuilder().setStreamInfo(ModelHelper.createStreamInfo(stream.getScope(), stream.getStreamName())).setTxnId(ModelHelper.decode(txId)).build(), callback);
            return callback.getFuture();
        }, this.executor);
        return result.thenApply(txnStatus -> {
            LoggerHelpers.traceLeave((Logger)log, (String)"abortTransaction", (long)traceId, (Object[])new Object[0]);
            if (txnStatus.getStatus().equals((Object)Controller.TxnStatus.Status.STREAM_NOT_FOUND)) {
                throw new InvalidStreamException("Stream no longer exists: " + stream);
            }
            if (txnStatus.getStatus().equals((Object)Controller.TxnStatus.Status.TRANSACTION_NOT_FOUND)) {
                throw Exceptions.sneakyThrow((Throwable)new TxnFailedException("Transaction was already either committed or aborted"));
            }
            if (txnStatus.getStatus().equals((Object)Controller.TxnStatus.Status.SUCCESS)) {
                return null;
            }
            log.info("Unable to abort " + txnStatus + " because of " + txnStatus.getStatus());
            throw new RuntimeException("Error aborting transaction: " + txnStatus.getStatus());
        });
    }

    @Override
    public CompletableFuture<Transaction.Status> checkTransactionStatus(Stream stream, UUID txId) {
        Exceptions.checkNotClosed((boolean)this.closed.get(), (Object)this);
        Preconditions.checkNotNull((Object)stream, (Object)"stream");
        Preconditions.checkNotNull((Object)txId, (Object)"txId");
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"checkTransactionStatus", (Object[])new Object[]{stream, txId});
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback callback = new RPCAsyncCallback(traceId, "checkTransactionStatus");
            this.client.checkTransactionState(Controller.TxnRequest.newBuilder().setStreamInfo(ModelHelper.createStreamInfo(stream.getScope(), stream.getStreamName())).setTxnId(ModelHelper.decode(txId)).build(), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(status -> ModelHelper.encode(status.getState(), stream + " " + txId))).whenComplete((x, e) -> {
            if (e != null) {
                log.warn("checkTransactionStatus failed: ", e);
            }
            LoggerHelpers.traceLeave((Logger)log, (String)"checkTransactionStatus", (long)traceId, (Object[])new Object[0]);
        });
    }

    @Override
    public CompletableFuture<Void> noteTimestampFromWriter(String writer, Stream stream, long timestamp, WriterPosition lastWrittenPosition) {
        Exceptions.checkNotClosed((boolean)this.closed.get(), (Object)this);
        Preconditions.checkNotNull((Object)stream, (Object)"stream");
        Preconditions.checkNotNull((Object)writer, (Object)"writer");
        Preconditions.checkNotNull((Object)lastWrittenPosition, (Object)"lastWrittenPosition");
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"noteTimestampFromWriter", (Object[])new Object[]{writer, stream});
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback callback = new RPCAsyncCallback(traceId, "lastWrittenPosition");
            this.client.noteTimestampFromWriter(Controller.TimestampFromWriter.newBuilder().setWriter(writer).setTimestamp(timestamp).setPosition(ModelHelper.createStreamCut(stream, lastWrittenPosition)).build(), callback);
            return callback.getFuture();
        }, this.executor);
        return result.thenApply(response -> {
            LoggerHelpers.traceLeave((Logger)log, (String)"noteTimestampFromWriter", (long)traceId, (Object[])new Object[0]);
            if (response.getResult().equals((Object)Controller.TimestampResponse.Status.SUCCESS)) {
                return null;
            }
            log.warn("Writer " + writer + " failed to note time because: " + response.getResult() + " time was: " + timestamp + " position=" + lastWrittenPosition);
            throw new RuntimeException("failed to note time because: " + response.getResult());
        });
    }

    @Override
    public CompletableFuture<Void> removeWriter(String writerId, Stream stream) {
        Exceptions.checkNotClosed((boolean)this.closed.get(), (Object)this);
        Preconditions.checkNotNull((Object)stream, (Object)"stream");
        Preconditions.checkNotNull((Object)writerId, (Object)"writerId");
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"writerShutdown", (Object[])new Object[]{writerId, stream});
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback callback = new RPCAsyncCallback(traceId, "writerShutdown");
            this.client.removeWriter(Controller.RemoveWriterRequest.newBuilder().setWriter(writerId).setStream(ModelHelper.createStreamInfo(stream.getScope(), stream.getStreamName())).build(), callback);
            return callback.getFuture();
        }, this.executor);
        return result.thenApply(response -> {
            LoggerHelpers.traceLeave((Logger)log, (String)"writerShutdown", (long)traceId, (Object[])new Object[0]);
            if (response.getResult().equals((Object)Controller.RemoveWriterResponse.Status.SUCCESS)) {
                return null;
            }
            log.warn("Notifying the controller of writer shutdown failed for writer: " + writerId + " because of " + response.getResult());
            throw new RuntimeException("Unable to remove writer due to: " + response.getResult());
        });
    }

    @Override
    public void close() {
        if (!this.closed.getAndSet(true)) {
            this.channel.shutdownNow();
        }
    }

    @Override
    public CompletableFuture<String> getOrRefreshDelegationTokenFor(String scope, String streamName) {
        Exceptions.checkNotClosed((boolean)this.closed.get(), (Object)this);
        Exceptions.checkNotNullOrEmpty((String)scope, (String)"scope");
        Exceptions.checkNotNullOrEmpty((String)streamName, (String)"stream");
        long traceId = LoggerHelpers.traceEnter((Logger)log, (String)"getOrRefreshDelegationTokenFor", (Object[])new Object[]{scope, streamName});
        CompletableFuture result = this.retryConfig.runAsync(() -> {
            RPCAsyncCallback callback = new RPCAsyncCallback(traceId, "getOrRefreshDelegationTokenFor");
            this.client.getDelegationToken(ModelHelper.createStreamInfo(scope, streamName), callback);
            return callback.getFuture();
        }, this.executor);
        return ((CompletableFuture)result.thenApply(token -> token.getDelegationToken())).whenComplete((x, e) -> {
            if (e != null) {
                log.warn("getCurrentSegments failed: ", e);
            }
            LoggerHelpers.traceLeave((Logger)log, (String)"getCurrentSegments", (long)traceId, (Object[])new Object[0]);
        });
    }

    private static class ControllerClientTagger {
        private ControllerServiceGrpc.ControllerServiceStub clientStub;

        ControllerClientTagger(ControllerServiceGrpc.ControllerServiceStub clientStub) {
            this.clientStub = clientStub;
        }

        ControllerClientTagger withTag(long requestId, String ... requestInfo) {
            String requestDescriptor = RequestTracker.buildRequestDescriptor((String[])requestInfo);
            log.info(requestId, "Tagging client request ({}).", new Object[]{requestDescriptor});
            this.clientStub = (ControllerServiceGrpc.ControllerServiceStub)((ControllerServiceGrpc.ControllerServiceStub)this.clientStub.withOption(RPCTracingHelpers.REQUEST_DESCRIPTOR_CALL_OPTION, (Object)requestDescriptor)).withOption(RPCTracingHelpers.REQUEST_ID_CALL_OPTION, (Object)String.valueOf(requestId));
            return this;
        }

        public void createScope(Controller.ScopeInfo scopeInfo, RPCAsyncCallback<Controller.CreateScopeStatus> callback) {
            this.clientStub.createScope(scopeInfo, callback);
        }

        public void listStreamsInScope(Controller.StreamsInScopeRequest request, RPCAsyncCallback<Controller.StreamsInScopeResponse> callback) {
            this.clientStub.listStreamsInScope(request, callback);
        }

        public void deleteScope(Controller.ScopeInfo scopeInfo, RPCAsyncCallback<Controller.DeleteScopeStatus> callback) {
            this.clientStub.deleteScope(scopeInfo, callback);
        }

        public void createStream(Controller.StreamConfig streamConfig, RPCAsyncCallback<Controller.CreateStreamStatus> callback) {
            this.clientStub.createStream(streamConfig, callback);
        }

        public void scale(Controller.ScaleRequest scaleRequest, RPCAsyncCallback<Controller.ScaleResponse> callback) {
            this.clientStub.scale(scaleRequest, callback);
        }

        public void updateStream(Controller.StreamConfig streamConfig, RPCAsyncCallback<Controller.UpdateStreamStatus> callback) {
            this.clientStub.updateStream(streamConfig, callback);
        }

        public void truncateStream(Controller.StreamCut streamCut, RPCAsyncCallback<Controller.UpdateStreamStatus> callback) {
            this.clientStub.truncateStream(streamCut, callback);
        }

        public void sealStream(Controller.StreamInfo streamInfo, RPCAsyncCallback<Controller.UpdateStreamStatus> callback) {
            this.clientStub.sealStream(streamInfo, callback);
        }

        public void deleteStream(Controller.StreamInfo streamInfo, RPCAsyncCallback<Controller.DeleteStreamStatus> callback) {
            this.clientStub.deleteStream(streamInfo, callback);
        }
    }

    private static final class RPCAsyncCallback<T>
    implements StreamObserver<T> {
        private final long traceId;
        private final String method;
        private T result = null;
        private final CompletableFuture<T> future = new CompletableFuture();

        RPCAsyncCallback(long traceId, String method) {
            this.traceId = traceId;
            this.method = method;
        }

        public void onNext(T value) {
            this.result = value;
        }

        public void onError(Throwable t) {
            log.warn("gRPC call for {} with trace id {} failed with server error.", new Object[]{this.method, this.traceId, t});
            if (t instanceof RuntimeException) {
                this.future.completeExceptionally(t);
            } else {
                this.future.completeExceptionally(new RuntimeException(t));
            }
        }

        public void onCompleted() {
            this.future.complete(this.result);
        }

        public CompletableFuture<T> getFuture() {
            return this.future;
        }
    }
}

