/*
 * Decompiled with CFR 0.152.
 */
package kafka.coordinator.transaction;

import com.yammer.metrics.core.MetricName;
import java.io.File;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import kafka.coordinator.transaction.CompleteCommit$;
import kafka.coordinator.transaction.CoordinatorEpochAndTxnMetadata;
import kafka.coordinator.transaction.PrepareCommit$;
import kafka.coordinator.transaction.TransactionMarkerChannelManager;
import kafka.coordinator.transaction.TransactionMarkerChannelManager$;
import kafka.coordinator.transaction.TransactionMetadata;
import kafka.coordinator.transaction.TransactionState;
import kafka.coordinator.transaction.TransactionStateManager;
import kafka.coordinator.transaction.TxnMarkerQueue;
import kafka.coordinator.transaction.TxnTransitMetadata;
import kafka.server.KafkaConfig$;
import kafka.server.MetadataCache;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.requests.WriteTxnMarkersRequest;
import org.apache.kafka.common.requests.WriteTxnMarkersResponse;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.RequestLocal;
import org.apache.kafka.server.common.TransactionVersion;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.server.util.RequestAndCompletionHandler;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.MockedConstruction;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.verification.VerificationMode;
import scala.;
import scala.$less$colon$less$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.IterableOnceOps;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.collection.mutable.Set;
import scala.collection.mutable.Set$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;
import scala.runtime.java8.JFunction0;
import scala.util.Either;
import scala.util.Left;
import scala.util.Right;
import scala.util.Try;
import scala.util.Try$;

@ScalaSignature(bytes="\u0006\u0005\t]d\u0001B!C\u0001%CQ\u0001\u0015\u0001\u0005\u0002ECq\u0001\u0016\u0001C\u0002\u0013%Q\u000b\u0003\u0004]\u0001\u0001\u0006IA\u0016\u0005\b;\u0002\u0011\r\u0011\"\u0003_\u0011\u0019Q\u0007\u0001)A\u0005?\"91\u000e\u0001b\u0001\n\u0013a\u0007B\u00029\u0001A\u0003%Q\u000eC\u0004r\u0001\t\u0007I\u0011\u0002:\t\re\u0004\u0001\u0015!\u0003t\u0011\u001dQ\bA1A\u0005\nIDaa\u001f\u0001!\u0002\u0013\u0019\bb\u0002?\u0001\u0005\u0004%I! \u0005\b\u0003\u0007\u0001\u0001\u0015!\u0003\u007f\u0011!\t)\u0001\u0001b\u0001\n\u0013i\bbBA\u0004\u0001\u0001\u0006IA \u0005\n\u0003\u0013\u0001!\u0019!C\u0005\u0003\u0017A\u0001\"!\b\u0001A\u0003%\u0011Q\u0002\u0005\n\u0003?\u0001!\u0019!C\u0005\u0003\u0017A\u0001\"!\t\u0001A\u0003%\u0011Q\u0002\u0005\n\u0003G\u0001!\u0019!C\u0005\u0003KA\u0001\"!\f\u0001A\u0003%\u0011q\u0005\u0005\n\u0003_\u0001!\u0019!C\u0005\u0003KA\u0001\"!\r\u0001A\u0003%\u0011q\u0005\u0005\n\u0003g\u0001!\u0019!C\u0005\u0003kA\u0001\"!\u0010\u0001A\u0003%\u0011q\u0007\u0005\n\u0003\u007f\u0001!\u0019!C\u0005\u0003kA\u0001\"!\u0011\u0001A\u0003%\u0011q\u0007\u0005\n\u0003\u0007\u0002!\u0019!C\u0005\u0003\u000bB\u0001\"!\u0014\u0001A\u0003%\u0011q\t\u0005\n\u0003\u001f\u0002!\u0019!C\u0005\u0003\u000bB\u0001\"!\u0015\u0001A\u0003%\u0011q\t\u0005\n\u0003'\u0002!\u0019!C\u0005\u0003\u000bB\u0001\"!\u0016\u0001A\u0003%\u0011q\t\u0005\n\u0003/\u0002!\u0019!C\u0005\u0003\u000bB\u0001\"!\u0017\u0001A\u0003%\u0011q\t\u0005\n\u00037\u0002!\u0019!C\u0005\u0003\u000bB\u0001\"!\u0018\u0001A\u0003%\u0011q\t\u0005\n\u0003?\u0002!\u0019!C\u0005\u0003CB\u0001\"a\u001c\u0001A\u0003%\u00111\r\u0005\n\u0003c\u0002!\u0019!C\u0005\u0003gB\u0001\"a\u001f\u0001A\u0003%\u0011Q\u000f\u0005\n\u0003{\u0002!\u0019!C\u0005\u0003gB\u0001\"a \u0001A\u0003%\u0011Q\u000f\u0005\n\u0003\u0003\u0003!\u0019!C\u0005\u0003\u0007C\u0001\"!+\u0001A\u0003%\u0011Q\u0011\u0005\n\u0003W\u0003!\u0019!C\u0005\u0003[C\u0001\"a/\u0001A\u0003%\u0011q\u0016\u0005\n\u0003{\u0003!\u0019!C\u0005\u0003\u007fC\u0001\"a2\u0001A\u0003%\u0011\u0011\u0019\u0005\b\u0003\u0013\u0004A\u0011BAf\u0011\u001d\ti\r\u0001C\u0001\u0003\u0017Dq!!:\u0001\t\u0003\t9\u000fC\u0004\u0003\u0018\u0001!\t!a3\t\u000f\tm\u0001\u0001\"\u0001\u0002L\"9!q\u0004\u0001\u0005\u0002\u0005-\u0007b\u0002B\u0012\u0001\u0011\u0005\u00111\u001a\u0005\b\u0005O\u0001A\u0011AAf\u0011\u001d\u0011Y\u0003\u0001C\u0001\u0003\u0017DqAa\f\u0001\t\u0003\u0011\t\u0004C\u0004\u0003<\u0001!\t!a3\t\u000f\t}\u0002\u0001\"\u0001\u0003B!9!1\n\u0001\u0005\n\t5\u0003b\u0002B5\u0001\u0011\u0005\u00111\u001a\u0005\b\u0005[\u0002A\u0011\u0002B8\u0005\r\"&/\u00198tC\u000e$\u0018n\u001c8NCJ\\WM]\"iC:tW\r\\'b]\u0006<WM\u001d+fgRT!a\u0011#\u0002\u0017Q\u0014\u0018M\\:bGRLwN\u001c\u0006\u0003\u000b\u001a\u000b1bY8pe\u0012Lg.\u0019;pe*\tq)A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u0001Q\u0005CA&O\u001b\u0005a%\"A'\u0002\u000bM\u001c\u0017\r\\1\n\u0005=c%AB!osJ+g-\u0001\u0004=S:LGO\u0010\u000b\u0002%B\u00111\u000bA\u0007\u0002\u0005\u0006iQ.\u001a;bI\u0006$\u0018mQ1dQ\u0016,\u0012A\u0016\t\u0003/jk\u0011\u0001\u0017\u0006\u00033\u001a\u000baa]3sm\u0016\u0014\u0018BA.Y\u00055iU\r^1eCR\f7)Y2iK\u0006qQ.\u001a;bI\u0006$\u0018mQ1dQ\u0016\u0004\u0013!\u00048fi^|'o[\"mS\u0016tG/F\u0001`!\t\u0001\u0007.D\u0001b\u0015\t\u00117-A\u0004dY&,g\u000e^:\u000b\u0005\u001d#'BA3g\u0003\u0019\t\u0007/Y2iK*\tq-A\u0002pe\u001eL!![1\u0003\u001b9+Go^8sW\u000ec\u0017.\u001a8u\u00039qW\r^<pe.\u001cE.[3oi\u0002\nq\u0002\u001e=o'R\fG/Z'b]\u0006<WM]\u000b\u0002[B\u00111K\\\u0005\u0003_\n\u0013q\u0003\u0016:b]N\f7\r^5p]N#\u0018\r^3NC:\fw-\u001a:\u0002!QDhn\u0015;bi\u0016l\u0015M\\1hKJ\u0004\u0013A\u00039beRLG/[8ocU\t1\u000f\u0005\u0002uo6\tQO\u0003\u0002wG\u000611m\\7n_:L!\u0001_;\u0003\u001dQ{\u0007/[2QCJ$\u0018\u000e^5p]\u0006Y\u0001/\u0019:uSRLwN\\\u0019!\u0003)\u0001\u0018M\u001d;ji&|gNM\u0001\fa\u0006\u0014H/\u001b;j_:\u0014\u0004%A\u0004ce>\\WM]\u0019\u0016\u0003y\u0004\"\u0001^@\n\u0007\u0005\u0005QO\u0001\u0003O_\u0012,\u0017\u0001\u00032s_.,'/\r\u0011\u0002\u000f\t\u0014xn[3se\u0005A!M]8lKJ\u0014\u0004%\u0001\tue\u0006t7/Y2uS>t\u0017\r\\%ecU\u0011\u0011Q\u0002\t\u0005\u0003\u001f\tI\"\u0004\u0002\u0002\u0012)!\u00111CA\u000b\u0003\u0011a\u0017M\\4\u000b\u0005\u0005]\u0011\u0001\u00026bm\u0006LA!a\u0007\u0002\u0012\t11\u000b\u001e:j]\u001e\f\u0011\u0003\u001e:b]N\f7\r^5p]\u0006d\u0017\nZ\u0019!\u0003A!(/\u00198tC\u000e$\u0018n\u001c8bY&#''A\tue\u0006t7/Y2uS>t\u0017\r\\%ee\u0001\n1\u0002\u001d:pIV\u001cWM]%ecU\u0011\u0011q\u0005\t\u0004\u0017\u0006%\u0012bAA\u0016\u0019\n!Aj\u001c8h\u00031\u0001(o\u001c3vG\u0016\u0014\u0018\nZ\u0019!\u0003-\u0001(o\u001c3vG\u0016\u0014\u0018\n\u001a\u001a\u0002\u0019A\u0014x\u000eZ;dKJLEM\r\u0011\u0002\u001bA\u0014x\u000eZ;dKJ,\u0005o\\2i+\t\t9\u0004E\u0002L\u0003sI1!a\u000fM\u0005\u0015\u0019\u0006n\u001c:u\u00039\u0001(o\u001c3vG\u0016\u0014X\t]8dQ\u0002\n\u0011\u0003\\1tiB\u0013x\u000eZ;dKJ,\u0005o\\2i\u0003Ia\u0017m\u001d;Qe>$WoY3s\u000bB|7\r\u001b\u0011\u0002%QDh\u000eV8qS\u000e\u0004\u0016M\u001d;ji&|g.M\u000b\u0003\u0003\u000f\u00022aSA%\u0013\r\tY\u0005\u0014\u0002\u0004\u0013:$\u0018a\u0005;y]R{\u0007/[2QCJ$\u0018\u000e^5p]F\u0002\u0013A\u0005;y]R{\u0007/[2QCJ$\u0018\u000e^5p]J\n1\u0003\u001e=o)>\u0004\u0018n\u0019)beRLG/[8oe\u0001\n\u0001cY8pe\u0012Lg.\u0019;pe\u0016\u0003xn\u00195\u0002#\r|wN\u001d3j]\u0006$xN]#q_\u000eD\u0007%A\td_>\u0014H-\u001b8bi>\u0014X\t]8dQJ\n!cY8pe\u0012Lg.\u0019;pe\u0016\u0003xn\u001953A\u0005aA\u000f\u001f8US6,w.\u001e;Ng\u0006iA\u000f\u001f8US6,w.\u001e;Ng\u0002\n\u0011\u0002\u001e=o%\u0016\u001cX\u000f\u001c;\u0016\u0005\u0005\r\u0004\u0003BA3\u0003Wj!!a\u001a\u000b\u0007\u0005%T/\u0001\u0005sKF,Xm\u001d;t\u0013\u0011\ti'a\u001a\u0003#Q\u0013\u0018M\\:bGRLwN\u001c*fgVdG/\u0001\u0006uq:\u0014Vm];mi\u0002\nA\u0002\u001e=o\u001b\u0016$\u0018\rZ1uCF*\"!!\u001e\u0011\u0007M\u000b9(C\u0002\u0002z\t\u00131\u0003\u0016:b]N\f7\r^5p]6+G/\u00193bi\u0006\fQ\u0002\u001e=o\u001b\u0016$\u0018\rZ1uCF\u0002\u0013\u0001\u0004;y]6+G/\u00193bi\u0006\u0014\u0014!\u0004;y]6+G/\u00193bi\u0006\u0014\u0004%\u0001\fdCB$XO]3e\u000bJ\u0014xN]:DC2d'-Y2l+\t\t)\t\u0005\u0004\u0002\b\u00065\u0015\u0011S\u0007\u0003\u0003\u0013S1!a#g\u0003\u001diwnY6ji>LA!a$\u0002\n\nq\u0011I]4v[\u0016tGoQ1qi>\u0014\bcB&\u0002\u0014\u0006]\u00151U\u0005\u0004\u0003+c%!\u0003$v]\u000e$\u0018n\u001c82!\u0011\tI*a(\u000e\u0005\u0005m%bAAOk\u0006A\u0001O]8u_\u000e|G.\u0003\u0003\u0002\"\u0006m%AB#se>\u00148\u000fE\u0002L\u0003KK1!a*M\u0005\u0011)f.\u001b;\u0002/\r\f\u0007\u000f^;sK\u0012,%O]8sg\u000e\u000bG\u000e\u001c2bG.\u0004\u0013\u0001\u0002;j[\u0016,\"!a,\u0011\t\u0005E\u0016qW\u0007\u0003\u0003gS1!!.v\u0003\u0015)H/\u001b7t\u0013\u0011\tI,a-\u0003\u00115{7m\u001b+j[\u0016\fQ\u0001^5nK\u0002\nab\u00195b]:,G.T1oC\u001e,'/\u0006\u0002\u0002BB\u00191+a1\n\u0007\u0005\u0015'IA\u0010Ue\u0006t7/Y2uS>tW*\u0019:lKJ\u001c\u0005.\u00198oK2l\u0015M\\1hKJ\fqb\u00195b]:,G.T1oC\u001e,'\u000fI\u0001\n[>\u001c7nQ1dQ\u0016$\"!a)\u00021Q,7\u000f\u001e*f[>4X-T3ue&\u001c7o\u00148DY>\u001cX\rK\u00024\u0003#\u0004B!a5\u0002b6\u0011\u0011Q\u001b\u0006\u0005\u0003/\fI.A\u0002ba&TA!a7\u0002^\u00069!.\u001e9ji\u0016\u0014(bAApM\u0006)!.\u001e8ji&!\u00111]Ak\u0005\u0011!Vm\u001d;\u0002AMDw.\u001e7e\u001f:d\u0017p\u0016:ji\u0016$\u0006P\\\"p[BdW\r^5p]>s7-\u001a\u000b\u0005\u0003G\u000bI\u000fC\u0004\u0002lR\u0002\r!!<\u0002-%\u001cHK]1og\u0006\u001cG/[8o-J*e.\u00192mK\u0012\u00042aSAx\u0013\r\t\t\u0010\u0014\u0002\b\u0005>|G.Z1oQ\r!\u0014Q\u001f\t\u0005\u0003o\fi0\u0004\u0002\u0002z*!\u00111`Am\u0003\u0019\u0001\u0018M]1ng&!\u0011q`A}\u0005E\u0001\u0016M]1nKR,'/\u001b>fIR+7\u000f\u001e\u0015\bi\t\r!q\u0002B\t!\u0011\u0011)Aa\u0003\u000e\u0005\t\u001d!\u0002\u0002B\u0005\u0003s\f\u0001\u0002\u001d:pm&$WM]\u0005\u0005\u0005\u001b\u00119AA\u0006WC2,XmU8ve\u000e,\u0017\u0001\u00032p_2,\u0017M\\:-\t\tM!QC\r\u0002\u0003e\t\u0001!A\u0012tQ>,H\u000e\u001a(pi2{7/\u001a+y]\u000e{W\u000e\u001d7fi&|g.\u00114uKJdu.\u00193)\u0007U\n\t.A\u0018tQ>,H\u000eZ$f]\u0016\u0014\u0018\r^3F[B$\u00180T1q/\",gNT8SKF,Xm\u001d;t\u001fV$8\u000f^1oI&tw\rK\u00027\u0003#\f!f\u001d5pk2$w)\u001a8fe\u0006$XMU3rk\u0016\u001cH\u000fU3s!\u0006\u0014H/\u001b;j_:\u0004VM\u001d\"s_.,'\u000fK\u00028\u0003#\fqe\u001d5pk2$7k[5q'\u0016tG-T1sW\u0016\u00148o\u00165f]2+\u0017\rZ3s\u001d>$hi\\;oI\"\u001a\u0001(!5\u0002eMDw.\u001e7e'\u00064XMR8s\u0019\u0006$XM],iK:dU-\u00193feVs7N\\8x]\n+HOT8u\u0003Z\f\u0017\u000e\\1cY\u0016D3!OAi\u0003a\u001a\bn\\;mIJ+Wn\u001c<f\u001b\u0006\u00148.\u001a:t\r>\u0014H\u000b\u001f8QCJ$\u0018\u000e^5p]^CWM\u001c)beRLG/[8o\u000b6LwM]1uK\u0012D3AOAi\u0003]\u001a\bn\\;mI\u000e{W\u000e\u001d7fi\u0016\f\u0005\u000f]3oIR{Gj\\4P]\u0016sG\r\u0016=o/\",gnU3oI6\u000b'o[3sgN+8mY3fIR!\u00111\u0015B\u001a\u0011\u001d\tYo\u000fa\u0001\u0003[D3aOA{Q\u001dY$1\u0001B\b\u0005saCAa\u0005\u0003\u0016\u0005)4\u000f[8vY\u0012\f%m\u001c:u\u0003B\u0004XM\u001c3U_2{wm\u00148F]\u0012$\u0006P\\,iK:tu\u000e^\"p_J$\u0017N\\1u_J,%O]8sQ\ra\u0014\u0011[\u0001?g\"|W\u000f\u001c3SKR\u0014\u00180\u00119qK:$Gk\u001c'pO>sWI\u001c3Uq:<\u0006.\u001a8D_>\u0014H-\u001b8bi>\u0014hj\u001c;Bm\u0006LG.\u00192mK\u0016\u0013(o\u001c:\u0015\t\u0005\r&1\t\u0005\b\u0003Wl\u0004\u0019AAwQ\ri\u0014Q\u001f\u0015\b{\t\r!q\u0002B%Y\u0011\u0011\u0019B!\u0006\u0002#\r\u0014X-\u0019;f!&$WI\u001d:pe6\u000b\u0007\u000f\u0006\u0003\u0003P\t\u0015\u0004\u0003\u0003B)\u0005/\u0012YFa\u0018\u000e\u0005\tM#\u0002\u0002B+\u0003+\tA!\u001e;jY&!!\u0011\fB*\u0005\u001dA\u0015m\u001d5NCB\u0004B!a\u0004\u0003^%!\u00111FA\t!\u001d\u0011\tF!\u0019t\u0003/KAAa\u0019\u0003T\t\u0019Q*\u00199\t\u000f\t\u001dd\b1\u0001\u0002\u0018\u00061QM\u001d:peN\fQd\u001d5pk2$7I]3bi\u0016lU\r\u001e:jGN|en\u0015;beRLgn\u001a\u0015\u0004\u007f\u0005E\u0017aI1eUV\u001cH\u000f\u0016:b]N\f7\r^5p]6+G/\u00193bi\u00064uN\u001d,feNLwN\u001c\u000b\u0007\u0003G\u0013\tHa\u001d\t\u000f\u0005-\b\t1\u0001\u0002n\"9!Q\u000f!A\u0002\u0005U\u0014a\u0003;y]6+G/\u00193bi\u0006\u0004")
public class TransactionMarkerChannelManagerTest {
    private final MetadataCache metadataCache = (MetadataCache)Mockito.mock(MetadataCache.class);
    private final NetworkClient networkClient = (NetworkClient)Mockito.mock(NetworkClient.class);
    private final TransactionStateManager txnStateManager = (TransactionStateManager)Mockito.mock(TransactionStateManager.class);
    private final TopicPartition partition1 = new TopicPartition("topic1", 0);
    private final TopicPartition partition2 = new TopicPartition("topic1", 1);
    private final Node broker1 = new Node(1, "host", 10);
    private final Node broker2 = new Node(2, "otherhost", 10);
    private final String transactionalId1;
    private final String transactionalId2;
    private final long producerId1;
    private final long producerId2;
    private final short producerEpoch = (short)0;
    private final short lastProducerEpoch = (short)-1;
    private final int txnTopicPartition1;
    private final int txnTopicPartition2;
    private final int coordinatorEpoch;
    private final int coordinatorEpoch2;
    private final int txnTimeoutMs;
    private final TransactionResult txnResult = TransactionResult.COMMIT;
    private final TransactionMetadata txnMetadata1 = new TransactionMetadata(this.transactionalId1(), this.producerId1(), this.producerId1(), -1L, this.producerEpoch(), this.lastProducerEpoch(), this.txnTimeoutMs(), (TransactionState)PrepareCommit$.MODULE$, (Set)Set$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new TopicPartition[]{this.partition1(), this.partition2()})), 0L, 0L, TransactionVersion.TV_2);
    private final TransactionMetadata txnMetadata2 = new TransactionMetadata(this.transactionalId2(), this.producerId2(), this.producerId2(), -1L, this.producerEpoch(), this.lastProducerEpoch(), this.txnTimeoutMs(), (TransactionState)PrepareCommit$.MODULE$, (Set)Set$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new TopicPartition[]{this.partition1()})), 0L, 0L, TransactionVersion.TV_2);
    private final ArgumentCaptor<Function1<Errors, BoxedUnit>> capturedErrorsCallback = ArgumentCaptor.forClass(Function1.class);
    private final MockTime time = new MockTime();
    private final TransactionMarkerChannelManager channelManager = new TransactionMarkerChannelManager(KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1, false)), this.metadataCache(), this.networkClient(), this.txnStateManager(), (Time)this.time());

    private MetadataCache metadataCache() {
        return this.metadataCache;
    }

    private NetworkClient networkClient() {
        return this.networkClient;
    }

    private TransactionStateManager txnStateManager() {
        return this.txnStateManager;
    }

    private TopicPartition partition1() {
        return this.partition1;
    }

    private TopicPartition partition2() {
        return this.partition2;
    }

    private Node broker1() {
        return this.broker1;
    }

    private Node broker2() {
        return this.broker2;
    }

    private String transactionalId1() {
        return this.transactionalId1;
    }

    private String transactionalId2() {
        return this.transactionalId2;
    }

    private long producerId1() {
        return this.producerId1;
    }

    private long producerId2() {
        return this.producerId2;
    }

    private short producerEpoch() {
        return this.producerEpoch;
    }

    private short lastProducerEpoch() {
        return this.lastProducerEpoch;
    }

    private int txnTopicPartition1() {
        return this.txnTopicPartition1;
    }

    private int txnTopicPartition2() {
        return this.txnTopicPartition2;
    }

    private int coordinatorEpoch() {
        return this.coordinatorEpoch;
    }

    private int coordinatorEpoch2() {
        return this.coordinatorEpoch2;
    }

    private int txnTimeoutMs() {
        return this.txnTimeoutMs;
    }

    private TransactionResult txnResult() {
        return this.txnResult;
    }

    private TransactionMetadata txnMetadata1() {
        return this.txnMetadata1;
    }

    private TransactionMetadata txnMetadata2() {
        return this.txnMetadata2;
    }

    private ArgumentCaptor<Function1<Errors, BoxedUnit>> capturedErrorsCallback() {
        return this.capturedErrorsCallback;
    }

    private MockTime time() {
        return this.time;
    }

    private TransactionMarkerChannelManager channelManager() {
        return this.channelManager;
    }

    private void mockCache() {
        Mockito.when((Object)BoxesRunTime.boxToInteger((int)this.txnStateManager().partitionFor(this.transactionalId1()))).thenReturn((Object)BoxesRunTime.boxToInteger((int)this.txnTopicPartition1()));
        Mockito.when((Object)BoxesRunTime.boxToInteger((int)this.txnStateManager().partitionFor(this.transactionalId2()))).thenReturn((Object)BoxesRunTime.boxToInteger((int)this.txnTopicPartition2()));
        Mockito.when((Object)this.txnStateManager().getTransactionState((String)ArgumentMatchers.eq((Object)this.transactionalId1()))).thenReturn((Object)new Right((Object)new Some((Object)new CoordinatorEpochAndTxnMetadata(this.coordinatorEpoch(), this.txnMetadata1()))));
        Mockito.when((Object)this.txnStateManager().getTransactionState((String)ArgumentMatchers.eq((Object)this.transactionalId2()))).thenReturn((Object)new Right((Object)new Some((Object)new CoordinatorEpochAndTxnMetadata(this.coordinatorEpoch(), this.txnMetadata2()))));
        Mockito.when((Object)this.metadataCache().metadataVersion()).thenReturn((Object)MetadataVersion.latestProduction());
    }

    @Test
    public void testRemoveMetricsOnClose() {
        try (MockedConstruction mockMetricsGroupCtor = Mockito.mockConstruction(KafkaMetricsGroup.class);){
            new TransactionMarkerChannelManager(KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1, false)), this.metadataCache(), this.networkClient(), this.txnStateManager(), (Time)this.time()).shutdown();
            KafkaMetricsGroup mockMetricsGroup = (KafkaMetricsGroup)mockMetricsGroupCtor.constructed().get(0);
            TransactionMarkerChannelManager$.MODULE$.MetricNames().foreach((Function1 & Serializable)metricName -> ((KafkaMetricsGroup)Mockito.verify((Object)mockMetricsGroup)).newGauge((String)ArgumentMatchers.eq((Object)metricName), (Supplier)ArgumentMatchers.any()));
            TransactionMarkerChannelManager$.MODULE$.MetricNames().foreach((Function1 & Serializable)x$1 -> {
                TransactionMarkerChannelManagerTest.$anonfun$testRemoveMetricsOnClose$2(mockMetricsGroup, x$1);
                return BoxedUnit.UNIT;
            });
            Mockito.verifyNoMoreInteractions((Object[])new Object[]{mockMetricsGroup});
        }
    }

    /*
     * WARNING - void declaration
     */
    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void shouldOnlyWriteTxnCompletionOnce(boolean isTransactionV2Enabled) {
        Future<Try> addMarkerFuture;
        this.mockCache();
        this.adjustTransactionMetadataForVersion(isTransactionV2Enabled, this.txnMetadata2());
        TxnTransitMetadata expectedTransition = this.txnMetadata2().prepareComplete(this.time().milliseconds());
        Mockito.when((Object)this.metadataCache().getPartitionLeaderEndpoint((String)ArgumentMatchers.eq((Object)this.partition1().topic()), ArgumentMatchers.eq((int)this.partition1().partition()), (ListenerName)ArgumentMatchers.any())).thenReturn((Object)new Some((Object)this.broker1()));
        this.txnStateManager().appendTransactionToLog((String)ArgumentMatchers.eq((Object)this.transactionalId2()), ArgumentMatchers.eq((int)this.coordinatorEpoch()), (TxnTransitMetadata)ArgumentMatchers.eq((Object)expectedTransition), (Function1)this.capturedErrorsCallback().capture(), (Function1)ArgumentMatchers.any(), (RequestLocal)ArgumentMatchers.any());
        Mockito.when((Object)BoxedUnit.UNIT).thenAnswer(x$2 -> {
            TransactionMarkerChannelManagerTest.$anonfun$shouldOnlyWriteTxnCompletionOnce$1(this, expectedTransition, x$2);
            return BoxedUnit.UNIT;
        });
        ExecutorService executor = Executors.newFixedThreadPool(1);
        this.txnMetadata2().lock().lock();
        try {
            addMarkerFuture = executor.submit(() -> Try$.MODULE$.apply((Function0)(JFunction0.mcV.sp & Serializable)() -> this.channelManager().addTxnMarkersToSend(this.coordinatorEpoch(), this.txnResult(), this.txnMetadata2(), expectedTransition)));
            RequestHeader header = new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, 0, "client", 1);
            WriteTxnMarkersResponse response = new WriteTxnMarkersResponse(Collections.singletonMap(Predef$.MODULE$.long2Long(this.producerId2()), Collections.singletonMap(this.partition1(), Errors.NONE)));
            ClientResponse clientResponse = new ClientResponse(header, null, null, this.time().milliseconds(), this.time().milliseconds(), false, null, null, (AbstractResponse)response);
            long l = 100L;
            long waitUntilTrue_waitTimeMs = 15000L;
            long waitUntilTrue_startTime = System.currentTimeMillis();
            while (!TransactionMarkerChannelManagerTest.$anonfun$shouldOnlyWriteTxnCompletionOnce$4(this, clientResponse)) {
                void waitUntilTrue_pause;
                if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                    Assertions.fail((String)"Timed out waiting for expected WriteTxnMarkers request");
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
            }
        }
        finally {
            this.txnMetadata2().lock().unlock();
            executor.shutdown();
        }
        Assertions.assertNotNull(addMarkerFuture);
        Assertions.assertTrue((boolean)addMarkerFuture.get().isSuccess(), (String)("Add marker task failed with exception " + addMarkerFuture.get().get()));
        ((TransactionStateManager)Mockito.verify((Object)this.txnStateManager())).appendTransactionToLog((String)ArgumentMatchers.eq((Object)this.transactionalId2()), ArgumentMatchers.eq((int)this.coordinatorEpoch()), (TxnTransitMetadata)ArgumentMatchers.eq((Object)expectedTransition), (Function1)this.capturedErrorsCallback().capture(), (Function1)ArgumentMatchers.any(), (RequestLocal)ArgumentMatchers.any());
    }

    @Test
    public void shouldNotLoseTxnCompletionAfterLoad() {
        this.mockCache();
        TxnTransitMetadata expectedTransition = this.txnMetadata2().prepareComplete(this.time().milliseconds());
        Mockito.when((Object)this.metadataCache().getPartitionLeaderEndpoint((String)ArgumentMatchers.eq((Object)this.partition1().topic()), ArgumentMatchers.eq((int)this.partition1().partition()), (ListenerName)ArgumentMatchers.any())).thenReturn((Object)new Some((Object)this.broker1()));
        RequestHeader header = new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, 0, "client", 1);
        WriteTxnMarkersResponse successfulResponse = new WriteTxnMarkersResponse(Collections.singletonMap(Predef$.MODULE$.long2Long(this.producerId2()), Collections.singletonMap(this.partition1(), Errors.NONE)));
        ClientResponse successfulClientResponse = new ClientResponse(header, null, null, this.time().milliseconds(), this.time().milliseconds(), false, null, null, (AbstractResponse)successfulResponse);
        ClientResponse disconnectedClientResponse = new ClientResponse(header, null, null, this.time().milliseconds(), this.time().milliseconds(), true, null, null, null);
        .colon.colon clientResponses = new .colon.colon((Object)successfulClientResponse, (List)new .colon.colon((Object)disconnectedClientResponse, (List)Nil$.MODULE$));
        .colon.colon getTransactionStateResponses = new .colon.colon((Object)new Left((Object)Errors.NOT_COORDINATOR), (List)new .colon.colon((Object)new Left((Object)Errors.COORDINATOR_LOAD_IN_PROGRESS), (List)new .colon.colon((Object)new Right((Object)new Some((Object)new CoordinatorEpochAndTxnMetadata(this.coordinatorEpoch2(), this.txnMetadata2()))), (List)Nil$.MODULE$)));
        clientResponses.foreach(arg_0 -> TransactionMarkerChannelManagerTest.$anonfun$shouldNotLoseTxnCompletionAfterLoad$1$adapted(this, (Seq)getTransactionStateResponses, expectedTransition, successfulClientResponse, arg_0));
    }

    @Test
    public void shouldGenerateEmptyMapWhenNoRequestsOutstanding() {
        Assertions.assertTrue((boolean)this.channelManager().generateRequests().isEmpty());
    }

    @Test
    public void shouldGenerateRequestPerPartitionPerBroker() {
        this.mockCache();
        Mockito.when((Object)this.metadataCache().getPartitionLeaderEndpoint((String)ArgumentMatchers.eq((Object)this.partition1().topic()), ArgumentMatchers.eq((int)this.partition1().partition()), (ListenerName)ArgumentMatchers.any())).thenReturn((Object)new Some((Object)this.broker1()));
        Mockito.when((Object)this.metadataCache().getPartitionLeaderEndpoint((String)ArgumentMatchers.eq((Object)this.partition2().topic()), ArgumentMatchers.eq((int)this.partition2().partition()), (ListenerName)ArgumentMatchers.any())).thenReturn((Object)new Some((Object)this.broker2()));
        this.channelManager().addTxnMarkersToSend(this.coordinatorEpoch(), this.txnResult(), this.txnMetadata1(), this.txnMetadata1().prepareComplete(this.time().milliseconds()));
        this.channelManager().addTxnMarkersToSend(this.coordinatorEpoch(), this.txnResult(), this.txnMetadata2(), this.txnMetadata2().prepareComplete(this.time().milliseconds()));
        Assertions.assertEquals((int)2, (int)this.channelManager().numTxnsWithPendingMarkers());
        Assertions.assertEquals((int)2, (int)((TxnMarkerQueue)this.channelManager().queueForBroker(this.broker1().id()).get()).totalNumMarkers());
        Assertions.assertEquals((int)1, (int)((TxnMarkerQueue)this.channelManager().queueForBroker(this.broker1().id()).get()).totalNumMarkers(this.txnTopicPartition1()));
        Assertions.assertEquals((int)1, (int)((TxnMarkerQueue)this.channelManager().queueForBroker(this.broker1().id()).get()).totalNumMarkers(this.txnTopicPartition2()));
        Assertions.assertEquals((int)1, (int)((TxnMarkerQueue)this.channelManager().queueForBroker(this.broker2().id()).get()).totalNumMarkers());
        Assertions.assertEquals((int)1, (int)((TxnMarkerQueue)this.channelManager().queueForBroker(this.broker2().id()).get()).totalNumMarkers(this.txnTopicPartition1()));
        Assertions.assertEquals((int)0, (int)((TxnMarkerQueue)this.channelManager().queueForBroker(this.broker2().id()).get()).totalNumMarkers(this.txnTopicPartition2()));
        WriteTxnMarkersRequest expectedBroker1Request = (WriteTxnMarkersRequest)new WriteTxnMarkersRequest.Builder(Arrays.asList(new WriteTxnMarkersRequest.TxnMarkerEntry(this.producerId1(), this.producerEpoch(), this.coordinatorEpoch(), this.txnResult(), Arrays.asList(this.partition1())), new WriteTxnMarkersRequest.TxnMarkerEntry(this.producerId2(), this.producerEpoch(), this.coordinatorEpoch(), this.txnResult(), Arrays.asList(this.partition1())))).build();
        WriteTxnMarkersRequest expectedBroker2Request = (WriteTxnMarkersRequest)new WriteTxnMarkersRequest.Builder(Arrays.asList(new WriteTxnMarkersRequest.TxnMarkerEntry(this.producerId1(), this.producerEpoch(), this.coordinatorEpoch(), this.txnResult(), Arrays.asList(this.partition2())))).build();
        scala.collection.immutable.Map requests = ((IterableOnceOps)CollectionConverters$.MODULE$.CollectionHasAsScala(this.channelManager().generateRequests()).asScala().map((Function1 & Serializable)handler -> new Tuple2((Object)handler.destination, (Object)((WriteTxnMarkersRequest.Builder)handler.request).build()))).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl());
        Assertions.assertEquals((Object)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.broker1()), (Object)expectedBroker1Request), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.broker2()), (Object)expectedBroker2Request)})), (Object)requests);
        Assertions.assertTrue((boolean)this.channelManager().generateRequests().isEmpty());
    }

    @Test
    public void shouldSkipSendMarkersWhenLeaderNotFound() {
        this.mockCache();
        Mockito.when((Object)this.metadataCache().getPartitionLeaderEndpoint((String)ArgumentMatchers.eq((Object)this.partition1().topic()), ArgumentMatchers.eq((int)this.partition1().partition()), (ListenerName)ArgumentMatchers.any())).thenReturn((Object)None$.MODULE$);
        Mockito.when((Object)this.metadataCache().getPartitionLeaderEndpoint((String)ArgumentMatchers.eq((Object)this.partition2().topic()), ArgumentMatchers.eq((int)this.partition2().partition()), (ListenerName)ArgumentMatchers.any())).thenReturn((Object)new Some((Object)this.broker2()));
        this.channelManager().addTxnMarkersToSend(this.coordinatorEpoch(), this.txnResult(), this.txnMetadata1(), this.txnMetadata1().prepareComplete(this.time().milliseconds()));
        Assertions.assertEquals((int)1, (int)this.channelManager().numTxnsWithPendingMarkers());
        Assertions.assertEquals((int)1, (int)((TxnMarkerQueue)this.channelManager().queueForBroker(this.broker2().id()).get()).totalNumMarkers());
        Assertions.assertTrue((boolean)this.channelManager().queueForBroker(this.broker1().id()).isEmpty());
        Assertions.assertEquals((int)1, (int)((TxnMarkerQueue)this.channelManager().queueForBroker(this.broker2().id()).get()).totalNumMarkers(this.txnTopicPartition1()));
        Assertions.assertEquals((int)0, (int)((TxnMarkerQueue)this.channelManager().queueForBroker(this.broker2().id()).get()).totalNumMarkers(this.txnTopicPartition2()));
    }

    @Test
    public void shouldSaveForLaterWhenLeaderUnknownButNotAvailable() {
        this.mockCache();
        Mockito.when((Object)this.metadataCache().getPartitionLeaderEndpoint((String)ArgumentMatchers.eq((Object)this.partition1().topic()), ArgumentMatchers.eq((int)this.partition1().partition()), (ListenerName)ArgumentMatchers.any())).thenReturn((Object)new Some((Object)Node.noNode())).thenReturn((Object)new Some((Object)Node.noNode())).thenReturn((Object)new Some((Object)Node.noNode())).thenReturn((Object)new Some((Object)Node.noNode())).thenReturn((Object)new Some((Object)this.broker1())).thenReturn((Object)new Some((Object)this.broker1()));
        Mockito.when((Object)this.metadataCache().getPartitionLeaderEndpoint((String)ArgumentMatchers.eq((Object)this.partition2().topic()), ArgumentMatchers.eq((int)this.partition2().partition()), (ListenerName)ArgumentMatchers.any())).thenReturn((Object)new Some((Object)this.broker2()));
        this.channelManager().addTxnMarkersToSend(this.coordinatorEpoch(), this.txnResult(), this.txnMetadata1(), this.txnMetadata1().prepareComplete(this.time().milliseconds()));
        this.channelManager().addTxnMarkersToSend(this.coordinatorEpoch(), this.txnResult(), this.txnMetadata2(), this.txnMetadata2().prepareComplete(this.time().milliseconds()));
        Assertions.assertEquals((int)2, (int)this.channelManager().numTxnsWithPendingMarkers());
        Assertions.assertEquals((int)1, (int)((TxnMarkerQueue)this.channelManager().queueForBroker(this.broker2().id()).get()).totalNumMarkers());
        Assertions.assertTrue((boolean)this.channelManager().queueForBroker(this.broker1().id()).isEmpty());
        Assertions.assertEquals((int)1, (int)((TxnMarkerQueue)this.channelManager().queueForBroker(this.broker2().id()).get()).totalNumMarkers(this.txnTopicPartition1()));
        Assertions.assertEquals((int)0, (int)((TxnMarkerQueue)this.channelManager().queueForBroker(this.broker2().id()).get()).totalNumMarkers(this.txnTopicPartition2()));
        Assertions.assertEquals((int)2, (int)this.channelManager().queueForUnknownBroker().totalNumMarkers());
        Assertions.assertEquals((int)1, (int)this.channelManager().queueForUnknownBroker().totalNumMarkers(this.txnTopicPartition1()));
        Assertions.assertEquals((int)1, (int)this.channelManager().queueForUnknownBroker().totalNumMarkers(this.txnTopicPartition2()));
        WriteTxnMarkersRequest expectedBroker1Request = (WriteTxnMarkersRequest)new WriteTxnMarkersRequest.Builder(Arrays.asList(new WriteTxnMarkersRequest.TxnMarkerEntry(this.producerId1(), this.producerEpoch(), this.coordinatorEpoch(), this.txnResult(), Arrays.asList(this.partition1())), new WriteTxnMarkersRequest.TxnMarkerEntry(this.producerId2(), this.producerEpoch(), this.coordinatorEpoch(), this.txnResult(), Arrays.asList(this.partition1())))).build();
        WriteTxnMarkersRequest expectedBroker2Request = (WriteTxnMarkersRequest)new WriteTxnMarkersRequest.Builder(Arrays.asList(new WriteTxnMarkersRequest.TxnMarkerEntry(this.producerId1(), this.producerEpoch(), this.coordinatorEpoch(), this.txnResult(), Arrays.asList(this.partition2())))).build();
        scala.collection.immutable.Map firstDrainedRequests = ((IterableOnceOps)CollectionConverters$.MODULE$.CollectionHasAsScala(this.channelManager().generateRequests()).asScala().map((Function1 & Serializable)handler -> new Tuple2((Object)handler.destination, (Object)((WriteTxnMarkersRequest.Builder)handler.request).build()))).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl());
        Assertions.assertEquals((Object)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.broker2()), (Object)expectedBroker2Request)})), (Object)firstDrainedRequests);
        scala.collection.immutable.Map secondDrainedRequests = ((IterableOnceOps)CollectionConverters$.MODULE$.CollectionHasAsScala(this.channelManager().generateRequests()).asScala().map((Function1 & Serializable)handler -> new Tuple2((Object)handler.destination, (Object)((WriteTxnMarkersRequest.Builder)handler.request).build()))).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl());
        Assertions.assertEquals((Object)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.broker1()), (Object)expectedBroker1Request)})), (Object)secondDrainedRequests);
    }

    @Test
    public void shouldRemoveMarkersForTxnPartitionWhenPartitionEmigrated() {
        this.mockCache();
        Mockito.when((Object)this.metadataCache().getPartitionLeaderEndpoint((String)ArgumentMatchers.eq((Object)this.partition1().topic()), ArgumentMatchers.eq((int)this.partition1().partition()), (ListenerName)ArgumentMatchers.any())).thenReturn((Object)new Some((Object)this.broker1()));
        Mockito.when((Object)this.metadataCache().getPartitionLeaderEndpoint((String)ArgumentMatchers.eq((Object)this.partition2().topic()), ArgumentMatchers.eq((int)this.partition2().partition()), (ListenerName)ArgumentMatchers.any())).thenReturn((Object)new Some((Object)this.broker2()));
        this.channelManager().addTxnMarkersToSend(this.coordinatorEpoch(), this.txnResult(), this.txnMetadata1(), this.txnMetadata1().prepareComplete(this.time().milliseconds()));
        this.channelManager().addTxnMarkersToSend(this.coordinatorEpoch(), this.txnResult(), this.txnMetadata2(), this.txnMetadata2().prepareComplete(this.time().milliseconds()));
        Assertions.assertEquals((int)2, (int)this.channelManager().numTxnsWithPendingMarkers());
        Assertions.assertEquals((int)2, (int)((TxnMarkerQueue)this.channelManager().queueForBroker(this.broker1().id()).get()).totalNumMarkers());
        Assertions.assertEquals((int)1, (int)((TxnMarkerQueue)this.channelManager().queueForBroker(this.broker1().id()).get()).totalNumMarkers(this.txnTopicPartition1()));
        Assertions.assertEquals((int)1, (int)((TxnMarkerQueue)this.channelManager().queueForBroker(this.broker1().id()).get()).totalNumMarkers(this.txnTopicPartition2()));
        Assertions.assertEquals((int)1, (int)((TxnMarkerQueue)this.channelManager().queueForBroker(this.broker2().id()).get()).totalNumMarkers());
        Assertions.assertEquals((int)1, (int)((TxnMarkerQueue)this.channelManager().queueForBroker(this.broker2().id()).get()).totalNumMarkers(this.txnTopicPartition1()));
        Assertions.assertEquals((int)0, (int)((TxnMarkerQueue)this.channelManager().queueForBroker(this.broker2().id()).get()).totalNumMarkers(this.txnTopicPartition2()));
        this.channelManager().removeMarkersForTxnTopicPartition(this.txnTopicPartition1());
        Assertions.assertEquals((int)1, (int)this.channelManager().numTxnsWithPendingMarkers());
        Assertions.assertEquals((int)1, (int)((TxnMarkerQueue)this.channelManager().queueForBroker(this.broker1().id()).get()).totalNumMarkers());
        Assertions.assertEquals((int)0, (int)((TxnMarkerQueue)this.channelManager().queueForBroker(this.broker1().id()).get()).totalNumMarkers(this.txnTopicPartition1()));
        Assertions.assertEquals((int)1, (int)((TxnMarkerQueue)this.channelManager().queueForBroker(this.broker1().id()).get()).totalNumMarkers(this.txnTopicPartition2()));
        Assertions.assertEquals((int)0, (int)((TxnMarkerQueue)this.channelManager().queueForBroker(this.broker2().id()).get()).totalNumMarkers());
        Assertions.assertEquals((int)0, (int)((TxnMarkerQueue)this.channelManager().queueForBroker(this.broker2().id()).get()).totalNumMarkers(this.txnTopicPartition1()));
        Assertions.assertEquals((int)0, (int)((TxnMarkerQueue)this.channelManager().queueForBroker(this.broker2().id()).get()).totalNumMarkers(this.txnTopicPartition2()));
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void shouldCompleteAppendToLogOnEndTxnWhenSendMarkersSucceed(boolean isTransactionV2Enabled) {
        this.mockCache();
        this.adjustTransactionMetadataForVersion(isTransactionV2Enabled, this.txnMetadata2());
        Mockito.when((Object)this.metadataCache().getPartitionLeaderEndpoint((String)ArgumentMatchers.eq((Object)this.partition1().topic()), ArgumentMatchers.eq((int)this.partition1().partition()), (ListenerName)ArgumentMatchers.any())).thenReturn((Object)new Some((Object)this.broker1()));
        Mockito.when((Object)this.metadataCache().getPartitionLeaderEndpoint((String)ArgumentMatchers.eq((Object)this.partition2().topic()), ArgumentMatchers.eq((int)this.partition2().partition()), (ListenerName)ArgumentMatchers.any())).thenReturn((Object)new Some((Object)this.broker2()));
        TxnTransitMetadata txnTransitionMetadata2 = this.txnMetadata2().prepareComplete(this.time().milliseconds());
        this.txnStateManager().appendTransactionToLog((String)ArgumentMatchers.eq((Object)this.transactionalId2()), ArgumentMatchers.eq((int)this.coordinatorEpoch()), (TxnTransitMetadata)ArgumentMatchers.eq((Object)txnTransitionMetadata2), (Function1)this.capturedErrorsCallback().capture(), (Function1)ArgumentMatchers.any(), (RequestLocal)ArgumentMatchers.any());
        Mockito.when((Object)BoxedUnit.UNIT).thenAnswer(x$3 -> {
            TransactionMarkerChannelManagerTest.$anonfun$shouldCompleteAppendToLogOnEndTxnWhenSendMarkersSucceed$1(this, txnTransitionMetadata2, x$3);
            return BoxedUnit.UNIT;
        });
        this.channelManager().addTxnMarkersToSend(this.coordinatorEpoch(), this.txnResult(), this.txnMetadata2(), txnTransitionMetadata2);
        Iterable requestAndHandlers = CollectionConverters$.MODULE$.CollectionHasAsScala(this.channelManager().generateRequests()).asScala();
        WriteTxnMarkersResponse response = new WriteTxnMarkersResponse(this.createPidErrorMap(Errors.NONE));
        requestAndHandlers.foreach((Function1 & Serializable)requestAndHandler -> {
            requestAndHandler.handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, 0, "client", 1), null, null, 0L, 0L, false, null, null, (AbstractResponse)response));
            return BoxedUnit.UNIT;
        });
        ((TransactionStateManager)Mockito.verify((Object)this.txnStateManager())).appendTransactionToLog((String)ArgumentMatchers.eq((Object)this.transactionalId2()), ArgumentMatchers.eq((int)this.coordinatorEpoch()), (TxnTransitMetadata)ArgumentMatchers.eq((Object)txnTransitionMetadata2), (Function1)this.capturedErrorsCallback().capture(), (Function1)ArgumentMatchers.any(), (RequestLocal)ArgumentMatchers.any());
        Assertions.assertEquals((int)0, (int)this.channelManager().numTxnsWithPendingMarkers());
        Assertions.assertEquals((int)0, (int)((TxnMarkerQueue)this.channelManager().queueForBroker(this.broker1().id()).get()).totalNumMarkers());
        Assertions.assertEquals((Object)None$.MODULE$, (Object)this.txnMetadata2().pendingState());
        Assertions.assertEquals((Object)CompleteCommit$.MODULE$, (Object)this.txnMetadata2().state());
    }

    @Test
    public void shouldAbortAppendToLogOnEndTxnWhenNotCoordinatorError() {
        this.mockCache();
        Mockito.when((Object)this.metadataCache().getPartitionLeaderEndpoint((String)ArgumentMatchers.eq((Object)this.partition1().topic()), ArgumentMatchers.eq((int)this.partition1().partition()), (ListenerName)ArgumentMatchers.any())).thenReturn((Object)new Some((Object)this.broker1()));
        Mockito.when((Object)this.metadataCache().getPartitionLeaderEndpoint((String)ArgumentMatchers.eq((Object)this.partition2().topic()), ArgumentMatchers.eq((int)this.partition2().partition()), (ListenerName)ArgumentMatchers.any())).thenReturn((Object)new Some((Object)this.broker2()));
        TxnTransitMetadata txnTransitionMetadata2 = this.txnMetadata2().prepareComplete(this.time().milliseconds());
        this.txnStateManager().appendTransactionToLog((String)ArgumentMatchers.eq((Object)this.transactionalId2()), ArgumentMatchers.eq((int)this.coordinatorEpoch()), (TxnTransitMetadata)ArgumentMatchers.eq((Object)txnTransitionMetadata2), (Function1)this.capturedErrorsCallback().capture(), (Function1)ArgumentMatchers.any(), (RequestLocal)ArgumentMatchers.any());
        Mockito.when((Object)BoxedUnit.UNIT).thenAnswer(x$4 -> {
            TransactionMarkerChannelManagerTest.$anonfun$shouldAbortAppendToLogOnEndTxnWhenNotCoordinatorError$1(this, x$4);
            return BoxedUnit.UNIT;
        });
        this.channelManager().addTxnMarkersToSend(this.coordinatorEpoch(), this.txnResult(), this.txnMetadata2(), txnTransitionMetadata2);
        Iterable requestAndHandlers = CollectionConverters$.MODULE$.CollectionHasAsScala(this.channelManager().generateRequests()).asScala();
        WriteTxnMarkersResponse response = new WriteTxnMarkersResponse(this.createPidErrorMap(Errors.NONE));
        requestAndHandlers.foreach((Function1 & Serializable)requestAndHandler -> {
            requestAndHandler.handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, 0, "client", 1), null, null, 0L, 0L, false, null, null, (AbstractResponse)response));
            return BoxedUnit.UNIT;
        });
        ((TransactionStateManager)Mockito.verify((Object)this.txnStateManager())).appendTransactionToLog((String)ArgumentMatchers.eq((Object)this.transactionalId2()), ArgumentMatchers.eq((int)this.coordinatorEpoch()), (TxnTransitMetadata)ArgumentMatchers.eq((Object)txnTransitionMetadata2), (Function1)this.capturedErrorsCallback().capture(), (Function1)ArgumentMatchers.any(), (RequestLocal)ArgumentMatchers.any());
        Assertions.assertEquals((int)0, (int)this.channelManager().numTxnsWithPendingMarkers());
        Assertions.assertEquals((int)0, (int)((TxnMarkerQueue)this.channelManager().queueForBroker(this.broker1().id()).get()).totalNumMarkers());
        Assertions.assertEquals((Object)None$.MODULE$, (Object)this.txnMetadata2().pendingState());
        Assertions.assertEquals((Object)PrepareCommit$.MODULE$, (Object)this.txnMetadata2().state());
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void shouldRetryAppendToLogOnEndTxnWhenCoordinatorNotAvailableError(boolean isTransactionV2Enabled) {
        this.mockCache();
        this.adjustTransactionMetadataForVersion(isTransactionV2Enabled, this.txnMetadata2());
        Mockito.when((Object)this.metadataCache().getPartitionLeaderEndpoint((String)ArgumentMatchers.eq((Object)this.partition1().topic()), ArgumentMatchers.eq((int)this.partition1().partition()), (ListenerName)ArgumentMatchers.any())).thenReturn((Object)new Some((Object)this.broker1()));
        Mockito.when((Object)this.metadataCache().getPartitionLeaderEndpoint((String)ArgumentMatchers.eq((Object)this.partition2().topic()), ArgumentMatchers.eq((int)this.partition2().partition()), (ListenerName)ArgumentMatchers.any())).thenReturn((Object)new Some((Object)this.broker2()));
        TxnTransitMetadata txnTransitionMetadata2 = this.txnMetadata2().prepareComplete(this.time().milliseconds());
        this.txnStateManager().appendTransactionToLog((String)ArgumentMatchers.eq((Object)this.transactionalId2()), ArgumentMatchers.eq((int)this.coordinatorEpoch()), (TxnTransitMetadata)ArgumentMatchers.eq((Object)txnTransitionMetadata2), (Function1)this.capturedErrorsCallback().capture(), (Function1)ArgumentMatchers.any(), (RequestLocal)ArgumentMatchers.any());
        Mockito.when((Object)BoxedUnit.UNIT).thenAnswer(x$5 -> {
            TransactionMarkerChannelManagerTest.$anonfun$shouldRetryAppendToLogOnEndTxnWhenCoordinatorNotAvailableError$1(this, x$5);
            return BoxedUnit.UNIT;
        }).thenAnswer(x$6 -> {
            TransactionMarkerChannelManagerTest.$anonfun$shouldRetryAppendToLogOnEndTxnWhenCoordinatorNotAvailableError$2(this, txnTransitionMetadata2, x$6);
            return BoxedUnit.UNIT;
        });
        this.channelManager().addTxnMarkersToSend(this.coordinatorEpoch(), this.txnResult(), this.txnMetadata2(), txnTransitionMetadata2);
        Iterable requestAndHandlers = CollectionConverters$.MODULE$.CollectionHasAsScala(this.channelManager().generateRequests()).asScala();
        WriteTxnMarkersResponse response = new WriteTxnMarkersResponse(this.createPidErrorMap(Errors.NONE));
        requestAndHandlers.foreach((Function1 & Serializable)requestAndHandler -> {
            requestAndHandler.handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, 0, "client", 1), null, null, 0L, 0L, false, null, null, (AbstractResponse)response));
            return BoxedUnit.UNIT;
        });
        this.channelManager().generateRequests();
        ((TransactionStateManager)Mockito.verify((Object)this.txnStateManager(), (VerificationMode)Mockito.times((int)2))).appendTransactionToLog((String)ArgumentMatchers.eq((Object)this.transactionalId2()), ArgumentMatchers.eq((int)this.coordinatorEpoch()), (TxnTransitMetadata)ArgumentMatchers.eq((Object)txnTransitionMetadata2), (Function1)this.capturedErrorsCallback().capture(), (Function1)ArgumentMatchers.any(), (RequestLocal)ArgumentMatchers.any());
        Assertions.assertEquals((int)0, (int)this.channelManager().numTxnsWithPendingMarkers());
        Assertions.assertEquals((int)0, (int)((TxnMarkerQueue)this.channelManager().queueForBroker(this.broker1().id()).get()).totalNumMarkers());
        Assertions.assertEquals((Object)None$.MODULE$, (Object)this.txnMetadata2().pendingState());
        Assertions.assertEquals((Object)CompleteCommit$.MODULE$, (Object)this.txnMetadata2().state());
    }

    private HashMap<Long, Map<TopicPartition, Errors>> createPidErrorMap(Errors errors) {
        HashMap<Long, Map<TopicPartition, Errors>> pidMap = new HashMap<Long, Map<TopicPartition, Errors>>();
        HashMap<TopicPartition, Errors> errorsMap = new HashMap<TopicPartition, Errors>();
        errorsMap.put(this.partition1(), errors);
        pidMap.put(Predef$.MODULE$.long2Long(this.producerId2()), errorsMap);
        return pidMap;
    }

    @Test
    public void shouldCreateMetricsOnStarting() {
        scala.collection.mutable.Map metrics = CollectionConverters$.MODULE$.MapHasAsScala(KafkaYammerMetrics.defaultRegistry().allMetrics()).asScala();
        Assertions.assertEquals((int)1, (int)metrics.count((Function1 & Serializable)x0$1 -> BoxesRunTime.boxToBoolean((boolean)TransactionMarkerChannelManagerTest.$anonfun$shouldCreateMetricsOnStarting$1(x0$1))));
        Assertions.assertEquals((int)1, (int)metrics.count((Function1 & Serializable)x0$2 -> BoxesRunTime.boxToBoolean((boolean)TransactionMarkerChannelManagerTest.$anonfun$shouldCreateMetricsOnStarting$2(x0$2))));
    }

    private void adjustTransactionMetadataForVersion(boolean isTransactionV2Enabled, TransactionMetadata txnMetadata) {
        if (isTransactionV2Enabled) {
            txnMetadata.clientTransactionVersion_$eq(TransactionVersion.TV_2);
            txnMetadata.producerEpoch_$eq((short)(this.producerEpoch() + 1));
            txnMetadata.lastProducerEpoch_$eq(this.producerEpoch());
            return;
        }
        txnMetadata.clientTransactionVersion_$eq(TransactionVersion.TV_1);
    }

    public static final /* synthetic */ void $anonfun$testRemoveMetricsOnClose$2(KafkaMetricsGroup mockMetricsGroup$1, String x$1) {
        ((KafkaMetricsGroup)Mockito.verify((Object)mockMetricsGroup$1)).removeMetric(x$1);
    }

    public static final /* synthetic */ void $anonfun$shouldOnlyWriteTxnCompletionOnce$1(TransactionMarkerChannelManagerTest $this, TxnTransitMetadata expectedTransition$1, InvocationOnMock x$2) {
        $this.txnMetadata2().completeTransitionTo(expectedTransition$1);
        ((Function1)$this.capturedErrorsCallback().getValue()).apply((Object)Errors.NONE);
    }

    public static final /* synthetic */ boolean $anonfun$shouldOnlyWriteTxnCompletionOnce$4(TransactionMarkerChannelManagerTest $this, ClientResponse clientResponse$1) {
        Iterable requests = CollectionConverters$.MODULE$.CollectionHasAsScala($this.channelManager().generateRequests()).asScala();
        if (requests.nonEmpty()) {
            Assertions.assertEquals((int)1, (int)requests.size());
            ((RequestAndCompletionHandler)requests.head()).handler.onComplete(clientResponse$1);
            return true;
        }
        return false;
    }

    public static final /* synthetic */ String $anonfun$shouldOnlyWriteTxnCompletionOnce$5() {
        return "Timed out waiting for expected WriteTxnMarkers request";
    }

    public static final /* synthetic */ void $anonfun$shouldNotLoseTxnCompletionAfterLoad$2(TransactionMarkerChannelManagerTest $this, TxnTransitMetadata expectedTransition$2, ClientResponse clientResponse$2, ClientResponse successfulClientResponse$1, Either getTransactionStateResponse) {
        $this.txnMetadata2().topicPartitions().add((Object)$this.partition1());
        Mockito.clearInvocations((Object[])new TransactionStateManager[]{$this.txnStateManager()});
        $this.channelManager().addTxnMarkersToSend($this.coordinatorEpoch(), $this.txnResult(), $this.txnMetadata2(), expectedTransition$2);
        Iterable requests1 = CollectionConverters$.MODULE$.CollectionHasAsScala($this.channelManager().generateRequests()).asScala();
        Assertions.assertEquals((int)1, (int)requests1.size());
        $this.channelManager().removeMarkersForTxnTopicPartition($this.txnTopicPartition2());
        Mockito.when((Object)$this.txnStateManager().getTransactionState((String)ArgumentMatchers.eq((Object)$this.transactionalId2()))).thenReturn((Object)getTransactionStateResponse);
        $this.channelManager().addTxnMarkersToSend($this.coordinatorEpoch2(), $this.txnResult(), $this.txnMetadata2(), expectedTransition$2);
        ((RequestAndCompletionHandler)requests1.head()).handler.onComplete(clientResponse$2);
        Mockito.when((Object)$this.txnStateManager().getTransactionState((String)ArgumentMatchers.eq((Object)$this.transactionalId2()))).thenReturn((Object)new Right((Object)new Some((Object)new CoordinatorEpochAndTxnMetadata($this.coordinatorEpoch2(), $this.txnMetadata2()))));
        Iterable requests2 = CollectionConverters$.MODULE$.CollectionHasAsScala($this.channelManager().generateRequests()).asScala();
        Assertions.assertEquals((int)1, (int)requests2.size());
        ((RequestAndCompletionHandler)requests2.head()).handler.onComplete(successfulClientResponse$1);
        ((TransactionStateManager)Mockito.verify((Object)$this.txnStateManager())).appendTransactionToLog((String)ArgumentMatchers.eq((Object)$this.transactionalId2()), ArgumentMatchers.eq((int)$this.coordinatorEpoch2()), (TxnTransitMetadata)ArgumentMatchers.eq((Object)expectedTransition$2), (Function1)$this.capturedErrorsCallback().capture(), (Function1)ArgumentMatchers.any(), (RequestLocal)ArgumentMatchers.any());
    }

    public static final /* synthetic */ void $anonfun$shouldCompleteAppendToLogOnEndTxnWhenSendMarkersSucceed$1(TransactionMarkerChannelManagerTest $this, TxnTransitMetadata txnTransitionMetadata2$1, InvocationOnMock x$3) {
        $this.txnMetadata2().completeTransitionTo(txnTransitionMetadata2$1);
        ((Function1)$this.capturedErrorsCallback().getValue()).apply((Object)Errors.NONE);
    }

    public static final /* synthetic */ void $anonfun$shouldAbortAppendToLogOnEndTxnWhenNotCoordinatorError$1(TransactionMarkerChannelManagerTest $this, InvocationOnMock x$4) {
        $this.txnMetadata2().pendingState_$eq((Option)None$.MODULE$);
        ((Function1)$this.capturedErrorsCallback().getValue()).apply((Object)Errors.NOT_COORDINATOR);
    }

    public static final /* synthetic */ void $anonfun$shouldRetryAppendToLogOnEndTxnWhenCoordinatorNotAvailableError$1(TransactionMarkerChannelManagerTest $this, InvocationOnMock x$5) {
        ((Function1)$this.capturedErrorsCallback().getValue()).apply((Object)Errors.COORDINATOR_NOT_AVAILABLE);
    }

    public static final /* synthetic */ void $anonfun$shouldRetryAppendToLogOnEndTxnWhenCoordinatorNotAvailableError$2(TransactionMarkerChannelManagerTest $this, TxnTransitMetadata txnTransitionMetadata2$2, InvocationOnMock x$6) {
        $this.txnMetadata2().completeTransitionTo(txnTransitionMetadata2$2);
        ((Function1)$this.capturedErrorsCallback().getValue()).apply((Object)Errors.NONE);
    }

    public static final /* synthetic */ boolean $anonfun$shouldCreateMetricsOnStarting$1(Tuple2 x0$1) {
        if (x0$1 != null) {
            String string = ((MetricName)x0$1._1()).getMBeanName();
            String string2 = "kafka.coordinator.transaction:type=TransactionMarkerChannelManager,name=UnknownDestinationQueueSize";
            return string != null && string.equals(string2);
        }
        throw new MatchError(null);
    }

    public static final /* synthetic */ boolean $anonfun$shouldCreateMetricsOnStarting$2(Tuple2 x0$2) {
        if (x0$2 != null) {
            String string = ((MetricName)x0$2._1()).getMBeanName();
            String string2 = "kafka.coordinator.transaction:type=TransactionMarkerChannelManager,name=LogAppendRetryQueueSize";
            return string != null && string.equals(string2);
        }
        throw new MatchError(null);
    }

    public TransactionMarkerChannelManagerTest() {
        this.transactionalId1 = "txnId1";
        this.transactionalId2 = "txnId2";
        this.producerId1 = 0;
        this.producerId2 = 1;
        this.txnTopicPartition1 = 0;
        this.txnTopicPartition2 = 1;
        this.coordinatorEpoch = 0;
        this.coordinatorEpoch2 = 1;
        this.txnTimeoutMs = 0;
    }

    public static final /* synthetic */ Object $anonfun$shouldNotLoseTxnCompletionAfterLoad$1$adapted(TransactionMarkerChannelManagerTest $this, Seq getTransactionStateResponses$1, TxnTransitMetadata expectedTransition$2, ClientResponse successfulClientResponse$1, ClientResponse clientResponse) {
        getTransactionStateResponses$1.foreach((Function1 & Serializable)getTransactionStateResponse -> {
            TransactionMarkerChannelManagerTest.$anonfun$shouldNotLoseTxnCompletionAfterLoad$2($this, expectedTransition$2, clientResponse, successfulClientResponse$1, getTransactionStateResponse);
            return BoxedUnit.UNIT;
        });
        return BoxedUnit.UNIT;
    }
}

