/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.client.stream.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.ClientConfig;
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.SynchronizerClientFactory;
import io.pravega.client.admin.impl.ReaderGroupManagerImpl;
import io.pravega.client.connection.impl.ConnectionFactory;
import io.pravega.client.connection.impl.ConnectionPool;
import io.pravega.client.connection.impl.ConnectionPoolImpl;
import io.pravega.client.connection.impl.SocketConnectionFactoryImpl;
import io.pravega.client.control.impl.Controller;
import io.pravega.client.security.auth.DelegationTokenProvider;
import io.pravega.client.security.auth.DelegationTokenProviderFactory;
import io.pravega.client.segment.impl.ConditionalOutputStream;
import io.pravega.client.segment.impl.ConditionalOutputStreamFactory;
import io.pravega.client.segment.impl.ConditionalOutputStreamFactoryImpl;
import io.pravega.client.segment.impl.EventSegmentReader;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.segment.impl.SegmentInputStreamFactory;
import io.pravega.client.segment.impl.SegmentInputStreamFactoryImpl;
import io.pravega.client.segment.impl.SegmentMetadataClient;
import io.pravega.client.segment.impl.SegmentMetadataClientFactory;
import io.pravega.client.segment.impl.SegmentMetadataClientFactoryImpl;
import io.pravega.client.segment.impl.SegmentOutputStreamFactory;
import io.pravega.client.segment.impl.SegmentOutputStreamFactoryImpl;
import io.pravega.client.state.InitialUpdate;
import io.pravega.client.state.Revisioned;
import io.pravega.client.state.RevisionedStreamClient;
import io.pravega.client.state.StateSynchronizer;
import io.pravega.client.state.SynchronizerConfig;
import io.pravega.client.state.Update;
import io.pravega.client.state.impl.RevisionedStreamClientImpl;
import io.pravega.client.state.impl.StateSynchronizerImpl;
import io.pravega.client.state.impl.UpdateOrInitSerializer;
import io.pravega.client.stream.EventStreamReader;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.InvalidStreamException;
import io.pravega.client.stream.ReaderConfig;
import io.pravega.client.stream.Serializer;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.TransactionalEventStreamWriter;
import io.pravega.client.stream.impl.AbstractClientFactoryImpl;
import io.pravega.client.stream.impl.EventStreamReaderImpl;
import io.pravega.client.stream.impl.EventStreamWriterImpl;
import io.pravega.client.stream.impl.Orderer;
import io.pravega.client.stream.impl.ReaderGroupState;
import io.pravega.client.stream.impl.ReaderGroupStateManager;
import io.pravega.client.stream.impl.StreamImpl;
import io.pravega.client.stream.impl.StreamSegments;
import io.pravega.client.stream.impl.TransactionalEventStreamWriterImpl;
import io.pravega.client.stream.impl.WatermarkReaderImpl;
import io.pravega.client.watermark.WatermarkSerializer;
import io.pravega.common.concurrent.ExecutorServiceHelpers;
import io.pravega.common.concurrent.Futures;
import io.pravega.shaded.com.google.common.annotations.VisibleForTesting;
import io.pravega.shaded.com.google.common.base.Preconditions;
import io.pravega.shaded.com.google.common.collect.ImmutableMap;
import io.pravega.shared.NameUtils;
import io.pravega.shared.security.auth.AccessOperation;
import io.pravega.shared.watermarks.Watermark;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Supplier;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientFactoryImpl
extends AbstractClientFactoryImpl
implements EventStreamClientFactory,
SynchronizerClientFactory {
    @SuppressFBWarnings(justification="generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ClientFactoryImpl.class);
    private final SegmentInputStreamFactory inFactory;
    private final SegmentOutputStreamFactory outFactory;
    private final ConditionalOutputStreamFactory condFactory;
    private final SegmentMetadataClientFactory metaFactory;
    private final ScheduledExecutorService watermarkReaderThreads = ExecutorServiceHelpers.newScheduledThreadPool(this.getThreadPoolSize(), "WatermarkReader");

    public ClientFactoryImpl(String scope, Controller controller, ClientConfig config) {
        super(scope, controller, new ConnectionPoolImpl(config, new SocketConnectionFactoryImpl(config)));
        this.inFactory = new SegmentInputStreamFactoryImpl(controller, this.connectionPool);
        this.outFactory = new SegmentOutputStreamFactoryImpl(controller, this.connectionPool);
        this.condFactory = new ConditionalOutputStreamFactoryImpl(controller, this.connectionPool);
        this.metaFactory = new SegmentMetadataClientFactoryImpl(controller, this.connectionPool);
    }

    @VisibleForTesting
    public ClientFactoryImpl(String scope, Controller controller, ConnectionFactory connectionFactory) {
        this(scope, controller, new ConnectionPoolImpl(ClientConfig.builder().build(), connectionFactory));
    }

    @VisibleForTesting
    public ClientFactoryImpl(String scope, Controller controller, ConnectionPool pool) {
        super(scope, controller, pool);
        this.inFactory = new SegmentInputStreamFactoryImpl(controller, this.connectionPool);
        this.outFactory = new SegmentOutputStreamFactoryImpl(controller, this.connectionPool);
        this.condFactory = new ConditionalOutputStreamFactoryImpl(controller, this.connectionPool);
        this.metaFactory = new SegmentMetadataClientFactoryImpl(controller, this.connectionPool);
    }

    @VisibleForTesting
    public ClientFactoryImpl(String scope, Controller controller, ConnectionPool connectionPool, SegmentInputStreamFactory inFactory, SegmentOutputStreamFactory outFactory, ConditionalOutputStreamFactory condFactory, SegmentMetadataClientFactory metaFactory) {
        super(scope, controller, connectionPool);
        Preconditions.checkNotNull(inFactory);
        Preconditions.checkNotNull(outFactory);
        Preconditions.checkNotNull(condFactory);
        Preconditions.checkNotNull(metaFactory);
        this.inFactory = inFactory;
        this.outFactory = outFactory;
        this.condFactory = condFactory;
        this.metaFactory = metaFactory;
    }

    @Override
    public <T> EventStreamWriter<T> createEventWriter(String streamName, Serializer<T> s2, EventWriterConfig config) {
        return this.createEventWriter(UUID.randomUUID().toString(), streamName, s2, config);
    }

    @Override
    public <T> EventStreamWriter<T> createEventWriter(String writerId, String streamName, Serializer<T> s2, EventWriterConfig config) {
        NameUtils.validateWriterId(writerId);
        log.info("Creating writer: {} for stream: {} with configuration: {}", new Object[]{writerId, streamName, config});
        StreamImpl stream = new StreamImpl(this.scope, streamName);
        ThreadPoolExecutor retransmitPool = ExecutorServiceHelpers.getShrinkingExecutor(1, 100, "ScalingRetransmission-" + stream.getScopedName());
        try {
            return new EventStreamWriterImpl<T>(stream, writerId, this.controller, this.outFactory, s2, config, retransmitPool, this.connectionPool.getInternalExecutor());
        }
        catch (Throwable ex) {
            ExecutorServiceHelpers.shutdown(retransmitPool);
            throw ex;
        }
    }

    @Override
    public <T> TransactionalEventStreamWriter<T> createTransactionalEventWriter(String writerId, String streamName, Serializer<T> s2, EventWriterConfig config) {
        NameUtils.validateWriterId(writerId);
        log.info("Creating transactional writer:{} for stream: {} with configuration: {}", new Object[]{writerId, streamName, config});
        StreamImpl stream = new StreamImpl(this.scope, streamName);
        return new TransactionalEventStreamWriterImpl<T>(stream, writerId, this.controller, this.outFactory, s2, config, this.connectionPool.getInternalExecutor());
    }

    @Override
    public <T> TransactionalEventStreamWriter<T> createTransactionalEventWriter(String streamName, Serializer<T> s2, EventWriterConfig config) {
        return this.createTransactionalEventWriter(UUID.randomUUID().toString(), streamName, s2, config);
    }

    @Override
    public <T> EventStreamReader<T> createReader(String readerId, String readerGroup, Serializer<T> s2, ReaderConfig config) {
        log.info("Creating reader: {} under readerGroup: {} with configuration: {}", new Object[]{readerId, readerGroup, config});
        return this.createReader(readerId, readerGroup, s2, config, System::nanoTime, System::currentTimeMillis);
    }

    @VisibleForTesting
    public <T> EventStreamReader<T> createReader(String readerId, String readerGroup, Serializer<T> s2, ReaderConfig config, Supplier<Long> nanoTime, Supplier<Long> milliTime) {
        NameUtils.validateReaderId(readerId);
        log.info("Creating reader: {} under readerGroup: {} with configuration: {}", new Object[]{readerId, readerGroup, config});
        SynchronizerConfig synchronizerConfig = SynchronizerConfig.builder().build();
        StateSynchronizer<ReaderGroupState> sync = this.createStateSynchronizer(NameUtils.getStreamForReaderGroup(readerGroup), new ReaderGroupManagerImpl.ReaderGroupStateUpdatesSerializer(), new ReaderGroupManagerImpl.ReaderGroupStateInitSerializer(), synchronizerConfig);
        ReaderGroupStateManager stateManager = new ReaderGroupStateManager(this.scope, readerGroup, readerId, sync, this.controller, nanoTime);
        stateManager.initializeReader(config.getInitialAllocationDelay());
        ImmutableMap.Builder<Stream, WatermarkReaderImpl> watermarkReaders = ImmutableMap.builder();
        if (!config.isDisableTimeWindows()) {
            for (Stream stream : stateManager.getStreams()) {
                String streamName = NameUtils.getMarkStreamForStream(stream.getStreamName());
                RevisionedStreamClient<Watermark> client = this.createRevisionedStreamClient(this.getSegmentForRevisionedClient(stream.getScope(), streamName), new WatermarkSerializer(), SynchronizerConfig.builder().readBufferSize(4096).build());
                watermarkReaders.put(stream, new WatermarkReaderImpl(stream, client, this.watermarkReaderThreads));
            }
        }
        return new EventStreamReaderImpl<T>(this.inFactory, this.metaFactory, s2, stateManager, new Orderer(), milliTime, config, watermarkReaders.build(), this.controller);
    }

    @Override
    public <T> RevisionedStreamClient<T> createRevisionedStreamClient(String streamName, Serializer<T> serializer, SynchronizerConfig config) {
        log.info("Creating revisioned stream client for stream: {} with synchronizer configuration: {}", (Object)streamName, (Object)config);
        return this.createRevisionedStreamClient(this.getSegmentForRevisionedClient(this.scope, streamName), serializer, config);
    }

    private <T> RevisionedStreamClient<T> createRevisionedStreamClient(Segment segment, Serializer<T> serializer, SynchronizerConfig config) {
        EventSegmentReader in = this.inFactory.createEventReaderForSegment(segment, config.getReadBufferSize());
        DelegationTokenProvider delegationTokenProvider = DelegationTokenProviderFactory.create(this.controller, segment, AccessOperation.READ_WRITE);
        ConditionalOutputStream cond = this.condFactory.createConditionalOutputStream(segment, delegationTokenProvider, config.getEventWriterConfig());
        SegmentMetadataClient meta = this.metaFactory.createSegmentMetadataClient(segment, delegationTokenProvider);
        return new RevisionedStreamClientImpl<T>(segment, in, this.outFactory, cond, meta, serializer, config.getEventWriterConfig(), delegationTokenProvider);
    }

    @Override
    public <StateT extends Revisioned, UpdateT extends Update<StateT>, InitT extends InitialUpdate<StateT>> StateSynchronizer<StateT> createStateSynchronizer(String streamName, Serializer<UpdateT> updateSerializer, Serializer<InitT> initialSerializer, SynchronizerConfig config) {
        log.info("Creating state synchronizer with stream: {} and configuration: {}", (Object)streamName, (Object)config);
        UpdateOrInitSerializer serializer = new UpdateOrInitSerializer(updateSerializer, initialSerializer);
        Segment segment = this.getSegmentForRevisionedClient(this.scope, streamName);
        return new StateSynchronizerImpl(segment, this.createRevisionedStreamClient(segment, serializer, config));
    }

    private Segment getSegmentForRevisionedClient(String scope, String streamName) {
        StreamSegments currentSegments = Futures.getAndHandleExceptions(this.controller.getCurrentSegments(scope, streamName), InvalidStreamException::new);
        if (currentSegments == null || currentSegments.getSegments().size() == 0) {
            throw new InvalidStreamException("Stream does not exist: " + streamName);
        }
        return currentSegments.getSegmentForKey(0.0);
    }

    @Override
    public void close() {
        ExecutorServiceHelpers.shutdown(this.watermarkReaderThreads);
        this.connectionPool.close();
        this.controller.close();
    }

    private int getThreadPoolSize() {
        String configuredThreads = System.getProperty("pravega.client.internal.threadpool.size", null);
        if (configuredThreads != null) {
            return Integer.parseInt(configuredThreads);
        }
        return Runtime.getRuntime().availableProcessors();
    }
}

