/*
 * Decompiled with CFR 0.152.
 */
package org.graylog.shaded.opensearch2.org.opensearch.ingest;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import org.graylog.shaded.opensearch2.org.opensearch.OpenSearchException;
import org.graylog.shaded.opensearch2.org.opensearch.common.collect.Tuple;
import org.graylog.shaded.opensearch2.org.opensearch.common.metrics.OperationMetrics;
import org.graylog.shaded.opensearch2.org.opensearch.ingest.IngestDocument;
import org.graylog.shaded.opensearch2.org.opensearch.ingest.IngestDocumentWrapper;
import org.graylog.shaded.opensearch2.org.opensearch.ingest.IngestProcessorException;
import org.graylog.shaded.opensearch2.org.opensearch.ingest.Processor;

public class CompoundProcessor
implements Processor {
    public static final String ON_FAILURE_MESSAGE_FIELD = "on_failure_message";
    public static final String ON_FAILURE_PROCESSOR_TYPE_FIELD = "on_failure_processor_type";
    public static final String ON_FAILURE_PROCESSOR_TAG_FIELD = "on_failure_processor_tag";
    public static final String ON_FAILURE_PIPELINE_FIELD = "on_failure_pipeline";
    private final boolean ignoreFailure;
    private final List<Processor> processors;
    private final List<Processor> onFailureProcessors;
    private final List<Tuple<Processor, OperationMetrics>> processorsWithMetrics;
    private final LongSupplier relativeTimeProvider;

    CompoundProcessor(LongSupplier relativeTimeProvider, Processor ... processor) {
        this(false, Arrays.asList(processor), Collections.emptyList(), relativeTimeProvider);
    }

    public CompoundProcessor(Processor ... processor) {
        this(false, Arrays.asList(processor), Collections.emptyList());
    }

    public CompoundProcessor(boolean ignoreFailure, List<Processor> processors, List<Processor> onFailureProcessors) {
        this(ignoreFailure, processors, onFailureProcessors, System::nanoTime);
    }

    CompoundProcessor(boolean ignoreFailure, List<Processor> processors, List<Processor> onFailureProcessors, LongSupplier relativeTimeProvider) {
        this.ignoreFailure = ignoreFailure;
        this.processors = processors;
        this.onFailureProcessors = onFailureProcessors;
        this.relativeTimeProvider = relativeTimeProvider;
        this.processorsWithMetrics = new ArrayList<Tuple<Processor, OperationMetrics>>(processors.size());
        processors.forEach(p -> this.processorsWithMetrics.add(new Tuple<Processor, OperationMetrics>((Processor)p, new OperationMetrics())));
    }

    List<Tuple<Processor, OperationMetrics>> getProcessorsWithMetrics() {
        return this.processorsWithMetrics;
    }

    public boolean isIgnoreFailure() {
        return this.ignoreFailure;
    }

    public List<Processor> getOnFailureProcessors() {
        return this.onFailureProcessors;
    }

    public List<Processor> getProcessors() {
        return this.processors;
    }

    public List<Processor> flattenProcessors() {
        ArrayList<Processor> allProcessors = new ArrayList<Processor>(CompoundProcessor.flattenProcessors(this.processors));
        allProcessors.addAll(CompoundProcessor.flattenProcessors(this.onFailureProcessors));
        return allProcessors;
    }

    private static List<Processor> flattenProcessors(List<Processor> processors) {
        ArrayList<Processor> flattened = new ArrayList<Processor>();
        for (Processor processor : processors) {
            if (processor instanceof CompoundProcessor) {
                flattened.addAll(((CompoundProcessor)processor).flattenProcessors());
                continue;
            }
            flattened.add(processor);
        }
        return flattened;
    }

    @Override
    public String getType() {
        return "compound";
    }

    @Override
    public String getTag() {
        return "CompoundProcessor-" + this.flattenProcessors().stream().map(Processor::getTag).collect(Collectors.joining("-"));
    }

    @Override
    public String getDescription() {
        return null;
    }

    @Override
    public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
        throw new UnsupportedOperationException("this method should not get executed");
    }

    @Override
    public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
        this.innerExecute(0, ingestDocument, handler);
    }

    @Override
    public void batchExecute(List<IngestDocumentWrapper> ingestDocumentWrappers, Consumer<List<IngestDocumentWrapper>> handler) {
        this.innerBatchExecute(0, ingestDocumentWrappers, handler);
    }

    void innerBatchExecute(int currentProcessor, List<IngestDocumentWrapper> ingestDocumentWrappers, Consumer<List<IngestDocumentWrapper>> handler) {
        if (currentProcessor == this.processorsWithMetrics.size()) {
            handler.accept(ingestDocumentWrappers);
            return;
        }
        Tuple<Processor, OperationMetrics> processorWithMetric = this.processorsWithMetrics.get(currentProcessor);
        Processor processor = processorWithMetric.v1();
        OperationMetrics metric = processorWithMetric.v2();
        long startTimeInNanos = this.relativeTimeProvider.getAsLong();
        int size = ingestDocumentWrappers.size();
        metric.beforeN(size);
        AtomicInteger counter = new AtomicInteger(size);
        List allResults = Collections.synchronizedList(new ArrayList());
        Map<Integer, IngestDocumentWrapper> slotToWrapperMap = this.createSlotIngestDocumentWrapperMap(ingestDocumentWrappers);
        processor.batchExecute(ingestDocumentWrappers, results -> {
            if (results.isEmpty()) {
                return;
            }
            allResults.addAll(results);
            if (counter.addAndGet(-results.size()) == 0) {
                long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(this.relativeTimeProvider.getAsLong() - startTimeInNanos);
                metric.afterN(allResults.size(), ingestTimeInMillis);
                ArrayList<IngestDocumentWrapper> documentsDropped = new ArrayList<IngestDocumentWrapper>();
                ArrayList<IngestDocumentWrapper> documentsWithException = new ArrayList<IngestDocumentWrapper>();
                ArrayList<IngestDocumentWrapper> documentsToContinue = new ArrayList<IngestDocumentWrapper>();
                int totalFailed = 0;
                for (IngestDocumentWrapper resultDocumentWrapper : allResults) {
                    IngestDocumentWrapper originalDocumentWrapper = (IngestDocumentWrapper)slotToWrapperMap.get(resultDocumentWrapper.getSlot());
                    if (resultDocumentWrapper.getException() != null) {
                        ++totalFailed;
                        if (this.ignoreFailure) {
                            documentsToContinue.add(originalDocumentWrapper);
                            continue;
                        }
                        IngestProcessorException compoundProcessorException = CompoundProcessor.newCompoundProcessorException(resultDocumentWrapper.getException(), processor, originalDocumentWrapper.getIngestDocument());
                        documentsWithException.add(new IngestDocumentWrapper(resultDocumentWrapper.getSlot(), originalDocumentWrapper.getIngestDocument(), compoundProcessorException));
                        continue;
                    }
                    if (resultDocumentWrapper.getIngestDocument() == null) {
                        documentsDropped.add(resultDocumentWrapper);
                        continue;
                    }
                    documentsToContinue.add(resultDocumentWrapper);
                }
                if (totalFailed > 0) {
                    metric.failedN(totalFailed);
                }
                if (!documentsDropped.isEmpty()) {
                    handler.accept(documentsDropped);
                }
                if (!documentsToContinue.isEmpty()) {
                    this.innerBatchExecute(currentProcessor + 1, documentsToContinue, handler);
                }
                if (!documentsWithException.isEmpty()) {
                    if (this.onFailureProcessors.isEmpty()) {
                        handler.accept(documentsWithException);
                    } else {
                        documentsWithException.forEach(doc -> this.executeOnFailureAsync(0, doc.getIngestDocument(), (IngestProcessorException)doc.getException(), (result, ex) -> handler.accept(Collections.singletonList(new IngestDocumentWrapper(doc.getSlot(), (IngestDocument)result, (Exception)ex)))));
                    }
                }
            }
            assert (counter.get() >= 0);
        });
    }

    void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
        if (currentProcessor == this.processorsWithMetrics.size()) {
            handler.accept(ingestDocument, null);
            return;
        }
        Tuple<Processor, OperationMetrics> processorWithMetric = this.processorsWithMetrics.get(currentProcessor);
        Processor processor = processorWithMetric.v1();
        OperationMetrics metric = processorWithMetric.v2();
        long startTimeInNanos = this.relativeTimeProvider.getAsLong();
        metric.before();
        processor.execute(ingestDocument, (result, e) -> {
            long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(this.relativeTimeProvider.getAsLong() - startTimeInNanos);
            metric.after(ingestTimeInMillis);
            if (e != null) {
                metric.failed();
                if (this.ignoreFailure) {
                    this.innerExecute(currentProcessor + 1, ingestDocument, handler);
                } else {
                    IngestProcessorException compoundProcessorException = CompoundProcessor.newCompoundProcessorException(e, processor, ingestDocument);
                    if (this.onFailureProcessors.isEmpty()) {
                        handler.accept(null, compoundProcessorException);
                    } else {
                        this.executeOnFailureAsync(0, ingestDocument, compoundProcessorException, handler);
                    }
                }
            } else if (result != null) {
                this.innerExecute(currentProcessor + 1, (IngestDocument)result, handler);
            } else {
                handler.accept(null, null);
            }
        });
    }

    void executeOnFailureAsync(int currentOnFailureProcessor, IngestDocument ingestDocument, OpenSearchException exception, BiConsumer<IngestDocument, Exception> handler) {
        if (currentOnFailureProcessor == 0) {
            this.putFailureMetadata(ingestDocument, exception);
        }
        if (currentOnFailureProcessor == this.onFailureProcessors.size()) {
            this.removeFailureMetadata(ingestDocument);
            handler.accept(ingestDocument, null);
            return;
        }
        Processor onFailureProcessor = this.onFailureProcessors.get(currentOnFailureProcessor);
        onFailureProcessor.execute(ingestDocument, (result, e) -> {
            if (e != null) {
                this.removeFailureMetadata(ingestDocument);
                handler.accept(null, CompoundProcessor.newCompoundProcessorException(e, onFailureProcessor, ingestDocument));
                return;
            }
            if (result == null) {
                this.removeFailureMetadata(ingestDocument);
                handler.accept(null, null);
                return;
            }
            this.executeOnFailureAsync(currentOnFailureProcessor + 1, ingestDocument, exception, handler);
        });
    }

    private void putFailureMetadata(IngestDocument ingestDocument, OpenSearchException cause) {
        List<String> processorTypeHeader = cause.getHeader("processor_type");
        List<String> processorTagHeader = cause.getHeader("processor_tag");
        List<String> processorOriginHeader = cause.getHeader("pipeline_origin");
        String failedProcessorType = processorTypeHeader != null ? processorTypeHeader.get(0) : null;
        String failedProcessorTag = processorTagHeader != null ? processorTagHeader.get(0) : null;
        String failedPipelineId = processorOriginHeader != null ? processorOriginHeader.get(0) : null;
        Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
        ingestMetadata.put(ON_FAILURE_MESSAGE_FIELD, cause.getRootCause().getMessage());
        ingestMetadata.put(ON_FAILURE_PROCESSOR_TYPE_FIELD, failedProcessorType);
        ingestMetadata.put(ON_FAILURE_PROCESSOR_TAG_FIELD, failedProcessorTag);
        if (failedPipelineId != null) {
            ingestMetadata.put(ON_FAILURE_PIPELINE_FIELD, failedPipelineId);
        }
    }

    private void removeFailureMetadata(IngestDocument ingestDocument) {
        Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
        ingestMetadata.remove(ON_FAILURE_MESSAGE_FIELD);
        ingestMetadata.remove(ON_FAILURE_PROCESSOR_TYPE_FIELD);
        ingestMetadata.remove(ON_FAILURE_PROCESSOR_TAG_FIELD);
        ingestMetadata.remove(ON_FAILURE_PIPELINE_FIELD);
    }

    static IngestProcessorException newCompoundProcessorException(Exception e, Processor processor, IngestDocument document) {
        List<String> pipelineStack;
        String processorTag;
        if (e instanceof IngestProcessorException && ((IngestProcessorException)e).getHeader("processor_type") != null) {
            return (IngestProcessorException)e;
        }
        IngestProcessorException exception = new IngestProcessorException(e);
        String processorType = processor.getType();
        if (processorType != null) {
            exception.addHeader("processor_type", processorType);
        }
        if ((processorTag = processor.getTag()) != null) {
            exception.addHeader("processor_tag", processorTag);
        }
        if ((pipelineStack = document.getPipelineStack()).size() > 1) {
            exception.addHeader("pipeline_origin", pipelineStack);
        }
        return exception;
    }

    private Map<Integer, IngestDocumentWrapper> createSlotIngestDocumentWrapperMap(List<IngestDocumentWrapper> ingestDocumentWrappers) {
        HashMap<Integer, IngestDocumentWrapper> slotIngestDocumentWrapperMap = new HashMap<Integer, IngestDocumentWrapper>();
        for (IngestDocumentWrapper ingestDocumentWrapper : ingestDocumentWrappers) {
            slotIngestDocumentWrapperMap.put(ingestDocumentWrapper.getSlot(), ingestDocumentWrapper);
        }
        return slotIngestDocumentWrapperMap;
    }
}

