/*
 * Decompiled with CFR 0.152.
 */
package akka.stream.alpakka.kinesis.scaladsl;

import akka.NotUsed;
import akka.annotation.InternalApi;
import akka.dispatch.ExecutionContexts$;
import akka.stream.ThrottleMode;
import akka.stream.alpakka.kinesis.KinesisErrors;
import akka.stream.alpakka.kinesis.KinesisFlowSettings;
import akka.stream.alpakka.kinesis.KinesisFlowSettings$;
import akka.stream.scaladsl.Flow;
import akka.stream.scaladsl.Flow$;
import akka.stream.scaladsl.FlowWithContext;
import akka.stream.scaladsl.FlowWithContext$;
import akka.util.ByteString;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletionStage;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Queue;
import scala.collection.immutable.Queue$;
import scala.compat.java8.FutureConverters;
import scala.compat.java8.FutureConverters$;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;

public final class KinesisFlow$ {
    public static KinesisFlow$ MODULE$;

    static {
        new KinesisFlow$();
    }

    public Flow<PutRecordsRequestEntry, PutRecordsResultEntry, NotUsed> apply(String streamName, KinesisFlowSettings settings, KinesisAsyncClient kinesisClient) {
        return (Flow)((Flow)Flow$.MODULE$.apply().map((Function1 & Serializable & scala.Serializable)x$1 -> new Tuple2(x$1, (Object)BoxedUnit.UNIT))).via(this.withContext(streamName, settings, kinesisClient)).map((Function1 & Serializable & scala.Serializable)x$2 -> (PutRecordsResultEntry)x$2._1());
    }

    public KinesisFlowSettings apply$default$2() {
        return KinesisFlowSettings$.MODULE$.Defaults();
    }

    public <T> FlowWithContext<PutRecordsRequestEntry, T, PutRecordsResultEntry, T, NotUsed> withContext(String streamName, KinesisFlowSettings settings, KinesisAsyncClient kinesisClient) {
        return FlowWithContext$.MODULE$.fromTuples(this.batchingFlow(settings).via(this.batchWritingFlow(streamName, (Function1 & Serializable & scala.Serializable)batch -> (Function1 & Serializable & scala.Serializable)x0$1 -> {
            Try try_ = x0$1;
            if (try_ instanceof Success) {
                Success success = (Success)try_;
                PutRecordsResponse putRecordsResponse = (PutRecordsResponse)success.value();
                return new Success(MODULE$.handlePutRecordsSuccess((Iterable)batch, putRecordsResponse));
            }
            if (try_ instanceof Failure) {
                Failure failure = (Failure)try_;
                Throwable throwable = failure.exception();
                return new Failure((Throwable)new KinesisErrors.FailurePublishingRecords(throwable));
            }
            throw new MatchError((Object)try_);
        }, settings, kinesisClient)));
    }

    public <T> KinesisFlowSettings withContext$default$2() {
        return KinesisFlowSettings$.MODULE$.Defaults();
    }

    public <T> Flow<Tuple2<PutRecordsRequestEntry, T>, Iterable<Tuple2<PutRecordsRequestEntry, T>>, NotUsed> batchingFlow(KinesisFlowSettings settings) {
        return (Flow)Flow$.MODULE$.apply().throttle(settings.maxRecordsPerSecond(), new package.DurationInt(package$.MODULE$.DurationInt(1)).second(), settings.maxRecordsPerSecond(), (ThrottleMode)ThrottleMode.Shaping$.MODULE$).throttle(settings.maxBytesPerSecond(), new package.DurationInt(package$.MODULE$.DurationInt(1)).second(), settings.maxBytesPerSecond(), (Function1 & Serializable & scala.Serializable)record -> BoxesRunTime.boxToInteger((int)KinesisFlow$.MODULE$.getPayloadByteSize(record)), (ThrottleMode)ThrottleMode.Shaping$.MODULE$).batch((long)settings.maxBatchSize(), (Function1 & Serializable & scala.Serializable)x$3 -> Queue$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{x$3})), (Function2 & Serializable & scala.Serializable)(x$4, x$5) -> (Queue)x$4.$colon$plus(x$5, Queue$.MODULE$.canBuildFrom()));
    }

    public <S, T> Flow<Iterable<Tuple2<PutRecordsRequestEntry, T>>, Tuple2<S, T>, NotUsed> batchWritingFlow(String streamName, Function1<Iterable<Tuple2<PutRecordsRequestEntry, T>>, Function1<Try<PutRecordsResponse>, Try<Iterable<Tuple2<S, T>>>>> handleBatch, KinesisFlowSettings settings, KinesisAsyncClient kinesisClient) {
        this.checkClient(kinesisClient);
        return (Flow)Flow$.MODULE$.apply().mapAsync(settings.parallelism(), (Function1 & Serializable & scala.Serializable)entries -> FutureConverters.CompletionStageOps$.MODULE$.toScala$extension(FutureConverters$.MODULE$.CompletionStageOps((CompletionStage)kinesisClient.putRecords((PutRecordsRequest)PutRecordsRequest.builder().streamName(streamName).records(JavaConverters$.MODULE$.asJavaCollectionConverter((Iterable)entries.map((Function1 & Serializable & scala.Serializable)x$6 -> (PutRecordsRequestEntry)x$6._1(), Iterable$.MODULE$.canBuildFrom())).asJavaCollection()).build()))).transform((Function1)handleBatch.apply(entries), ExecutionContexts$.MODULE$.parasitic())).mapConcat((Function1 & Serializable & scala.Serializable)x -> (Iterable)Predef$.MODULE$.identity(x));
    }

    public <T> List<Tuple2<PutRecordsResultEntry, T>> handlePutRecordsSuccess(Iterable<Tuple2<PutRecordsRequestEntry, T>> entries, PutRecordsResponse result) {
        return (List)((List)((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(result.records()).asScala()).toList().zip(entries, List$.MODULE$.canBuildFrom())).map((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Tuple2 tuple2 = x0$1;
            if (tuple2 != null) {
                PutRecordsResultEntry res = (PutRecordsResultEntry)tuple2._1();
                Tuple2 tuple22 = (Tuple2)tuple2._2();
                if (tuple22 != null) {
                    Object t = tuple22._2();
                    return new Tuple2((Object)res, t);
                }
            }
            throw new MatchError((Object)tuple2);
        }, List$.MODULE$.canBuildFrom());
    }

    private <T> int getPayloadByteSize(Tuple2<PutRecordsRequestEntry, T> record) {
        Tuple2<PutRecordsRequestEntry, T> tuple2 = record;
        if (tuple2 != null) {
            PutRecordsRequestEntry request = (PutRecordsRequestEntry)tuple2._1();
            return request.partitionKey().length() + request.data().asByteBuffer().position();
        }
        throw new MatchError(tuple2);
    }

    public Flow<Tuple2<String, ByteBuffer>, PutRecordsResultEntry, NotUsed> byPartitionAndData(String streamName, KinesisFlowSettings settings, KinesisAsyncClient kinesisClient) {
        return ((Flow)Flow$.MODULE$.apply().map((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Tuple2 tuple2 = x0$1;
            if (tuple2 != null) {
                String partitionKey = (String)tuple2._1();
                ByteBuffer data = (ByteBuffer)tuple2._2();
                return (PutRecordsRequestEntry)PutRecordsRequestEntry.builder().partitionKey(partitionKey).data(SdkBytes.fromByteBuffer((ByteBuffer)data)).build();
            }
            throw new MatchError((Object)tuple2);
        })).via(this.apply(streamName, settings, kinesisClient));
    }

    public KinesisFlowSettings byPartitionAndData$default$2() {
        return KinesisFlowSettings$.MODULE$.Defaults();
    }

    public Flow<Tuple2<String, ByteString>, PutRecordsResultEntry, NotUsed> byPartitionAndBytes(String streamName, KinesisFlowSettings settings, KinesisAsyncClient kinesisClient) {
        return ((Flow)Flow$.MODULE$.apply().map((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Tuple2 tuple2 = x0$1;
            if (tuple2 != null) {
                String partitionKey = (String)tuple2._1();
                ByteString bytes = (ByteString)tuple2._2();
                return Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)partitionKey), (Object)bytes.toByteBuffer());
            }
            throw new MatchError((Object)tuple2);
        })).via(this.byPartitionAndData(streamName, settings, kinesisClient));
    }

    public KinesisFlowSettings byPartitionAndBytes$default$2() {
        return KinesisFlowSettings$.MODULE$.Defaults();
    }

    @InternalApi
    public void checkClient(KinesisAsyncClient kinesisClient) {
        Predef$.MODULE$.require(kinesisClient != null, (Function0 & Serializable & scala.Serializable)() -> "The `KinesisAsyncClient` passed in may not be null.");
    }

    private KinesisFlow$() {
        MODULE$ = this;
    }
}

