/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.org.apache.hadoop.hbase.ipc;

import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.util.StringUtils;
import org.apache.hudi.org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hudi.org.apache.hadoop.hbase.ipc.BufferChain;
import org.apache.hudi.org.apache.hadoop.hbase.ipc.RpcResponse;
import org.apache.hudi.org.apache.hadoop.hbase.ipc.SimpleRpcServer;
import org.apache.hudi.org.apache.hadoop.hbase.ipc.SimpleServerRpcConnection;
import org.apache.hudi.org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
class SimpleRpcServerResponder
extends Thread {
    private final SimpleRpcServer simpleRpcServer;
    private final Selector writeSelector;
    private final Set<SimpleServerRpcConnection> writingCons = Collections.newSetFromMap(new ConcurrentHashMap());

    SimpleRpcServerResponder(SimpleRpcServer simpleRpcServer) throws IOException {
        this.simpleRpcServer = simpleRpcServer;
        this.setName("RpcServer.responder");
        this.setDaemon(true);
        this.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER);
        this.writeSelector = Selector.open();
    }

    @Override
    public void run() {
        SimpleRpcServer.LOG.debug(this.getName() + ": starting");
        try {
            this.doRunLoop();
        }
        finally {
            SimpleRpcServer.LOG.info(this.getName() + ": stopping");
            try {
                this.writeSelector.close();
            }
            catch (IOException ioe) {
                SimpleRpcServer.LOG.error(this.getName() + ": couldn't close write selector", (Throwable)ioe);
            }
        }
    }

    private void registerWrites() {
        Iterator<SimpleServerRpcConnection> it = this.writingCons.iterator();
        while (it.hasNext()) {
            SimpleServerRpcConnection c = it.next();
            it.remove();
            SelectionKey sk = c.channel.keyFor(this.writeSelector);
            try {
                if (sk == null) {
                    try {
                        c.channel.register(this.writeSelector, 4, c);
                    }
                    catch (ClosedChannelException e) {
                        if (!SimpleRpcServer.LOG.isTraceEnabled()) continue;
                        SimpleRpcServer.LOG.trace("ignored", (Throwable)e);
                    }
                    continue;
                }
                sk.interestOps(4);
            }
            catch (CancelledKeyException e) {
                if (!SimpleRpcServer.LOG.isTraceEnabled()) continue;
                SimpleRpcServer.LOG.trace("ignored", (Throwable)e);
            }
        }
    }

    public void registerForWrite(SimpleServerRpcConnection c) {
        if (this.writingCons.add(c)) {
            this.writeSelector.wakeup();
        }
    }

    private void doRunLoop() {
        long lastPurgeTime = 0L;
        while (this.simpleRpcServer.running) {
            try {
                this.registerWrites();
                int keyCt = this.writeSelector.select(this.simpleRpcServer.purgeTimeout);
                if (keyCt == 0) continue;
                Set<SelectionKey> keys2 = this.writeSelector.selectedKeys();
                Iterator<SelectionKey> iter = keys2.iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    iter.remove();
                    try {
                        if (!key.isValid() || !key.isWritable()) continue;
                        this.doAsyncWrite(key);
                    }
                    catch (IOException e) {
                        SimpleRpcServer.LOG.debug(this.getName() + ": asyncWrite", (Throwable)e);
                    }
                }
                lastPurgeTime = this.purge(lastPurgeTime);
            }
            catch (OutOfMemoryError e) {
                if (this.simpleRpcServer.errorHandler != null) {
                    if (!this.simpleRpcServer.errorHandler.checkOOME(e)) continue;
                    SimpleRpcServer.LOG.info(this.getName() + ": exiting on OutOfMemoryError");
                    return;
                }
                SimpleRpcServer.LOG.warn(this.getName() + ": OutOfMemoryError in server select", (Throwable)e);
                try {
                    Thread.sleep(60000L);
                }
                catch (InterruptedException ex) {
                    SimpleRpcServer.LOG.debug("Interrupted while sleeping");
                    return;
                }
            }
            catch (Exception e) {
                SimpleRpcServer.LOG.warn(this.getName() + ": exception in Responder " + StringUtils.stringifyException((Throwable)e), (Throwable)e);
            }
        }
        SimpleRpcServer.LOG.info(this.getName() + ": stopped");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private long purge(long lastPurgeTime) {
        long now = System.currentTimeMillis();
        if (now < lastPurgeTime + this.simpleRpcServer.purgeTimeout) {
            return lastPurgeTime;
        }
        ArrayList<SimpleServerRpcConnection> conWithOldCalls = new ArrayList<SimpleServerRpcConnection>();
        Set<SelectionKey> set = this.writeSelector.keys();
        synchronized (set) {
            for (SelectionKey key : this.writeSelector.keys()) {
                SimpleServerRpcConnection connection = (SimpleServerRpcConnection)key.attachment();
                if (connection == null) {
                    throw new IllegalStateException("Coding error: SelectionKey key without attachment.");
                }
                if (connection.lastSentTime <= 0L || now <= connection.lastSentTime + this.simpleRpcServer.purgeTimeout) continue;
                conWithOldCalls.add(connection);
            }
        }
        for (SimpleServerRpcConnection connection : conWithOldCalls) {
            this.simpleRpcServer.closeConnection(connection);
        }
        return now;
    }

    private void doAsyncWrite(SelectionKey key) throws IOException {
        SimpleServerRpcConnection connection = (SimpleServerRpcConnection)key.attachment();
        if (connection == null) {
            throw new IOException("doAsyncWrite: no connection");
        }
        if (key.channel() != connection.channel) {
            throw new IOException("doAsyncWrite: bad channel");
        }
        if (this.processAllResponses(connection)) {
            try {
                key.interestOps(0);
            }
            catch (CancelledKeyException e) {
                SimpleRpcServer.LOG.warn("Exception while changing ops : " + e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean processResponse(SimpleServerRpcConnection conn, RpcResponse resp) throws IOException {
        boolean error = true;
        BufferChain buf = resp.getResponse();
        try {
            long numBytes = this.simpleRpcServer.channelWrite(conn.channel, buf);
            if (numBytes < 0L) {
                throw new HBaseIOException("Error writing on the socket " + conn);
            }
            error = false;
        }
        finally {
            if (error) {
                SimpleRpcServer.LOG.debug(conn + ": output error -- closing");
                resp.done();
                this.simpleRpcServer.closeConnection(conn);
            }
        }
        if (!buf.hasRemaining()) {
            resp.done();
            return true;
        }
        conn.lastSentTime = System.currentTimeMillis();
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean processAllResponses(SimpleServerRpcConnection connection) throws IOException {
        connection.responseWriteLock.lock();
        try {
            for (int i = 0; i < 20; ++i) {
                RpcResponse resp = connection.responseQueue.pollFirst();
                if (resp == null) {
                    boolean bl = true;
                    return bl;
                }
                if (this.processResponse(connection, resp)) continue;
                connection.responseQueue.addFirst(resp);
                boolean bl = false;
                return bl;
            }
        }
        finally {
            connection.responseWriteLock.unlock();
        }
        return connection.responseQueue.isEmpty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void doRespond(SimpleServerRpcConnection conn, RpcResponse resp) throws IOException {
        boolean added = false;
        if (conn.responseQueue.isEmpty() && conn.responseWriteLock.tryLock()) {
            try {
                if (conn.responseQueue.isEmpty()) {
                    if (this.processResponse(conn, resp)) {
                        return;
                    }
                    conn.responseQueue.addFirst(resp);
                    added = true;
                }
            }
            finally {
                conn.responseWriteLock.unlock();
            }
        }
        if (!added) {
            conn.responseQueue.addLast(resp);
        }
        this.registerForWrite(conn);
    }
}

