/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.jms.processors;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider;
import org.apache.nifi.jms.processors.AbstractJMSProcessor;
import org.apache.nifi.jms.processors.JMSConsumer;
import org.apache.nifi.jms.processors.PublishJMS;
import org.apache.nifi.jms.processors.ioconcept.writer.FlowFileWriterCallback;
import org.apache.nifi.jms.processors.ioconcept.writer.record.OutputStrategy;
import org.apache.nifi.jms.processors.ioconcept.writer.record.RecordWriter;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.connection.SingleConnectionFactory;
import org.springframework.jms.core.JmsTemplate;

@Tags(value={"jms", "get", "message", "receive", "consume"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@CapabilityDescription(value="Consumes JMS Message of type BytesMessage, TextMessage, ObjectMessage, MapMessage or StreamMessage transforming its content to a FlowFile and transitioning it to 'success' relationship. JMS attributes such as headers and properties will be copied as FlowFile attributes. MapMessages will be transformed into JSONs and then into byte arrays. The other types will have their raw contents as byte array transferred into the flowfile.")
@WritesAttributes(value={@WritesAttribute(attribute="jms_deliveryMode", description="The JMSDeliveryMode from the message header."), @WritesAttribute(attribute="jms_expiration", description="The JMSExpiration from the message header."), @WritesAttribute(attribute="jms_priority", description="The JMSPriority from the message header."), @WritesAttribute(attribute="jms_redelivered", description="The JMSRedelivered from the message header."), @WritesAttribute(attribute="jms_timestamp", description="The JMSTimestamp from the message header."), @WritesAttribute(attribute="jms_correlationId", description="The JMSCorrelationID from the message header."), @WritesAttribute(attribute="jms_messageId", description="The JMSMessageID from the message header."), @WritesAttribute(attribute="jms_type", description="The JMSType from the message header."), @WritesAttribute(attribute="jms_replyTo", description="The JMSReplyTo from the message header."), @WritesAttribute(attribute="jms_destination", description="The JMSDestination from the message header."), @WritesAttribute(attribute="jms.messagetype", description="The JMS message type, can be TextMessage, BytesMessage, ObjectMessage, MapMessage or StreamMessage)."), @WritesAttribute(attribute="other attributes", description="Each message property is written to an attribute.")})
@DynamicProperty(name="The name of a Connection Factory configuration property.", value="The value of a given Connection Factory configuration property.", description="Additional configuration property for the Connection Factory. It can be used when the Connection Factory is being configured via the 'JNDI *' or the 'JMS *'properties of the processor. For more information, see the Additional Details page.", expressionLanguageScope=ExpressionLanguageScope.ENVIRONMENT)
@SeeAlso(value={PublishJMS.class, JMSConnectionFactoryProvider.class})
@Restricted(restrictions={@Restriction(requiredPermission=RequiredPermission.REFERENCE_REMOTE_RESOURCES, explanation="Client Library Location can reference resources over HTTP")})
public class ConsumeJMS
extends AbstractJMSProcessor<JMSConsumer> {
    public static final String JMS_MESSAGETYPE = "jms.messagetype";
    private static final String COUNTER_PARSE_FAILURES = "Parse Failures";
    private static final String COUNTER_RECORDS_RECEIVED = "Records Received";
    private static final String COUNTER_RECORDS_PROCESSED = "Records Processed";
    static final AllowableValue AUTO_ACK = new AllowableValue(String.valueOf(1), "AUTO_ACKNOWLEDGE (1)", "Automatically acknowledges a client's receipt of a message, regardless if NiFi session has been commited. Can result in data loss in the event where NiFi abruptly stopped before session was commited.");
    static final AllowableValue CLIENT_ACK = new AllowableValue(String.valueOf(2), "CLIENT_ACKNOWLEDGE (2)", "(DEFAULT) Manually acknowledges a client's receipt of a message after NiFi Session was commited, thus ensuring no data loss");
    static final AllowableValue DUPS_OK = new AllowableValue(String.valueOf(3), "DUPS_OK_ACKNOWLEDGE (3)", "This acknowledgment mode instructs the session to lazily acknowledge the delivery of messages. May result in both data duplication and data loss while achieving the best throughput.");
    public static final String JMS_SOURCE_DESTINATION_NAME = "jms.source.destination";
    static final PropertyDescriptor MESSAGE_SELECTOR = new PropertyDescriptor.Builder().name("Message Selector").displayName("Message Selector").description("The JMS Message Selector to filter the messages that the processor will receive").required(false).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor ACKNOWLEDGEMENT_MODE = new PropertyDescriptor.Builder().name("Acknowledgement Mode").description("The JMS Acknowledgement Mode. Using Auto Acknowledge can cause messages to be lost on restart of NiFi but may provide better performance than Client Acknowledge.").required(true).allowableValues(new DescribedValue[]{AUTO_ACK, CLIENT_ACK, DUPS_OK}).defaultValue(CLIENT_ACK.getValue()).build();
    static final PropertyDescriptor DURABLE_SUBSCRIBER = new PropertyDescriptor.Builder().name("Durable subscription").displayName("Durable Subscription").description("If destination is Topic if present then make it the consumer durable. @see https://jakarta.ee/specifications/platform/9/apidocs/jakarta/jms/session#createDurableConsumer-jakarta.jms.Topic-java.lang.String-").required(false).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).defaultValue("false").allowableValues(new String[]{"true", "false"}).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor SHARED_SUBSCRIBER = new PropertyDescriptor.Builder().name("Shared subscription").displayName("Shared Subscription").description("If destination is Topic if present then make it the consumer shared. @see https://jakarta.ee/specifications/platform/9/apidocs/jakarta/jms/session#createSharedConsumer-jakarta.jms.Topic-java.lang.String-").required(false).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).defaultValue("false").allowableValues(new String[]{"true", "false"}).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor SUBSCRIPTION_NAME = new PropertyDescriptor.Builder().name("Subscription Name").description("The name of the subscription to use if destination is Topic and is shared or durable.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).build();
    static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder().name("Timeout").description("How long to wait to consume a message from the remote broker before giving up.").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).defaultValue("1 sec").expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).build();
    static final PropertyDescriptor ERROR_QUEUE = new PropertyDescriptor.Builder().name("Error Queue Name").description("The name of a JMS Queue where - if set - unprocessed messages will be routed. Usually provided by the administrator (e.g., 'queue://myErrorQueue' or 'myErrorQueue').Only applicable if 'Destination Type' is set to 'QUEUE'").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().fromPropertyDescriptor(BASE_RECORD_READER).description("The Record Reader to use for parsing received JMS Messages into Records.").build();
    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().fromPropertyDescriptor(BASE_RECORD_WRITER).description("The Record Writer to use for serializing Records before writing them to a FlowFile.").build();
    static final PropertyDescriptor OUTPUT_STRATEGY = new PropertyDescriptor.Builder().name("output-strategy").displayName("Output Strategy").description("The format used to output the JMS message into a FlowFile record.").dependsOn(RECORD_READER, new AllowableValue[0]).required(true).defaultValue(OutputStrategy.USE_VALUE.getValue()).allowableValues(OutputStrategy.class).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are received from the JMS Destination are routed to this relationship").build();
    public static final Relationship REL_PARSE_FAILURE = new Relationship.Builder().name("parse.failure").description("If a message cannot be parsed using the configured Record Reader, the contents of the message will be routed to this Relationship as its own individual FlowFile.").autoTerminateDefault(true).build();
    private static final Set<Relationship> relationships;
    private static final List<PropertyDescriptor> propertyDescriptors;

    private static boolean isDurableSubscriber(ProcessContext context) {
        Boolean durableBoolean = context.getProperty(DURABLE_SUBSCRIBER).evaluateAttributeExpressions().asBoolean();
        return durableBoolean == null ? false : durableBoolean;
    }

    private static boolean isShared(ProcessContext context) {
        Boolean sharedBoolean = context.getProperty(SHARED_SUBSCRIBER).evaluateAttributeExpressions().asBoolean();
        return sharedBoolean == null ? false : sharedBoolean;
    }

    @OnScheduled
    public void onSchedule(ProcessContext context) {
        if (context.getMaxConcurrentTasks() > 1 && ConsumeJMS.isDurableSubscriber(context) && !ConsumeJMS.isShared(context)) {
            throw new ProcessException("Durable non shared subscriptions cannot work on multiple threads. Check javax/jms/Session#createDurableConsumer API doc.");
        }
    }

    @Override
    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList<ValidationResult> validationResults = new ArrayList<ValidationResult>(super.customValidate(validationContext));
        String destinationType = validationContext.getProperty(DESTINATION_TYPE).getValue();
        String errorQueue = validationContext.getProperty(ERROR_QUEUE).getValue();
        if (errorQueue != null && !"QUEUE".equals(destinationType)) {
            validationResults.add(new ValidationResult.Builder().valid(false).subject(ERROR_QUEUE.getDisplayName()).explanation("'" + ERROR_QUEUE.getDisplayName() + "' is applicable only when '" + DESTINATION_TYPE.getDisplayName() + "'='QUEUE'").build());
        }
        return validationResults;
    }

    @Override
    protected void rendezvousWithJms(ProcessContext context, ProcessSession processSession, JMSConsumer consumer) throws ProcessException {
        String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
        String errorQueueName = context.getProperty(ERROR_QUEUE).evaluateAttributeExpressions().getValue();
        boolean durable = ConsumeJMS.isDurableSubscriber(context);
        boolean shared = ConsumeJMS.isShared(context);
        String subscriptionName = context.getProperty(SUBSCRIPTION_NAME).evaluateAttributeExpressions().getValue();
        String messageSelector = context.getProperty(MESSAGE_SELECTOR).evaluateAttributeExpressions().getValue();
        String charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
        try {
            if (context.getProperty(RECORD_READER).isSet()) {
                this.processMessageSet(context, processSession, consumer, destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset);
            } else {
                this.processSingleMessage(processSession, consumer, destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset);
            }
        }
        catch (Exception e) {
            this.getLogger().error("Error while trying to process JMS message", (Throwable)e);
            consumer.setValid(false);
            context.yield();
            throw e;
        }
    }

    private void processSingleMessage(ProcessSession processSession, JMSConsumer consumer, String destinationName, String errorQueueName, boolean durable, boolean shared, String subscriptionName, String messageSelector, String charset) {
        consumer.consumeSingleMessage(destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset, response -> {
            if (response == null) {
                return;
            }
            try {
                FlowFile flowFile = this.createFlowFileFromMessage(processSession, destinationName, (JMSConsumer.JMSResponse)response);
                processSession.getProvenanceReporter().receive(flowFile, destinationName);
                processSession.transfer(flowFile, REL_SUCCESS);
                processSession.commitAsync(() -> this.withLog(() -> this.acknowledge((JMSConsumer.JMSResponse)response)), __ -> this.withLog(() -> response.reject()));
            }
            catch (Throwable t) {
                response.reject();
                throw t;
            }
        });
    }

    private FlowFile createFlowFileFromMessage(ProcessSession processSession, String destinationName, JMSConsumer.JMSResponse response) {
        FlowFile flowFile = processSession.create();
        flowFile = processSession.write(flowFile, out -> out.write(response.getMessageBody()));
        Map<String, String> jmsHeaders = response.getMessageHeaders();
        Map<String, String> jmsProperties = response.getMessageProperties();
        Map<String, String> attributes = this.mergeJmsAttributes(jmsHeaders, jmsProperties);
        attributes.put(JMS_SOURCE_DESTINATION_NAME, destinationName);
        attributes.put(JMS_MESSAGETYPE, response.getMessageType());
        return processSession.putAllAttributes(flowFile, attributes);
    }

    private void processMessageSet(ProcessContext context, final ProcessSession session, JMSConsumer consumer, final String destinationName, String errorQueueName, boolean durable, boolean shared, String subscriptionName, String messageSelector, String charset) {
        RecordReaderFactory readerFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        RecordSetWriterFactory writerFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        OutputStrategy outputStrategy = OutputStrategy.valueOf(context.getProperty(OUTPUT_STRATEGY).getValue());
        RecordWriter<JMSConsumer.JMSResponse> flowFileWriter = new RecordWriter<JMSConsumer.JMSResponse>(readerFactory, writerFactory, message -> message.getMessageBody() == null ? new byte[]{} : message.getMessageBody(), message -> this.mergeJmsAttributes(message.getMessageHeaders(), message.getMessageProperties()), outputStrategy, this.getLogger());
        consumer.consumeMessageSet(destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset, jmsResponses -> flowFileWriter.write(session, (List<JMSConsumer.JMSResponse>)jmsResponses, new FlowFileWriterCallback<JMSConsumer.JMSResponse>(){

            @Override
            public void onSuccess(FlowFile flowFile, List<JMSConsumer.JMSResponse> processedMessages, List<JMSConsumer.JMSResponse> failedMessages) {
                session.getProvenanceReporter().receive(flowFile, destinationName);
                session.adjustCounter(ConsumeJMS.COUNTER_RECORDS_RECEIVED, (long)(processedMessages.size() + failedMessages.size()), false);
                session.adjustCounter(ConsumeJMS.COUNTER_RECORDS_PROCESSED, (long)processedMessages.size(), false);
                session.transfer(flowFile, REL_SUCCESS);
                session.commitAsync(() -> ConsumeJMS.this.withLog(() -> ConsumeJMS.this.acknowledge(processedMessages, failedMessages)), __ -> ConsumeJMS.this.withLog(() -> ConsumeJMS.this.reject(processedMessages, failedMessages)));
            }

            @Override
            public void onParseFailure(FlowFile flowFile, JMSConsumer.JMSResponse message, Exception e) {
                session.adjustCounter(ConsumeJMS.COUNTER_PARSE_FAILURES, 1L, false);
                FlowFile failedMessage = ConsumeJMS.this.createFlowFileFromMessage(session, destinationName, message);
                session.transfer(failedMessage, REL_PARSE_FAILURE);
            }

            @Override
            public void onFailure(FlowFile flowFile, List<JMSConsumer.JMSResponse> processedMessages, List<JMSConsumer.JMSResponse> failedMessages, Exception e) {
                ConsumeJMS.this.reject(processedMessages, failedMessages);
                throw new ProcessException((Throwable)e);
            }
        }));
    }

    private void acknowledge(JMSConsumer.JMSResponse response) {
        try {
            response.acknowledge();
        }
        catch (Exception e) {
            this.getLogger().error("Failed to acknowledge JMS Message that was received", (Throwable)e);
            throw new ProcessException((Throwable)e);
        }
    }

    private void acknowledge(List<JMSConsumer.JMSResponse> processedMessages, List<JMSConsumer.JMSResponse> failedMessages) {
        this.acknowledge(this.findLastBatchedJmsResponse(processedMessages, failedMessages));
    }

    private void reject(List<JMSConsumer.JMSResponse> processedMessages, List<JMSConsumer.JMSResponse> failedMessages) {
        this.findLastBatchedJmsResponse(processedMessages, failedMessages).reject();
    }

    private void withLog(Runnable runnable) {
        try {
            runnable.run();
        }
        catch (Exception e) {
            this.getLogger().error("An error happened during commitAsync callback", (Throwable)e);
            throw e;
        }
    }

    private JMSConsumer.JMSResponse findLastBatchedJmsResponse(List<JMSConsumer.JMSResponse> processedMessages, List<JMSConsumer.JMSResponse> failedMessages) {
        return Stream.of(processedMessages, failedMessages).flatMap(Collection::stream).max(Comparator.comparing(JMSConsumer.JMSResponse::getBatchOrder)).get();
    }

    @Override
    protected JMSConsumer finishBuildingJmsWorker(CachingConnectionFactory connectionFactory, JmsTemplate jmsTemplate, ProcessContext processContext) {
        int ackMode = processContext.getProperty(ACKNOWLEDGEMENT_MODE).asInteger();
        jmsTemplate.setSessionAcknowledgeMode(ackMode);
        long timeout = processContext.getProperty(TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
        jmsTemplate.setReceiveTimeout(timeout);
        return new JMSConsumer(connectionFactory, jmsTemplate, this.getLogger());
    }

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    @Override
    protected void setClientId(ProcessContext context, SingleConnectionFactory cachingFactory) {
        if (ConsumeJMS.isDurableSubscriber(context) && !ConsumeJMS.isShared(context)) {
            cachingFactory.setClientId(ConsumeJMS.getClientId(context));
        } else {
            super.setClientId(context, cachingFactory);
        }
    }

    private Map<String, String> mergeJmsAttributes(Map<String, String> headers, Map<String, String> properties) {
        HashMap<String, String> jmsAttributes = new HashMap<String, String>(headers);
        properties.forEach((key, value) -> {
            if (jmsAttributes.containsKey(key)) {
                this.getLogger().warn("JMS Header and Property name collides as an attribute. JMS Property will override the JMS Header attribute. attributeName=[{}]", new Object[]{key});
            }
            jmsAttributes.put((String)key, (String)value);
        });
        return jmsAttributes;
    }

    static {
        ArrayList<PropertyDescriptor> _propertyDescriptors = new ArrayList<PropertyDescriptor>();
        _propertyDescriptors.add(CF_SERVICE);
        _propertyDescriptors.add(DESTINATION);
        _propertyDescriptors.add(DESTINATION_TYPE);
        _propertyDescriptors.add(MESSAGE_SELECTOR);
        _propertyDescriptors.add(USER);
        _propertyDescriptors.add(PASSWORD);
        _propertyDescriptors.add(CLIENT_ID);
        PropertyDescriptor charsetWithELValidatorProperty = new PropertyDescriptor.Builder().fromPropertyDescriptor(CHARSET).addValidator(StandardValidators.CHARACTER_SET_VALIDATOR_WITH_EVALUATION).build();
        _propertyDescriptors.add(charsetWithELValidatorProperty);
        _propertyDescriptors.add(ACKNOWLEDGEMENT_MODE);
        _propertyDescriptors.add(DURABLE_SUBSCRIBER);
        _propertyDescriptors.add(SHARED_SUBSCRIBER);
        _propertyDescriptors.add(SUBSCRIPTION_NAME);
        _propertyDescriptors.add(TIMEOUT);
        _propertyDescriptors.add(ERROR_QUEUE);
        _propertyDescriptors.add(RECORD_READER);
        _propertyDescriptors.add(RECORD_WRITER);
        _propertyDescriptors.add(OUTPUT_STRATEGY);
        _propertyDescriptors.addAll(JNDI_JMS_CF_PROPERTIES);
        _propertyDescriptors.addAll(JMS_CF_PROPERTIES);
        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
        HashSet<Relationship> _relationships = new HashSet<Relationship>();
        _relationships.add(REL_SUCCESS);
        _relationships.add(REL_PARSE_FAILURE);
        relationships = Collections.unmodifiableSet(_relationships);
    }
}

