/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.proto;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ByteString;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.auth.AuthToken;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.processor.RequestProcessor;
import org.apache.bookkeeper.proto.AuthHandler;
import org.apache.bookkeeper.proto.BKStats;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperProtocol;
import org.apache.bookkeeper.proto.GetBookieInfoProcessorV3;
import org.apache.bookkeeper.proto.LongPollReadEntryProcessorV3;
import org.apache.bookkeeper.proto.ReadEntryProcessor;
import org.apache.bookkeeper.proto.ReadEntryProcessorV3;
import org.apache.bookkeeper.proto.ReadLacProcessorV3;
import org.apache.bookkeeper.proto.RequestUtils;
import org.apache.bookkeeper.proto.ResponseBuilder;
import org.apache.bookkeeper.proto.WriteEntryProcessor;
import org.apache.bookkeeper.proto.WriteEntryProcessorV3;
import org.apache.bookkeeper.proto.WriteLacProcessorV3;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.tls.SecurityException;
import org.apache.bookkeeper.tls.SecurityHandlerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BookieRequestProcessor
implements RequestProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(BookieRequestProcessor.class);
    private final ServerConfiguration serverCfg;
    final Bookie bookie;
    private final OrderedExecutor readThreadPool;
    private final OrderedExecutor writeThreadPool;
    private final SecurityHandlerFactory shFactory;
    private final OrderedExecutor longPollThreadPool;
    private final OrderedExecutor highPriorityThreadPool;
    private final HashedWheelTimer requestTimer;
    private final BKStats bkStats = BKStats.getInstance();
    private final boolean statsEnabled;
    private final OpStatsLogger addRequestStats;
    private final OpStatsLogger addEntryStats;
    final OpStatsLogger readRequestStats;
    final OpStatsLogger readEntryStats;
    final OpStatsLogger fenceReadRequestStats;
    final OpStatsLogger fenceReadEntryStats;
    final OpStatsLogger fenceReadWaitStats;
    final OpStatsLogger readEntrySchedulingDelayStats;
    final OpStatsLogger longPollPreWaitStats;
    final OpStatsLogger longPollWaitStats;
    final OpStatsLogger longPollReadStats;
    final OpStatsLogger longPollReadRequestStats;
    final Counter readLastEntryNoEntryErrorCounter;
    final OpStatsLogger writeLacRequestStats;
    final OpStatsLogger writeLacStats;
    final OpStatsLogger readLacRequestStats;
    final OpStatsLogger readLacStats;
    final OpStatsLogger getBookieInfoRequestStats;
    final OpStatsLogger getBookieInfoStats;
    final OpStatsLogger channelWriteStats;

    public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie, StatsLogger statsLogger, SecurityHandlerFactory shFactory) throws SecurityException {
        this.serverCfg = serverCfg;
        this.bookie = bookie;
        this.readThreadPool = this.createExecutor(this.serverCfg.getNumReadWorkerThreads(), "BookieReadThreadPool", serverCfg.getMaxPendingReadRequestPerThread(), statsLogger);
        this.writeThreadPool = this.createExecutor(this.serverCfg.getNumAddWorkerThreads(), "BookieWriteThreadPool", serverCfg.getMaxPendingAddRequestPerThread(), statsLogger);
        if (serverCfg.getNumLongPollWorkerThreads() <= 0 && this.readThreadPool != null) {
            this.longPollThreadPool = this.readThreadPool;
        } else {
            int numThreads = this.serverCfg.getNumLongPollWorkerThreads();
            if (numThreads <= 0) {
                numThreads = Runtime.getRuntime().availableProcessors();
            }
            this.longPollThreadPool = this.createExecutor(numThreads, "BookieLongPollThread-" + serverCfg.getBookiePort(), -1, statsLogger);
        }
        this.highPriorityThreadPool = this.createExecutor(this.serverCfg.getNumHighPriorityWorkerThreads(), "BookieHighPriorityThread-" + serverCfg.getBookiePort(), -1, statsLogger);
        this.requestTimer = new HashedWheelTimer(new ThreadFactoryBuilder().setNameFormat("BookieRequestTimer-%d").build(), (long)this.serverCfg.getRequestTimerTickDurationMs(), TimeUnit.MILLISECONDS, this.serverCfg.getRequestTimerNumTicks());
        this.shFactory = shFactory;
        if (shFactory != null) {
            shFactory.init(SecurityHandlerFactory.NodeType.Server, serverCfg);
        }
        this.statsEnabled = serverCfg.isStatisticsEnabled();
        this.addEntryStats = statsLogger.getOpStatsLogger("ADD_ENTRY");
        this.addRequestStats = statsLogger.getOpStatsLogger("ADD_ENTRY_REQUEST");
        this.readEntryStats = statsLogger.getOpStatsLogger("READ_ENTRY");
        this.readRequestStats = statsLogger.getOpStatsLogger("READ_ENTRY_REQUEST");
        this.fenceReadEntryStats = statsLogger.getOpStatsLogger("READ_ENTRY_FENCE_READ");
        this.fenceReadRequestStats = statsLogger.getOpStatsLogger("READ_ENTRY_FENCE_REQUEST");
        this.fenceReadWaitStats = statsLogger.getOpStatsLogger("READ_ENTRY_FENCE_WAIT");
        this.readEntrySchedulingDelayStats = statsLogger.getOpStatsLogger("READ_ENTRY_SCHEDULING_DELAY");
        this.longPollPreWaitStats = statsLogger.getOpStatsLogger("READ_ENTRY_LONG_POLL_PRE_WAIT");
        this.longPollWaitStats = statsLogger.getOpStatsLogger("READ_ENTRY_LONG_POLL_WAIT");
        this.longPollReadStats = statsLogger.getOpStatsLogger("READ_ENTRY_LONG_POLL_READ");
        this.longPollReadRequestStats = statsLogger.getOpStatsLogger("READ_ENTRY_LONG_POLL_REQUEST");
        this.readLastEntryNoEntryErrorCounter = statsLogger.getCounter("READ_LAST_ENTRY_NOENTRY_ERROR");
        this.writeLacStats = statsLogger.getOpStatsLogger("WRITE_LAC");
        this.writeLacRequestStats = statsLogger.getOpStatsLogger("WRITE_LAC_REQUEST");
        this.readLacStats = statsLogger.getOpStatsLogger("READ_LAC");
        this.readLacRequestStats = statsLogger.getOpStatsLogger("READ_LAC_REQUEST");
        this.getBookieInfoStats = statsLogger.getOpStatsLogger("GET_BOOKIE_INFO");
        this.getBookieInfoRequestStats = statsLogger.getOpStatsLogger("GET_BOOKIE_INFO_REQUEST");
        this.channelWriteStats = statsLogger.getOpStatsLogger("CHANNEL_WRITE");
    }

    @Override
    public void close() {
        this.shutdownExecutor(this.writeThreadPool);
        this.shutdownExecutor(this.readThreadPool);
        if (this.serverCfg.getNumLongPollWorkerThreads() > 0 || this.readThreadPool == null) {
            this.shutdownExecutor(this.longPollThreadPool);
        }
        this.shutdownExecutor(this.highPriorityThreadPool);
    }

    private OrderedExecutor createExecutor(int numThreads, String nameFormat, int maxTasksInQueue, StatsLogger statsLogger) {
        if (numThreads <= 0) {
            return null;
        }
        return OrderedExecutor.newBuilder().numThreads(numThreads).name(nameFormat).traceTaskExecution(this.serverCfg.getEnableTaskExecutionStats()).statsLogger(statsLogger).maxTasksInQueue(maxTasksInQueue).build();
    }

    private void shutdownExecutor(OrderedExecutor service) {
        if (null != service) {
            service.shutdown();
        }
    }

    @Override
    public void processRequest(Object msg, Channel c) {
        if (msg instanceof BookkeeperProtocol.Request) {
            BookkeeperProtocol.Request r = (BookkeeperProtocol.Request)msg;
            BookkeeperProtocol.BKPacketHeader header = r.getHeader();
            switch (header.getOperation()) {
                case ADD_ENTRY: {
                    this.processAddRequestV3(r, c);
                    break;
                }
                case READ_ENTRY: {
                    this.processReadRequestV3(r, c);
                    break;
                }
                case AUTH: {
                    LOG.info("Ignoring auth operation from client {}", (Object)c.remoteAddress());
                    BookkeeperProtocol.AuthMessage message = BookkeeperProtocol.AuthMessage.newBuilder().setAuthPluginName("AuthDisabledPlugin").setPayload(ByteString.copyFrom((byte[])AuthToken.NULL.getData())).build();
                    BookkeeperProtocol.Response.Builder authResponse = BookkeeperProtocol.Response.newBuilder().setHeader(r.getHeader()).setStatus(BookkeeperProtocol.StatusCode.EOK).setAuthResponse(message);
                    c.writeAndFlush((Object)authResponse.build());
                    break;
                }
                case WRITE_LAC: {
                    this.processWriteLacRequestV3(r, c);
                    break;
                }
                case READ_LAC: {
                    this.processReadLacRequestV3(r, c);
                    break;
                }
                case GET_BOOKIE_INFO: {
                    this.processGetBookieInfoRequestV3(r, c);
                    break;
                }
                case START_TLS: {
                    this.processStartTLSRequestV3(r, c);
                    break;
                }
                default: {
                    LOG.info("Unknown operation type {}", (Object)header.getOperation());
                    BookkeeperProtocol.Response.Builder response = BookkeeperProtocol.Response.newBuilder().setHeader(r.getHeader()).setStatus(BookkeeperProtocol.StatusCode.EBADREQ);
                    c.writeAndFlush((Object)response.build());
                    if (this.statsEnabled) {
                        this.bkStats.getOpStats(2).incrementFailedOps();
                        break;
                    } else {
                        break;
                    }
                }
            }
        } else {
            BookieProtocol.Request r = (BookieProtocol.Request)msg;
            switch (r.getOpCode()) {
                case 1: {
                    Preconditions.checkArgument((boolean)(r instanceof BookieProtocol.ParsedAddRequest));
                    this.processAddRequest((BookieProtocol.ParsedAddRequest)r, c);
                    break;
                }
                case 2: {
                    Preconditions.checkArgument((boolean)(r instanceof BookieProtocol.ReadRequest));
                    this.processReadRequest((BookieProtocol.ReadRequest)r, c);
                    break;
                }
                case 3: {
                    LOG.info("Ignoring auth operation from client {}", (Object)c.remoteAddress());
                    BookkeeperProtocol.AuthMessage message = BookkeeperProtocol.AuthMessage.newBuilder().setAuthPluginName("AuthDisabledPlugin").setPayload(ByteString.copyFrom((byte[])AuthToken.NULL.getData())).build();
                    c.writeAndFlush((Object)new BookieProtocol.AuthResponse(2, message));
                    break;
                }
                default: {
                    LOG.error("Unknown op type {}, sending error", (Object)r.getOpCode());
                    c.writeAndFlush((Object)ResponseBuilder.buildErrorResponse(100, r));
                    if (!this.statsEnabled) break;
                    this.bkStats.getOpStats(2).incrementFailedOps();
                }
            }
        }
    }

    private void processWriteLacRequestV3(BookkeeperProtocol.Request r, Channel c) {
        WriteLacProcessorV3 writeLac = new WriteLacProcessorV3(r, c, this);
        if (null == this.writeThreadPool) {
            writeLac.run();
        } else {
            this.writeThreadPool.executeOrdered(r.getAddRequest().getLedgerId(), (SafeRunnable)writeLac);
        }
    }

    private void processReadLacRequestV3(BookkeeperProtocol.Request r, Channel c) {
        ReadLacProcessorV3 readLac = new ReadLacProcessorV3(r, c, this);
        if (null == this.readThreadPool) {
            readLac.run();
        } else {
            this.readThreadPool.executeOrdered(r.getAddRequest().getLedgerId(), (SafeRunnable)readLac);
        }
    }

    private void processAddRequestV3(BookkeeperProtocol.Request r, Channel c) {
        WriteEntryProcessorV3 write = new WriteEntryProcessorV3(r, c, this);
        OrderedExecutor threadPool = RequestUtils.isHighPriority(r) ? this.highPriorityThreadPool : this.writeThreadPool;
        if (null == threadPool) {
            write.run();
        } else {
            try {
                threadPool.executeOrdered(r.getAddRequest().getLedgerId(), (SafeRunnable)write);
            }
            catch (RejectedExecutionException e) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Failed to process request to add entry at {}:{}. Too many pending requests", (Object)r.getAddRequest().getLedgerId(), (Object)r.getAddRequest().getEntryId());
                }
                BookkeeperProtocol.AddResponse.Builder addResponse = BookkeeperProtocol.AddResponse.newBuilder().setLedgerId(r.getAddRequest().getLedgerId()).setEntryId(r.getAddRequest().getEntryId()).setStatus(BookkeeperProtocol.StatusCode.ETOOMANYREQUESTS);
                BookkeeperProtocol.Response.Builder response = BookkeeperProtocol.Response.newBuilder().setHeader(write.getHeader()).setStatus(addResponse.getStatus()).setAddResponse(addResponse);
                BookkeeperProtocol.Response resp = response.build();
                write.sendResponse(addResponse.getStatus(), resp, this.addRequestStats);
            }
        }
    }

    private void processReadRequestV3(BookkeeperProtocol.Request r, Channel c) {
        OrderedExecutor threadPool;
        ReadEntryProcessorV3 read;
        ExecutorService fenceThread;
        ExecutorService executorService = fenceThread = null == this.highPriorityThreadPool ? null : this.highPriorityThreadPool.chooseThread((Object)c);
        if (RequestUtils.isLongPollReadRequest(r.getReadRequest())) {
            ExecutorService lpThread = this.longPollThreadPool.chooseThread((Object)c);
            read = new LongPollReadEntryProcessorV3(r, c, this, fenceThread, lpThread, this.requestTimer);
            threadPool = this.longPollThreadPool;
        } else {
            read = new ReadEntryProcessorV3(r, c, this, fenceThread);
            boolean isHighPriority = RequestUtils.isHighPriority(r) || RequestUtils.hasFlag(r.getReadRequest(), BookkeeperProtocol.ReadRequest.Flag.FENCE_LEDGER);
            threadPool = isHighPriority ? this.highPriorityThreadPool : this.readThreadPool;
        }
        if (null == threadPool) {
            read.run();
        } else {
            try {
                threadPool.executeOrdered(r.getReadRequest().getLedgerId(), (SafeRunnable)read);
            }
            catch (RejectedExecutionException e) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Failed to process request to read entry at {}:{}. Too many pending requests", (Object)r.getReadRequest().getLedgerId(), (Object)r.getReadRequest().getEntryId());
                }
                BookkeeperProtocol.ReadResponse.Builder readResponse = BookkeeperProtocol.ReadResponse.newBuilder().setLedgerId(r.getReadRequest().getLedgerId()).setEntryId(r.getReadRequest().getEntryId()).setStatus(BookkeeperProtocol.StatusCode.ETOOMANYREQUESTS);
                BookkeeperProtocol.Response.Builder response = BookkeeperProtocol.Response.newBuilder().setHeader(read.getHeader()).setStatus(readResponse.getStatus()).setReadResponse(readResponse);
                BookkeeperProtocol.Response resp = response.build();
                read.sendResponse(readResponse.getStatus(), resp, this.readRequestStats);
            }
        }
    }

    private void processStartTLSRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
        BookkeeperProtocol.Response.Builder response = BookkeeperProtocol.Response.newBuilder();
        BookkeeperProtocol.BKPacketHeader.Builder header = BookkeeperProtocol.BKPacketHeader.newBuilder();
        header.setVersion(BookkeeperProtocol.ProtocolVersion.VERSION_THREE);
        header.setOperation(r.getHeader().getOperation());
        header.setTxnId(r.getHeader().getTxnId());
        response.setHeader(header.build());
        if (this.shFactory == null) {
            LOG.error("Got StartTLS request but TLS not configured");
            response.setStatus(BookkeeperProtocol.StatusCode.EBADREQ);
            c.writeAndFlush((Object)response.build());
        } else {
            final SslHandler sslHandler = this.shFactory.newTLSHandler();
            c.pipeline().addFirst("tls", (ChannelHandler)sslHandler);
            response.setStatus(BookkeeperProtocol.StatusCode.EOK);
            BookkeeperProtocol.StartTLSResponse.Builder builder = BookkeeperProtocol.StartTLSResponse.newBuilder();
            response.setStartTLSResponse(builder.build());
            sslHandler.handshakeFuture().addListener((GenericFutureListener)new GenericFutureListener<Future<Channel>>(){

                public void operationComplete(Future<Channel> future) throws Exception {
                    AuthHandler.ServerSideHandler authHandler = (AuthHandler.ServerSideHandler)c.pipeline().get(AuthHandler.ServerSideHandler.class);
                    authHandler.authProvider.onProtocolUpgrade();
                    if (future.isSuccess()) {
                        LOG.info("Session is protected by: {}", (Object)sslHandler.engine().getSession().getCipherSuite());
                    } else {
                        LOG.error("TLS Handshake failure: {}", future.cause());
                        BookkeeperProtocol.Response.Builder errResponse = BookkeeperProtocol.Response.newBuilder().setHeader(r.getHeader()).setStatus(BookkeeperProtocol.StatusCode.EIO);
                        c.writeAndFlush((Object)errResponse.build());
                        if (BookieRequestProcessor.this.statsEnabled) {
                            BookieRequestProcessor.this.bkStats.getOpStats(2).incrementFailedOps();
                        }
                    }
                }
            });
            c.writeAndFlush((Object)response.build());
        }
    }

    private void processGetBookieInfoRequestV3(BookkeeperProtocol.Request r, Channel c) {
        GetBookieInfoProcessorV3 getBookieInfo = new GetBookieInfoProcessorV3(r, c, this);
        if (null == this.readThreadPool) {
            getBookieInfo.run();
        } else {
            this.readThreadPool.submit((Runnable)getBookieInfo);
        }
    }

    private void processAddRequest(BookieProtocol.ParsedAddRequest r, Channel c) {
        WriteEntryProcessor write = WriteEntryProcessor.create(r, c, this);
        OrderedExecutor threadPool = r.isHighPriority() ? this.highPriorityThreadPool : this.writeThreadPool;
        if (null == threadPool) {
            write.run();
        } else {
            try {
                threadPool.executeOrdered(r.getLedgerId(), (SafeRunnable)write);
            }
            catch (RejectedExecutionException e) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Failed to process request to add entry at {}:{}. Too many pending requests", (Object)r.ledgerId, (Object)r.entryId);
                }
                write.sendResponse(106, ResponseBuilder.buildErrorResponse(106, r), this.addRequestStats);
            }
        }
    }

    private void processReadRequest(BookieProtocol.ReadRequest r, Channel c) {
        ReadEntryProcessor read = ReadEntryProcessor.create(r, c, this);
        OrderedExecutor threadPool = r.isHighPriority() || r.isFencing() ? this.highPriorityThreadPool : this.readThreadPool;
        if (null == threadPool) {
            read.run();
        } else {
            try {
                threadPool.executeOrdered(r.getLedgerId(), (SafeRunnable)read);
            }
            catch (RejectedExecutionException e) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Failed to process request to read entry at {}:{}. Too many pending requests", (Object)r.ledgerId, (Object)r.entryId);
                }
                read.sendResponse(106, ResponseBuilder.buildErrorResponse(106, r), this.readRequestStats);
            }
        }
    }

    ServerConfiguration getServerCfg() {
        return this.serverCfg;
    }

    Bookie getBookie() {
        return this.bookie;
    }

    OrderedExecutor getReadThreadPool() {
        return this.readThreadPool;
    }

    OrderedExecutor getWriteThreadPool() {
        return this.writeThreadPool;
    }

    SecurityHandlerFactory getShFactory() {
        return this.shFactory;
    }

    OrderedExecutor getLongPollThreadPool() {
        return this.longPollThreadPool;
    }

    OrderedExecutor getHighPriorityThreadPool() {
        return this.highPriorityThreadPool;
    }

    HashedWheelTimer getRequestTimer() {
        return this.requestTimer;
    }

    BKStats getBkStats() {
        return this.bkStats;
    }

    boolean isStatsEnabled() {
        return this.statsEnabled;
    }

    OpStatsLogger getAddRequestStats() {
        return this.addRequestStats;
    }

    OpStatsLogger getAddEntryStats() {
        return this.addEntryStats;
    }

    OpStatsLogger getReadRequestStats() {
        return this.readRequestStats;
    }

    OpStatsLogger getReadEntryStats() {
        return this.readEntryStats;
    }

    OpStatsLogger getFenceReadRequestStats() {
        return this.fenceReadRequestStats;
    }

    OpStatsLogger getFenceReadEntryStats() {
        return this.fenceReadEntryStats;
    }

    OpStatsLogger getFenceReadWaitStats() {
        return this.fenceReadWaitStats;
    }

    OpStatsLogger getReadEntrySchedulingDelayStats() {
        return this.readEntrySchedulingDelayStats;
    }

    OpStatsLogger getLongPollPreWaitStats() {
        return this.longPollPreWaitStats;
    }

    OpStatsLogger getLongPollWaitStats() {
        return this.longPollWaitStats;
    }

    OpStatsLogger getLongPollReadStats() {
        return this.longPollReadStats;
    }

    OpStatsLogger getLongPollReadRequestStats() {
        return this.longPollReadRequestStats;
    }

    Counter getReadLastEntryNoEntryErrorCounter() {
        return this.readLastEntryNoEntryErrorCounter;
    }

    OpStatsLogger getWriteLacRequestStats() {
        return this.writeLacRequestStats;
    }

    OpStatsLogger getWriteLacStats() {
        return this.writeLacStats;
    }

    OpStatsLogger getReadLacRequestStats() {
        return this.readLacRequestStats;
    }

    OpStatsLogger getReadLacStats() {
        return this.readLacStats;
    }

    OpStatsLogger getGetBookieInfoRequestStats() {
        return this.getBookieInfoRequestStats;
    }

    OpStatsLogger getGetBookieInfoStats() {
        return this.getBookieInfoStats;
    }

    OpStatsLogger getChannelWriteStats() {
        return this.channelWriteStats;
    }
}

