/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.routing;

import com.google.common.collect.Iterators;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.Initialisable;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.meta.AnnotatedObject;
import org.mule.runtime.api.metadata.DataType;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.core.api.InternalEvent;
import org.mule.runtime.core.api.exception.MessagingException;
import org.mule.runtime.core.api.processor.AbstractMessageProcessorOwner;
import org.mule.runtime.core.api.processor.MessageProcessorChain;
import org.mule.runtime.core.api.processor.MessageProcessors;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.Scope;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.util.collection.SplittingStrategy;
import org.mule.runtime.core.internal.routing.ExpressionSplittingStrategy;
import org.mule.runtime.core.internal.routing.outbound.EventBuilderConfigurer;
import org.mule.runtime.core.internal.routing.outbound.EventBuilderConfigurerIterator;
import org.mule.runtime.core.internal.routing.outbound.EventBuilderConfigurerList;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class Foreach
extends AbstractMessageProcessorOwner
implements Initialisable,
Scope {
    static final String DEFAULT_ROOT_MESSAGE_VARIABLE = "rootMessage";
    static final String DEFAULT_COUNTER_VARIABLE = "counter";
    private static final Logger LOGGER = LoggerFactory.getLogger(Foreach.class);
    static final String MAP_NOT_SUPPORTED_MESSAGE = "Foreach does not support 'java.util.Map' with no collection expression. To iterate over Map entries use '#[dw::core::Objects::entrySet(payload)]'";
    private List<Processor> messageProcessors;
    private String expression = "#[payload]";
    private int batchSize = 1;
    private SplittingStrategy<InternalEvent, Iterator<TypedValue<?>>> splittingStrategy;
    private String rootMessageVariableName = "rootMessage";
    private String counterVariableName = "counter";
    private MessageProcessorChain nestedChain;

    @Override
    public InternalEvent process(InternalEvent event) throws MuleException {
        return MessageProcessors.processToApply(event, this);
    }

    @Override
    public Publisher<InternalEvent> apply(Publisher<InternalEvent> publisher) {
        return Flux.from(publisher).doOnNext(event -> {
            if (this.expression.equals("#[payload]") && Map.class.isAssignableFrom(event.getMessage().getPayload().getDataType().getType())) {
                throw new IllegalArgumentException(MAP_NOT_SUPPORTED_MESSAGE);
            }
        }).flatMap(originalEvent -> {
            Object previousCounterVar = originalEvent.getVariables().containsKey(this.counterVariableName) ? originalEvent.getVariables().get(this.counterVariableName).getValue() : null;
            Object previousRootMessageVar = originalEvent.getVariables().containsKey(this.rootMessageVariableName) ? originalEvent.getVariables().get(this.rootMessageVariableName).getValue() : null;
            InternalEvent requestEvent = InternalEvent.builder(originalEvent).addVariable(this.rootMessageVariableName, originalEvent.getMessage()).build();
            return this.splitAndProcess(requestEvent).map(result -> {
                InternalEvent.Builder responseBuilder = InternalEvent.builder(result).message(originalEvent.getMessage());
                this.restoreVariables(previousCounterVar, previousRootMessageVar, responseBuilder);
                return responseBuilder.build();
            }).onErrorMap(MessagingException.class, me -> {
                InternalEvent.Builder exceptionEventBuilder = InternalEvent.builder(me.getEvent());
                this.restoreVariables(previousCounterVar, previousRootMessageVar, exceptionEventBuilder);
                me.setProcessedEvent(exceptionEventBuilder.build());
                return me;
            }).onErrorMap(throwable -> !(throwable instanceof MessagingException), throwable -> new MessagingException((InternalEvent)originalEvent, (Throwable)throwable, (AnnotatedObject)this));
        });
    }

    private void restoreVariables(Object previousCounterVar, Object previousRootMessageVar, InternalEvent.Builder responseBuilder) {
        if (previousCounterVar != null) {
            responseBuilder.addVariable(this.counterVariableName, previousCounterVar);
        } else {
            responseBuilder.removeVariable(this.counterVariableName);
        }
        if (previousRootMessageVar != null) {
            responseBuilder.addVariable(this.rootMessageVariableName, previousRootMessageVar);
        } else {
            responseBuilder.removeVariable(this.rootMessageVariableName);
        }
    }

    private Flux<InternalEvent> splitAndProcess(InternalEvent request) {
        AtomicInteger count = new AtomicInteger();
        AtomicReference<InternalEvent> currentEvent = new AtomicReference<InternalEvent>(request);
        return Flux.fromIterable(() -> this.splitRequest(request)).transform(p -> this.batchSize > 1 ? Flux.from((Publisher)p).buffer(this.batchSize).map(this.typedValueListToTypedValue()) : p).concatMap(typedValue -> {
            InternalEvent.Builder partEventBuilder = InternalEvent.builder((InternalEvent)currentEvent.get());
            if (typedValue.getValue() instanceof EventBuilderConfigurer) {
                ((EventBuilderConfigurer)typedValue.getValue()).configure(partEventBuilder);
            } else if (typedValue.getValue() instanceof Message) {
                partEventBuilder.message((Message)typedValue.getValue());
            } else {
                partEventBuilder.message(Message.builder().payload((TypedValue<?>)typedValue).build());
            }
            return Mono.just((Object)partEventBuilder.addVariable(this.counterVariableName, count.incrementAndGet()).build()).transform((Function)this.nestedChain).doOnNext(result -> currentEvent.set(InternalEvent.builder(result).build()));
        }).takeLast(1).map(s -> InternalEvent.builder((InternalEvent)currentEvent.get()).message(request.getMessage()).build());
    }

    private Iterator<TypedValue<?>> splitRequest(InternalEvent request) {
        Object payloadValue = request.getMessage().getPayload().getValue();
        if ("#[payload]".equals(this.expression) && payloadValue instanceof EventBuilderConfigurerList) {
            return Iterators.transform(((EventBuilderConfigurerList)payloadValue).eventBuilderConfigurerIterator(), input -> TypedValue.of(input));
        }
        if ("#[payload]".equals(this.expression) && payloadValue instanceof EventBuilderConfigurerIterator) {
            return new EventBuilderConfigurerIteratorWrapper((EventBuilderConfigurerIterator)payloadValue);
        }
        return this.splittingStrategy.split(request);
    }

    private Function<List<TypedValue<?>>, TypedValue<List>> typedValueListToTypedValue() {
        return list -> {
            DataType dataType = DataType.OBJECT;
            if (list.stream().map(i -> i.getDataType()).distinct().count() == 1L) {
                dataType = ((TypedValue)list.stream().findFirst().get()).getDataType();
            }
            return new TypedValue(list.stream().map(tv -> tv.getValue()).collect(Collectors.toList()), DataType.builder().collectionType(List.class).itemType(dataType.getType()).itemMediaType(dataType.getMediaType()).build());
        };
    }

    @Override
    protected List<Processor> getOwnedMessageProcessors() {
        return Collections.singletonList(this.nestedChain);
    }

    public void setMessageProcessors(List<Processor> messageProcessors) throws MuleException {
        this.messageProcessors = messageProcessors;
    }

    @Override
    public void initialise() throws InitialisationException {
        Optional<ProcessingStrategy> processingStrategy = MessageProcessors.getProcessingStrategy(this.muleContext, this.getRootContainerName());
        this.nestedChain = MessageProcessors.newChain(processingStrategy, this.messageProcessors);
        this.splittingStrategy = new ExpressionSplittingStrategy(this.muleContext.getExpressionManager(), this.expression);
        super.initialise();
    }

    public void setCollectionExpression(String expression) {
        this.expression = expression;
    }

    public void setBatchSize(int batchSize) {
        this.batchSize = batchSize;
    }

    public void setRootMessageVariableName(String rootMessageVariableName) {
        this.rootMessageVariableName = rootMessageVariableName;
    }

    public void setCounterVariableName(String counterVariableName) {
        this.counterVariableName = counterVariableName;
    }

    private static class EventBuilderConfigurerIteratorWrapper
    implements Iterator<TypedValue<?>> {
        private final EventBuilderConfigurerIterator configurerIterator;

        public EventBuilderConfigurerIteratorWrapper(EventBuilderConfigurerIterator configurerIterator) {
            this.configurerIterator = configurerIterator;
        }

        @Override
        public boolean hasNext() {
            return this.configurerIterator.hasNext();
        }

        @Override
        public TypedValue<?> next() {
            return TypedValue.of(this.configurerIterator.nextEventBuilderConfigurer());
        }
    }
}

