/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws.dynamodb;

import com.amazonaws.regions.Regions;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.BatchWriteItemRequest;
import com.amazonaws.services.dynamodbv2.model.ScanRequest;
import com.amazonaws.services.dynamodbv2.model.ScanResult;
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.aws.dynamodb.AttributeValueCoder;
import org.apache.beam.sdk.io.aws.dynamodb.AutoValue_DynamoDBIO_Read;
import org.apache.beam.sdk.io.aws.dynamodb.AutoValue_DynamoDBIO_RetryConfiguration;
import org.apache.beam.sdk.io.aws.dynamodb.AutoValue_DynamoDBIO_Write;
import org.apache.beam.sdk.io.aws.dynamodb.AwsClientsProvider;
import org.apache.beam.sdk.io.aws.dynamodb.BasicDynamoDBProvider;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.joda.time.Duration;
import org.joda.time.ReadableDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental(value=Experimental.Kind.SOURCE_SINK)
public final class DynamoDBIO {
    public static <T> Read<T> read() {
        return new AutoValue_DynamoDBIO_Read.Builder().build();
    }

    public static <T> Write<T> write() {
        return new AutoValue_DynamoDBIO_Write.Builder().build();
    }

    @AutoValue
    public static abstract class Write<T>
    extends PTransform<PCollection<T>, PCollection<Void>> {
        @Nullable
        abstract AwsClientsProvider getAwsClientsProvider();

        @Nullable
        abstract RetryConfiguration getRetryConfiguration();

        @Nullable
        abstract SerializableFunction<T, KV<String, WriteRequest>> getWriteItemMapperFn();

        abstract Builder<T> builder();

        public Write<T> withAwsClientsProvider(AwsClientsProvider awsClientsProvider) {
            return this.builder().setAwsClientsProvider(awsClientsProvider).build();
        }

        public Write<T> withAwsClientsProvider(String awsAccessKey, String awsSecretKey, Regions region, String serviceEndpoint) {
            return this.withAwsClientsProvider(new BasicDynamoDBProvider(awsAccessKey, awsSecretKey, region, serviceEndpoint));
        }

        public Write<T> withAwsClientsProvider(String awsAccessKey, String awsSecretKey, Regions region) {
            return this.withAwsClientsProvider(awsAccessKey, awsSecretKey, region, null);
        }

        public Write<T> withRetryConfiguration(RetryConfiguration retryConfiguration) {
            Preconditions.checkArgument((retryConfiguration != null ? 1 : 0) != 0, (Object)"retryConfiguration is required");
            return this.builder().setRetryConfiguration(retryConfiguration).build();
        }

        public Write<T> withWriteRequestMapperFn(SerializableFunction<T, KV<String, WriteRequest>> writeItemMapperFn) {
            return this.builder().setWriteItemMapperFn(writeItemMapperFn).build();
        }

        public PCollection<Void> expand(PCollection<T> input) {
            return (PCollection)input.apply((PTransform)ParDo.of(new WriteFn(this)));
        }

        static class WriteFn<T>
        extends DoFn<T, Void> {
            @VisibleForTesting
            static final String RETRY_ATTEMPT_LOG = "Error writing to DynamoDB. Retry attempt[%d]";
            private static final Duration RETRY_INITIAL_BACKOFF = Duration.standardSeconds((long)5L);
            private transient FluentBackoff retryBackoff;
            private static final Logger LOG = LoggerFactory.getLogger(WriteFn.class);
            private static final Counter DYNAMO_DB_WRITE_FAILURES = Metrics.counter(WriteFn.class, (String)"DynamoDB_Write_Failures");
            private static final int BATCH_SIZE = 25;
            private transient AmazonDynamoDB client;
            private final Write spec;
            private List<KV<String, WriteRequest>> batch;

            WriteFn(Write spec) {
                this.spec = spec;
            }

            @DoFn.Setup
            public void setup() {
                this.client = this.spec.getAwsClientsProvider().createDynamoDB();
                this.retryBackoff = FluentBackoff.DEFAULT.withMaxRetries(0).withInitialBackoff(RETRY_INITIAL_BACKOFF);
                if (this.spec.getRetryConfiguration() != null) {
                    this.retryBackoff = this.retryBackoff.withMaxRetries(this.spec.getRetryConfiguration().getMaxAttempts() - 1).withMaxCumulativeBackoff(this.spec.getRetryConfiguration().getMaxDuration());
                }
            }

            @DoFn.StartBundle
            public void startBundle(DoFn.StartBundleContext context) {
                this.batch = new ArrayList<KV<String, WriteRequest>>();
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext context) throws Exception {
                KV writeRequest = (KV)this.spec.getWriteItemMapperFn().apply(context.element());
                this.batch.add((KV<String, WriteRequest>)writeRequest);
                if (this.batch.size() >= 25) {
                    this.flushBatch();
                }
            }

            @DoFn.FinishBundle
            public void finishBundle(DoFn.FinishBundleContext context) throws Exception {
                this.flushBatch();
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            private void flushBatch() throws IOException, InterruptedException {
                if (this.batch.isEmpty()) {
                    return;
                }
                try {
                    Map mapTableRequest = this.batch.stream().collect(Collectors.groupingBy(KV::getKey, Collectors.mapping(KV::getValue, Collectors.toList())));
                    BatchWriteItemRequest batchRequest = new BatchWriteItemRequest();
                    mapTableRequest.entrySet().forEach(entry -> batchRequest.addRequestItemsEntry((String)entry.getKey(), (List)entry.getValue()));
                    Sleeper sleeper = Sleeper.DEFAULT;
                    BackOff backoff = this.retryBackoff.backoff();
                    int attempt = 0;
                    while (true) {
                        ++attempt;
                        try {
                            this.client.batchWriteItem(batchRequest);
                        }
                        catch (Exception ex) {
                            if (this.spec.getRetryConfiguration() == null || !this.spec.getRetryConfiguration().getRetryPredicate().test(ex)) {
                                DYNAMO_DB_WRITE_FAILURES.inc();
                                LOG.info("Unable to write batch items {} due to {} ", batchRequest.getRequestItems().entrySet(), (Object)ex);
                                throw new IOException("Error writing to DynamoDB (no attempt made to retry)", ex);
                            }
                            if (!BackOffUtils.next((Sleeper)sleeper, (BackOff)backoff)) {
                                throw new IOException(String.format("Error writing to DynamoDB after %d attempt(s). No more attempts allowed", attempt), ex);
                            }
                            LOG.warn(String.format(RETRY_ATTEMPT_LOG, attempt), (Throwable)ex);
                            continue;
                        }
                        break;
                    }
                }
                finally {
                    this.batch.clear();
                }
            }

            @DoFn.Teardown
            public void tearDown() {
                if (this.client != null) {
                    this.client.shutdown();
                    this.client = null;
                }
            }
        }

        @AutoValue.Builder
        static abstract class Builder<T> {
            Builder() {
            }

            abstract Builder<T> setAwsClientsProvider(AwsClientsProvider var1);

            abstract Builder<T> setRetryConfiguration(RetryConfiguration var1);

            abstract Builder<T> setWriteItemMapperFn(SerializableFunction<T, KV<String, WriteRequest>> var1);

            abstract Write<T> build();
        }
    }

    @AutoValue
    public static abstract class RetryConfiguration
    implements Serializable {
        @VisibleForTesting
        static final RetryPredicate DEFAULT_RETRY_PREDICATE = new DefaultRetryPredicate();

        abstract int getMaxAttempts();

        abstract Duration getMaxDuration();

        abstract RetryPredicate getRetryPredicate();

        abstract Builder builder();

        public static RetryConfiguration create(int maxAttempts, Duration maxDuration) {
            Preconditions.checkArgument((maxAttempts > 0 ? 1 : 0) != 0, (Object)"maxAttempts should be greater than 0");
            Preconditions.checkArgument((maxDuration != null && maxDuration.isLongerThan((ReadableDuration)Duration.ZERO) ? 1 : 0) != 0, (Object)"maxDuration should be greater than 0");
            return new AutoValue_DynamoDBIO_RetryConfiguration.Builder().setMaxAttempts(maxAttempts).setMaxDuration(maxDuration).setRetryPredicate(DEFAULT_RETRY_PREDICATE).build();
        }

        private static class DefaultRetryPredicate
        implements RetryPredicate {
            private static final ImmutableSet<Integer> ELIGIBLE_CODES = ImmutableSet.of((Object)503);

            private DefaultRetryPredicate() {
            }

            @Override
            public boolean test(Throwable throwable) {
                return throwable instanceof IOException || throwable instanceof AmazonDynamoDBException || throwable instanceof AmazonDynamoDBException && ELIGIBLE_CODES.contains((Object)((AmazonDynamoDBException)throwable).getStatusCode());
            }
        }

        @FunctionalInterface
        static interface RetryPredicate
        extends Predicate<Throwable>,
        Serializable {
        }

        @AutoValue.Builder
        static abstract class Builder {
            Builder() {
            }

            abstract Builder setMaxAttempts(int var1);

            abstract Builder setMaxDuration(Duration var1);

            abstract Builder setRetryPredicate(RetryPredicate var1);

            abstract RetryConfiguration build();
        }
    }

    @AutoValue
    public static abstract class Read<T>
    extends PTransform<PBegin, PCollection<T>> {
        @Nullable
        abstract AwsClientsProvider getAwsClientsProvider();

        @Nullable
        abstract SerializableFunction<Void, ScanRequest> getScanRequestFn();

        @Nullable
        abstract Integer getSegmentId();

        @Nullable
        abstract SerializableFunction<ScanResult, T> getScanResultMapperFn();

        @Nullable
        abstract Coder<T> getCoder();

        abstract Builder<T> toBuilder();

        public Read<T> withAwsClientsProvider(AwsClientsProvider awsClientsProvider) {
            return this.toBuilder().setAwsClientsProvider(awsClientsProvider).build();
        }

        public Read<T> withAwsClientsProvider(String awsAccessKey, String awsSecretKey, Regions region, String serviceEndpoint) {
            return this.withAwsClientsProvider(new BasicDynamoDBProvider(awsAccessKey, awsSecretKey, region, serviceEndpoint));
        }

        public Read<T> withAwsClientsProvider(String awsAccessKey, String awsSecretKey, Regions region) {
            return this.withAwsClientsProvider(awsAccessKey, awsSecretKey, region, null);
        }

        public Read<T> withScanRequestFn(SerializableFunction<Void, ScanRequest> fn) {
            return this.toBuilder().setScanRequestFn(fn).build();
        }

        private Read<T> withSegmentId(Integer segmentId) {
            Preconditions.checkArgument((segmentId != null ? 1 : 0) != 0, (Object)"segmentId can not be null");
            return this.toBuilder().setSegmentId(segmentId).build();
        }

        public Read<T> withScanResultMapperFn(SerializableFunction<ScanResult, T> scanResultMapperFn) {
            Preconditions.checkArgument((scanResultMapperFn != null ? 1 : 0) != 0, (Object)"scanResultMapper can not be null");
            return this.toBuilder().setScanResultMapperFn(scanResultMapperFn).build();
        }

        public Read<List<Map<String, AttributeValue>>> items() {
            return this.withScanResultMapperFn(new ItemsMapper()).withCoder((Coder<List<Map<String, AttributeValue>>>)ListCoder.of((Coder)MapCoder.of((Coder)StringUtf8Coder.of(), (Coder)AttributeValueCoder.of())));
        }

        public Read<T> withCoder(Coder<T> coder) {
            Preconditions.checkArgument((coder != null ? 1 : 0) != 0, (Object)"coder can not be null");
            return this.toBuilder().setCoder(coder).build();
        }

        public PCollection<T> expand(PBegin input) {
            Preconditions.checkArgument((this.getScanRequestFn() != null ? 1 : 0) != 0, (Object)"withScanRequestFn() is required");
            Preconditions.checkArgument((this.getAwsClientsProvider() != null ? 1 : 0) != 0, (Object)"withAwsClientsProvider() is required");
            ScanRequest scanRequest = (ScanRequest)this.getScanRequestFn().apply(null);
            Preconditions.checkArgument((scanRequest.getTotalSegments() != null && scanRequest.getTotalSegments() > 0 ? 1 : 0) != 0, (Object)"TotalSegments is required with withScanRequestFn() and greater zero");
            PCollection splits = (PCollection)((PCollection)input.apply("Create", (PTransform)Create.of((Object)((Object)this), (Object[])new Read[0]))).apply("Split", (PTransform)ParDo.of(new SplitFn()));
            splits.setCoder((Coder)SerializableCoder.of((TypeDescriptor)new TypeDescriptor<Read<T>>(){}));
            PCollection output = (PCollection)((PCollection)splits.apply("Reshuffle", (PTransform)Reshuffle.viaRandomKey())).apply("Read", (PTransform)ParDo.of(new ReadFn()));
            output.setCoder(this.getCoder());
            return output;
        }

        static final class ItemsMapper<T>
        implements SerializableFunction<ScanResult, List<Map<String, AttributeValue>>> {
            ItemsMapper() {
            }

            public List<Map<String, AttributeValue>> apply(@Nullable ScanResult scanResult) {
                if (scanResult == null) {
                    return Collections.emptyList();
                }
                return scanResult.getItems();
            }
        }

        private static class ReadFn<T>
        extends DoFn<Read<T>, T> {
            private ReadFn() {
            }

            @DoFn.ProcessElement
            public void processElement(@DoFn.Element Read<T> spec, DoFn.OutputReceiver<T> out) {
                AmazonDynamoDB client = spec.getAwsClientsProvider().createDynamoDB();
                ScanRequest scanRequest = (ScanRequest)spec.getScanRequestFn().apply(null);
                scanRequest.setSegment(spec.getSegmentId());
                ScanResult scanResult = client.scan(scanRequest);
                out.output(spec.getScanResultMapperFn().apply((Object)scanResult));
            }
        }

        private static class SplitFn<T>
        extends DoFn<Read<T>, Read<T>> {
            private SplitFn() {
            }

            @DoFn.ProcessElement
            public void processElement(@DoFn.Element Read<T> spec, DoFn.OutputReceiver<Read<T>> out) {
                ScanRequest scanRequest = (ScanRequest)spec.getScanRequestFn().apply(null);
                for (int i = 0; i < scanRequest.getTotalSegments(); ++i) {
                    out.output((Object)((Read)spec).withSegmentId(i));
                }
            }
        }

        @AutoValue.Builder
        static abstract class Builder<T> {
            Builder() {
            }

            abstract Builder<T> setAwsClientsProvider(AwsClientsProvider var1);

            abstract Builder<T> setScanRequestFn(SerializableFunction<Void, ScanRequest> var1);

            abstract Builder<T> setSegmentId(Integer var1);

            abstract Builder<T> setScanResultMapperFn(SerializableFunction<ScanResult, T> var1);

            abstract Builder<T> setCoder(Coder<T> var1);

            abstract Read<T> build();
        }
    }
}

