/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.s3;

import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.function.TriFunction;
import com.hazelcast.jet.pipeline.BatchSource;
import com.hazelcast.jet.pipeline.SourceBuilder;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Object;

public final class S3Sources {
    private static final int LOCAL_PARALLELISM = 2;

    private S3Sources() {
    }

    @Nonnull
    public static BatchSource<String> s3(@Nonnull List<String> bucketNames, @Nullable String prefix, @Nonnull SupplierEx<? extends S3Client> clientSupplier) {
        return S3Sources.s3(bucketNames, prefix, StandardCharsets.UTF_8, clientSupplier, (BiFunctionEx & Serializable)(name, line) -> line);
    }

    @Nonnull
    public static <T> BatchSource<T> s3(@Nonnull List<String> bucketNames, @Nullable String prefix, @Nonnull Charset charset, @Nonnull SupplierEx<? extends S3Client> clientSupplier, @Nonnull BiFunctionEx<String, String, ? extends T> mapFn) {
        String charsetName = charset.name();
        FunctionEx & Serializable readFileFn = (FunctionEx & Serializable)responseInputStream -> {
            BufferedReader reader = new BufferedReader(new InputStreamReader((InputStream)responseInputStream, Charset.forName(charsetName)));
            return reader.lines();
        };
        return S3Sources.s3(bucketNames, prefix, clientSupplier, readFileFn, mapFn);
    }

    @Nonnull
    public static <I, T> BatchSource<T> s3(@Nonnull List<String> bucketNames, @Nullable String prefix, @Nonnull SupplierEx<? extends S3Client> clientSupplier, @Nonnull FunctionEx<? super InputStream, ? extends Stream<I>> readFileFn, @Nonnull BiFunctionEx<String, ? super I, ? extends T> mapFn) {
        TriFunction & Serializable adaptedFunction = (TriFunction & Serializable)(inputStream, key, bucketName) -> (Stream)readFileFn.apply(inputStream);
        return SourceBuilder.batch((String)"s3-source", (FunctionEx & Serializable)context -> new S3SourceContext(bucketNames, prefix, (Processor.Context)context, (SupplierEx<S3Client>)clientSupplier, adaptedFunction, mapFn)).fillBufferFn(S3SourceContext::fillBuffer).distributed(2).destroyFn(S3SourceContext::close).build();
    }

    @Nonnull
    public static <I, T> BatchSource<T> s3(@Nonnull List<String> bucketNames, @Nullable String prefix, @Nonnull SupplierEx<? extends S3Client> clientSupplier, @Nonnull TriFunction<? super InputStream, String, String, ? extends Stream<I>> readFileFn, @Nonnull BiFunctionEx<String, ? super I, ? extends T> mapFn) {
        return SourceBuilder.batch((String)"s3Source", (FunctionEx & Serializable)context -> new S3SourceContext(bucketNames, prefix, (Processor.Context)context, (SupplierEx<S3Client>)clientSupplier, readFileFn, mapFn)).fillBufferFn(S3SourceContext::fillBuffer).distributed(2).destroyFn(S3SourceContext::close).build();
    }

    private static final class S3SourceContext<I, T> {
        private static final int BATCH_COUNT = 1024;
        private final String prefix;
        private final S3Client amazonS3;
        private final TriFunction<? super InputStream, String, String, ? extends Stream<I>> readFileFn;
        private final BiFunctionEx<String, ? super I, ? extends T> mapFn;
        private final int processorIndex;
        private final int totalParallelism;
        private Iterator<Map.Entry<String, String>> objectIterator;
        private Traverser<I> itemTraverser;
        private String currentKey;

        private S3SourceContext(List<String> bucketNames, String prefix, Processor.Context context, SupplierEx<? extends S3Client> clientSupplier, TriFunction<? super InputStream, String, String, ? extends Stream<I>> readFileFn, BiFunctionEx<String, ? super I, ? extends T> mapFn) {
            this.prefix = prefix;
            this.amazonS3 = (S3Client)clientSupplier.get();
            this.readFileFn = readFileFn;
            this.mapFn = mapFn;
            this.processorIndex = context.globalProcessorIndex();
            this.totalParallelism = context.totalParallelism();
            this.objectIterator = bucketNames.stream().flatMap(bucket -> this.amazonS3.listObjectsV2Paginator(b -> b.bucket(bucket).prefix(this.prefix)).contents().stream().map(S3Object::key).filter(this::belongsToThisProcessor).map(key -> Util.entry((Object)bucket, (Object)key))).iterator();
        }

        private void fillBuffer(SourceBuilder.SourceBuffer<? super T> buffer) {
            if (this.itemTraverser != null) {
                this.addBatchToBuffer(buffer);
                return;
            }
            if (this.objectIterator.hasNext()) {
                Map.Entry<String, String> entry = this.objectIterator.next();
                String bucketName = entry.getKey();
                String key = entry.getValue();
                GetObjectRequest getObjectRequest = (GetObjectRequest)GetObjectRequest.builder().bucket(bucketName).key(key).build();
                ResponseInputStream responseInputStream = this.amazonS3.getObject(getObjectRequest);
                this.currentKey = key;
                this.itemTraverser = Traversers.traverseStream((Stream)((Stream)this.readFileFn.apply((Object)responseInputStream, (Object)key, (Object)bucketName)));
                this.addBatchToBuffer(buffer);
            } else {
                buffer.close();
                this.objectIterator = null;
            }
        }

        private void addBatchToBuffer(SourceBuilder.SourceBuffer<? super T> buffer) {
            assert (this.currentKey != null) : "currentKey must not be null";
            for (int i = 0; i < 1024; ++i) {
                Object item = this.itemTraverser.next();
                if (item == null) {
                    this.itemTraverser = null;
                    this.currentKey = null;
                    return;
                }
                buffer.add(this.mapFn.apply((Object)this.currentKey, item));
            }
        }

        private boolean belongsToThisProcessor(String key) {
            return Math.floorMod(key.hashCode(), this.totalParallelism) == this.processorIndex;
        }

        private void close() {
            this.amazonS3.close();
        }
    }
}

