/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.execution.python;

import com.google.protobuf.ByteString;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.Serializable;
import java.net.ServerSocket;
import java.net.Socket;
import java.time.Duration;
import java.util.HashMap;
import org.apache.spark.internal.LogEntry;
import org.apache.spark.internal.LogEntry$;
import org.apache.spark.internal.LogKey;
import org.apache.spark.internal.LogKeys;
import org.apache.spark.internal.Logging;
import org.apache.spark.internal.MDC;
import org.apache.spark.sql.Encoders$;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.api.python.PythonSQLUtils$;
import org.apache.spark.sql.catalyst.analysis.Analyzer;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$;
import org.apache.spark.sql.execution.python.TransformWithStateInPandasStateServer$;
import org.apache.spark.sql.execution.streaming.ImplicitGroupingKeyTracker$;
import org.apache.spark.sql.execution.streaming.StatefulProcessorHandleImpl;
import org.apache.spark.sql.execution.streaming.StatefulProcessorHandleState$;
import org.apache.spark.sql.execution.streaming.state.StateMessage;
import org.apache.spark.sql.streaming.TTLConfig;
import org.apache.spark.sql.streaming.ValueState;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.slf4j.Logger;
import scala.Enumeration;
import scala.Function0;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.Tuple3;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;

@ScalaSignature(bytes="\u0006\u0005\t\rc\u0001B\u0011#\u0001=B\u0001\"\u0011\u0001\u0003\u0002\u0003\u0006IA\u0011\u0005\t\u0011\u0002\u0011\t\u0011)A\u0005\u0013\"Aq\n\u0001B\u0001B\u0003%\u0001\u000b\u0003\u0005W\u0001\t\u0005\t\u0015!\u0003X\u0011!i\u0006A!A!\u0002\u0013q\u0006bBA\r\u0001\u0011\u0005\u00111\u0004\u0005\n\u0003W\u0001!\u0019!C\u0005\u0003[A\u0001\"a\f\u0001A\u0003%\u0011\u0011\u0001\u0005\f\u0003c\u0001\u0001\u0019!a\u0001\n\u0013\t\u0019\u0004C\u0006\u0002<\u0001\u0001\r\u00111A\u0005\n\u0005u\u0002bCA%\u0001\u0001\u0007\t\u0011)Q\u0005\u0003kA\u0011\"a\u0013\u0001\u0001\u0004%I!!\u0014\t\u0013\u0005=\u0003\u00011A\u0005\n\u0005E\u0003bBA+\u0001\u0001\u0006Ka\u0016\u0005\n\u0003/\u0002!\u0019!C\u0005\u00033Bq!a\u0017\u0001A\u0003%a\fC\u0004\u0002^\u0001!\t!a\u0018\t\u000f\u0005\u0005\u0004\u0001\"\u0003\u0002d!9\u00111\u0013\u0001\u0005\n\u0005U\u0005bBAN\u0001\u0011%\u0011Q\u0014\u0005\t\u0003O\u0003A\u0011\u0001\u0014\u0002*\"9\u00111\u0017\u0001\u0005\n\u0005U\u0006\u0002CA`\u0001\u0011\u0005a%!1\t\u000f\u0005-\u0007\u0001\"\u0003\u0002N\"I\u0011Q\u001f\u0001\u0012\u0002\u0013%\u0011q\u001f\u0005\n\u0005\u001b\u0001\u0011\u0013!C\u0005\u0005\u001fAqAa\u0005\u0001\t\u0013\u0011)bB\u0005\u0003*\t\n\t\u0011#\u0001\u0003,\u0019A\u0011EIA\u0001\u0012\u0003\u0011i\u0003C\u0004\u0002\u001au!\tA!\u000e\t\u0013\t]R$%A\u0005\u0002\te\u0002\"\u0003B\u001f;E\u0005I\u0011\u0001B \u0005\u0015\"&/\u00198tM>\u0014XnV5uQN#\u0018\r^3J]B\u000bg\u000eZ1t'R\fG/Z*feZ,'O\u0003\u0002$I\u00051\u0001/\u001f;i_:T!!\n\u0014\u0002\u0013\u0015DXmY;uS>t'BA\u0014)\u0003\r\u0019\u0018\u000f\u001c\u0006\u0003S)\nQa\u001d9be.T!a\u000b\u0017\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005i\u0013aA8sO\u000e\u00011\u0003\u0002\u00011qm\u0002\"!\r\u001c\u000e\u0003IR!a\r\u001b\u0002\t1\fgn\u001a\u0006\u0002k\u0005!!.\u0019<b\u0013\t9$G\u0001\u0004PE*,7\r\u001e\t\u0003ceJ!A\u000f\u001a\u0003\u0011I+hN\\1cY\u0016\u0004\"\u0001P \u000e\u0003uR!A\u0010\u0015\u0002\u0011%tG/\u001a:oC2L!\u0001Q\u001f\u0003\u000f1{wmZ5oO\u0006\t2\u000f^1uKN+'O^3s'>\u001c7.\u001a;\u0011\u0005\r3U\"\u0001#\u000b\u0005\u0015#\u0014a\u00018fi&\u0011q\t\u0012\u0002\r'\u0016\u0014h/\u001a:T_\u000e\\W\r^\u0001\u0018gR\fG/\u001a4vYB\u0013xnY3tg>\u0014\b*\u00198eY\u0016\u0004\"AS'\u000e\u0003-S!\u0001\u0014\u0013\u0002\u0013M$(/Z1nS:<\u0017B\u0001(L\u0005m\u0019F/\u0019;fMVd\u0007K]8dKN\u001cxN\u001d%b]\u0012dW-S7qY\u0006\trM]8va&twmS3z'\u000eDW-\\1\u0011\u0005E#V\"\u0001*\u000b\u0005M3\u0013!\u0002;za\u0016\u001c\u0018BA+S\u0005)\u0019FO];diRK\b/Z\u0001\u0014_V$\b/\u001e;TiJ,\u0017-\u001c$peR+7\u000f\u001e\t\u00031nk\u0011!\u0017\u0006\u00035R\n!![8\n\u0005qK&\u0001\u0005#bi\u0006|U\u000f\u001e9viN#(/Z1n\u0003Q1\u0018\r\\;f'R\fG/Z'ba\u001a{'\u000fV3tiB!qL\u001a5t\u001b\u0005\u0001'BA1c\u0003\u001diW\u000f^1cY\u0016T!a\u00193\u0002\u0015\r|G\u000e\\3di&|gNC\u0001f\u0003\u0015\u00198-\u00197b\u0013\t9\u0007MA\u0004ICNDW*\u00199\u0011\u0005%\u0004hB\u00016o!\tYG-D\u0001m\u0015\tig&\u0001\u0004=e>|GOP\u0005\u0003_\u0012\fa\u0001\u0015:fI\u00164\u0017BA9s\u0005\u0019\u0019FO]5oO*\u0011q\u000e\u001a\t\u0007iV<\b+!\u0001\u000e\u0003\u0011L!A\u001e3\u0003\rQ+\b\u000f\\34!\rA(\u0010`\u0007\u0002s*\u0011AJJ\u0005\u0003wf\u0014!BV1mk\u0016\u001cF/\u0019;f!\tih0D\u0001'\u0013\tyhEA\u0002S_^\u0004R!a\u0001\u0002\u0014qtA!!\u0002\u0002\u00105\u0011\u0011q\u0001\u0006\u0005\u0003\u0013\tY!\u0001\u0005f]\u000e|G-\u001a:t\u0015\r\tiAJ\u0001\tG\u0006$\u0018\r\\=ti&!\u0011\u0011CA\u0004\u0003E)\u0005\u0010\u001d:fgNLwN\\#oG>$WM]\u0005\u0005\u0003+\t9B\u0001\u0007EKN,'/[1mSj,'O\u0003\u0003\u0002\u0012\u0005\u001d\u0011A\u0002\u001fj]&$h\b\u0006\u0007\u0002\u001e\u0005\u0005\u00121EA\u0013\u0003O\tI\u0003E\u0002\u0002 \u0001i\u0011A\t\u0005\u0006\u0003\u001a\u0001\rA\u0011\u0005\u0006\u0011\u001a\u0001\r!\u0013\u0005\u0006\u001f\u001a\u0001\r\u0001\u0015\u0005\b-\u001a\u0001\n\u00111\u0001X\u0011\u001dif\u0001%AA\u0002y\u000b!c[3z%><H)Z:fe&\fG.\u001b>feV\u0011\u0011\u0011A\u0001\u0014W\u0016L(k\\<EKN,'/[1mSj,'\u000fI\u0001\fS:\u0004X\u000f^*ue\u0016\fW.\u0006\u0002\u00026A\u0019\u0001,a\u000e\n\u0007\u0005e\u0012LA\bECR\f\u0017J\u001c9viN#(/Z1n\u0003=Ig\u000e];u'R\u0014X-Y7`I\u0015\fH\u0003BA \u0003\u000b\u00022\u0001^A!\u0013\r\t\u0019\u0005\u001a\u0002\u0005+:LG\u000fC\u0005\u0002H)\t\t\u00111\u0001\u00026\u0005\u0019\u0001\u0010J\u0019\u0002\u0019%t\u0007/\u001e;TiJ,\u0017-\u001c\u0011\u0002\u0019=,H\u000f];u'R\u0014X-Y7\u0016\u0003]\u000b\u0001c\\;uaV$8\u000b\u001e:fC6|F%Z9\u0015\t\u0005}\u00121\u000b\u0005\t\u0003\u000fj\u0011\u0011!a\u0001/\u0006iq.\u001e;qkR\u001cFO]3b[\u0002\n1B^1mk\u0016\u001cF/\u0019;fgV\ta,\u0001\u0007wC2,Xm\u0015;bi\u0016\u001c\b%A\u0002sk:$\"!a\u0010\u0002#A\f'o]3Qe>$x.T3tg\u0006<W\r\u0006\u0002\u0002fA!\u0011qMAG\u001d\u0011\tI'a\"\u000f\t\u0005-\u00141\u0011\b\u0005\u0003[\n\tI\u0004\u0003\u0002p\u0005}d\u0002BA9\u0003{rA!a\u001d\u0002|9!\u0011QOA=\u001d\rY\u0017qO\u0005\u0002[%\u00111\u0006L\u0005\u0003S)J!a\n\u0015\n\u0005\u00152\u0013B\u0001'%\u0013\r\t)iS\u0001\u0006gR\fG/Z\u0005\u0005\u0003\u0013\u000bY)\u0001\u0007Ti\u0006$X-T3tg\u0006<WMC\u0002\u0002\u0006.KA!a$\u0002\u0012\na1\u000b^1uKJ+\u0017/^3ti*!\u0011\u0011RAF\u00035A\u0017M\u001c3mKJ+\u0017/^3tiR!\u0011qHAL\u0011\u001d\tIj\u0005a\u0001\u0003K\nq!\\3tg\u0006<W-\u0001\u0011iC:$G.Z%na2L7-\u001b;He>,\b/\u001b8h\u0017\u0016L(+Z9vKN$H\u0003BA \u0003?Cq!!'\u0015\u0001\u0004\t\t\u000b\u0005\u0003\u0002h\u0005\r\u0016\u0002BAS\u0003#\u0013!$S7qY&\u001c\u0017\u000e^$s_V\u0004\u0018N\\4LKf\u0014V-];fgR\f1\u0004[1oI2,7\u000b^1uK\u001a,H\u000e\u0015:pG\u0016\u001c8o\u001c:DC2dG\u0003BA \u0003WCq!!'\u0016\u0001\u0004\ti\u000b\u0005\u0003\u0002h\u0005=\u0016\u0002BAY\u0003#\u0013Qc\u0015;bi\u00164W\u000f\u001c)s_\u000e,7o]8s\u0007\u0006dG.\u0001\u000eiC:$G.Z*uCR,g+\u0019:jC\ndWMU3rk\u0016\u001cH\u000f\u0006\u0003\u0002@\u0005]\u0006bBAM-\u0001\u0007\u0011\u0011\u0018\t\u0005\u0003O\nY,\u0003\u0003\u0002>\u0006E%\u0001F*uCR,g+\u0019:jC\ndWMU3rk\u0016\u001cH/A\fiC:$G.\u001a,bYV,7\u000b^1uKJ+\u0017/^3tiR!\u0011qHAb\u0011\u001d\tIj\u0006a\u0001\u0003\u000b\u0004B!a\u001a\u0002H&!\u0011\u0011ZAI\u000591\u0016\r\\;f'R\fG/Z\"bY2\fAb]3oIJ+7\u000f]8og\u0016$\u0002\"a\u0010\u0002P\u0006e\u0017Q\u001c\u0005\b\u0003#D\u0002\u0019AAj\u0003\u0019\u0019H/\u0019;vgB\u0019A/!6\n\u0007\u0005]GMA\u0002J]RD\u0001\"a7\u0019!\u0003\u0005\r\u0001[\u0001\rKJ\u0014xN]'fgN\fw-\u001a\u0005\n\u0003?D\u0002\u0013!a\u0001\u0003C\f!BY=uKN#(/\u001b8h!\u0011\t\u0019/!=\u000e\u0005\u0005\u0015(\u0002BAt\u0003S\f\u0001\u0002\u001d:pi>\u0014WO\u001a\u0006\u0005\u0003W\fi/\u0001\u0004h_><G.\u001a\u0006\u0003\u0003_\f1aY8n\u0013\u0011\t\u00190!:\u0003\u0015\tKH/Z*ue&tw-\u0001\ftK:$'+Z:q_:\u001cX\r\n3fM\u0006,H\u000e\u001e\u00133+\t\tIPK\u0002i\u0003w\\#!!@\u0011\t\u0005}(\u0011B\u0007\u0003\u0005\u0003QAAa\u0001\u0003\u0006\u0005IQO\\2iK\u000e\\W\r\u001a\u0006\u0004\u0005\u000f!\u0017AC1o]>$\u0018\r^5p]&!!1\u0002B\u0001\u0005E)hn\u00195fG.,GMV1sS\u0006t7-Z\u0001\u0017g\u0016tGMU3ta>t7/\u001a\u0013eK\u001a\fW\u000f\u001c;%gU\u0011!\u0011\u0003\u0016\u0005\u0003C\fY0\u0001\u000bj]&$\u0018.\u00197ju\u00164\u0016\r\\;f'R\fG/\u001a\u000b\t\u0003\u007f\u00119Ba\u0007\u0003 !1!\u0011D\u000eA\u0002!\f\u0011b\u001d;bi\u0016t\u0015-\\3\t\r\tu1\u00041\u0001i\u00031\u00198\r[3nCN#(/\u001b8h\u0011\u001d\u0011\tc\u0007a\u0001\u0005G\tQ\u0002\u001e;m\tV\u0014\u0018\r^5p]6\u001b\b#\u0002;\u0003&\u0005M\u0017b\u0001B\u0014I\n1q\n\u001d;j_:\fQ\u0005\u0016:b]N4wN]7XSRD7\u000b^1uK&s\u0007+\u00198eCN\u001cF/\u0019;f'\u0016\u0014h/\u001a:\u0011\u0007\u0005}QdE\u0002\u001e\u0005_\u00012\u0001\u001eB\u0019\u0013\r\u0011\u0019\u0004\u001a\u0002\u0007\u0003:L(+\u001a4\u0015\u0005\t-\u0012a\u0007\u0013mKN\u001c\u0018N\\5uI\u001d\u0014X-\u0019;fe\u0012\"WMZ1vYR$C'\u0006\u0002\u0003<)\u001aq+a?\u00027\u0011bWm]:j]&$He\u001a:fCR,'\u000f\n3fM\u0006,H\u000e\u001e\u00136+\t\u0011\tEK\u0002_\u0003w\u0004")
public class TransformWithStateInPandasStateServer
implements Runnable,
Logging {
    private final ServerSocket stateServerSocket;
    private final StatefulProcessorHandleImpl statefulProcessorHandle;
    private final StructType groupingKeySchema;
    private final ExpressionEncoder.Deserializer<Row> keyRowDeserializer;
    private DataInputStream inputStream;
    private DataOutputStream outputStream;
    private final scala.collection.mutable.HashMap<String, Tuple3<ValueState<Row>, StructType, ExpressionEncoder.Deserializer<Row>>> valueStates;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public static scala.collection.mutable.HashMap<String, Tuple3<ValueState<Row>, StructType, ExpressionEncoder.Deserializer<Row>>> $lessinit$greater$default$5() {
        return TransformWithStateInPandasStateServer$.MODULE$.$lessinit$greater$default$5();
    }

    public static DataOutputStream $lessinit$greater$default$4() {
        return TransformWithStateInPandasStateServer$.MODULE$.$lessinit$greater$default$4();
    }

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public Logging.LogStringContext LogStringContext(StringContext sc) {
        return Logging.LogStringContext$((Logging)this, (StringContext)sc);
    }

    public void withLogContext(HashMap<String, String> context, Function0<BoxedUnit> body) {
        Logging.withLogContext$((Logging)this, context, body);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logInfo(LogEntry entry) {
        Logging.logInfo$((Logging)this, (LogEntry)entry);
    }

    public void logInfo(LogEntry entry, Throwable throwable) {
        Logging.logInfo$((Logging)this, (LogEntry)entry, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logDebug(LogEntry entry) {
        Logging.logDebug$((Logging)this, (LogEntry)entry);
    }

    public void logDebug(LogEntry entry, Throwable throwable) {
        Logging.logDebug$((Logging)this, (LogEntry)entry, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logTrace(LogEntry entry) {
        Logging.logTrace$((Logging)this, (LogEntry)entry);
    }

    public void logTrace(LogEntry entry, Throwable throwable) {
        Logging.logTrace$((Logging)this, (LogEntry)entry, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logWarning(LogEntry entry) {
        Logging.logWarning$((Logging)this, (LogEntry)entry);
    }

    public void logWarning(LogEntry entry, Throwable throwable) {
        Logging.logWarning$((Logging)this, (LogEntry)entry, (Throwable)throwable);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logError(LogEntry entry) {
        Logging.logError$((Logging)this, (LogEntry)entry);
    }

    public void logError(LogEntry entry, Throwable throwable) {
        Logging.logError$((Logging)this, (LogEntry)entry, (Throwable)throwable);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public void initializeForcefully(boolean isInterpreter, boolean silent) {
        Logging.initializeForcefully$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    private ExpressionEncoder.Deserializer<Row> keyRowDeserializer() {
        return this.keyRowDeserializer;
    }

    private DataInputStream inputStream() {
        return this.inputStream;
    }

    private void inputStream_$eq(DataInputStream x$1) {
        this.inputStream = x$1;
    }

    private DataOutputStream outputStream() {
        return this.outputStream;
    }

    private void outputStream_$eq(DataOutputStream x$1) {
        this.outputStream = x$1;
    }

    private scala.collection.mutable.HashMap<String, Tuple3<ValueState<Row>, StructType, ExpressionEncoder.Deserializer<Row>>> valueStates() {
        return this.valueStates;
    }

    @Override
    public void run() {
        Socket listeningSocket = this.stateServerSocket.accept();
        this.inputStream_$eq(new DataInputStream(new BufferedInputStream(listeningSocket.getInputStream())));
        this.outputStream_$eq(new DataOutputStream(new BufferedOutputStream(listeningSocket.getOutputStream())));
        while (listeningSocket.isConnected()) {
            Enumeration.Value value = this.statefulProcessorHandle.getHandleState();
            Enumeration.Value value2 = StatefulProcessorHandleState$.MODULE$.CLOSED();
            if (!(value == null ? value2 != null : !value.equals(value2))) break;
            try {
                int version = this.inputStream().readInt();
                if (version == -1) continue;
                Predef$.MODULE$.assert(version == 0);
                StateMessage.StateRequest message = this.parseProtoMessage();
                this.handleRequest(message);
                this.outputStream().flush();
            }
            catch (EOFException eOFException) {
                this.logWarning(LogEntry$.MODULE$.from((Function0 & Serializable)() -> this.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"No more data to read from the socket"}))).log((Seq)Nil$.MODULE$)));
                this.statefulProcessorHandle.setHandleState(StatefulProcessorHandleState$.MODULE$.CLOSED());
                return;
            }
            catch (Exception e) {
                this.logError(LogEntry$.MODULE$.from((Function0 & Serializable)() -> this.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"Error reading message: ", ""}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{new MDC((LogKey)LogKeys.ERROR$.MODULE$, (Object)e.getMessage())}))), (Throwable)e);
                this.sendResponse(1, e.getMessage(), this.sendResponse$default$3());
                this.outputStream().flush();
                this.statefulProcessorHandle.setHandleState(StatefulProcessorHandleState$.MODULE$.CLOSED());
                return;
            }
        }
        this.logInfo(LogEntry$.MODULE$.from((Function0 & Serializable)() -> this.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"Done from the state server thread"}))).log((Seq)Nil$.MODULE$)));
    }

    private StateMessage.StateRequest parseProtoMessage() {
        int messageLen = this.inputStream().readInt();
        byte[] messageBytes = new byte[messageLen];
        this.inputStream().read(messageBytes);
        return StateMessage.StateRequest.parseFrom(ByteString.copyFrom(messageBytes));
    }

    private void handleRequest(StateMessage.StateRequest message) {
        StateMessage.StateRequest.MethodCase methodCase = message.getMethodCase();
        if (StateMessage.StateRequest.MethodCase.IMPLICITGROUPINGKEYREQUEST.equals(methodCase)) {
            this.handleImplicitGroupingKeyRequest(message.getImplicitGroupingKeyRequest());
            return;
        }
        if (StateMessage.StateRequest.MethodCase.STATEFULPROCESSORCALL.equals(methodCase)) {
            this.handleStatefulProcessorCall(message.getStatefulProcessorCall());
            return;
        }
        if (StateMessage.StateRequest.MethodCase.STATEVARIABLEREQUEST.equals(methodCase)) {
            this.handleStateVariableRequest(message.getStateVariableRequest());
            return;
        }
        throw new IllegalArgumentException("Invalid method call");
    }

    private void handleImplicitGroupingKeyRequest(StateMessage.ImplicitGroupingKeyRequest message) {
        StateMessage.ImplicitGroupingKeyRequest.MethodCase methodCase = message.getMethodCase();
        if (StateMessage.ImplicitGroupingKeyRequest.MethodCase.SETIMPLICITKEY.equals(methodCase)) {
            byte[] keyBytes = message.getSetImplicitKey().getKey().toByteArray();
            Row keyRow = PythonSQLUtils$.MODULE$.toJVMRow(keyBytes, this.groupingKeySchema, this.keyRowDeserializer());
            ImplicitGroupingKeyTracker$.MODULE$.setImplicitKey(keyRow);
            this.sendResponse(0, this.sendResponse$default$2(), this.sendResponse$default$3());
            return;
        }
        if (StateMessage.ImplicitGroupingKeyRequest.MethodCase.REMOVEIMPLICITKEY.equals(methodCase)) {
            ImplicitGroupingKeyTracker$.MODULE$.removeImplicitKey();
            this.sendResponse(0, this.sendResponse$default$2(), this.sendResponse$default$3());
            return;
        }
        throw new IllegalArgumentException("Invalid method call");
    }

    public void handleStatefulProcessorCall(StateMessage.StatefulProcessorCall message) {
        StateMessage.StatefulProcessorCall.MethodCase methodCase = message.getMethodCase();
        if (StateMessage.StatefulProcessorCall.MethodCase.SETHANDLESTATE.equals(methodCase)) {
            StateMessage.HandleState requestedState = message.getSetHandleState().getState();
            StateMessage.HandleState handleState = requestedState;
            if (StateMessage.HandleState.CREATED.equals(handleState)) {
                this.logInfo(LogEntry$.MODULE$.from((Function0 & Serializable)() -> this.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"set handle state to Created"}))).log((Seq)Nil$.MODULE$)));
                this.statefulProcessorHandle.setHandleState(StatefulProcessorHandleState$.MODULE$.CREATED());
            } else if (StateMessage.HandleState.INITIALIZED.equals(handleState)) {
                this.logInfo(LogEntry$.MODULE$.from((Function0 & Serializable)() -> this.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"set handle state to Initialized"}))).log((Seq)Nil$.MODULE$)));
                this.statefulProcessorHandle.setHandleState(StatefulProcessorHandleState$.MODULE$.INITIALIZED());
            } else if (StateMessage.HandleState.CLOSED.equals(handleState)) {
                this.logInfo(LogEntry$.MODULE$.from((Function0 & Serializable)() -> this.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"set handle state to Closed"}))).log((Seq)Nil$.MODULE$)));
                this.statefulProcessorHandle.setHandleState(StatefulProcessorHandleState$.MODULE$.CLOSED());
            }
            this.sendResponse(0, this.sendResponse$default$2(), this.sendResponse$default$3());
            return;
        }
        if (StateMessage.StatefulProcessorCall.MethodCase.GETVALUESTATE.equals(methodCase)) {
            String stateName = message.getGetValueState().getStateName();
            String schema = message.getGetValueState().getSchema();
            None$ ttlDurationMs = message.getGetValueState().hasTtl() ? new Some((Object)BoxesRunTime.boxToInteger((int)message.getGetValueState().getTtl().getDurationMs())) : None$.MODULE$;
            this.initializeValueState(stateName, schema, (Option<Object>)ttlDurationMs);
            return;
        }
        throw new IllegalArgumentException("Invalid method call");
    }

    private void handleStateVariableRequest(StateMessage.StateVariableRequest message) {
        StateMessage.StateVariableRequest.MethodCase methodCase = message.getMethodCase();
        if (StateMessage.StateVariableRequest.MethodCase.VALUESTATECALL.equals(methodCase)) {
            this.handleValueStateRequest(message.getValueStateCall());
            return;
        }
        throw new IllegalArgumentException("Invalid method call");
    }

    public void handleValueStateRequest(StateMessage.ValueStateCall message) {
        String stateName = message.getStateName();
        if (!this.valueStates().contains((Object)stateName)) {
            this.logWarning(LogEntry$.MODULE$.from((Function0 & Serializable)() -> this.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"Value state ", " is not initialized."}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{new MDC((LogKey)LogKeys.STATE_NAME$.MODULE$, (Object)stateName)}))));
            this.sendResponse(1, "Value state " + stateName + " is not initialized.", this.sendResponse$default$3());
            return;
        }
        StateMessage.ValueStateCall.MethodCase methodCase = message.getMethodCase();
        if (StateMessage.ValueStateCall.MethodCase.EXISTS.equals(methodCase)) {
            if (((ValueState)((Tuple3)this.valueStates().apply((Object)stateName))._1()).exists()) {
                this.sendResponse(0, this.sendResponse$default$2(), this.sendResponse$default$3());
                return;
            }
            this.sendResponse(2, "state " + stateName + " doesn't exist", this.sendResponse$default$3());
            return;
        }
        if (StateMessage.ValueStateCall.MethodCase.GET.equals(methodCase)) {
            Option valueOption = ((ValueState)((Tuple3)this.valueStates().apply((Object)stateName))._1()).getOption();
            if (valueOption.isDefined()) {
                byte[] valueBytes = PythonSQLUtils$.MODULE$.toPyRow((Row)valueOption.get());
                ByteString byteString = ByteString.copyFrom(valueBytes);
                this.sendResponse(0, null, byteString);
                return;
            }
            this.logWarning(LogEntry$.MODULE$.from((Function0 & Serializable)() -> this.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"Value state ", " doesn't contain"}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{new MDC((LogKey)LogKeys.STATE_NAME$.MODULE$, (Object)stateName)})).$plus(this.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{" a value."}))).log((Seq)Nil$.MODULE$))));
            this.sendResponse(0, this.sendResponse$default$2(), this.sendResponse$default$3());
            return;
        }
        if (StateMessage.ValueStateCall.MethodCase.VALUESTATEUPDATE.equals(methodCase)) {
            byte[] byteArray = message.getValueStateUpdate().getValue().toByteArray();
            Tuple3 valueStateTuple = (Tuple3)this.valueStates().apply((Object)stateName);
            Row valueRow = PythonSQLUtils$.MODULE$.toJVMRow(byteArray, (StructType)valueStateTuple._2(), (ExpressionEncoder.Deserializer<Row>)((ExpressionEncoder.Deserializer)valueStateTuple._3()));
            ((ValueState)valueStateTuple._1()).update((Object)valueRow);
            this.sendResponse(0, this.sendResponse$default$2(), this.sendResponse$default$3());
            return;
        }
        if (StateMessage.ValueStateCall.MethodCase.CLEAR.equals(methodCase)) {
            ((ValueState)((Tuple3)this.valueStates().apply((Object)stateName))._1()).clear();
            this.sendResponse(0, this.sendResponse$default$2(), this.sendResponse$default$3());
            return;
        }
        throw new IllegalArgumentException("Invalid method call");
    }

    private void sendResponse(int status, String errorMessage, ByteString byteString) {
        StateMessage.StateResponse.Builder responseMessageBuilder = StateMessage.StateResponse.newBuilder().setStatusCode(status);
        Object object = status != 0 && errorMessage != null ? responseMessageBuilder.setErrorMessage(errorMessage) : BoxedUnit.UNIT;
        Object object2 = byteString != null ? responseMessageBuilder.setValue(byteString) : BoxedUnit.UNIT;
        StateMessage.StateResponse responseMessage = responseMessageBuilder.build();
        byte[] responseMessageBytes = responseMessage.toByteArray();
        int byteLength = responseMessageBytes.length;
        this.outputStream().writeInt(byteLength);
        this.outputStream().write(responseMessageBytes);
    }

    private String sendResponse$default$2() {
        return null;
    }

    private ByteString sendResponse$default$3() {
        return null;
    }

    private void initializeValueState(String stateName, String schemaString, Option<Object> ttlDurationMs) {
        if (!this.valueStates().contains((Object)stateName)) {
            StructType schema = StructType$.MODULE$.fromString(schemaString);
            ValueState state = ttlDurationMs.isEmpty() ? this.statefulProcessorHandle.getValueState(stateName, Encoders$.MODULE$.row(schema)) : this.statefulProcessorHandle.getValueState(stateName, Encoders$.MODULE$.row(schema), new TTLConfig(Duration.ofMillis(BoxesRunTime.unboxToInt((Object)ttlDurationMs.get()))));
            ExpressionEncoder qual$1 = ExpressionEncoder$.MODULE$.apply(schema);
            Seq x$1 = qual$1.resolveAndBind$default$1();
            Analyzer x$2 = qual$1.resolveAndBind$default$2();
            ExpressionEncoder.Deserializer valueRowDeserializer = qual$1.resolveAndBind(x$1, x$2).createDeserializer();
            this.valueStates().put((Object)stateName, (Object)new Tuple3(state, (Object)schema, (Object)valueRowDeserializer));
            this.sendResponse(0, this.sendResponse$default$2(), this.sendResponse$default$3());
            return;
        }
        this.sendResponse(1, "state " + stateName + " already exists", this.sendResponse$default$3());
    }

    public TransformWithStateInPandasStateServer(ServerSocket stateServerSocket, StatefulProcessorHandleImpl statefulProcessorHandle, StructType groupingKeySchema, DataOutputStream outputStreamForTest, scala.collection.mutable.HashMap<String, Tuple3<ValueState<Row>, StructType, ExpressionEncoder.Deserializer<Row>>> valueStateMapForTest) {
        this.stateServerSocket = stateServerSocket;
        this.statefulProcessorHandle = statefulProcessorHandle;
        this.groupingKeySchema = groupingKeySchema;
        Logging.$init$((Logging)this);
        ExpressionEncoder qual$1 = ExpressionEncoder$.MODULE$.apply(groupingKeySchema);
        Seq x$1 = qual$1.resolveAndBind$default$1();
        Analyzer x$2 = qual$1.resolveAndBind$default$2();
        this.keyRowDeserializer = qual$1.resolveAndBind(x$1, x$2).createDeserializer();
        this.outputStream = outputStreamForTest;
        this.valueStates = valueStateMapForTest != null ? valueStateMapForTest : new scala.collection.mutable.HashMap();
    }
}

