/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processor.util.listen;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.listen.AbstractListenEventProcessor;
import org.apache.nifi.processor.util.listen.ListenerProperties;
import org.apache.nifi.processor.util.listen.event.Event;

public abstract class AbstractListenEventBatchingProcessor<E extends Event>
extends AbstractListenEventProcessor<E> {
    public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder().name("Max Batch Size").description("The maximum number of messages to add to a single FlowFile. If multiple messages are available, they will be concatenated along with the <Message Delimiter> up to this configured maximum number of messages").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(false).defaultValue("1").required(true).build();
    public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder().name("Message Delimiter").displayName("Batching Message Delimiter").description("Specifies the delimiter to place between messages when multiple messages are bundled together (see <Max Batch Size> property).").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).defaultValue("\\n").required(true).build();
    protected volatile byte[] messageDemarcatorBytes;

    @Override
    protected void init(ProcessorInitializationContext context) {
        ArrayList<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(ListenerProperties.NETWORK_INTF_NAME);
        descriptors.add(PORT);
        descriptors.add(RECV_BUFFER_SIZE);
        descriptors.add(MAX_MESSAGE_QUEUE_SIZE);
        descriptors.add(MAX_SOCKET_BUFFER_SIZE);
        descriptors.add(CHARSET);
        descriptors.add(MAX_BATCH_SIZE);
        descriptors.add(MESSAGE_DELIMITER);
        descriptors.addAll(this.getAdditionalProperties());
        this.descriptors = Collections.unmodifiableList(descriptors);
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        relationships.addAll(this.getAdditionalRelationships());
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    @Override
    @OnScheduled
    public void onScheduled(ProcessContext context) throws IOException {
        super.onScheduled(context);
        String msgDemarcator = context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
        this.messageDemarcatorBytes = msgDemarcator.getBytes(this.charset);
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger();
        Map<String, FlowFileEventBatch> batches = this.getBatches(session, maxBatchSize, this.messageDemarcatorBytes);
        if (batches.size() == 0) {
            return;
        }
        ArrayList allEvents = new ArrayList();
        for (Map.Entry<String, FlowFileEventBatch> entry : batches.entrySet()) {
            FlowFile flowFile = entry.getValue().getFlowFile();
            List events = entry.getValue().getEvents();
            if (flowFile.getSize() == 0L || events.size() == 0) {
                session.remove(flowFile);
                this.getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[]{entry.getKey()});
                continue;
            }
            Map<String, String> attributes = this.getAttributes(entry.getValue());
            flowFile = session.putAllAttributes(flowFile, attributes);
            this.getLogger().debug("Transferring {} to success", new Object[]{flowFile});
            session.transfer(flowFile, REL_SUCCESS);
            session.adjustCounter("FlowFiles Transferred to Success", 1L, false);
            String transitUri = this.getTransitUri(entry.getValue());
            session.getProvenanceReporter().receive(flowFile, transitUri);
            allEvents.addAll(events);
        }
        this.postProcess(context, session, allEvents);
    }

    protected abstract Map<String, String> getAttributes(FlowFileEventBatch var1);

    protected abstract String getTransitUri(FlowFileEventBatch var1);

    protected void postProcess(ProcessContext context, ProcessSession session, List<E> events) {
    }

    protected Map<String, FlowFileEventBatch> getBatches(ProcessSession session, int totalBatchSize, final byte[] messageDemarcatorBytes) {
        Object event;
        HashMap<String, FlowFileEventBatch> batches = new HashMap<String, FlowFileEventBatch>();
        for (int i = 0; i < totalBatchSize && (event = this.getMessage(true, true, session)) != null; ++i) {
            String batchKey = this.getBatchKey(event);
            FlowFileEventBatch batch = (FlowFileEventBatch)batches.get(batchKey);
            if (batch == null) {
                batch = new FlowFileEventBatch(session.create(), new ArrayList());
                batches.put(batchKey, batch);
            }
            batch.getEvents().add(event);
            final boolean writeDemarcator = i > 0;
            try {
                final byte[] rawMessage = event.getData();
                FlowFile appendedFlowFile = session.append(batch.getFlowFile(), new OutputStreamCallback(){

                    public void process(OutputStream out) throws IOException {
                        if (writeDemarcator) {
                            out.write(messageDemarcatorBytes);
                        }
                        out.write(rawMessage);
                    }
                });
                batch.setFlowFile(appendedFlowFile);
                continue;
            }
            catch (Exception e) {
                this.getLogger().error("Failed to write contents of the message to FlowFile due to {}; will re-queue message and try again", new Object[]{e.getMessage()}, (Throwable)e);
                this.errorEvents.offer(event);
                break;
            }
        }
        return batches;
    }

    protected String getBatchKey(E event) {
        return event.getSender();
    }

    protected final class FlowFileEventBatch {
        private FlowFile flowFile;
        private List<E> events;

        public FlowFileEventBatch(FlowFile flowFile, List<E> events) {
            this.flowFile = flowFile;
            this.events = events;
        }

        public FlowFile getFlowFile() {
            return this.flowFile;
        }

        public List<E> getEvents() {
            return this.events;
        }

        public void setFlowFile(FlowFile flowFile) {
            this.flowFile = flowFile;
        }
    }
}

