/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.api;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.avro.ipc.CallFuture;
import org.apache.avro.ipc.Callback;
import org.apache.avro.ipc.NettyTransceiver;
import org.apache.avro.ipc.Transceiver;
import org.apache.avro.ipc.specific.SpecificRequestor;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.api.AbstractRpcClient;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientConfigurationConstants;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.flume.source.avro.AvroSourceProtocol;
import org.apache.flume.source.avro.Status;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyAvroRpcClient
extends AbstractRpcClient
implements RpcClient {
    private ExecutorService callTimeoutPool;
    private final ReentrantLock stateLock = new ReentrantLock();
    private ConnState connState;
    private InetSocketAddress address;
    private Transceiver transceiver;
    private AvroSourceProtocol.Callback avroClient;
    private static final Logger logger = LoggerFactory.getLogger(NettyAvroRpcClient.class);

    protected NettyAvroRpcClient() {
    }

    private void connect() throws FlumeException {
        this.connect(this.connectTimeout, TimeUnit.MILLISECONDS);
    }

    private void connect(long timeout, TimeUnit tu) throws FlumeException {
        this.callTimeoutPool = Executors.newCachedThreadPool(new TransceiverThreadFactory("Flume Avro RPC Client Call Invoker"));
        try {
            this.transceiver = new NettyTransceiver(this.address, (ChannelFactory)new NioClientSocketChannelFactory((Executor)Executors.newCachedThreadPool(new TransceiverThreadFactory("Avro " + NettyTransceiver.class.getSimpleName() + " Boss")), (Executor)Executors.newCachedThreadPool(new TransceiverThreadFactory("Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker"))), Long.valueOf(tu.toMillis(timeout)));
            this.avroClient = (AvroSourceProtocol.Callback)SpecificRequestor.getClient(AvroSourceProtocol.Callback.class, (Transceiver)this.transceiver);
        }
        catch (IOException ex) {
            throw new FlumeException(this + ": RPC connection error", ex);
        }
        this.setState(ConnState.READY);
    }

    @Override
    public void close() throws FlumeException {
        if (this.callTimeoutPool != null) {
            this.callTimeoutPool.shutdown();
            while (!this.callTimeoutPool.isTerminated()) {
                try {
                    this.callTimeoutPool.awaitTermination(this.requestTimeout, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException ex) {
                    logger.warn(this + ": Interrupted during close", (Throwable)ex);
                    this.callTimeoutPool.shutdownNow();
                    Thread.currentThread().interrupt();
                    break;
                }
            }
            this.callTimeoutPool = null;
        }
        try {
            this.transceiver.close();
        }
        catch (IOException ex) {
            throw new FlumeException(this + ": Error closing transceiver.", ex);
        }
        finally {
            this.setState(ConnState.DEAD);
        }
    }

    public String toString() {
        return "NettyAvroRpcClient { host: " + this.address.getHostName() + ", port: " + this.address.getPort() + " }";
    }

    @Override
    public void append(Event event) throws EventDeliveryException {
        try {
            this.append(event, this.requestTimeout, TimeUnit.MILLISECONDS);
        }
        catch (Throwable t) {
            this.setState(ConnState.DEAD);
            if (t instanceof Error) {
                throw (Error)t;
            }
            if (t instanceof TimeoutException) {
                throw new EventDeliveryException(this + ": Failed to send event. " + "RPC request timed out after " + this.requestTimeout + "ms", t);
            }
            throw new EventDeliveryException(this + ": Failed to send event", t);
        }
    }

    private void append(Event event, long timeout, TimeUnit tu) throws EventDeliveryException {
        Future<Void> handshake;
        this.assertReady();
        final CallFuture callFuture = new CallFuture();
        final AvroFlumeEvent avroEvent = new AvroFlumeEvent();
        avroEvent.setBody(ByteBuffer.wrap(event.getBody()));
        avroEvent.setHeaders(NettyAvroRpcClient.toCharSeqMap(event.getHeaders()));
        try {
            handshake = this.callTimeoutPool.submit(new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    NettyAvroRpcClient.this.avroClient.append(avroEvent, (Callback<Status>)callFuture);
                    return null;
                }
            });
        }
        catch (RejectedExecutionException ex) {
            throw new EventDeliveryException(this + ": Executor error", ex);
        }
        try {
            handshake.get(this.connectTimeout, TimeUnit.MILLISECONDS);
        }
        catch (TimeoutException ex) {
            throw new EventDeliveryException(this + ": Handshake timed out after " + this.connectTimeout + " ms", ex);
        }
        catch (InterruptedException ex) {
            throw new EventDeliveryException(this + ": Interrupted in handshake", ex);
        }
        catch (ExecutionException ex) {
            throw new EventDeliveryException(this + ": RPC request exception", ex);
        }
        catch (CancellationException ex) {
            throw new EventDeliveryException(this + ": RPC request cancelled", ex);
        }
        finally {
            if (!handshake.isDone()) {
                handshake.cancel(true);
            }
        }
        this.waitForStatusOK((CallFuture<Status>)callFuture, timeout, tu);
    }

    @Override
    public void appendBatch(List<Event> events) throws EventDeliveryException {
        try {
            this.appendBatch(events, this.requestTimeout, TimeUnit.MILLISECONDS);
        }
        catch (Throwable t) {
            this.setState(ConnState.DEAD);
            if (t instanceof Error) {
                throw (Error)t;
            }
            if (t instanceof TimeoutException) {
                throw new EventDeliveryException(this + ": Failed to send event. " + "RPC request timed out after " + this.requestTimeout + " ms", t);
            }
            throw new EventDeliveryException(this + ": Failed to send batch", t);
        }
    }

    private void appendBatch(List<Event> events, long timeout, TimeUnit tu) throws EventDeliveryException {
        this.assertReady();
        Iterator<Event> iter = events.iterator();
        final LinkedList<AvroFlumeEvent> avroEvents = new LinkedList<AvroFlumeEvent>();
        while (iter.hasNext()) {
            Future<Void> handshake;
            avroEvents.clear();
            for (int i = 0; i < this.batchSize && iter.hasNext(); ++i) {
                Event event = iter.next();
                AvroFlumeEvent avroEvent = new AvroFlumeEvent();
                avroEvent.setBody(ByteBuffer.wrap(event.getBody()));
                avroEvent.setHeaders(NettyAvroRpcClient.toCharSeqMap(event.getHeaders()));
                avroEvents.add(avroEvent);
            }
            final CallFuture callFuture = new CallFuture();
            try {
                handshake = this.callTimeoutPool.submit(new Callable<Void>(){

                    @Override
                    public Void call() throws Exception {
                        NettyAvroRpcClient.this.avroClient.appendBatch(avroEvents, (Callback<Status>)callFuture);
                        return null;
                    }
                });
            }
            catch (RejectedExecutionException ex) {
                throw new EventDeliveryException(this + ": Executor error", ex);
            }
            try {
                handshake.get(this.connectTimeout, TimeUnit.MILLISECONDS);
            }
            catch (TimeoutException ex) {
                throw new EventDeliveryException(this + ": Handshake timed out after " + this.connectTimeout + "ms", ex);
            }
            catch (InterruptedException ex) {
                throw new EventDeliveryException(this + ": Interrupted in handshake", ex);
            }
            catch (ExecutionException ex) {
                throw new EventDeliveryException(this + ": RPC request exception", ex);
            }
            catch (CancellationException ex) {
                throw new EventDeliveryException(this + ": RPC request cancelled", ex);
            }
            finally {
                if (!handshake.isDone()) {
                    handshake.cancel(true);
                }
            }
            this.waitForStatusOK((CallFuture<Status>)callFuture, timeout, tu);
        }
    }

    private void waitForStatusOK(CallFuture<Status> callFuture, long timeout, TimeUnit tu) throws EventDeliveryException {
        try {
            Status status = (Status)((Object)callFuture.get(timeout, tu));
            if (status != Status.OK) {
                throw new EventDeliveryException(this + ": Avro RPC call returned " + "Status: " + (Object)((Object)status));
            }
        }
        catch (CancellationException ex) {
            throw new EventDeliveryException(this + ": RPC future was cancelled", ex);
        }
        catch (ExecutionException ex) {
            throw new EventDeliveryException(this + ": Exception thrown from " + "remote handler", ex);
        }
        catch (TimeoutException ex) {
            throw new EventDeliveryException(this + ": RPC request timed out", ex);
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
            throw new EventDeliveryException(this + ": RPC request interrupted", ex);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setState(ConnState newState) {
        this.stateLock.lock();
        try {
            if (this.connState == ConnState.DEAD && this.connState != newState) {
                throw new IllegalStateException("Cannot transition from CLOSED state.");
            }
            this.connState = newState;
        }
        finally {
            this.stateLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void assertReady() throws EventDeliveryException {
        this.stateLock.lock();
        try {
            ConnState curState = this.connState;
            if (curState != ConnState.READY) {
                throw new EventDeliveryException("RPC failed, client in an invalid state: " + (Object)((Object)curState));
            }
        }
        finally {
            this.stateLock.unlock();
        }
    }

    private static Map<CharSequence, CharSequence> toCharSeqMap(Map<String, String> stringMap) {
        HashMap<CharSequence, CharSequence> charSeqMap = new HashMap<CharSequence, CharSequence>();
        for (Map.Entry<String, String> entry : stringMap.entrySet()) {
            charSeqMap.put(entry.getKey(), entry.getValue());
        }
        return charSeqMap;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isActive() {
        this.stateLock.lock();
        try {
            boolean bl = this.connState == ConnState.READY;
            return bl;
        }
        finally {
            this.stateLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void configure(Properties properties) throws FlumeException {
        String host;
        this.stateLock.lock();
        try {
            if (this.connState == ConnState.READY || this.connState == ConnState.DEAD) {
                throw new FlumeException("This client was already configured, cannot reconfigure.");
            }
        }
        finally {
            this.stateLock.unlock();
        }
        String strBatchSize = properties.getProperty("batch-size");
        logger.debug("Batch size string = " + strBatchSize);
        this.batchSize = RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE;
        if (strBatchSize != null && !strBatchSize.isEmpty()) {
            try {
                int parsedBatch = Integer.parseInt(strBatchSize);
                if (parsedBatch < 1) {
                    logger.warn("Invalid value for batchSize: {}; Using default value.", (Object)parsedBatch);
                } else {
                    this.batchSize = parsedBatch;
                }
            }
            catch (NumberFormatException e) {
                logger.warn("Batchsize is not valid for RpcClient: " + strBatchSize + ". Default value assigned.", (Throwable)e);
            }
        }
        String hostNames = properties.getProperty("hosts");
        String[] hosts = null;
        if (hostNames == null || hostNames.isEmpty()) {
            throw new FlumeException("Hosts list is invalid: " + hostNames);
        }
        hosts = hostNames.split("\\s+");
        if (hosts.length > 1) {
            logger.warn("More than one hosts are specified for the default client. Only the first host will be used and others ignored. Specified: " + hostNames + "; to be used: " + hosts[0]);
        }
        if ((host = properties.getProperty("hosts." + hosts[0])) == null || host.isEmpty()) {
            throw new FlumeException("Host not found: " + hosts[0]);
        }
        String[] hostAndPort = host.split(":");
        if (hostAndPort.length != 2) {
            throw new FlumeException("Invalid hostname: " + hosts[0]);
        }
        Integer port = null;
        try {
            port = Integer.parseInt(hostAndPort[1]);
        }
        catch (NumberFormatException e) {
            throw new FlumeException("Invalid Port: " + hostAndPort[1], e);
        }
        this.address = new InetSocketAddress(hostAndPort[0], (int)port);
        this.connectTimeout = RpcClientConfigurationConstants.DEFAULT_CONNECT_TIMEOUT_MILLIS;
        String strConnTimeout = properties.getProperty("connect-timeout");
        if (strConnTimeout != null && strConnTimeout.trim().length() > 0) {
            try {
                this.connectTimeout = Long.parseLong(strConnTimeout);
                if (this.connectTimeout < 1000L) {
                    logger.warn("Connection timeout specified less than 1s. Using default value instead.");
                    this.connectTimeout = RpcClientConfigurationConstants.DEFAULT_CONNECT_TIMEOUT_MILLIS;
                }
            }
            catch (NumberFormatException ex) {
                logger.error("Invalid connect timeout specified: " + strConnTimeout);
            }
        }
        this.requestTimeout = RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS;
        String strReqTimeout = properties.getProperty("request-timeout");
        if (strReqTimeout != null && strReqTimeout.trim().length() > 0) {
            try {
                this.requestTimeout = Long.parseLong(strReqTimeout);
                if (this.requestTimeout < 1000L) {
                    logger.warn("Request timeout specified less than 1s. Using default value instead.");
                    this.requestTimeout = RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS;
                }
            }
            catch (NumberFormatException ex) {
                logger.error("Invalid request timeout specified: " + strReqTimeout);
            }
        }
        this.connect();
    }

    private static class TransceiverThreadFactory
    implements ThreadFactory {
        private final AtomicInteger threadId = new AtomicInteger(0);
        private final String prefix;

        public TransceiverThreadFactory(String prefix) {
            this.prefix = prefix;
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName(this.prefix + " " + this.threadId.incrementAndGet());
            return thread;
        }
    }

    private static enum ConnState {
        INIT,
        READY,
        DEAD;

    }
}

