/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.elasticsearch;

import java.io.IOException;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.elasticsearch.ThrowingConsumer;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.ProcessFunction;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ElasticsearchIO {
    private static long documentSize(DocWriteRequest docWriteRequest) {
        if (docWriteRequest instanceof IndexRequest) {
            return ((IndexRequest)docWriteRequest).source().length();
        }
        if (docWriteRequest instanceof UpdateRequest) {
            return ((UpdateRequest)docWriteRequest).doc().source().length();
        }
        if (docWriteRequest instanceof DeleteRequest) {
            return 0L;
        }
        throw new IllegalArgumentException("Encountered unknown subclass of DocWriteRequest");
    }

    public static class Write {
        private static final Logger LOG = LoggerFactory.getLogger(Write.class);
        private static final String RETRY_ATTEMPT_LOG = "Error writing to Elasticsearch. Retry attempt[%d]";
        private static final String RETRY_FAILED_LOG = "Error writing to ES after %d attempt(s). No more attempts allowed";

        public static <T> Bound<T> withClusterName(String string) {
            return new Bound().withClusterName(string);
        }

        public static <T> Bound<T> withServers(InetSocketAddress[] inetSocketAddressArray) {
            return new Bound().withServers(inetSocketAddressArray);
        }

        public static <T> Bound withFlushInterval(Duration duration) {
            return new Bound().withFlushInterval(duration);
        }

        public static <T> Bound withFunction(SerializableFunction<T, Iterable<DocWriteRequest<?>>> serializableFunction) {
            return new Bound<T>().withFunction(serializableFunction);
        }

        public static <T> Bound withNumOfShard(long l) {
            return new Bound().withNumOfShard(l);
        }

        public static <T> Bound withError(ThrowingConsumer<BulkExecutionException> throwingConsumer) {
            return new Bound().withError(throwingConsumer);
        }

        public static <T> Bound withMaxBulkRequestSize(int n) {
            return new Bound().withMaxBulkRequestSize(n);
        }

        public static <T> Bound withMaxBulkRequestBytes(long l) {
            return new Bound().withMaxBulkRequestBytes(l);
        }

        public static <T> Bound withMaxRetries(int n) {
            return new Bound().withMaxRetries(n);
        }

        public static <T> Bound withRetryPause(Duration duration) {
            return new Bound().withRetryPause(duration);
        }

        private static ProcessFunction<BulkRequest, BulkResponse> request(ClientSupplier clientSupplier, ThrowingConsumer<BulkExecutionException> throwingConsumer) {
            return (ProcessFunction & Serializable)bulkRequest -> {
                BulkResponse bulkResponse = (BulkResponse)clientSupplier.get().bulk(bulkRequest).get();
                if (bulkResponse.hasFailures()) {
                    throwingConsumer.accept(new BulkExecutionException(bulkResponse));
                }
                return bulkResponse;
            };
        }

        private static ProcessFunction<BulkRequest, BulkResponse> retry(ProcessFunction<BulkRequest, BulkResponse> processFunction, FluentBackoff fluentBackoff) {
            return (ProcessFunction & Serializable)bulkRequest -> {
                BackOff backOff = fluentBackoff.backoff();
                int n = 0;
                BulkResponse bulkResponse = null;
                Exception exception = null;
                while (bulkResponse == null && BackOffUtils.next((Sleeper)Sleeper.DEFAULT, (BackOff)backOff)) {
                    LOG.warn(String.format(RETRY_ATTEMPT_LOG, ++n));
                    try {
                        bulkResponse = (BulkResponse)processFunction.apply(bulkRequest);
                        exception = null;
                    }
                    catch (Exception exception2) {
                        exception = exception2;
                    }
                }
                if (exception != null) {
                    throw new Exception(String.format(RETRY_FAILED_LOG, n), exception);
                }
                return bulkResponse;
            };
        }

        private static ThrowingConsumer<BulkExecutionException> defaultErrorHandler() {
            return bulkExecutionException -> {
                throw bulkExecutionException;
            };
        }

        public static class BulkExecutionException
        extends IOException {
            private final Iterable<Throwable> failures;

            BulkExecutionException(BulkResponse bulkResponse) {
                super(bulkResponse.buildFailureMessage());
                this.failures = Arrays.stream(bulkResponse.getItems()).map(BulkItemResponse::getFailure).filter(Objects::nonNull).map(BulkItemResponse.Failure::getCause).collect(Collectors.toList());
            }

            public Iterable<Throwable> getFailures() {
                return this.failures;
            }
        }

        private static class ClientSupplier
        implements Supplier<Client>,
        Serializable {
            private final AtomicReference<Client> CLIENT = new AtomicReference();
            private final String clusterName;
            private final InetSocketAddress[] addresses;

            public ClientSupplier(String string, InetSocketAddress[] inetSocketAddressArray) {
                this.clusterName = string;
                this.addresses = inetSocketAddressArray;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public Client get() {
                if (this.CLIENT.get() == null) {
                    AtomicReference<Client> atomicReference = this.CLIENT;
                    synchronized (atomicReference) {
                        if (this.CLIENT.get() == null) {
                            this.CLIENT.set((Client)this.create(this.clusterName, this.addresses));
                        }
                    }
                }
                return this.CLIENT.get();
            }

            private TransportClient create(String string, InetSocketAddress[] inetSocketAddressArray) {
                Settings settings = Settings.builder().put("cluster.name", string).build();
                TransportAddress[] transportAddressArray = (TransportAddress[])Arrays.stream(inetSocketAddressArray).map(TransportAddress::new).toArray(TransportAddress[]::new);
                return new PreBuiltTransportClient(settings, new Class[0]).addTransportAddresses(transportAddressArray);
            }
        }

        private static class ElasticsearchShardWriter<T>
        extends DoFn<KV<Long, Iterable<T>>, Void> {
            private final ClientSupplier clientSupplier;
            private final SerializableFunction<T, Iterable<DocWriteRequest<?>>> toDocWriteRequests;
            private final ThrowingConsumer<BulkExecutionException> error;
            private final int maxBulkRequestSize;
            private final long maxBulkRequestBytes;
            private final int maxRetries;
            private final Duration retryPause;
            private ProcessFunction<BulkRequest, BulkResponse> requestFn;
            private ProcessFunction<BulkRequest, BulkResponse> retryFn;

            public ElasticsearchShardWriter(String string, InetSocketAddress[] inetSocketAddressArray, int n, long l, SerializableFunction<T, Iterable<DocWriteRequest<?>>> serializableFunction, ThrowingConsumer<BulkExecutionException> throwingConsumer, int n2, Duration duration) {
                this.maxBulkRequestSize = n;
                this.maxBulkRequestBytes = l;
                this.clientSupplier = new ClientSupplier(string, inetSocketAddressArray);
                this.toDocWriteRequests = serializableFunction;
                this.error = throwingConsumer;
                this.maxRetries = n2;
                this.retryPause = duration;
            }

            @DoFn.Setup
            public void setup() throws Exception {
                FluentBackoff fluentBackoff = FluentBackoff.DEFAULT.withMaxRetries(this.maxRetries).withInitialBackoff(this.retryPause);
                this.requestFn = Write.request(this.clientSupplier, this.error);
                this.retryFn = Write.retry((ProcessFunction<BulkRequest, BulkResponse>)this.requestFn, fluentBackoff);
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext processContext) throws Exception {
                Iterable iterable2 = (Iterable)((KV)processContext.element()).getValue();
                if (!iterable2.iterator().hasNext()) {
                    LOG.info("ElasticsearchWriter: no requests to send");
                    return;
                }
                Stream stream = StreamSupport.stream(iterable2.spliterator(), false).map(arg_0 -> this.toDocWriteRequests.apply(arg_0)).flatMap(iterable -> StreamSupport.stream(iterable.spliterator(), false));
                int n = 0;
                long l = 0L;
                BulkRequest bulkRequest = new BulkRequest();
                for (DocWriteRequest docWriteRequest : stream::iterator) {
                    long l2 = ElasticsearchIO.documentSize(docWriteRequest);
                    if (n < this.maxBulkRequestSize && l + l2 < this.maxBulkRequestBytes) {
                        bulkRequest.add(docWriteRequest);
                        ++n;
                        l += l2;
                        continue;
                    }
                    this.flush(bulkRequest);
                    bulkRequest = new BulkRequest().add(docWriteRequest);
                    n = 1;
                    l = l2;
                }
                this.flush(bulkRequest);
            }

            private void flush(BulkRequest bulkRequest) throws Exception {
                if (bulkRequest.numberOfActions() < 1) {
                    return;
                }
                try {
                    this.requestFn.apply((Object)bulkRequest);
                }
                catch (Exception exception) {
                    this.retryFn.apply((Object)bulkRequest);
                }
            }
        }

        private static class ElasticsearchWriter<T>
        extends DoFn<T, Void> {
            private BulkRequest chunk;
            private long currentSize;
            private long currentBytes;
            private final ClientSupplier clientSupplier;
            private final SerializableFunction<T, Iterable<DocWriteRequest<?>>> toDocWriteRequests;
            private final ThrowingConsumer<BulkExecutionException> error;
            private final int maxBulkRequestSize;
            private final long maxBulkRequestBytes;
            private final int maxRetries;
            private final Duration retryPause;
            private ProcessFunction<BulkRequest, BulkResponse> requestFn;
            private ProcessFunction<BulkRequest, BulkResponse> retryFn;

            public ElasticsearchWriter(String string, InetSocketAddress[] inetSocketAddressArray, int n, long l, SerializableFunction<T, Iterable<DocWriteRequest<?>>> serializableFunction, ThrowingConsumer<BulkExecutionException> throwingConsumer, int n2, Duration duration) {
                this.maxBulkRequestSize = n;
                this.maxBulkRequestBytes = l;
                this.clientSupplier = new ClientSupplier(string, inetSocketAddressArray);
                this.toDocWriteRequests = serializableFunction;
                this.error = throwingConsumer;
                this.maxRetries = n2;
                this.retryPause = duration;
            }

            @DoFn.Setup
            public void setup() throws Exception {
                FluentBackoff fluentBackoff = FluentBackoff.DEFAULT.withMaxRetries(this.maxRetries).withInitialBackoff(this.retryPause);
                this.requestFn = Write.request(this.clientSupplier, this.error);
                this.retryFn = Write.retry((ProcessFunction<BulkRequest, BulkResponse>)this.requestFn, fluentBackoff);
            }

            @DoFn.StartBundle
            public void startBundle(DoFn.StartBundleContext startBundleContext) {
                this.chunk = new BulkRequest();
                this.currentSize = 0L;
                this.currentBytes = 0L;
            }

            @DoFn.FinishBundle
            public void finishBundle() throws Exception {
                this.flush();
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext processContext) throws Exception {
                Object object = processContext.element();
                Iterable iterable = (Iterable)this.toDocWriteRequests.apply(object);
                for (DocWriteRequest docWriteRequest : iterable) {
                    long l = ElasticsearchIO.documentSize(docWriteRequest);
                    if (this.currentSize < (long)this.maxBulkRequestSize && this.currentBytes + l < this.maxBulkRequestBytes) {
                        this.chunk.add(docWriteRequest);
                        ++this.currentSize;
                        this.currentBytes += l;
                        continue;
                    }
                    this.flush();
                    this.chunk = new BulkRequest().add(docWriteRequest);
                    this.currentSize = 1L;
                    this.currentBytes = l;
                }
            }

            private void flush() throws Exception {
                if (this.chunk.numberOfActions() < 1) {
                    return;
                }
                try {
                    this.requestFn.apply((Object)this.chunk);
                }
                catch (Exception exception) {
                    this.retryFn.apply((Object)this.chunk);
                }
            }
        }

        private static class AssignToShard<T>
        extends DoFn<T, KV<Long, T>> {
            private final long numOfShard;

            public AssignToShard(long l) {
                this.numOfShard = l;
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext processContext) throws Exception {
                long l = ThreadLocalRandom.current().nextLong(this.numOfShard);
                processContext.output((Object)KV.of((Object)l, (Object)processContext.element()));
            }
        }

        public static class Bound<T>
        extends PTransform<PCollection<T>, PDone> {
            private static final int CHUNK_SIZE = 3000;
            private static final long CHUNK_BYTES = 0x500000L;
            private static final int DEFAULT_RETRIES = 3;
            private static final Duration DEFAULT_RETRY_PAUSE = Duration.millis((long)35000L);
            private final String clusterName;
            private final InetSocketAddress[] servers;
            private final Duration flushInterval;
            private final SerializableFunction<T, Iterable<DocWriteRequest<?>>> toDocWriteRequests;
            private final long numOfShard;
            private final int maxBulkRequestSize;
            private final long maxBulkRequestBytes;
            private final int maxRetries;
            private final Duration retryPause;
            private final ThrowingConsumer<BulkExecutionException> error;

            private Bound(String string, InetSocketAddress[] inetSocketAddressArray, Duration duration, SerializableFunction<T, Iterable<DocWriteRequest<?>>> serializableFunction, long l, int n, long l2, int n2, Duration duration2, ThrowingConsumer<BulkExecutionException> throwingConsumer) {
                this.clusterName = string;
                this.servers = inetSocketAddressArray;
                this.flushInterval = duration;
                this.toDocWriteRequests = serializableFunction;
                this.numOfShard = l;
                this.maxBulkRequestSize = n;
                this.maxBulkRequestBytes = l2;
                this.maxRetries = n2;
                this.retryPause = duration2;
                this.error = throwingConsumer;
            }

            Bound() {
                this(null, null, null, null, 0L, 3000, 0x500000L, 3, DEFAULT_RETRY_PAUSE, Write.defaultErrorHandler());
            }

            public Bound<T> withClusterName(String string) {
                return new Bound<T>(string, this.servers, this.flushInterval, this.toDocWriteRequests, this.numOfShard, this.maxBulkRequestSize, this.maxBulkRequestBytes, this.maxRetries, this.retryPause, this.error);
            }

            public Bound<T> withServers(InetSocketAddress[] inetSocketAddressArray) {
                return new Bound<T>(this.clusterName, inetSocketAddressArray, this.flushInterval, this.toDocWriteRequests, this.numOfShard, this.maxBulkRequestSize, this.maxBulkRequestBytes, this.maxRetries, this.retryPause, this.error);
            }

            public Bound<T> withFlushInterval(Duration duration) {
                return new Bound<T>(this.clusterName, this.servers, duration, this.toDocWriteRequests, this.numOfShard, this.maxBulkRequestSize, this.maxBulkRequestBytes, this.maxRetries, this.retryPause, this.error);
            }

            public Bound<T> withFunction(SerializableFunction<T, Iterable<DocWriteRequest<?>>> serializableFunction) {
                return new Bound<T>(this.clusterName, this.servers, this.flushInterval, serializableFunction, this.numOfShard, this.maxBulkRequestSize, this.maxBulkRequestBytes, this.maxRetries, this.retryPause, this.error);
            }

            public Bound<T> withNumOfShard(long l) {
                return new Bound<T>(this.clusterName, this.servers, this.flushInterval, this.toDocWriteRequests, l, this.maxBulkRequestSize, this.maxBulkRequestBytes, this.maxRetries, this.retryPause, this.error);
            }

            public Bound<T> withError(ThrowingConsumer<BulkExecutionException> throwingConsumer) {
                return new Bound<T>(this.clusterName, this.servers, this.flushInterval, this.toDocWriteRequests, this.numOfShard, this.maxBulkRequestSize, this.maxBulkRequestBytes, this.maxRetries, this.retryPause, throwingConsumer);
            }

            public Bound<T> withMaxBulkRequestSize(int n) {
                return new Bound<T>(this.clusterName, this.servers, this.flushInterval, this.toDocWriteRequests, this.numOfShard, n, this.maxBulkRequestBytes, this.maxRetries, this.retryPause, this.error);
            }

            public Bound<T> withMaxBulkRequestBytes(long l) {
                return new Bound<T>(this.clusterName, this.servers, this.flushInterval, this.toDocWriteRequests, this.numOfShard, this.maxBulkRequestSize, l, this.maxRetries, this.retryPause, this.error);
            }

            public Bound<T> withMaxRetries(int n) {
                return new Bound<T>(this.clusterName, this.servers, this.flushInterval, this.toDocWriteRequests, this.numOfShard, this.maxBulkRequestSize, this.maxBulkRequestBytes, n, this.retryPause, this.error);
            }

            public Bound<T> withRetryPause(Duration duration) {
                return new Bound<T>(this.clusterName, this.servers, this.flushInterval, this.toDocWriteRequests, this.numOfShard, this.maxBulkRequestSize, this.maxBulkRequestBytes, this.maxRetries, duration, this.error);
            }

            public PDone expand(PCollection<T> pCollection) {
                Preconditions.checkNotNull((Object)this.clusterName);
                Preconditions.checkNotNull((Object)this.servers);
                Preconditions.checkNotNull(this.toDocWriteRequests);
                Preconditions.checkNotNull((Object)this.flushInterval);
                Preconditions.checkArgument((this.numOfShard >= 0L ? 1 : 0) != 0);
                Preconditions.checkArgument((this.maxBulkRequestSize > 0 ? 1 : 0) != 0);
                Preconditions.checkArgument((this.maxBulkRequestBytes > 0L ? 1 : 0) != 0);
                Preconditions.checkArgument((this.maxRetries >= 0 ? 1 : 0) != 0);
                Preconditions.checkArgument((this.retryPause.getMillis() >= 0L ? 1 : 0) != 0);
                if (this.numOfShard == 0L) {
                    pCollection.apply((PTransform)ParDo.of(new ElasticsearchWriter<T>(this.clusterName, this.servers, this.maxBulkRequestSize, this.maxBulkRequestBytes, this.toDocWriteRequests, this.error, this.maxRetries, this.retryPause)));
                } else {
                    ((PCollection)((PCollection)((PCollection)pCollection.apply("Assign To Shard", (PTransform)ParDo.of(new AssignToShard(this.numOfShard)))).apply("Re-Window to Global Window", (PTransform)Window.into((WindowFn)new GlobalWindows()).triggering((Trigger)Repeatedly.forever((Trigger)AfterProcessingTime.pastFirstElementInPane().plusDelayOf(this.flushInterval))).discardingFiredPanes().withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))).apply((PTransform)GroupByKey.create())).apply("Write to Elasticsearch", (PTransform)ParDo.of(new ElasticsearchShardWriter<T>(this.clusterName, this.servers, this.maxBulkRequestSize, this.maxBulkRequestBytes, this.toDocWriteRequests, this.error, this.maxRetries, this.retryPause)));
                }
                return PDone.in((Pipeline)pCollection.getPipeline());
            }
        }
    }
}

