/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.controller.server.rpc.grpc.v1;

import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.pravega.auth.AuthHandler;
import io.pravega.client.stream.impl.ModelHelper;
import io.pravega.common.Exceptions;
import io.pravega.common.hash.RandomFactory;
import io.pravega.common.tracing.RequestTag;
import io.pravega.common.tracing.RequestTracker;
import io.pravega.common.tracing.TagLogger;
import io.pravega.controller.server.AuthResourceRepresentation;
import io.pravega.controller.server.ControllerService;
import io.pravega.controller.server.rpc.auth.AuthContext;
import io.pravega.controller.server.rpc.auth.GrpcAuthHelper;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.task.LockFailedException;
import io.pravega.controller.stream.api.grpc.v1.Controller;
import io.pravega.controller.stream.api.grpc.v1.ControllerServiceGrpc;
import java.beans.ConstructorProperties;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.LoggerFactory;

public class ControllerServiceImpl
extends ControllerServiceGrpc.ControllerServiceImplBase {
    private static final TagLogger log = new TagLogger(LoggerFactory.getLogger(ControllerServiceImpl.class));
    private static final int LIST_STREAMS_IN_SCOPE_LIMIT = 1000;
    private final ControllerService controllerService;
    private final GrpcAuthHelper grpcAuthHelper;
    private final RequestTracker requestTracker;
    private final boolean replyWithStackTraceOnError;
    private final Supplier<Long> requestIdGenerator = RandomFactory.create()::nextLong;
    private final int listStreamsInScopeLimit;

    public ControllerServiceImpl(ControllerService controllerService, GrpcAuthHelper authHelper, RequestTracker requestTracker, boolean replyWithStackTraceOnError) {
        this(controllerService, authHelper, requestTracker, replyWithStackTraceOnError, 1000);
    }

    public void getControllerServerList(Controller.ServerRequest request, StreamObserver<Controller.ServerResponse> responseObserver) {
        log.info("getControllerServerList called.");
        this.authenticateExecuteAndProcessResults(() -> "", delegationToken -> this.controllerService.getControllerServerList().thenApply(servers -> Controller.ServerResponse.newBuilder().addAllNodeURI((Iterable)servers).build()), responseObserver);
    }

    public void createStream(Controller.StreamConfig request, StreamObserver<Controller.CreateStreamStatus> responseObserver) {
        String scope = request.getStreamInfo().getScope();
        String stream = request.getStreamInfo().getStream();
        RequestTag requestTag = this.requestTracker.initializeAndTrackRequestTag(this.requestIdGenerator.get().longValue(), new String[]{"createStream", scope, stream});
        log.info(requestTag.getRequestId(), "createStream called for stream {}/{}.", new Object[]{scope, stream});
        this.authenticateExecuteAndProcessResults(() -> this.grpcAuthHelper.checkAuthorizationAndCreateToken(AuthResourceRepresentation.ofStreamsInScope(scope), AuthHandler.Permissions.READ_UPDATE), delegationToken -> this.controllerService.createStream(scope, stream, ModelHelper.encode((Controller.StreamConfig)request), System.currentTimeMillis()), responseObserver, requestTag);
    }

    public void updateStream(Controller.StreamConfig request, StreamObserver<Controller.UpdateStreamStatus> responseObserver) {
        String scope = request.getStreamInfo().getScope();
        String stream = request.getStreamInfo().getStream();
        RequestTag requestTag = this.requestTracker.initializeAndTrackRequestTag(this.requestIdGenerator.get().longValue(), new String[]{"updateStream", scope, stream});
        log.info(requestTag.getRequestId(), "updateStream called for stream {}/{}.", new Object[]{scope, stream});
        this.authenticateExecuteAndProcessResults(() -> this.grpcAuthHelper.checkAuthorization(AuthResourceRepresentation.ofStreamInScope(scope, stream), AuthHandler.Permissions.READ_UPDATE), delegationToken -> this.controllerService.updateStream(scope, stream, ModelHelper.encode((Controller.StreamConfig)request)), responseObserver, requestTag);
    }

    public void truncateStream(Controller.StreamCut request, StreamObserver<Controller.UpdateStreamStatus> responseObserver) {
        RequestTag requestTag = this.requestTracker.initializeAndTrackRequestTag(this.requestIdGenerator.get().longValue(), new String[]{"truncateStream", request.getStreamInfo().getScope(), request.getStreamInfo().getStream()});
        log.info(requestTag.getRequestId(), "truncateStream called for stream {}/{}.", new Object[]{request.getStreamInfo().getScope(), request.getStreamInfo().getStream()});
        this.authenticateExecuteAndProcessResults(() -> this.grpcAuthHelper.checkAuthorization(AuthResourceRepresentation.ofStreamInScope(request.getStreamInfo().getScope(), request.getStreamInfo().getStream()), AuthHandler.Permissions.READ_UPDATE), delegationToken -> this.controllerService.truncateStream(request.getStreamInfo().getScope(), request.getStreamInfo().getStream(), ModelHelper.encode((Controller.StreamCut)request)), responseObserver, requestTag);
    }

    public void sealStream(Controller.StreamInfo request, StreamObserver<Controller.UpdateStreamStatus> responseObserver) {
        RequestTag requestTag = this.requestTracker.initializeAndTrackRequestTag(this.requestIdGenerator.get().longValue(), new String[]{"sealStream", request.getScope(), request.getStream()});
        log.info(requestTag.getRequestId(), "sealStream called for stream {}/{}.", new Object[]{request.getScope(), request.getStream()});
        this.authenticateExecuteAndProcessResults(() -> this.grpcAuthHelper.checkAuthorization(AuthResourceRepresentation.ofStreamInScope(request.getScope(), request.getStream()), AuthHandler.Permissions.READ_UPDATE), delegationToken -> this.controllerService.sealStream(request.getScope(), request.getStream()), responseObserver, requestTag);
    }

    public void deleteStream(Controller.StreamInfo request, StreamObserver<Controller.DeleteStreamStatus> responseObserver) {
        RequestTag requestTag = this.requestTracker.initializeAndTrackRequestTag(this.requestIdGenerator.get().longValue(), new String[]{"deleteStream", request.getScope(), request.getStream()});
        log.info(requestTag.getRequestId(), "deleteStream called for stream {}/{}.", new Object[]{request.getScope(), request.getStream()});
        this.authenticateExecuteAndProcessResults(() -> this.grpcAuthHelper.checkAuthorization(AuthResourceRepresentation.ofStreamInScope(request.getScope(), request.getStream()), AuthHandler.Permissions.READ_UPDATE), delegationToken -> this.controllerService.deleteStream(request.getScope(), request.getStream()), responseObserver, requestTag);
    }

    public void getCurrentSegments(Controller.StreamInfo request, StreamObserver<Controller.SegmentRanges> responseObserver) {
        log.info("getCurrentSegments called for stream {}/{}.", (Object)request.getScope(), (Object)request.getStream());
        this.authenticateExecuteAndProcessResults(() -> this.grpcAuthHelper.checkAuthorizationAndCreateToken(AuthResourceRepresentation.ofStreamInScope(request.getScope(), request.getStream()), AuthHandler.Permissions.READ_UPDATE), delegationToken -> {
            this.logIfEmpty((String)delegationToken, "getCurrentSegments", request.getScope(), request.getStream());
            return this.controllerService.getCurrentSegments(request.getScope(), request.getStream()).thenApply(segmentRanges -> Controller.SegmentRanges.newBuilder().addAllSegmentRanges((Iterable)segmentRanges).setDelegationToken(delegationToken).build());
        }, responseObserver);
    }

    public void getSegments(Controller.GetSegmentsRequest request, StreamObserver<Controller.SegmentsAtTime> responseObserver) {
        log.debug("getSegments called for stream " + request.getStreamInfo().getScope() + "/" + request.getStreamInfo().getStream());
        this.authenticateExecuteAndProcessResults(() -> this.grpcAuthHelper.checkAuthorizationAndCreateToken(AuthResourceRepresentation.ofStreamInScope(request.getStreamInfo().getScope(), request.getStreamInfo().getStream()), AuthHandler.Permissions.READ_UPDATE), delegationToken -> {
            this.logIfEmpty((String)delegationToken, "getSegments", request.getStreamInfo().getScope(), request.getStreamInfo().getStream());
            return this.controllerService.getSegmentsAtHead(request.getStreamInfo().getScope(), request.getStreamInfo().getStream()).thenApply(segments -> {
                Controller.SegmentsAtTime.Builder builder = Controller.SegmentsAtTime.newBuilder().setDelegationToken(delegationToken);
                for (Map.Entry entry : segments.entrySet()) {
                    builder.addSegments(Controller.SegmentsAtTime.SegmentLocation.newBuilder().setSegmentId((Controller.SegmentId)entry.getKey()).setOffset(((Long)entry.getValue()).longValue()).build());
                }
                return builder.build();
            });
        }, responseObserver);
    }

    public void getSegmentsImmediatelyFollowing(Controller.SegmentId segmentId, StreamObserver<Controller.SuccessorResponse> responseObserver) {
        log.info("getSegmentsImmediatelyFollowing called for segment {} ", (Object)segmentId);
        this.authenticateExecuteAndProcessResults(() -> this.grpcAuthHelper.checkAuthorization(AuthResourceRepresentation.ofStreamInScope(segmentId.getStreamInfo().getScope(), segmentId.getStreamInfo().getStream()), AuthHandler.Permissions.READ), delegationToken -> ((CompletableFuture)this.controllerService.getSegmentsImmediatelyFollowing(segmentId).thenApply(ModelHelper::createSuccessorResponse)).thenApply(response -> {
            response.setDelegationToken(delegationToken);
            return response.build();
        }), responseObserver);
    }

    public void getSegmentsImmediatlyFollowing(Controller.SegmentId segmentId, StreamObserver<Controller.SuccessorResponse> responseObserver) {
        log.info("getSegmentsImmediatlyFollowing called for segment {} ", (Object)segmentId);
        this.getSegmentsImmediatelyFollowing(segmentId, responseObserver);
    }

    public void getSegmentsBetween(Controller.StreamCutRange request, StreamObserver<Controller.StreamCutRangeResponse> responseObserver) {
        log.info("getSegmentsBetweenStreamCuts called for stream {} for cuts from {} to {}", new Object[]{request.getStreamInfo(), request.getFromMap(), request.getToMap()});
        String scope = request.getStreamInfo().getScope();
        String stream = request.getStreamInfo().getStream();
        this.authenticateExecuteAndProcessResults(() -> this.grpcAuthHelper.checkAuthorizationAndCreateToken(AuthResourceRepresentation.ofStreamInScope(scope, stream), AuthHandler.Permissions.READ), delegationToken -> {
            this.logIfEmpty((String)delegationToken, "getSegmentsBetween", request.getStreamInfo().getScope(), request.getStreamInfo().getStream());
            return this.controllerService.getSegmentsBetweenStreamCuts(request).thenApply(segments -> ModelHelper.createStreamCutRangeResponse((String)scope, (String)stream, segments.stream().map(x -> ModelHelper.createSegmentId((String)scope, (String)stream, (long)x.segmentId())).collect(Collectors.toList()), (String)delegationToken));
        }, responseObserver);
    }

    public void scale(Controller.ScaleRequest request, StreamObserver<Controller.ScaleResponse> responseObserver) {
        RequestTag requestTag = this.requestTracker.initializeAndTrackRequestTag(this.requestIdGenerator.get().longValue(), new String[]{"scaleStream", request.getStreamInfo().getScope(), request.getStreamInfo().getStream(), String.valueOf(request.getScaleTimestamp())});
        log.info(requestTag.getRequestId(), "scale called for stream {}/{}.", new Object[]{request.getStreamInfo().getScope(), request.getStreamInfo().getStream()});
        this.authenticateExecuteAndProcessResults(() -> this.grpcAuthHelper.checkAuthorization(AuthResourceRepresentation.ofStreamInScope(request.getStreamInfo().getScope(), request.getStreamInfo().getStream()), AuthHandler.Permissions.READ_UPDATE), delegationToken -> this.controllerService.scale(request.getStreamInfo().getScope(), request.getStreamInfo().getStream(), request.getSealedSegmentsList(), request.getNewKeyRangesList().stream().collect(Collectors.toMap(entry -> entry.getStart(), entry -> entry.getEnd())), request.getScaleTimestamp()), responseObserver);
    }

    public void checkScale(Controller.ScaleStatusRequest request, StreamObserver<Controller.ScaleStatusResponse> responseObserver) {
        log.debug("check scale status called for stream {}/{}.", (Object)request.getStreamInfo().getScope(), (Object)request.getStreamInfo().getStream());
        this.authenticateExecuteAndProcessResults(() -> this.grpcAuthHelper.checkAuthorization(AuthResourceRepresentation.ofStreamInScope(request.getStreamInfo().getScope(), request.getStreamInfo().getStream()), AuthHandler.Permissions.READ), delegationToken -> this.controllerService.checkScale(request.getStreamInfo().getScope(), request.getStreamInfo().getStream(), request.getEpoch()), responseObserver);
    }

    public void getURI(Controller.SegmentId request, StreamObserver<Controller.NodeUri> responseObserver) {
        log.info("getURI called for segment {}/{}/{}.", new Object[]{request.getStreamInfo().getScope(), request.getStreamInfo().getStream(), request.getSegmentId()});
        this.authenticateExecuteAndProcessResults(() -> this.grpcAuthHelper.checkAuthorization(AuthResourceRepresentation.ofStreamInScope(request.getStreamInfo().getScope(), request.getStreamInfo().getStream()), AuthHandler.Permissions.READ), delegationToken -> this.controllerService.getURI(request), responseObserver);
    }

    public void isSegmentValid(Controller.SegmentId request, StreamObserver<Controller.SegmentValidityResponse> responseObserver) {
        log.info("isSegmentValid called for segment {}/{}/{}.", new Object[]{request.getStreamInfo().getScope(), request.getStreamInfo().getStream(), request.getSegmentId()});
        this.authenticateExecuteAndProcessResults(() -> this.grpcAuthHelper.checkAuthorization(AuthResourceRepresentation.ofStreamInScope(request.getStreamInfo().getScope(), request.getStreamInfo().getStream()), AuthHandler.Permissions.READ), delegationToken -> this.controllerService.isSegmentValid(request.getStreamInfo().getScope(), request.getStreamInfo().getStream(), request.getSegmentId()).thenApply(bRes -> Controller.SegmentValidityResponse.newBuilder().setResponse(bRes.booleanValue()).build()), responseObserver);
    }

    public void isStreamCutValid(Controller.StreamCut request, StreamObserver<Controller.StreamCutValidityResponse> responseObserver) {
        log.info("isStreamCutValid called for stream {}/{} streamcut {}.", new Object[]{request.getStreamInfo().getScope(), request.getStreamInfo().getStream(), request.getCutMap()});
        this.authenticateExecuteAndProcessResults(() -> this.grpcAuthHelper.checkAuthorizationAndCreateToken(AuthResourceRepresentation.ofStreamInScope(request.getStreamInfo().getScope(), request.getStreamInfo().getStream()), AuthHandler.Permissions.READ), delegationToken -> this.controllerService.isStreamCutValid(request.getStreamInfo().getScope(), request.getStreamInfo().getStream(), request.getCutMap()).thenApply(bRes -> Controller.StreamCutValidityResponse.newBuilder().setResponse(bRes.booleanValue()).build()), responseObserver);
    }

    public void createTransaction(Controller.CreateTxnRequest request, StreamObserver<Controller.CreateTxnResponse> responseObserver) {
        log.info("createTransaction called for stream {}/{}.", (Object)request.getStreamInfo().getScope(), (Object)request.getStreamInfo().getStream());
        this.authenticateExecuteAndProcessResults(() -> this.grpcAuthHelper.checkAuthorizationAndCreateToken(AuthResourceRepresentation.ofStreamInScope(request.getStreamInfo().getScope(), request.getStreamInfo().getStream()), AuthHandler.Permissions.READ_UPDATE), delegationToken -> this.controllerService.createTransaction(request.getStreamInfo().getScope(), request.getStreamInfo().getStream(), request.getLease()).thenApply(pair -> Controller.CreateTxnResponse.newBuilder().setDelegationToken(delegationToken).setTxnId(ModelHelper.decode((UUID)((UUID)pair.getKey()))).addAllActiveSegments((Iterable)pair.getValue()).build()), responseObserver);
    }

    public void commitTransaction(Controller.TxnRequest request, StreamObserver<Controller.TxnStatus> responseObserver) {
        log.info("commitTransaction called for stream {}/{}, txnId={}.", new Object[]{request.getStreamInfo().getScope(), request.getStreamInfo().getStream(), request.getTxnId()});
        this.authenticateExecuteAndProcessResults(() -> this.grpcAuthHelper.checkAuthorization(AuthResourceRepresentation.ofStreamInScope(request.getStreamInfo().getScope(), request.getStreamInfo().getStream()), AuthHandler.Permissions.READ_UPDATE), delegationToken -> this.controllerService.commitTransaction(request.getStreamInfo().getScope(), request.getStreamInfo().getStream(), request.getTxnId(), request.getWriterId(), request.getTimestamp()), responseObserver);
    }

    public void abortTransaction(Controller.TxnRequest request, StreamObserver<Controller.TxnStatus> responseObserver) {
        log.info("abortTransaction called for stream {}/{}, txnId={}.", new Object[]{request.getStreamInfo().getScope(), request.getStreamInfo().getStream(), request.getTxnId()});
        this.authenticateExecuteAndProcessResults(() -> this.grpcAuthHelper.checkAuthorization(AuthResourceRepresentation.ofStreamInScope(request.getStreamInfo().getScope(), request.getStreamInfo().getStream()), AuthHandler.Permissions.READ_UPDATE), delegationToken -> this.controllerService.abortTransaction(request.getStreamInfo().getScope(), request.getStreamInfo().getStream(), request.getTxnId()), responseObserver);
    }

    public void pingTransaction(Controller.PingTxnRequest request, StreamObserver<Controller.PingTxnStatus> responseObserver) {
        log.info("pingTransaction called for stream {}/{}, txnId={}", new Object[]{request.getStreamInfo().getScope(), request.getStreamInfo().getStream(), request.getTxnId()});
        this.authenticateExecuteAndProcessResults(() -> this.grpcAuthHelper.checkAuthorization(AuthResourceRepresentation.ofStreamInScope(request.getStreamInfo().getScope(), request.getStreamInfo().getStream()), AuthHandler.Permissions.READ_UPDATE), delegationToken -> this.controllerService.pingTransaction(request.getStreamInfo().getScope(), request.getStreamInfo().getStream(), request.getTxnId(), request.getLease()), responseObserver);
    }

    public void checkTransactionState(Controller.TxnRequest request, StreamObserver<Controller.TxnState> responseObserver) {
        log.info("checkTransactionState called for stream {}/{}, txnId={}.", new Object[]{request.getStreamInfo().getScope(), request.getStreamInfo().getStream(), request.getTxnId()});
        this.authenticateExecuteAndProcessResults(() -> this.grpcAuthHelper.checkAuthorization(AuthResourceRepresentation.ofStreamInScope(request.getStreamInfo().getScope(), request.getStreamInfo().getStream()), AuthHandler.Permissions.READ), delegationToken -> this.controllerService.checkTransactionStatus(request.getStreamInfo().getScope(), request.getStreamInfo().getStream(), request.getTxnId()), responseObserver);
    }

    public void createScope(Controller.ScopeInfo request, StreamObserver<Controller.CreateScopeStatus> responseObserver) {
        RequestTag requestTag = this.requestTracker.initializeAndTrackRequestTag(this.requestIdGenerator.get().longValue(), new String[]{"createScope", request.getScope()});
        log.info(requestTag.getRequestId(), "createScope called for scope {}.", new Object[]{request.getScope()});
        this.authenticateExecuteAndProcessResults(() -> this.grpcAuthHelper.checkAuthorization(AuthResourceRepresentation.ofScopes(), AuthHandler.Permissions.READ_UPDATE), delegationToken -> this.controllerService.createScope(request.getScope()), responseObserver, requestTag);
    }

    public void listStreamsInScope(Controller.StreamsInScopeRequest request, StreamObserver<Controller.StreamsInScopeResponse> responseObserver) {
        String scopeName = request.getScope().getScope();
        RequestTag requestTag = this.requestTracker.initializeAndTrackRequestTag(this.requestIdGenerator.get().longValue(), new String[]{"listStream", scopeName});
        log.info(requestTag.getRequestId(), "listStream called for scope {}.", new Object[]{scopeName});
        AuthContext ctx = this.grpcAuthHelper.isAuthEnabled() ? AuthContext.current() : null;
        this.authenticateExecuteAndProcessResults(() -> {
            String result = this.grpcAuthHelper.checkAuthorization(AuthResourceRepresentation.ofScope(scopeName), AuthHandler.Permissions.READ, ctx);
            log.debug("Result of authorization for [{}] and READ permission is: [{}]", (Object)AuthResourceRepresentation.ofScope(scopeName), (Object)result);
            return result;
        }, delegationToken -> this.controllerService.listStreams(scopeName, request.getContinuationToken().getToken(), this.listStreamsInScopeLimit).handle((response, ex) -> {
            if (ex != null) {
                if (Exceptions.unwrap((Throwable)ex) instanceof StoreException.DataNotFoundException) {
                    return Controller.StreamsInScopeResponse.newBuilder().setStatus(Controller.StreamsInScopeResponse.Status.SCOPE_NOT_FOUND).build();
                }
                throw new CompletionException((Throwable)ex);
            }
            log.debug("All streams in scope with continuation token: {}", response);
            List streams = ((List)response.getKey()).stream().filter(streamName -> {
                String streamAuthResource = AuthResourceRepresentation.ofStreamInScope(scopeName, streamName);
                boolean isAuthorized = this.grpcAuthHelper.isAuthorized(streamAuthResource, AuthHandler.Permissions.READ, ctx);
                log.debug("Authorization for [{}] for READ permission was [{}]", (Object)streamAuthResource, (Object)isAuthorized);
                return isAuthorized;
            }).map(m -> Controller.StreamInfo.newBuilder().setScope(scopeName).setStream(m).build()).collect(Collectors.toList());
            return Controller.StreamsInScopeResponse.newBuilder().addAllStreams(streams).setContinuationToken(Controller.ContinuationToken.newBuilder().setToken((String)response.getValue()).build()).setStatus(Controller.StreamsInScopeResponse.Status.SUCCESS).build();
        }), responseObserver, requestTag);
    }

    public void deleteScope(Controller.ScopeInfo request, StreamObserver<Controller.DeleteScopeStatus> responseObserver) {
        RequestTag requestTag = this.requestTracker.initializeAndTrackRequestTag(this.requestIdGenerator.get().longValue(), new String[]{"deleteScope", request.getScope()});
        log.info(requestTag.getRequestId(), "deleteScope called for scope {}.", new Object[]{request.getScope()});
        this.authenticateExecuteAndProcessResults(() -> this.grpcAuthHelper.checkAuthorization(AuthResourceRepresentation.ofScopes(), AuthHandler.Permissions.READ_UPDATE), delegationToken -> this.controllerService.deleteScope(request.getScope()), responseObserver, requestTag);
    }

    public void getDelegationToken(Controller.StreamInfo request, StreamObserver<Controller.DelegationToken> responseObserver) {
        log.info("getDelegationToken called for stream {}/{}.", (Object)request.getScope(), (Object)request.getStream());
        this.authenticateExecuteAndProcessResults(() -> this.grpcAuthHelper.checkAuthorizationAndCreateToken(AuthResourceRepresentation.ofStreamInScope(request.getScope(), request.getStream()), AuthHandler.Permissions.READ_UPDATE), delegationToken -> {
            this.logIfEmpty((String)delegationToken, "getDelegationToken", request.getScope(), request.getStream());
            return CompletableFuture.completedFuture(Controller.DelegationToken.newBuilder().setDelegationToken(delegationToken).build());
        }, responseObserver);
    }

    public void noteTimestampFromWriter(Controller.TimestampFromWriter request, StreamObserver<Controller.TimestampResponse> responseObserver) {
        Controller.StreamInfo streamInfo = request.getPosition().getStreamInfo();
        log.info("noteWriterMark called for stream {}/{}, writer={} time={}", new Object[]{streamInfo.getScope(), streamInfo.getStream(), request.getWriter(), request.getTimestamp()});
        this.authenticateExecuteAndProcessResults(() -> this.grpcAuthHelper.checkAuthorization(AuthResourceRepresentation.ofStreamInScope(streamInfo.getScope(), streamInfo.getStream()), AuthHandler.Permissions.READ_UPDATE), delegationToken -> this.controllerService.noteTimestampFromWriter(streamInfo.getScope(), streamInfo.getStream(), request.getWriter(), request.getTimestamp(), request.getPosition().getCutMap()), responseObserver);
    }

    public void removeWriter(Controller.RemoveWriterRequest request, StreamObserver<Controller.RemoveWriterResponse> responseObserver) {
        Controller.StreamInfo streamInfo = request.getStream();
        log.info("writerShutdown called for stream {}/{}, writer={}", new Object[]{streamInfo.getScope(), streamInfo.getStream(), request.getWriter()});
        this.authenticateExecuteAndProcessResults(() -> this.grpcAuthHelper.checkAuthorization(AuthResourceRepresentation.ofStreamInScope(streamInfo.getScope(), streamInfo.getStream()), AuthHandler.Permissions.READ_UPDATE), delegationToken -> this.controllerService.removeWriter(streamInfo.getScope(), streamInfo.getStream(), request.getWriter()), responseObserver);
    }

    private void logIfEmpty(String delegationToken, String requestName, String scopeName, String streamName) {
        if (this.isAuthEnabled() && Strings.isNullOrEmpty((String)delegationToken)) {
            log.warn("Delegation token for request [{}] with scope [{}] and stream [{}], is: [{}]", new Object[]{requestName, scopeName, streamName, delegationToken});
        }
    }

    private <T> void authenticateExecuteAndProcessResults(Supplier<String> authenticator, Function<String, CompletableFuture<T>> call, StreamObserver<T> streamObserver, RequestTag requestTag) {
        try {
            String delegationToken = authenticator.get();
            CompletableFuture<T> result = call.apply(delegationToken);
            result.whenComplete((value, ex) -> {
                log.debug("result =  {}", value);
                if (ex != null) {
                    Throwable cause = Exceptions.unwrap((Throwable)ex);
                    this.logError(requestTag, cause);
                    String errorDescription = this.replyWithStackTraceOnError ? "controllerStackTrace=" + Throwables.getStackTraceAsString((Throwable)ex) : cause.getMessage();
                    streamObserver.onError((Throwable)this.getStatusFromException(cause).withCause(cause).withDescription(errorDescription).asRuntimeException());
                } else if (value != null) {
                    streamObserver.onNext(value);
                    streamObserver.onCompleted();
                }
                this.logAndUntrackRequestTag(requestTag);
            });
        }
        catch (Exception e) {
            log.error(e.getMessage(), (Throwable)e);
            this.logAndUntrackRequestTag(requestTag);
            streamObserver.onError((Throwable)Status.UNAUTHENTICATED.withDescription("Authentication failed").asRuntimeException());
        }
    }

    private <T> void authenticateExecuteAndProcessResults(Supplier<String> authenticator, Function<String, CompletableFuture<T>> call, StreamObserver<T> streamObserver) {
        this.authenticateExecuteAndProcessResults(authenticator, call, streamObserver, null);
    }

    private Status getStatusFromException(Throwable cause) {
        if (cause instanceof StoreException.DataExistsException) {
            return Status.ALREADY_EXISTS;
        }
        if (cause instanceof StoreException.DataNotFoundException) {
            return Status.NOT_FOUND;
        }
        if (cause instanceof StoreException.DataNotEmptyException) {
            return Status.FAILED_PRECONDITION;
        }
        if (cause instanceof StoreException.WriteConflictException) {
            return Status.FAILED_PRECONDITION;
        }
        if (cause instanceof StoreException.IllegalStateException) {
            return Status.INTERNAL;
        }
        if (cause instanceof StoreException.OperationNotAllowedException) {
            return Status.PERMISSION_DENIED;
        }
        if (cause instanceof StoreException.StoreConnectionException) {
            return Status.INTERNAL;
        }
        return Status.UNKNOWN;
    }

    private void logAndUntrackRequestTag(RequestTag requestTag) {
        if (requestTag != null) {
            log.debug(this.requestTracker.untrackRequest(requestTag.getRequestDescriptor()), "Untracking request: {}.", new Object[]{requestTag.getRequestDescriptor()});
        }
    }

    private void logError(RequestTag requestTag, Throwable cause) {
        String tag;
        String string = tag = requestTag == null ? "none" : requestTag.getRequestDescriptor();
        if (cause instanceof LockFailedException) {
            log.warn("Controller API call with tag {} failed with: {}", (Object)tag, (Object)cause.getMessage());
        } else {
            log.error("Controller API call with tag {} failed with error: ", (Object)tag, (Object)cause);
        }
    }

    private boolean isAuthEnabled() {
        return this.grpcAuthHelper.isAuthEnabled();
    }

    @ConstructorProperties(value={"controllerService", "grpcAuthHelper", "requestTracker", "replyWithStackTraceOnError", "listStreamsInScopeLimit"})
    @SuppressFBWarnings(justification="generated code")
    public ControllerServiceImpl(ControllerService controllerService, GrpcAuthHelper grpcAuthHelper, RequestTracker requestTracker, boolean replyWithStackTraceOnError, int listStreamsInScopeLimit) {
        this.controllerService = controllerService;
        this.grpcAuthHelper = grpcAuthHelper;
        this.requestTracker = requestTracker;
        this.replyWithStackTraceOnError = replyWithStackTraceOnError;
        this.listStreamsInScopeLimit = listStreamsInScopeLimit;
    }
}

