/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.source;

import java.io.IOException;
import java.io.Reader;
import java.io.Writer;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.CounterGroup;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.FlumeException;
import org.apache.flume.Source;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark-project.guava.base.Charsets;
import org.spark-project.guava.util.concurrent.ThreadFactoryBuilder;

public class NetcatSource
extends AbstractSource
implements Configurable,
EventDrivenSource {
    private static final Logger logger = LoggerFactory.getLogger(NetcatSource.class);
    private String hostName;
    private int port = 0;
    private int maxLineLength;
    private boolean ackEveryEvent;
    private CounterGroup counterGroup = new CounterGroup();
    private ServerSocketChannel serverSocket;
    private AtomicBoolean acceptThreadShouldStop = new AtomicBoolean(false);
    private Thread acceptThread;
    private ExecutorService handlerService;

    @Override
    public void configure(Context context) {
        String hostKey = "bind";
        String portKey = "port";
        String ackEventKey = "ack-every-event";
        Configurables.ensureRequiredNonNull(context, hostKey, portKey);
        this.hostName = context.getString(hostKey);
        this.port = context.getInteger(portKey);
        this.ackEveryEvent = context.getBoolean(ackEventKey, true);
        this.maxLineLength = context.getInteger("max-line-length", 512);
    }

    @Override
    public void start() {
        logger.info("Source starting");
        this.counterGroup.incrementAndGet("open.attempts");
        this.handlerService = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("netcat-handler-%d").build());
        try {
            InetSocketAddress bindPoint = new InetSocketAddress(this.hostName, this.port);
            this.serverSocket = ServerSocketChannel.open();
            this.serverSocket.socket().setReuseAddress(true);
            this.serverSocket.socket().bind(bindPoint);
            logger.info("Created serverSocket:{}", (Object)this.serverSocket);
        }
        catch (IOException e) {
            this.counterGroup.incrementAndGet("open.errors");
            logger.error("Unable to bind to socket. Exception follows.", (Throwable)e);
            throw new FlumeException(e);
        }
        AcceptHandler acceptRunnable = new AcceptHandler(this.maxLineLength);
        this.acceptThreadShouldStop.set(false);
        acceptRunnable.counterGroup = this.counterGroup;
        acceptRunnable.handlerService = this.handlerService;
        acceptRunnable.shouldStop = this.acceptThreadShouldStop;
        acceptRunnable.ackEveryEvent = this.ackEveryEvent;
        acceptRunnable.source = this;
        acceptRunnable.serverSocket = this.serverSocket;
        this.acceptThread = new Thread(acceptRunnable);
        this.acceptThread.start();
        logger.debug("Source started");
        super.start();
    }

    @Override
    public void stop() {
        logger.info("Source stopping");
        this.acceptThreadShouldStop.set(true);
        if (this.acceptThread != null) {
            logger.debug("Stopping accept handler thread");
            while (this.acceptThread.isAlive()) {
                try {
                    logger.debug("Waiting for accept handler to finish");
                    this.acceptThread.interrupt();
                    this.acceptThread.join(500L);
                }
                catch (InterruptedException e) {
                    logger.debug("Interrupted while waiting for accept handler to finish");
                    Thread.currentThread().interrupt();
                }
            }
            logger.debug("Stopped accept handler thread");
        }
        if (this.serverSocket != null) {
            try {
                this.serverSocket.close();
            }
            catch (IOException e) {
                logger.error("Unable to close socket. Exception follows.", (Throwable)e);
                return;
            }
        }
        if (this.handlerService != null) {
            this.handlerService.shutdown();
            logger.debug("Waiting for handler service to stop");
            try {
                this.handlerService.awaitTermination(500L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                logger.debug("Interrupted while waiting for netcat handler service to stop");
                Thread.currentThread().interrupt();
            }
            if (!this.handlerService.isShutdown()) {
                this.handlerService.shutdownNow();
            }
            logger.debug("Handler service stopped");
        }
        logger.debug("Source stopped. Event metrics:{}", (Object)this.counterGroup);
        super.stop();
    }

    private static class NetcatSocketHandler
    implements Runnable {
        private Source source;
        private CounterGroup counterGroup;
        private SocketChannel socketChannel;
        private boolean ackEveryEvent;
        private final int maxLineLength;

        public NetcatSocketHandler(int maxLineLength) {
            this.maxLineLength = maxLineLength;
        }

        @Override
        public void run() {
            logger.debug("Starting connection handler");
            Object event = null;
            try {
                block3: {
                    int eventsProcessed;
                    int charsRead;
                    Reader reader = Channels.newReader((ReadableByteChannel)this.socketChannel, "utf-8");
                    Writer writer = Channels.newWriter((WritableByteChannel)this.socketChannel, "utf-8");
                    CharBuffer buffer = CharBuffer.allocate(this.maxLineLength);
                    buffer.flip();
                    do {
                        charsRead = this.fill(buffer, reader);
                        logger.debug("Chars read = {}", (Object)charsRead);
                        eventsProcessed = this.processEvents(buffer, writer);
                        logger.debug("Events processed = {}", (Object)eventsProcessed);
                        if (charsRead == -1) break block3;
                    } while (charsRead != 0 || eventsProcessed != 0 || buffer.remaining() != buffer.capacity());
                    logger.warn("Client sent event exceeding the maximum length");
                    this.counterGroup.incrementAndGet("events.failed");
                    writer.write("FAILED: Event exceeds the maximum length (" + buffer.capacity() + " chars, including newline)\n");
                    writer.flush();
                }
                this.socketChannel.close();
                this.counterGroup.incrementAndGet("sessions.completed");
            }
            catch (IOException e) {
                this.counterGroup.incrementAndGet("sessions.broken");
            }
            logger.debug("Connection handler exiting");
        }

        private int processEvents(CharBuffer buffer, Writer writer) throws IOException {
            int numProcessed = 0;
            boolean foundNewLine = true;
            block2: while (foundNewLine) {
                foundNewLine = false;
                int limit = buffer.limit();
                for (int pos = buffer.position(); pos < limit; ++pos) {
                    if (buffer.get(pos) != '\n') continue;
                    buffer.limit(pos);
                    ByteBuffer bytes = Charsets.UTF_8.encode(buffer);
                    buffer.limit(limit);
                    byte[] body = new byte[bytes.remaining()];
                    bytes.get(body);
                    Event event = EventBuilder.withBody(body);
                    ChannelException ex = null;
                    try {
                        this.source.getChannelProcessor().processEvent(event);
                    }
                    catch (ChannelException chEx) {
                        ex = chEx;
                    }
                    if (ex == null) {
                        this.counterGroup.incrementAndGet("events.processed");
                        ++numProcessed;
                        if (this.ackEveryEvent) {
                            writer.write("OK\n");
                        }
                    } else {
                        this.counterGroup.incrementAndGet("events.failed");
                        logger.warn("Error processing event. Exception follows.", (Throwable)ex);
                        writer.write("FAILED: " + ex.getMessage() + "\n");
                    }
                    writer.flush();
                    buffer.position(pos + 1);
                    foundNewLine = true;
                    continue block2;
                }
            }
            return numProcessed;
        }

        private int fill(CharBuffer buffer, Reader reader) throws IOException {
            buffer.compact();
            int charsRead = reader.read(buffer);
            this.counterGroup.addAndGet("characters.received", Long.valueOf(charsRead));
            buffer.flip();
            return charsRead;
        }
    }

    private static class AcceptHandler
    implements Runnable {
        private ServerSocketChannel serverSocket;
        private CounterGroup counterGroup;
        private ExecutorService handlerService;
        private EventDrivenSource source;
        private AtomicBoolean shouldStop;
        private boolean ackEveryEvent;
        private final int maxLineLength;

        public AcceptHandler(int maxLineLength) {
            this.maxLineLength = maxLineLength;
        }

        @Override
        public void run() {
            logger.debug("Starting accept handler");
            while (!this.shouldStop.get()) {
                try {
                    SocketChannel socketChannel = this.serverSocket.accept();
                    NetcatSocketHandler request = new NetcatSocketHandler(this.maxLineLength);
                    request.socketChannel = socketChannel;
                    request.counterGroup = this.counterGroup;
                    request.source = this.source;
                    request.ackEveryEvent = this.ackEveryEvent;
                    this.handlerService.submit(request);
                    this.counterGroup.incrementAndGet("accept.succeeded");
                }
                catch (ClosedByInterruptException e) {
                }
                catch (IOException e) {
                    logger.error("Unable to accept connection. Exception follows.", (Throwable)e);
                    this.counterGroup.incrementAndGet("accept.failed");
                }
            }
            logger.debug("Accept handler exiting");
        }
    }
}

