/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.client.stream.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.ClientConfig;
import io.pravega.client.ClientFactory;
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.SynchronizerClientFactory;
import io.pravega.client.admin.impl.ReaderGroupManagerImpl;
import io.pravega.client.batch.impl.BatchClientFactoryImpl;
import io.pravega.client.byteStream.impl.ByteStreamClientImpl;
import io.pravega.client.netty.impl.ConnectionFactory;
import io.pravega.client.netty.impl.ConnectionFactoryImpl;
import io.pravega.client.security.auth.DelegationTokenProvider;
import io.pravega.client.security.auth.DelegationTokenProviderFactory;
import io.pravega.client.segment.impl.ConditionalOutputStream;
import io.pravega.client.segment.impl.ConditionalOutputStreamFactory;
import io.pravega.client.segment.impl.ConditionalOutputStreamFactoryImpl;
import io.pravega.client.segment.impl.EventSegmentReader;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.segment.impl.SegmentInputStreamFactory;
import io.pravega.client.segment.impl.SegmentInputStreamFactoryImpl;
import io.pravega.client.segment.impl.SegmentMetadataClient;
import io.pravega.client.segment.impl.SegmentMetadataClientFactory;
import io.pravega.client.segment.impl.SegmentMetadataClientFactoryImpl;
import io.pravega.client.segment.impl.SegmentOutputStreamFactory;
import io.pravega.client.segment.impl.SegmentOutputStreamFactoryImpl;
import io.pravega.client.state.InitialUpdate;
import io.pravega.client.state.Revisioned;
import io.pravega.client.state.RevisionedStreamClient;
import io.pravega.client.state.StateSynchronizer;
import io.pravega.client.state.SynchronizerConfig;
import io.pravega.client.state.Update;
import io.pravega.client.state.impl.RevisionedStreamClientImpl;
import io.pravega.client.state.impl.StateSynchronizerImpl;
import io.pravega.client.state.impl.UpdateOrInitSerializer;
import io.pravega.client.stream.EventStreamReader;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.InvalidStreamException;
import io.pravega.client.stream.ReaderConfig;
import io.pravega.client.stream.Serializer;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.TransactionalEventStreamWriter;
import io.pravega.client.stream.impl.Controller;
import io.pravega.client.stream.impl.EventStreamReaderImpl;
import io.pravega.client.stream.impl.EventStreamWriterImpl;
import io.pravega.client.stream.impl.Orderer;
import io.pravega.client.stream.impl.ReaderGroupState;
import io.pravega.client.stream.impl.ReaderGroupStateManager;
import io.pravega.client.stream.impl.StreamImpl;
import io.pravega.client.stream.impl.StreamSegments;
import io.pravega.client.stream.impl.TransactionalEventStreamWriterImpl;
import io.pravega.client.stream.impl.WatermarkReaderImpl;
import io.pravega.client.watermark.WatermarkSerializer;
import io.pravega.common.concurrent.ExecutorServiceHelpers;
import io.pravega.common.concurrent.Futures;
import io.pravega.shared.NameUtils;
import io.pravega.shared.segment.StreamSegmentNameUtils;
import io.pravega.shared.watermarks.Watermark;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientFactoryImpl
implements ClientFactory,
EventStreamClientFactory,
SynchronizerClientFactory {
    @SuppressFBWarnings(justification="generated code")
    private static final Logger log = LoggerFactory.getLogger(ClientFactoryImpl.class);
    private final String scope;
    private final Controller controller;
    private final SegmentInputStreamFactory inFactory;
    private final SegmentOutputStreamFactory outFactory;
    private final ConditionalOutputStreamFactory condFactory;
    private final SegmentMetadataClientFactory metaFactory;
    private final ConnectionFactory connectionFactory;
    private final ScheduledExecutorService watermarkReaderThreads = ExecutorServiceHelpers.newScheduledThreadPool((int)this.getThreadPoolSize(), (String)"WatermarkReader");

    public ClientFactoryImpl(String scope, Controller controller) {
        Preconditions.checkNotNull((Object)scope);
        Preconditions.checkNotNull((Object)controller);
        this.scope = scope;
        this.controller = controller;
        this.connectionFactory = new ConnectionFactoryImpl(ClientConfig.builder().build());
        this.inFactory = new SegmentInputStreamFactoryImpl(controller, this.connectionFactory);
        this.outFactory = new SegmentOutputStreamFactoryImpl(controller, this.connectionFactory);
        this.condFactory = new ConditionalOutputStreamFactoryImpl(controller, this.connectionFactory);
        this.metaFactory = new SegmentMetadataClientFactoryImpl(controller, this.connectionFactory);
    }

    @VisibleForTesting
    public ClientFactoryImpl(String scope, Controller controller, ConnectionFactory connectionFactory) {
        this(scope, controller, connectionFactory, new SegmentInputStreamFactoryImpl(controller, connectionFactory), new SegmentOutputStreamFactoryImpl(controller, connectionFactory), new ConditionalOutputStreamFactoryImpl(controller, connectionFactory), new SegmentMetadataClientFactoryImpl(controller, connectionFactory));
    }

    @VisibleForTesting
    public ClientFactoryImpl(String scope, Controller controller, ConnectionFactory connectionFactory, SegmentInputStreamFactory inFactory, SegmentOutputStreamFactory outFactory, ConditionalOutputStreamFactory condFactory, SegmentMetadataClientFactory metaFactory) {
        Preconditions.checkNotNull((Object)scope);
        Preconditions.checkNotNull((Object)controller);
        Preconditions.checkNotNull((Object)inFactory);
        Preconditions.checkNotNull((Object)outFactory);
        Preconditions.checkNotNull((Object)condFactory);
        Preconditions.checkNotNull((Object)metaFactory);
        this.scope = scope;
        this.controller = controller;
        this.connectionFactory = connectionFactory;
        this.inFactory = inFactory;
        this.outFactory = outFactory;
        this.condFactory = condFactory;
        this.metaFactory = metaFactory;
    }

    @Override
    public <T> EventStreamWriter<T> createEventWriter(String streamName, Serializer<T> s, EventWriterConfig config) {
        return this.createEventWriter(UUID.randomUUID().toString(), streamName, s, config);
    }

    @Override
    public <T> EventStreamWriter<T> createEventWriter(String writerId, String streamName, Serializer<T> s, EventWriterConfig config) {
        log.info("Creating writer: {} for stream: {} with configuration: {}", new Object[]{writerId, streamName, config});
        StreamImpl stream = new StreamImpl(this.scope, streamName);
        ThreadPoolExecutor retransmitPool = ExecutorServiceHelpers.getShrinkingExecutor((int)1, (int)100, (String)("ScalingRetransmition-" + stream.getScopedName()));
        return new EventStreamWriterImpl<T>(stream, writerId, this.controller, this.outFactory, s, config, retransmitPool, this.connectionFactory.getInternalExecutor());
    }

    @Override
    public <T> TransactionalEventStreamWriter<T> createTransactionalEventWriter(String writerId, String streamName, Serializer<T> s, EventWriterConfig config) {
        log.info("Creating transactional writer:{} for stream: {} with configuration: {}", new Object[]{writerId, streamName, config});
        StreamImpl stream = new StreamImpl(this.scope, streamName);
        return new TransactionalEventStreamWriterImpl<T>(stream, writerId, this.controller, this.outFactory, s, config, this.connectionFactory.getInternalExecutor());
    }

    @Override
    public <T> TransactionalEventStreamWriter<T> createTransactionalEventWriter(String streamName, Serializer<T> s, EventWriterConfig config) {
        return this.createTransactionalEventWriter(UUID.randomUUID().toString(), streamName, s, config);
    }

    @Override
    public <T> EventStreamReader<T> createReader(String readerId, String readerGroup, Serializer<T> s, ReaderConfig config) {
        log.info("Creating reader: {} under readerGroup: {} with configuration: {}", new Object[]{readerId, readerGroup, config});
        return this.createReader(readerId, readerGroup, s, config, System::nanoTime, System::currentTimeMillis);
    }

    @VisibleForTesting
    public <T> EventStreamReader<T> createReader(String readerId, String readerGroup, Serializer<T> s, ReaderConfig config, Supplier<Long> nanoTime, Supplier<Long> milliTime) {
        log.info("Creating reader: {} under readerGroup: {} with configuration: {}", new Object[]{readerId, readerGroup, config});
        SynchronizerConfig synchronizerConfig = SynchronizerConfig.builder().build();
        StateSynchronizer<ReaderGroupState> sync = this.createStateSynchronizer(NameUtils.getStreamForReaderGroup((String)readerGroup), new ReaderGroupManagerImpl.ReaderGroupStateUpdatesSerializer(), new ReaderGroupManagerImpl.ReaderGroupStateInitSerializer(), synchronizerConfig);
        ReaderGroupStateManager stateManager = new ReaderGroupStateManager(readerId, sync, this.controller, nanoTime);
        stateManager.initializeReader(config.getInitialAllocationDelay());
        ImmutableMap.Builder watermarkReaders = ImmutableMap.builder();
        if (!config.isDisableTimeWindows()) {
            for (Stream stream : stateManager.getStreams()) {
                String streamName = StreamSegmentNameUtils.getMarkForStream((String)stream.getStreamName());
                RevisionedStreamClient<Watermark> client = this.createRevisionedStreamClient(this.getSegmentForRevisionedClient(stream.getScope(), streamName), new WatermarkSerializer(), SynchronizerConfig.builder().readBufferSize(4096).build());
                watermarkReaders.put((Object)stream, (Object)new WatermarkReaderImpl(stream, client, this.watermarkReaderThreads));
            }
        }
        return new EventStreamReaderImpl<T>(this.inFactory, this.metaFactory, s, stateManager, new Orderer(), milliTime, config, (ImmutableMap<Stream, WatermarkReaderImpl>)watermarkReaders.build(), this.controller);
    }

    @Override
    public <T> RevisionedStreamClient<T> createRevisionedStreamClient(String streamName, Serializer<T> serializer, SynchronizerConfig config) {
        log.info("Creating revisioned stream client for stream: {} with synchronizer configuration: {}", (Object)streamName, (Object)config);
        return this.createRevisionedStreamClient(this.getSegmentForRevisionedClient(this.scope, streamName), serializer, config);
    }

    private <T> RevisionedStreamClient<T> createRevisionedStreamClient(Segment segment, Serializer<T> serializer, SynchronizerConfig config) {
        EventSegmentReader in = this.inFactory.createEventReaderForSegment(segment, config.getReadBufferSize());
        String delegationToken = (String)Futures.getAndHandleExceptions(this.controller.getOrRefreshDelegationTokenFor(segment.getScope(), segment.getStreamName()), RuntimeException::new);
        DelegationTokenProvider delegationTokenProvider = DelegationTokenProviderFactory.create(delegationToken, this.controller, segment);
        ConditionalOutputStream cond = this.condFactory.createConditionalOutputStream(segment, delegationTokenProvider, config.getEventWriterConfig());
        SegmentMetadataClient meta = this.metaFactory.createSegmentMetadataClient(segment, delegationTokenProvider);
        return new RevisionedStreamClientImpl<T>(segment, in, this.outFactory, cond, meta, serializer, config.getEventWriterConfig(), delegationTokenProvider);
    }

    @Override
    public <StateT extends Revisioned, UpdateT extends Update<StateT>, InitT extends InitialUpdate<StateT>> StateSynchronizer<StateT> createStateSynchronizer(String streamName, Serializer<UpdateT> updateSerializer, Serializer<InitT> initialSerializer, SynchronizerConfig config) {
        log.info("Creating state synchronizer with stream: {} and configuration: {}", (Object)streamName, (Object)config);
        UpdateOrInitSerializer serializer = new UpdateOrInitSerializer(updateSerializer, initialSerializer);
        Segment segment = this.getSegmentForRevisionedClient(this.scope, streamName);
        return new StateSynchronizerImpl(segment, this.createRevisionedStreamClient(segment, serializer, config));
    }

    private Segment getSegmentForRevisionedClient(String scope, String streamName) {
        StreamSegments currentSegments = (StreamSegments)Futures.getAndHandleExceptions(this.controller.getCurrentSegments(scope, streamName), InvalidStreamException::new);
        if (currentSegments == null || currentSegments.getSegments().size() == 0) {
            throw new InvalidStreamException("Stream does not exist: " + streamName);
        }
        return currentSegments.getSegmentForKey(0.0);
    }

    @Override
    @Deprecated
    public BatchClientFactoryImpl createBatchClient() {
        return new BatchClientFactoryImpl(this.controller, this.connectionFactory);
    }

    @Override
    @Deprecated
    public ByteStreamClientImpl createByteStreamClient() {
        return new ByteStreamClientImpl(this.scope, this.controller, this.connectionFactory);
    }

    @Override
    public void close() {
        this.controller.close();
        this.connectionFactory.close();
    }

    private int getThreadPoolSize() {
        String configuredThreads = System.getProperty("pravega.client.internal.threadpool.size", null);
        if (configuredThreads != null) {
            return Integer.parseInt(configuredThreads);
        }
        return Runtime.getRuntime().availableProcessors();
    }
}

