/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.kinesis.retrieval.fanout;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import lombok.NonNull;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetrievalFactory;
import software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher;

@KinesisClientInternalApi
public class FanOutRetrievalFactory
implements RetrievalFactory {
    private final KinesisAsyncClient kinesisClient;
    private final String defaultStreamName;
    private final String defaultConsumerArn;
    private final Function<String, String> consumerArnCreator;
    private Map<StreamIdentifier, String> implicitConsumerArnTracker = new HashMap<StreamIdentifier, String>();

    @Override
    public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(ShardInfo shardInfo, MetricsFactory metricsFactory) {
        return null;
    }

    @Override
    public RecordsPublisher createGetRecordsCache(@NonNull ShardInfo shardInfo, StreamConfig streamConfig, MetricsFactory metricsFactory) {
        if (shardInfo == null) {
            throw new NullPointerException("shardInfo");
        }
        Optional<String> streamIdentifierStr = shardInfo.streamIdentifierSerOpt();
        if (streamIdentifierStr.isPresent()) {
            StreamIdentifier streamIdentifier = StreamIdentifier.multiStreamInstance(streamIdentifierStr.get());
            return new FanOutRecordsPublisher(this.kinesisClient, shardInfo.shardId(), this.getOrCreateConsumerArn(streamIdentifier, streamConfig.consumerArn()), streamIdentifierStr.get());
        }
        StreamIdentifier streamIdentifier = StreamIdentifier.singleStreamInstance(this.defaultStreamName);
        return new FanOutRecordsPublisher(this.kinesisClient, shardInfo.shardId(), this.getOrCreateConsumerArn(streamIdentifier, this.defaultConsumerArn));
    }

    @Override
    public RecordsPublisher createGetRecordsCache(ShardInfo shardInfo, MetricsFactory metricsFactory) {
        throw new UnsupportedOperationException("FanoutRetrievalFactory needs StreamConfig Info");
    }

    private String getOrCreateConsumerArn(StreamIdentifier streamIdentifier, String consumerArn) {
        return consumerArn != null ? consumerArn : this.implicitConsumerArnTracker.computeIfAbsent(streamIdentifier, sId -> this.consumerArnCreator.apply(sId.streamName()));
    }

    public FanOutRetrievalFactory(KinesisAsyncClient kinesisClient, String defaultStreamName, String defaultConsumerArn, Function<String, String> consumerArnCreator) {
        this.kinesisClient = kinesisClient;
        this.defaultStreamName = defaultStreamName;
        this.defaultConsumerArn = defaultConsumerArn;
        this.consumerArnCreator = consumerArnCreator;
    }
}

