/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.client.admin.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.ClientConfig;
import io.pravega.client.admin.StreamInfo;
import io.pravega.client.admin.StreamManager;
import io.pravega.client.admin.impl.StreamCutHelper;
import io.pravega.client.connection.impl.ConnectionPool;
import io.pravega.client.connection.impl.ConnectionPoolImpl;
import io.pravega.client.connection.impl.SocketConnectionFactoryImpl;
import io.pravega.client.control.impl.Controller;
import io.pravega.client.control.impl.ControllerFailureException;
import io.pravega.client.control.impl.ControllerImpl;
import io.pravega.client.control.impl.ControllerImplConfig;
import io.pravega.client.stream.DeleteScopeFailedException;
import io.pravega.client.stream.InvalidStreamException;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.client.stream.StreamCut;
import io.pravega.client.stream.impl.StreamCutImpl;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.ExecutorServiceHelpers;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.function.Callbacks;
import io.pravega.common.util.AsyncIterator;
import io.pravega.shaded.com.google.common.annotations.VisibleForTesting;
import io.pravega.shaded.com.google.common.base.Preconditions;
import io.pravega.shared.NameUtils;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamManagerImpl
implements StreamManager {
    @SuppressFBWarnings(justification="generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(StreamManagerImpl.class);
    private final Controller controller;
    private final ConnectionPool connectionPool;
    private final ScheduledExecutorService executor;
    private final StreamCutHelper streamCutHelper;

    public StreamManagerImpl(ClientConfig clientConfig) {
        this.executor = ExecutorServiceHelpers.newScheduledThreadPool(1, "StreamManager-Controller");
        this.controller = new ControllerImpl(ControllerImplConfig.builder().clientConfig(clientConfig).build(), this.executor);
        this.connectionPool = new ConnectionPoolImpl(clientConfig, new SocketConnectionFactoryImpl(clientConfig));
        this.streamCutHelper = new StreamCutHelper(this.controller, this.connectionPool);
    }

    @VisibleForTesting
    public StreamManagerImpl(Controller controller, ConnectionPool connectionPool) {
        this.executor = null;
        this.controller = controller;
        this.connectionPool = connectionPool;
        this.streamCutHelper = new StreamCutHelper(controller, connectionPool);
    }

    @Override
    public boolean createStream(String scopeName, String streamName, StreamConfiguration config) {
        NameUtils.validateUserStreamName(streamName);
        NameUtils.validateUserScopeName(scopeName);
        log.info("Creating scope/stream: {}/{} with configuration: {}", new Object[]{scopeName, streamName, config});
        return Futures.getThrowingException(this.controller.createStream(scopeName, streamName, StreamConfiguration.builder().scalingPolicy(config.getScalingPolicy()).retentionPolicy(config.getRetentionPolicy()).build()));
    }

    @Override
    public boolean updateStream(String scopeName, String streamName, StreamConfiguration config) {
        NameUtils.validateUserStreamName(streamName);
        NameUtils.validateUserScopeName(scopeName);
        log.info("Updating scope/stream: {}/{} with configuration: {}", new Object[]{scopeName, streamName, config});
        return Futures.getThrowingException(this.controller.updateStream(scopeName, streamName, StreamConfiguration.builder().scalingPolicy(config.getScalingPolicy()).retentionPolicy(config.getRetentionPolicy()).build()));
    }

    @Override
    public boolean truncateStream(String scopeName, String streamName, StreamCut streamCut) {
        NameUtils.validateUserStreamName(streamName);
        NameUtils.validateUserScopeName(scopeName);
        Preconditions.checkNotNull(streamCut);
        log.info("Truncating scope/stream: {}/{} with stream cut: {}", new Object[]{scopeName, streamName, streamCut});
        return Futures.getThrowingException(this.controller.truncateStream(scopeName, streamName, streamCut));
    }

    @Override
    public boolean sealStream(String scopeName, String streamName) {
        NameUtils.validateUserStreamName(streamName);
        NameUtils.validateUserScopeName(scopeName);
        log.info("Sealing scope/stream: {}/{}", (Object)scopeName, (Object)streamName);
        return Futures.getThrowingException(this.controller.sealStream(scopeName, streamName));
    }

    @Override
    public boolean deleteStream(String scopeName, String streamName) {
        NameUtils.validateUserStreamName(streamName);
        NameUtils.validateUserScopeName(scopeName);
        log.info("Deleting scope/stream: {}/{}", (Object)scopeName, (Object)streamName);
        return Futures.getThrowingException(this.controller.deleteStream(scopeName, streamName));
    }

    @Override
    public Iterator<String> listScopes() {
        log.info("Listing scopes");
        AsyncIterator<String> asyncIterator = this.controller.listScopes();
        return asyncIterator.asIterator();
    }

    @Override
    public boolean createScope(String scopeName) {
        NameUtils.validateUserScopeName(scopeName);
        log.info("Creating scope: {}", (Object)scopeName);
        return Futures.getThrowingException(this.controller.createScope(scopeName));
    }

    @Override
    public boolean checkScopeExists(String scopeName) {
        log.info("Checking if scope {} exists", (Object)scopeName);
        return Futures.getThrowingException(this.controller.checkScopeExists(scopeName));
    }

    @Override
    public Iterator<Stream> listStreams(String scopeName) {
        NameUtils.validateUserScopeName(scopeName);
        log.info("Listing streams in scope: {}", (Object)scopeName);
        AsyncIterator<Stream> asyncIterator = this.controller.listStreams(scopeName);
        return asyncIterator.asIterator();
    }

    @Override
    public boolean checkStreamExists(String scopeName, String streamName) {
        log.info("Checking if stream {} exists in scope {}", (Object)streamName, (Object)scopeName);
        return Futures.getThrowingException(this.controller.checkStreamExists(scopeName, streamName));
    }

    @Override
    public boolean deleteScope(String scopeName) {
        NameUtils.validateUserScopeName(scopeName);
        log.info("Deleting scope: {}", (Object)scopeName);
        return Futures.getThrowingException(this.controller.deleteScope(scopeName));
    }

    @Override
    public boolean deleteScope(String scopeName, boolean deleteStreams) throws DeleteScopeFailedException {
        NameUtils.validateUserScopeName(scopeName);
        log.info("Deleting scope: {}", (Object)scopeName);
        if (deleteStreams) {
            Iterator<Stream> iterator = this.listStreams(scopeName);
            while (iterator.hasNext()) {
                Stream stream = iterator.next();
                try {
                    Futures.getThrowingException(Futures.exceptionallyExpecting(this.controller.sealStream(stream.getScope(), stream.getStreamName()), e -> {
                        Throwable unwrap = Exceptions.unwrap(e);
                        return unwrap instanceof InvalidStreamException || unwrap instanceof ControllerFailureException;
                    }, false).thenCompose(sealed -> this.controller.deleteStream(stream.getScope(), stream.getStreamName())));
                }
                catch (Exception e2) {
                    String message = String.format("Failed to seal and delete stream %s", stream.getStreamName());
                    throw new DeleteScopeFailedException(message, e2);
                }
            }
        }
        return Futures.getThrowingException(this.controller.deleteScope(scopeName));
    }

    @Override
    public StreamInfo getStreamInfo(String scopeName, String streamName) {
        NameUtils.validateUserStreamName(streamName);
        NameUtils.validateUserScopeName(scopeName);
        log.info("Fetching StreamInfo for scope/stream: {}/{}", (Object)scopeName, (Object)streamName);
        return Futures.getThrowingException(this.getStreamInfo(Stream.of(scopeName, streamName)));
    }

    private CompletableFuture<StreamInfo> getStreamInfo(Stream stream) {
        CompletableFuture<StreamCut> currentTailStreamCut = this.streamCutHelper.fetchTailStreamCut(stream);
        CompletableFuture<StreamCut> currentHeadStreamCut = this.streamCutHelper.fetchHeadStreamCut(stream);
        return currentTailStreamCut.thenCombine(currentHeadStreamCut, (tailSC, headSC) -> {
            boolean isSealed = ((StreamCutImpl)tailSC).getPositions().isEmpty();
            return new StreamInfo(stream.getScope(), stream.getStreamName(), (StreamCut)tailSC, (StreamCut)headSC, isSealed);
        });
    }

    @Override
    public void close() {
        if (this.controller != null) {
            Callbacks.invokeSafely(this.controller::close, ex -> log.error("Unable to close Controller client.", ex));
        }
        if (this.executor != null) {
            ExecutorServiceHelpers.shutdown(this.executor);
        }
        if (this.connectionPool != null) {
            this.connectionPool.close();
        }
    }
}

