/*
 * Decompiled with CFR 0.152.
 */
package com.hsbc.cranker.mucranker;

import com.hsbc.cranker.mucranker.CrankerMuHandler;
import com.hsbc.cranker.mucranker.CrankerProtocolResponse;
import com.hsbc.cranker.mucranker.HeadersBuilder;
import com.hsbc.cranker.mucranker.ProxyInfo;
import com.hsbc.cranker.mucranker.ProxyListener;
import com.hsbc.cranker.mucranker.WebSocketFarmV3;
import io.muserver.AsyncHandle;
import io.muserver.BaseWebSocket;
import io.muserver.DoneCallback;
import io.muserver.HeaderNames;
import io.muserver.MuRequest;
import io.muserver.MuResponse;
import io.muserver.MuWebSocketSession;
import io.muserver.Mutils;
import io.muserver.RequestBodyListener;
import io.muserver.WebsocketSessionState;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.StringJoiner;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.ws.rs.WebApplicationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class RouterSocketV3
extends BaseWebSocket {
    static final byte MESSAGE_TYPE_DATA = 0;
    static final byte MESSAGE_TYPE_HEADER = 1;
    static final byte MESSAGE_TYPE_RST_STREAM = 3;
    static final byte MESSAGE_TYPE_WINDOW_UPDATE = 8;
    static final int ERROR_INTERNAL = 1;
    private static final Logger log = LoggerFactory.getLogger(RouterSocketV3.class);
    private static final List<String> RESPONSE_HEADERS_TO_NOT_SEND_BACK = Collections.singletonList("server");
    final String route;
    final String componentName;
    final String routerSocketID = UUID.randomUUID().toString();
    private final WebSocketFarmV3 webSocketFarmV3;
    private final String connectorInstanceID;
    private final List<ProxyListener> proxyListeners;
    private final boolean discardClientForwardedHeaders;
    private final boolean sendLegacyForwardedHeaders;
    private final String viaValue;
    private final Set<String> doNotProxy;
    private Runnable onReadyForAction;
    private InetSocketAddress remoteAddress;
    private boolean isRemoved;
    private final Map<Integer, RequestContext> contextMap = new ConcurrentHashMap<Integer, RequestContext>();
    private final AtomicInteger idMaker = new AtomicInteger(0);

    RouterSocketV3(String route, String componentName, WebSocketFarmV3 webSocketFarmV3, String remotePort, List<ProxyListener> proxyListeners, boolean discardClientForwardedHeaders, boolean sendLegacyForwardedHeaders, String viaValue, Set<String> doNotProxy) {
        this.webSocketFarmV3 = webSocketFarmV3;
        this.route = route;
        this.componentName = componentName;
        this.connectorInstanceID = remotePort;
        this.proxyListeners = proxyListeners;
        this.discardClientForwardedHeaders = discardClientForwardedHeaders;
        this.sendLegacyForwardedHeaders = sendLegacyForwardedHeaders;
        this.viaValue = viaValue;
        this.doNotProxy = doNotProxy;
        this.isRemoved = false;
    }

    public WebsocketSessionState state() {
        return super.state();
    }

    public Map<Integer, RequestContext> getContextMap() {
        return this.contextMap;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendRequestOverWebSocketV3(MuRequest clientRequest, MuResponse clientResponse) {
        final Integer requestId = this.idMaker.incrementAndGet();
        AsyncHandle asyncHandle = clientRequest.handleAsync();
        final RequestContext context = new RequestContext(requestId, clientRequest, clientResponse, asyncHandle);
        this.contextMap.put(requestId, context);
        asyncHandle.addResponseCompleteHandler(info -> {
            if (!info.completedSuccessfully()) {
                log.info("Client request did not complete successfully " + clientRequest);
                if (context.error == null) {
                    context.error = new IllegalStateException("Client request did not complete successfully.");
                }
                this.raiseCompletionEvent(context);
                if (!this.state().endState()) {
                    this.resetStream(context, 1, "Client early closed", DoneCallback.NoOp);
                }
            }
        });
        try {
            HeadersBuilder headers = new HeadersBuilder();
            CrankerMuHandler.setTargetRequestHeaders(clientRequest, headers, this.discardClientForwardedHeaders, this.sendLegacyForwardedHeaders, this.viaValue, this.doNotProxy);
            try {
                if (!this.proxyListeners.isEmpty()) {
                    for (ProxyListener proxyListener : this.proxyListeners) {
                        proxyListener.onBeforeProxyToTarget(context, headers.muHeaders());
                    }
                }
            }
            catch (WebApplicationException e) {
                CrankerMuHandler.handleWebApplicationException(e, clientResponse, asyncHandle);
                this.resetStream(context, 1, "Proxy listener error", DoneCallback.NoOp);
                return;
            }
            String headerText = CrankerMuHandler.createRequestLine(clientRequest) + "\n" + headers + "\n";
            if (clientRequest.headers().hasBody()) {
                for (ByteBuffer headerMessage : RouterSocketV3.headerMessages(requestId, true, false, headerText)) {
                    context.sendingBytes(headerMessage.remaining() - 6);
                    this.sendData(headerMessage, DoneCallback.NoOp);
                    context.fromClientBytes.addAndGet(headerMessage.remaining() - 6);
                }
                if (!this.proxyListeners.isEmpty()) {
                    for (ProxyListener proxyListener : this.proxyListeners) {
                        proxyListener.onAfterProxyToTargetHeadersSent(context, headers.muHeaders());
                    }
                }
                asyncHandle.setReadListener(new RequestBodyListener(){

                    public void onDataReceived(ByteBuffer buffer, DoneCallback callback) {
                        int remaining = buffer.remaining();
                        int position = buffer.position();
                        DoneCallback wrapper = error -> {
                            context2.fromClientBytes.addAndGet(remaining);
                            if (error != null) {
                                this.onError(error);
                                callback.onComplete(error);
                                return;
                            }
                            if (!RouterSocketV3.this.proxyListeners.isEmpty()) {
                                for (ProxyListener proxyListener : RouterSocketV3.this.proxyListeners) {
                                    proxyListener.onRequestBodyChunkSentToTarget(context, buffer.position(position));
                                }
                            }
                            context.flowControl(() -> {
                                try {
                                    callback.onComplete(null);
                                }
                                catch (Exception e) {
                                    this.onError(e);
                                }
                            });
                        };
                        try {
                            ByteBuffer data = RouterSocketV3.dataMessages(requestId, false, buffer);
                            context.sendingBytes(data.remaining() - 6);
                            RouterSocketV3.this.sendData(data, wrapper);
                        }
                        catch (Exception e) {
                            this.onError(e);
                        }
                    }

                    public void onComplete() {
                        try {
                            ByteBuffer endMessage = RouterSocketV3.dataMessages(requestId, true, null);
                            RouterSocketV3.this.sendData(endMessage, DoneCallback.NoOp);
                            if (!RouterSocketV3.this.proxyListeners.isEmpty()) {
                                for (ProxyListener proxyListener : RouterSocketV3.this.proxyListeners) {
                                    proxyListener.onRequestBodySentToTarget(context);
                                }
                            }
                        }
                        catch (Exception e) {
                            this.onError(e);
                        }
                    }

                    public void onError(Throwable t) {
                        try {
                            RouterSocketV3.this.notifyClientRequestError(context, t);
                            RouterSocketV3.this.resetStream(context, 1, "Client request body read error", DoneCallback.NoOp);
                        }
                        catch (Exception exception) {
                            // empty catch block
                        }
                    }
                });
            } else {
                for (ByteBuffer headerMessage : RouterSocketV3.headerMessages(requestId, true, true, headerText)) {
                    context.sendingBytes(headerMessage.remaining() - 6);
                    this.sendData(headerMessage, DoneCallback.NoOp);
                    context.fromClientBytes.addAndGet(headerMessage.remaining() - 6);
                }
                if (!this.proxyListeners.isEmpty()) {
                    for (ProxyListener proxyListener : this.proxyListeners) {
                        proxyListener.onAfterProxyToTargetHeadersSent(context, headers.muHeaders());
                        proxyListener.onRequestBodySentToTarget(context);
                    }
                }
            }
        }
        catch (WebApplicationException e) {
            CrankerMuHandler.handleWebApplicationException(e, clientResponse, asyncHandle);
            this.resetStream(context, 1001, "Going away", DoneCallback.NoOp);
        }
        catch (Throwable e) {
            CrankerMuHandler.handleException(clientRequest, clientResponse, asyncHandle, e);
            try {
                this.resetStream(context, 1001, "Going away", DoneCallback.NoOp);
            }
            catch (Throwable ei) {
                log.error("Fail to close crankedSocket, routerName=" + context.route() + ", routerSocketID=" + this.routerSocketID + " \n" + ei.getMessage());
            }
            finally {
                asyncHandle.complete();
            }
        }
    }

    private void sendData(ByteBuffer byteBuffer, DoneCallback doneCallback) {
        this.session().sendBinary(byteBuffer, doneCallback);
    }

    void socketSessionClose() {
        if (this.contextMap.isEmpty()) {
            try {
                MuWebSocketSession session = this.session();
                if (session != null) {
                    session.close(1001, "Going away");
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
    }

    void resetStream(RequestContext context, Integer errorCode, String message, DoneCallback doneCallback) {
        if (context != null && !context.state.isCompleted() && !context.isRstStreamSent) {
            ByteBuffer buffer = RouterSocketV3.rstMessage(context.requestId, errorCode, message);
            this.sendData(buffer, doneCallback);
            context.isRstStreamSent = true;
        }
        if (context != null) {
            this.contextMap.remove(context.requestId);
        }
    }

    public void onConnect(MuWebSocketSession session) throws Exception {
        super.onConnect(session);
        this.remoteAddress = session.remoteAddress();
        this.onReadyForAction.run();
    }

    public void onClientClosed(int statusCode, String reason) throws Exception {
        super.onClientClosed(statusCode, reason);
        if (!this.isRemoved) {
            this.webSocketFarmV3.removeWebSocket(this);
            this.isRemoved = true;
        }
        if (statusCode != 1000) {
            log.warn("websocket exceptional closed from client: statusCode={}, reason={}", (Object)statusCode, (Object)reason);
        }
        for (RequestContext context : this.contextMap.values()) {
            this.notifyClientRequestClose(context, statusCode);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyClientRequestClose(RequestContext context, int statusCode) {
        block13: {
            try {
                if (!this.proxyListeners.isEmpty()) {
                    for (ProxyListener proxyListener : this.proxyListeners) {
                        proxyListener.onResponseBodyChunkReceived(context);
                    }
                }
                if (context.response != null && !context.response.hasStartedSendingData()) {
                    if (statusCode == 1011) {
                        context.response.status(502);
                    } else if (statusCode == 1008) {
                        context.response.status(400);
                    }
                }
                if (context.asyncHandle == null) break block13;
                try {
                    if (statusCode == 1000) {
                        context.asyncHandle.complete();
                        break block13;
                    }
                    log.info("Closing client request early due to cranker wss connection close with status code {}", (Object)statusCode);
                    context.asyncHandle.complete((Throwable)new RuntimeException("Upstream Server Error"));
                }
                catch (IllegalStateException e) {
                    log.info("Tried to complete a request, but it is probably already closed.  routerName=" + this.route + ", routerSocketID=" + this.routerSocketID, (Throwable)e);
                }
            }
            finally {
                if (statusCode != 1000 && context.error == null) {
                    context.error = new IllegalStateException("Upstream server close with code " + statusCode);
                }
                this.raiseCompletionEvent(context);
                this.contextMap.remove(context.requestId);
            }
        }
    }

    private void raiseCompletionEvent(RequestContext context) {
        if (context != null && context.request != null && !this.proxyListeners.isEmpty()) {
            context.durationMillis = System.currentTimeMillis() - context.request.startTime();
            for (ProxyListener completionListener : this.proxyListeners) {
                try {
                    completionListener.onComplete(context);
                }
                catch (Exception e) {
                    log.warn("Error thrown by " + completionListener, (Throwable)e);
                }
            }
        }
    }

    public void onError(Throwable cause) throws Exception {
        super.onError(cause);
        if (!this.isRemoved) {
            this.webSocketFarmV3.removeWebSocket(this);
            this.isRemoved = true;
        }
        for (RequestContext context : this.contextMap.values()) {
            this.notifyClientRequestError(context, cause);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyClientRequestError(RequestContext context, Throwable cause) throws Exception {
        try {
            context.error = cause;
            if (cause instanceof TimeoutException) {
                if (context.response != null && !context.response.hasStartedSendingData()) {
                    String htmlBody = "The <code>" + Mutils.htmlEncode((String)this.route) + "</code> service did not respond in time.";
                    CrankerMuHandler.sendSimpleResponse(context.response, context.asyncHandle, 504, "504 Gateway Timeout", htmlBody);
                } else if (context.asyncHandle != null) {
                    log.info("Closing client request early due to timeout");
                    context.asyncHandle.complete(cause);
                }
            } else if (context.response != null && !context.response.hasStartedSendingData()) {
                String htmlBody = "The <code>" + Mutils.htmlEncode((String)this.route) + "</code> service error.";
                CrankerMuHandler.sendSimpleResponse(context.response, context.asyncHandle, 502, "502 Bad Gateway", htmlBody);
            } else if (context.asyncHandle != null) {
                log.info("Closing client request early due to cranker wss connection error", cause);
                context.asyncHandle.complete(cause);
            }
            this.raiseCompletionEvent(context);
        }
        catch (Throwable throwable) {
            this.raiseCompletionEvent(context);
            log.warn("stream error: requestId={}, target={}, error={}", new Object[]{context.requestId, context.request.uri(), cause.getMessage()});
            this.contextMap.remove(context.requestId);
            throw throwable;
        }
        log.warn("stream error: requestId={}, target={}, error={}", new Object[]{context.requestId, context.request.uri(), cause.getMessage()});
        this.contextMap.remove(context.requestId);
    }

    public void onText(String message, boolean isLast, DoneCallback doneCallback) {
    }

    private void handleHeaderMessage(RequestContext context, String content) {
        CrankerProtocolResponse protocolResponse = new CrankerProtocolResponse(content);
        context.response.status(protocolResponse.getStatus());
        RouterSocketV3.putHeadersTo(context.response, protocolResponse);
        try {
            if (!this.proxyListeners.isEmpty()) {
                for (ProxyListener proxyListener : this.proxyListeners) {
                    proxyListener.onBeforeRespondingToClient(context);
                    proxyListener.onAfterTargetToProxyHeadersReceived(context, protocolResponse.getStatus(), context.response.headers());
                }
            }
        }
        catch (WebApplicationException e) {
            CrankerMuHandler.handleWebApplicationException(e, context.response, context.asyncHandle);
        }
        context.toClientBytes.getAndAdd(content.length());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onBinary(ByteBuffer byteBuffer, boolean isLast, DoneCallback doneAndPullData, Runnable releaseBuffer) throws Exception {
        byte messageType = byteBuffer.get();
        byte flags = byteBuffer.get();
        Integer requestId = byteBuffer.getInt();
        RequestContext context = this.contextMap.get(requestId);
        if (context == null) {
            releaseBuffer.run();
            doneAndPullData.onComplete(null);
            return;
        }
        switch (messageType) {
            case 0: {
                boolean isEnd = (flags & 1) > 0;
                this.handleData(context, isLast, isEnd, byteBuffer, doneAndPullData, releaseBuffer);
                break;
            }
            case 1: {
                boolean isStreamEnd = (flags & 1) > 0;
                boolean isHeaderEnd = (flags & 4) > 0;
                int byteLength = byteBuffer.remaining();
                String content = StandardCharsets.UTF_8.decode(byteBuffer).toString();
                if (!isHeaderEnd) {
                    if (context.headerLineBuilder == null) {
                        context.headerLineBuilder = new StringBuilder();
                    }
                    context.headerLineBuilder.append(content);
                } else {
                    String fullContent = content;
                    if (context.headerLineBuilder != null) {
                        context.headerLineBuilder.append(content);
                        fullContent = context.headerLineBuilder.toString();
                    }
                    this.handleHeaderMessage(context, fullContent);
                }
                if (isStreamEnd) {
                    this.notifyClientRequestClose(context, 1000);
                }
                this.sendData(RouterSocketV3.windowUpdateMessage(requestId, byteLength), DoneCallback.NoOp);
                releaseBuffer.run();
                doneAndPullData.onComplete(null);
                break;
            }
            case 3: {
                try {
                    int errorCode = RouterSocketV3.getErrorCode(byteBuffer);
                    String message = RouterSocketV3.getErrorMessage(byteBuffer);
                    this.notifyClientRequestError(context, new RuntimeException(String.format("stream closed by connector, errorCode=%s, message=%s", errorCode, message)));
                    break;
                }
                catch (Throwable throwable) {
                    log.warn("exception on handling rst_stream", throwable);
                    break;
                }
                finally {
                    releaseBuffer.run();
                    doneAndPullData.onComplete(null);
                }
            }
            case 8: {
                int windowUpdate = byteBuffer.getInt();
                context.ackedBytes(windowUpdate);
                releaseBuffer.run();
                doneAndPullData.onComplete(null);
                break;
            }
            default: {
                log.info("not supported binary message byte {}", (Object)messageType);
                releaseBuffer.run();
                doneAndPullData.onComplete(null);
            }
        }
    }

    private static String getErrorMessage(ByteBuffer byteBuffer) {
        String message = "";
        if (byteBuffer.remaining() > 0) {
            byte[] bytes = new byte[byteBuffer.remaining()];
            byteBuffer.get(bytes);
            message = new String(bytes, StandardCharsets.UTF_8);
        }
        return message;
    }

    private static int getErrorCode(ByteBuffer byteBuffer) {
        return byteBuffer.remaining() >= 4 ? byteBuffer.getInt() : -1;
    }

    private void handleData(RequestContext context, boolean isLast, boolean isEnd, ByteBuffer byteBuffer, DoneCallback doneAndPullData, Runnable releaseBuffer) throws Exception {
        int len = byteBuffer.remaining();
        if (len == 0) {
            if (isEnd) {
                this.notifyClientRequestClose(context, 1000);
            }
            releaseBuffer.run();
            doneAndPullData.onComplete(null);
            return;
        }
        context.wssOnBinaryCallCount.incrementAndGet();
        WebsocketSessionState websocketState = this.state();
        if (websocketState.endState()) {
            if (isEnd) {
                this.notifyClientRequestClose(context, 1000);
            }
            releaseBuffer.run();
            doneAndPullData.onComplete((Throwable)new IllegalStateException("Received binary message from connector but state=" + websocketState));
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("routerName=" + this.route + ", routerSocketID=" + this.routerSocketID + ", sending " + len + " bytes to client");
        }
        doneAndPullData.onComplete(null);
        context.asyncHandle.write(byteBuffer, errorIfAny -> {
            try {
                if (errorIfAny == null) {
                    if (isEnd) {
                        this.notifyClientRequestClose(context, 1000);
                    }
                    context.toClientBytes.addAndGet(len);
                    this.sendData(RouterSocketV3.windowUpdateMessage(context.requestId, len), DoneCallback.NoOp);
                } else {
                    log.info("routerName=" + this.route + ", routerSocketID=" + this.routerSocketID + ", could not write to client response (maybe the user closed their browser) so will cancel the request. Error message: " + errorIfAny.getMessage());
                    context.error = errorIfAny;
                    context.asyncHandle.complete(errorIfAny);
                }
                if (!this.proxyListeners.isEmpty()) {
                    for (ProxyListener proxyListener : this.proxyListeners) {
                        proxyListener.onResponseBodyChunkReceivedFromTarget(context, byteBuffer);
                    }
                }
            }
            catch (Throwable throwable) {
                log.warn("something wrong after sending bytes to cranker", throwable);
            }
            finally {
                releaseBuffer.run();
            }
        });
    }

    public String connectorInstanceID() {
        return this.connectorInstanceID;
    }

    void setOnReadyForAction(Runnable onReadyForAction) {
        this.onReadyForAction = onReadyForAction;
    }

    public InetSocketAddress serviceAddress() {
        return this.remoteAddress;
    }

    public String getProtocol() {
        return "cranker_3.0";
    }

    private static void putHeadersTo(MuResponse response, CrankerProtocolResponse protocolResponse) {
        response.headers().remove("date");
        for (String line : protocolResponse.headers) {
            String header;
            String lowerHeader;
            int pos = line.indexOf(58);
            if (pos <= 0 || CrankerMuHandler.HOP_BY_HOP.contains(lowerHeader = (header = line.substring(0, pos)).toLowerCase()) || RESPONSE_HEADERS_TO_NOT_SEND_BACK.contains(lowerHeader)) continue;
            String value = line.substring(pos + 1);
            response.headers().add(lowerHeader, (Object)value);
        }
        List<String> customHopByHop = CrankerMuHandler.getCustomHopByHopHeaders(response.headers().get(HeaderNames.CONNECTION));
        for (String header : customHopByHop) {
            response.headers().remove(header);
        }
    }

    static ByteBuffer windowUpdateMessage(Integer requestId, Integer windowUpdate) {
        return ByteBuffer.allocate(10).put((byte)8).put((byte)0).putInt(requestId).putInt(windowUpdate).rewind();
    }

    static ByteBuffer rstMessage(Integer requestId, Integer errorCode, String message) {
        byte[] bytes = message.getBytes(StandardCharsets.UTF_8);
        return ByteBuffer.allocate(10 + bytes.length).put((byte)3).put((byte)0).putInt(requestId).putInt(errorCode).put(bytes).rewind();
    }

    static ByteBuffer[] headerMessages(Integer requestId, boolean isHeaderEnd, boolean isStreamEnd, String fullHeaderLine) {
        int chunkSize = 16000;
        if (fullHeaderLine.length() < 16000) {
            return new ByteBuffer[]{RouterSocketV3.headerMessage(requestId, isHeaderEnd, isStreamEnd, fullHeaderLine)};
        }
        ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
        for (int i = 0; i < fullHeaderLine.length(); i += 16000) {
            int endIndex = Math.min(fullHeaderLine.length(), i + 16000);
            boolean isLast = endIndex == fullHeaderLine.length();
            buffers.add(RouterSocketV3.headerMessage(requestId, isLast, isStreamEnd, fullHeaderLine.substring(i, endIndex)));
        }
        return buffers.toArray(new ByteBuffer[0]);
    }

    static ByteBuffer headerMessage(Integer requestId, boolean isHeaderEnd, boolean isStreamEnd, String headerLine) {
        int flags = 0;
        if (isStreamEnd) {
            flags |= 1;
        }
        if (isHeaderEnd) {
            flags |= 4;
        }
        byte[] bytes = headerLine.getBytes(StandardCharsets.UTF_8);
        return ByteBuffer.allocate(6 + bytes.length).put((byte)1).put((byte)flags).putInt(requestId).put(bytes).rewind();
    }

    static ByteBuffer dataMessages(Integer requestId, boolean isEnd, ByteBuffer buffer) {
        ByteBuffer message = ByteBuffer.allocate(6 + (buffer == null ? 0 : buffer.remaining())).put((byte)0).put((byte)(isEnd ? 1 : 0)).putInt(requestId);
        if (buffer != null) {
            message.put(buffer);
        }
        message.rewind();
        return message;
    }

    public class RequestContext
    implements ProxyInfo {
        private static final int WATER_MARK_HIGH = 65536;
        private static final int WATER_MARK_LOW = 16384;
        private final AtomicInteger wssReceivedAckBytes = new AtomicInteger(0);
        private final AtomicInteger isWssSending = new AtomicInteger(0);
        private final AtomicBoolean isWssWritable = new AtomicBoolean(true);
        private final AtomicBoolean isWssWriting = new AtomicBoolean(false);
        private final Queue<Runnable> wssWriteCallbacks = new ConcurrentLinkedQueue<Runnable>();
        final AtomicLong wssOnBinaryCallCount = new AtomicLong();
        public final Integer requestId;
        public final MuRequest request;
        public final MuResponse response;
        public final AsyncHandle asyncHandle;
        final AtomicLong fromClientBytes = new AtomicLong();
        final AtomicLong toClientBytes = new AtomicLong();
        long durationMillis = 0L;
        volatile Throwable error = null;
        volatile boolean isRstStreamSent = false;
        StreamState state = StreamState.OPEN;
        StringBuilder headerLineBuilder;

        public RequestContext(Integer requestId, MuRequest request, MuResponse response, AsyncHandle asyncHandle) {
            this.requestId = requestId;
            this.request = request;
            this.response = response;
            this.asyncHandle = asyncHandle;
        }

        void sendingBytes(int sendingBytes) {
            this.isWssSending.addAndGet(sendingBytes);
            if (this.isWssSending.get() > 65536) {
                this.isWssWritable.compareAndSet(true, false);
            }
        }

        void ackedBytes(int ack) {
            this.wssReceivedAckBytes.addAndGet(ack);
            this.isWssSending.addAndGet(-ack);
            if (this.isWssSending.get() < 16384 && this.isWssWritable.compareAndSet(false, true)) {
                this.writeItMaybe();
            }
        }

        void flowControl(Runnable runnable) {
            if (this.isWssWritable.get() && !this.isWssWriting.get()) {
                runnable.run();
            } else {
                this.wssWriteCallbacks.add(runnable);
                this.writeItMaybe();
            }
        }

        private void writeItMaybe() {
            if (this.isWssWritable.get() && !this.wssWriteCallbacks.isEmpty() && this.isWssWriting.compareAndSet(false, true)) {
                try {
                    Runnable current;
                    while (this.isWssWritable.get() && (current = this.wssWriteCallbacks.poll()) != null) {
                        current.run();
                    }
                }
                finally {
                    this.isWssWriting.set(false);
                    this.writeItMaybe();
                }
            }
        }

        @Override
        public boolean isCatchAll() {
            return "*".equals(RouterSocketV3.this.route);
        }

        @Override
        public String connectorInstanceID() {
            return RouterSocketV3.this.connectorInstanceID;
        }

        @Override
        public InetSocketAddress serviceAddress() {
            return RouterSocketV3.this.remoteAddress;
        }

        @Override
        public String route() {
            return RouterSocketV3.this.route;
        }

        @Override
        public MuRequest request() {
            return this.request;
        }

        @Override
        public MuResponse response() {
            return this.response;
        }

        @Override
        public long durationMillis() {
            return this.durationMillis;
        }

        @Override
        public long bytesReceived() {
            return this.fromClientBytes.get();
        }

        @Override
        public long bytesSent() {
            return this.toClientBytes.get();
        }

        @Override
        public long responseBodyFrames() {
            return this.wssOnBinaryCallCount.get();
        }

        @Override
        public Throwable errorIfAny() {
            return this.error;
        }

        @Override
        public long socketWaitInMillis() {
            return 0L;
        }

        public String toString() {
            return new StringJoiner(", ", RequestContext.class.getSimpleName() + "[", "]").add("wssReceivedAckBytes=" + this.wssReceivedAckBytes).add("isWssSending=" + this.isWssSending).add("isWssWritable=" + this.isWssWritable).add("wssWriteCallbacks=" + this.wssWriteCallbacks.size()).add("isWssWriting=" + this.isWssWriting).add("requestId=" + this.requestId).add("request=" + this.request).add("response=" + this.response).add("fromClientBytes=" + this.fromClientBytes).add("toClientBytes=" + this.toClientBytes).add("wssOnBinaryCallCount=" + this.wssOnBinaryCallCount).add("durationMillis=" + this.durationMillis).add("error=" + this.error).add("state=" + this.state).toString();
        }
    }

    public static enum StreamState {
        OPEN(false),
        HALF_CLOSE(false),
        CLOSED(true),
        ERROR(true);

        private final boolean isCompleted;

        private StreamState(boolean isCompleted) {
            this.isCompleted = isCompleted;
        }

        public boolean isCompleted() {
            return this.isCompleted;
        }
    }
}

