/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processor.util.listen;

import java.io.IOException;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.listen.ListenerProperties;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;

public abstract class AbstractListenEventProcessor<E extends Event>
extends AbstractProcessor {
    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder().name("Port").description("The port to listen on for communication.").required(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.PORT_VALIDATOR).build();
    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder().name("Character Set").description("Specifies the character set of the received data.").required(true).defaultValue("UTF-8").addValidator(StandardValidators.CHARACTER_SET_VALIDATOR).build();
    public static final PropertyDescriptor RECV_BUFFER_SIZE = new PropertyDescriptor.Builder().name("Receive Buffer Size").description("The size of each buffer used to receive messages. Adjust this value appropriately based on the expected size of the incoming messages.").addValidator(StandardValidators.DATA_SIZE_VALIDATOR).defaultValue("65507 B").required(true).build();
    public static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new PropertyDescriptor.Builder().name("Max Size of Socket Buffer").description("The maximum size of the socket buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before the data can be read, and incoming data will be dropped.").addValidator(StandardValidators.DATA_SIZE_VALIDATOR).defaultValue("1 MB").required(true).build();
    public static final PropertyDescriptor MAX_MESSAGE_QUEUE_SIZE = new PropertyDescriptor.Builder().name("Max Size of Message Queue").description("The maximum size of the internal queue used to buffer messages being transferred from the underlying channel to the processor. Setting this value higher allows more messages to be buffered in memory during surges of incoming messages, but increases the total memory used by the processor.").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("10000").required(true).build();
    public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder().name("Max Number of TCP Connections").description("The maximum number of concurrent TCP connections to accept.").addValidator(StandardValidators.createLongValidator((long)1L, (long)65535L, (boolean)true)).defaultValue("2").required(true).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Messages received successfully will be sent out this relationship.").build();
    public static final int POLL_TIMEOUT_MS = 20;
    protected Set<Relationship> relationships;
    protected List<PropertyDescriptor> descriptors;
    protected volatile int port;
    protected volatile Charset charset;
    protected volatile ChannelDispatcher dispatcher;
    protected volatile BlockingQueue<E> events;
    protected volatile BlockingQueue<E> errorEvents = new LinkedBlockingQueue();

    protected void init(ProcessorInitializationContext context) {
        ArrayList<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(ListenerProperties.NETWORK_INTF_NAME);
        descriptors.add(PORT);
        descriptors.add(RECV_BUFFER_SIZE);
        descriptors.add(MAX_MESSAGE_QUEUE_SIZE);
        descriptors.add(MAX_SOCKET_BUFFER_SIZE);
        descriptors.add(CHARSET);
        descriptors.addAll(this.getAdditionalProperties());
        this.descriptors = Collections.unmodifiableList(descriptors);
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        relationships.addAll(this.getAdditionalRelationships());
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    protected List<Relationship> getAdditionalRelationships() {
        return Collections.EMPTY_LIST;
    }

    protected List<PropertyDescriptor> getAdditionalProperties() {
        return Collections.EMPTY_LIST;
    }

    public final Set<Relationship> getRelationships() {
        return this.relationships;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.descriptors;
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) throws IOException {
        this.charset = Charset.forName(context.getProperty(CHARSET).getValue());
        this.port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
        this.events = new LinkedBlockingQueue(context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger());
        String nicIPAddressStr = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
        int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
        InetAddress nicIPAddress = null;
        if (!StringUtils.isEmpty((CharSequence)nicIPAddressStr)) {
            NetworkInterface netIF = NetworkInterface.getByName(nicIPAddressStr);
            nicIPAddress = netIF.getInetAddresses().nextElement();
        }
        this.dispatcher = this.createDispatcher(context, this.events);
        this.dispatcher.open(nicIPAddress, this.port, maxChannelBufferSize);
        Thread readerThread = new Thread(this.dispatcher);
        readerThread.setName(((Object)((Object)this)).getClass().getName() + " [" + this.getIdentifier() + "]");
        readerThread.setDaemon(true);
        readerThread.start();
    }

    protected abstract ChannelDispatcher createDispatcher(ProcessContext var1, BlockingQueue<E> var2) throws IOException;

    public final int getDispatcherPort() {
        return this.dispatcher == null ? 0 : this.dispatcher.getPort();
    }

    public int getErrorQueueSize() {
        return this.errorEvents.size();
    }

    public int getQueueSize() {
        return this.events == null ? 0 : this.events.size();
    }

    @OnUnscheduled
    public void onUnscheduled() {
        if (this.dispatcher != null) {
            this.dispatcher.close();
        }
    }

    protected BlockingQueue<ByteBuffer> createBufferPool(int poolSize, int bufferSize) {
        LinkedBlockingQueue<ByteBuffer> bufferPool = new LinkedBlockingQueue<ByteBuffer>(poolSize);
        for (int i = 0; i < poolSize; ++i) {
            bufferPool.offer(ByteBuffer.allocate(bufferSize));
        }
        return bufferPool;
    }

    protected E getMessage(boolean longPoll, boolean pollErrorQueue, ProcessSession session) {
        Event event = null;
        if (pollErrorQueue) {
            event = (Event)this.errorEvents.poll();
        }
        if (event != null) {
            return (E)event;
        }
        try {
            event = longPoll ? (Event)this.events.poll(this.getLongPollTimeout(), TimeUnit.MILLISECONDS) : (Event)this.events.poll();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        }
        if (event != null) {
            session.adjustCounter("Messages Received", 1L, false);
        }
        return (E)event;
    }

    protected long getLongPollTimeout() {
        return 20L;
    }
}

