/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.client.admin.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.ClientConfig;
import io.pravega.client.admin.KeyValueTableInfo;
import io.pravega.client.admin.StreamInfo;
import io.pravega.client.admin.StreamManager;
import io.pravega.client.admin.impl.StreamCutHelper;
import io.pravega.client.connection.impl.ConnectionPool;
import io.pravega.client.connection.impl.ConnectionPoolImpl;
import io.pravega.client.connection.impl.SocketConnectionFactoryImpl;
import io.pravega.client.control.impl.Controller;
import io.pravega.client.control.impl.ControllerFailureException;
import io.pravega.client.control.impl.ControllerImpl;
import io.pravega.client.control.impl.ControllerImplConfig;
import io.pravega.client.segment.impl.EndOfSegmentException;
import io.pravega.client.segment.impl.EventSegmentReader;
import io.pravega.client.segment.impl.NoSuchEventException;
import io.pravega.client.segment.impl.NoSuchSegmentException;
import io.pravega.client.segment.impl.SegmentInputStreamFactory;
import io.pravega.client.segment.impl.SegmentInputStreamFactoryImpl;
import io.pravega.client.segment.impl.SegmentTruncatedException;
import io.pravega.client.stream.DeleteScopeFailedException;
import io.pravega.client.stream.EventPointer;
import io.pravega.client.stream.InvalidStreamException;
import io.pravega.client.stream.ReaderGroupNotFoundException;
import io.pravega.client.stream.Serializer;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.client.stream.StreamCut;
import io.pravega.client.stream.TransactionInfo;
import io.pravega.client.stream.impl.EventSegmentReaderUtility;
import io.pravega.client.stream.impl.StreamCutImpl;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.function.Callbacks;
import io.pravega.common.util.AsyncIterator;
import io.pravega.shared.NameUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamManagerImpl
implements StreamManager {
    @SuppressFBWarnings(justification="generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(StreamManagerImpl.class);
    private final Controller controller;
    @VisibleForTesting
    private final ConnectionPool connectionPool;
    private final StreamCutHelper streamCutHelper;
    private final SegmentInputStreamFactory inputStreamFactory;
    private final EventSegmentReaderUtility eventSegmentReaderUtility;

    public StreamManagerImpl(ClientConfig clientConfig) {
        this(clientConfig, ControllerImplConfig.builder().clientConfig(clientConfig).build());
    }

    @VisibleForTesting
    public StreamManagerImpl(ClientConfig clientConfig, ControllerImplConfig controllerConfig) {
        this(controllerConfig, (ConnectionPool)new ConnectionPoolImpl(clientConfig, new SocketConnectionFactoryImpl(clientConfig)));
    }

    private StreamManagerImpl(ControllerImplConfig controllerConfig, ConnectionPool connectionPool) {
        this(new ControllerImpl(controllerConfig, connectionPool.getInternalExecutor()), connectionPool);
    }

    @VisibleForTesting
    public StreamManagerImpl(Controller controller, ConnectionPool connectionPool) {
        this(controller, connectionPool, new SegmentInputStreamFactoryImpl(controller, connectionPool));
    }

    @VisibleForTesting
    public StreamManagerImpl(Controller controller, ConnectionPool connectionPool, SegmentInputStreamFactory inputStreamFactory) {
        this.connectionPool = connectionPool;
        this.controller = controller;
        this.streamCutHelper = new StreamCutHelper(controller, connectionPool);
        this.inputStreamFactory = inputStreamFactory;
        this.eventSegmentReaderUtility = new EventSegmentReaderUtility(inputStreamFactory);
    }

    @Override
    public boolean createStream(String scopeName, String streamName, StreamConfiguration config) {
        NameUtils.validateUserStreamName((String)streamName);
        NameUtils.validateUserScopeName((String)scopeName);
        log.info("Creating scope/stream: {}/{} with configuration: {}", new Object[]{scopeName, streamName, config});
        return (Boolean)Futures.getThrowingException(this.controller.createStream(scopeName, streamName, StreamConfiguration.builder().scalingPolicy(config.getScalingPolicy()).retentionPolicy(config.getRetentionPolicy()).tags(config.getTags()).build()));
    }

    @Override
    public boolean updateStream(String scopeName, String streamName, StreamConfiguration config) {
        NameUtils.validateUserStreamName((String)streamName);
        NameUtils.validateUserScopeName((String)scopeName);
        log.info("Updating scope/stream: {}/{} with configuration: {}", new Object[]{scopeName, streamName, config});
        return (Boolean)Futures.getThrowingException(this.controller.updateStream(scopeName, streamName, StreamConfiguration.builder().scalingPolicy(config.getScalingPolicy()).retentionPolicy(config.getRetentionPolicy()).tags(config.getTags()).build()));
    }

    @Override
    public boolean truncateStream(String scopeName, String streamName, StreamCut streamCut) {
        NameUtils.validateUserStreamName((String)streamName);
        NameUtils.validateUserScopeName((String)scopeName);
        Preconditions.checkNotNull((Object)streamCut);
        log.info("Truncating scope/stream: {}/{} with stream cut: {}", new Object[]{scopeName, streamName, streamCut});
        return (Boolean)Futures.getThrowingException(this.controller.truncateStream(scopeName, streamName, streamCut));
    }

    @Override
    public boolean sealStream(String scopeName, String streamName) {
        NameUtils.validateUserStreamName((String)streamName);
        NameUtils.validateUserScopeName((String)scopeName);
        log.info("Sealing scope/stream: {}/{}", (Object)scopeName, (Object)streamName);
        return (Boolean)Futures.getThrowingException(this.controller.sealStream(scopeName, streamName));
    }

    @Override
    public boolean deleteStream(String scopeName, String streamName) {
        NameUtils.validateUserStreamName((String)streamName);
        NameUtils.validateUserScopeName((String)scopeName);
        log.info("Deleting scope/stream: {}/{}", (Object)scopeName, (Object)streamName);
        return (Boolean)Futures.getThrowingException(this.controller.deleteStream(scopeName, streamName));
    }

    @Override
    public Iterator<String> listScopes() {
        log.info("Listing scopes");
        AsyncIterator<String> asyncIterator = this.controller.listScopes();
        return asyncIterator.asIterator();
    }

    @Override
    public boolean createScope(String scopeName) {
        NameUtils.validateUserScopeName((String)scopeName);
        log.info("Creating scope: {}", (Object)scopeName);
        return (Boolean)Futures.getThrowingException(this.controller.createScope(scopeName));
    }

    @Override
    public boolean checkScopeExists(String scopeName) {
        log.info("Checking if scope {} exists", (Object)scopeName);
        return (Boolean)Futures.getThrowingException(this.controller.checkScopeExists(scopeName));
    }

    @Override
    public List<TransactionInfo> listCompletedTransactions(Stream stream) {
        log.info("Listing completed transactions for stream : {}", (Object)stream.getStreamName());
        return (List)Futures.getThrowingException(this.controller.listCompletedTransactions(stream));
    }

    @Override
    public Iterator<Stream> listStreams(String scopeName) {
        NameUtils.validateUserScopeName((String)scopeName);
        log.info("Listing streams in scope: {}", (Object)scopeName);
        AsyncIterator<Stream> asyncIterator = this.controller.listStreams(scopeName);
        return asyncIterator.asIterator();
    }

    @Override
    public Iterator<Stream> listStreams(String scopeName, String tagName) {
        NameUtils.validateUserScopeName((String)scopeName);
        log.info("Listing streams in scope: {} which has tag: {}", (Object)scopeName, (Object)tagName);
        AsyncIterator<Stream> asyncIterator = this.controller.listStreamsForTag(scopeName, tagName);
        return asyncIterator.asIterator();
    }

    @Override
    public Collection<String> getStreamTags(String scopeName, String streamName) {
        NameUtils.validateUserScopeName((String)scopeName);
        NameUtils.validateUserStreamName((String)streamName);
        log.info("Fetching tags associated with stream: {}/{}", (Object)scopeName, (Object)streamName);
        return (Collection)Futures.getThrowingException((Future)((Object)this.controller.getStreamConfiguration(scopeName, streamName).thenApply(StreamConfiguration::getTags)));
    }

    @Override
    public boolean checkStreamExists(String scopeName, String streamName) {
        log.info("Checking if stream {} exists in scope {}", (Object)streamName, (Object)scopeName);
        return (Boolean)Futures.getThrowingException(this.controller.checkStreamExists(scopeName, streamName));
    }

    @Override
    public boolean deleteScopeRecursive(String scopeName) {
        NameUtils.validateUserScopeName((String)scopeName);
        log.info("Deleting scope recursively: {}", (Object)scopeName);
        return (Boolean)Futures.getThrowingException(this.controller.deleteScopeRecursive(scopeName));
    }

    @Override
    public boolean deleteScope(String scopeName) {
        NameUtils.validateUserScopeName((String)scopeName);
        log.info("Deleting scope: {}", (Object)scopeName);
        return (Boolean)Futures.getThrowingException(this.controller.deleteScope(scopeName));
    }

    @Override
    @Deprecated
    public boolean deleteScope(String scopeName, boolean forceDelete) throws DeleteScopeFailedException {
        NameUtils.validateUserScopeName((String)scopeName);
        if (forceDelete) {
            log.info("Deleting scope recursively: {}", (Object)scopeName);
            ArrayList<String> readerGroupList = new ArrayList<String>();
            Iterator<Stream> iterator = this.listStreams(scopeName);
            while (iterator.hasNext()) {
                Stream stream = iterator.next();
                if (stream.getStreamName().startsWith("_RG")) {
                    readerGroupList.add(stream.getStreamName().substring("_RG".length()));
                }
                try {
                    Futures.getThrowingException((Future)((Object)Futures.exceptionallyExpecting(this.controller.sealStream(stream.getScope(), stream.getStreamName()), e -> {
                        Throwable unwrap = Exceptions.unwrap((Throwable)e);
                        return unwrap instanceof InvalidStreamException || unwrap instanceof ControllerFailureException;
                    }, (Object)false).thenCompose(sealed -> this.controller.deleteStream(stream.getScope(), stream.getStreamName()))));
                }
                catch (Exception e2) {
                    String message = String.format("Failed to seal and delete stream %s", stream.getStreamName());
                    throw new DeleteScopeFailedException(message, e2);
                }
            }
            Iterator kvtIterator = this.controller.listKeyValueTables(scopeName).asIterator();
            while (kvtIterator.hasNext()) {
                KeyValueTableInfo kvt = (KeyValueTableInfo)kvtIterator.next();
                try {
                    Futures.getThrowingException(this.controller.deleteKeyValueTable(scopeName, kvt.getKeyValueTableName()));
                }
                catch (Exception e3) {
                    String message = String.format("Failed to delete key-value table %s", kvt.getKeyValueTableName());
                    throw new DeleteScopeFailedException(message, e3);
                }
            }
            for (String groupName : readerGroupList) {
                try {
                    Futures.getThrowingException((Future)((Object)this.controller.getReaderGroupConfig(scopeName, groupName).thenCompose(conf -> this.controller.deleteReaderGroup(scopeName, groupName, conf.getReaderGroupId()))));
                }
                catch (Exception e4) {
                    if (Exceptions.unwrap((Throwable)e4) instanceof ReaderGroupNotFoundException) continue;
                    String message = String.format("Failed to delete reader group %s", groupName);
                    throw new DeleteScopeFailedException(message, e4);
                }
            }
        }
        return (Boolean)Futures.getThrowingException(this.controller.deleteScope(scopeName));
    }

    @Override
    public StreamInfo getStreamInfo(String scopeName, String streamName) {
        NameUtils.validateUserStreamName((String)streamName);
        NameUtils.validateUserScopeName((String)scopeName);
        log.info("Fetching StreamInfo for scope/stream: {}/{}", (Object)scopeName, (Object)streamName);
        return (StreamInfo)Futures.getThrowingException(this.getStreamInfo(Stream.of(scopeName, streamName)));
    }

    private CompletableFuture<StreamInfo> getStreamInfo(Stream stream) {
        CompletableFuture<StreamConfiguration> streamConfiguration = this.controller.getStreamConfiguration(stream.getScope(), stream.getStreamName());
        CompletableFuture<StreamCut> currentTailStreamCut = this.streamCutHelper.fetchTailStreamCut(stream);
        CompletableFuture<StreamCut> currentHeadStreamCut = this.streamCutHelper.fetchHeadStreamCut(stream);
        return CompletableFuture.allOf(streamConfiguration, currentHeadStreamCut, currentTailStreamCut).thenApply(v -> {
            boolean isSealed = ((StreamCutImpl)currentTailStreamCut.join()).getPositions().isEmpty();
            return new StreamInfo(stream.getScope(), stream.getStreamName(), (StreamConfiguration)streamConfiguration.join(), (StreamCut)currentTailStreamCut.join(), (StreamCut)currentHeadStreamCut.join(), isSealed);
        });
    }

    @Override
    public <T> CompletableFuture<T> fetchEvent(EventPointer pointer, Serializer<T> serializer) {
        Preconditions.checkNotNull((Object)pointer);
        Preconditions.checkNotNull(serializer);
        CompletableFuture<Object> completableFuture = CompletableFuture.supplyAsync(() -> {
            EventSegmentReader inputStream = this.eventSegmentReaderUtility.createEventSegmentReader(pointer);
            try {
                ByteBuffer buffer = inputStream.read();
                Object t = serializer.deserialize(buffer);
                return t;
            }
            catch (EndOfSegmentException e) {
                throw Exceptions.sneakyThrow((Throwable)new NoSuchEventException(e.getMessage()));
            }
            catch (NoSuchSegmentException | SegmentTruncatedException e) {
                throw Exceptions.sneakyThrow((Throwable)new NoSuchEventException("Event no longer exists."));
            }
            finally {
                if (Collections.singletonList(inputStream).get(0) != null) {
                    inputStream.close();
                }
            }
        });
        return completableFuture;
    }

    @Override
    public void close() {
        if (this.controller != null) {
            Callbacks.invokeSafely(this.controller::close, ex -> log.error("Unable to close Controller client.", ex));
        }
        if (this.connectionPool != null) {
            this.connectionPool.close();
        }
    }

    @SuppressFBWarnings(justification="generated code")
    @Generated
    ConnectionPool getConnectionPool() {
        return this.connectionPool;
    }
}

