/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.io.File;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import kafka.api.IntegrationTestHarness;
import kafka.cluster.Partition;
import kafka.controller.OfflineReplica$;
import kafka.controller.PartitionAndReplica;
import kafka.controller.ReplicaState;
import kafka.server.BrokerServer;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.server.ReplicaFetcherThread;
import kafka.utils.CoreUtils$;
import kafka.utils.Exit$;
import kafka.utils.Logging;
import kafka.utils.TestUtils;
import kafka.utils.TestUtils$;
import kafka.utils.TestUtils$Checkpoint$;
import kafka.utils.TestUtils$Roll$;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.IterableLike;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayOps;
import scala.jdk.CollectionConverters$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;
import scala.runtime.VolatileObjectRef;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\u0005Ee\u0001\u0002\r\u001a\u0001yAQ!\n\u0001\u0005\u0002\u0019Bq!\u000b\u0001C\u0002\u0013\u0005!\u0006\u0003\u00042\u0001\u0001\u0006Ia\u000b\u0005\be\u0001\u0011\r\u0011\"\u0001+\u0011\u0019\u0019\u0004\u0001)A\u0005W!9A\u0007\u0001b\u0001\n\u0003Q\u0003BB\u001b\u0001A\u0003%1\u0006C\u00047\u0001\t\u0007I\u0011B\u001c\t\r\u0001\u0003\u0001\u0015!\u00039\u0011\u001d\t\u0005A1A\u0005\n)BaA\u0011\u0001!\u0002\u0013Y\u0003bB\"\u0001\u0005\u0004%\tE\u000b\u0005\u0007\t\u0002\u0001\u000b\u0011B\u0016\t\u000b\u0015\u0003A\u0011\t$\t\u000bm\u0003A\u0011\u0001/\t\u000f\u0005\u0005\u0001\u0001\"\u0001\u0002\u0004!9\u0011Q\u0002\u0001\u0005\u0002\u0005=\u0001bBA\r\u0001\u0011\u0005\u00111\u0004\u0005\b\u0003K\u0001A\u0011AA\u0014\u0011\u001d\t\t\u0004\u0001C\u0001\u0003gAq!!\u0010\u0001\t\u0003\ty\u0004C\u0004\u0002`\u0001!\t!!\u0019\t\u000f\u0005\u001d\u0004\u0001\"\u0003\u0002j\t\tBj\\4ESJ4\u0015-\u001b7ve\u0016$Vm\u001d;\u000b\u0005iY\u0012AB:feZ,'OC\u0001\u001d\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001A\u0010\u0011\u0005\u0001\u001aS\"A\u0011\u000b\u0005\tZ\u0012aA1qS&\u0011A%\t\u0002\u0017\u0013:$Xm\u001a:bi&|g\u000eV3ti\"\u000b'O\\3tg\u00061A(\u001b8jiz\"\u0012a\n\t\u0003Q\u0001i\u0011!G\u0001\u000eaJ|G-^2fe\u000e{WO\u001c;\u0016\u0003-\u0002\"\u0001L\u0018\u000e\u00035R\u0011AL\u0001\u0006g\u000e\fG.Y\u0005\u0003a5\u00121!\u00138u\u00039\u0001(o\u001c3vG\u0016\u00148i\\;oi\u0002\nQbY8ogVlWM]\"pk:$\u0018AD2p]N,X.\u001a:D_VtG\u000fI\u0001\fEJ|7.\u001a:D_VtG/\u0001\u0007ce>\\WM]\"pk:$\b%A\u0003u_BL7-F\u00019!\tId(D\u0001;\u0015\tYD(\u0001\u0003mC:<'\"A\u001f\u0002\t)\fg/Y\u0005\u0003\u007fi\u0012aa\u0015;sS:<\u0017A\u0002;pa&\u001c\u0007%\u0001\u0007qCJ$\u0018\u000e^5p]:+X.A\u0007qCJ$\u0018\u000e^5p]:+X\u000eI\u0001\fY><G)\u001b:D_VtG/\u0001\u0007m_\u001e$\u0015N]\"pk:$\b%A\u0003tKR,\u0006\u000f\u0006\u0002H\u0015B\u0011A\u0006S\u0005\u0003\u00136\u0012A!\u00168ji\")1J\u0004a\u0001\u0019\u0006AA/Z:u\u0013:4w\u000e\u0005\u0002N+6\taJ\u0003\u0002#\u001f*\u0011\u0001+U\u0001\bUV\u0004\u0018\u000e^3s\u0015\t\u00116+A\u0003kk:LGOC\u0001U\u0003\ry'oZ\u0005\u0003-:\u0013\u0001\u0002V3ti&sgm\u001c\u0015\u0003\u001da\u0003\"!T-\n\u0005is%A\u0003\"fM>\u0014X-R1dQ\u0006!C/Z:u!J|G-^2f\u000bJ\u0014xN\u001d$s_64\u0015-\u001b7ve\u0016|e\u000eT8h%>dG\u000e\u0006\u0002H;\")al\u0004a\u0001?\u00061\u0011/^8sk6\u0004\"\u0001Y4\u000f\u0005\u0005,\u0007C\u00012.\u001b\u0005\u0019'B\u00013\u001e\u0003\u0019a$o\\8u}%\u0011a-L\u0001\u0007!J,G-\u001a4\n\u0005}B'B\u00014.Q\u0011y!N]:\u0011\u0005-\u0004X\"\u00017\u000b\u00055t\u0017\u0001\u00039s_ZLG-\u001a:\u000b\u0005=|\u0015A\u00029be\u0006l7/\u0003\u0002rY\nYa+\u00197vKN{WO]2f\u0003\u001d\u0019HO]5oONd#\u0001\u001e<\"\u0003U\f!A_6\"\u0003]\fQa\u001b:bMRDCaD=~}B\u0011!p_\u0007\u0002]&\u0011AP\u001c\u0002\u0012!\u0006\u0014\u0018-\\3uKJL'0\u001a3UKN$\u0018\u0001\u00028b[\u0016\f\u0013a`\u0001#w\u0012L7\u000f\u001d7bs:\u000bW.Z?/w\u0006\u0014x-^7f]R\u001cx+\u001b;i\u001d\u0006lWm]?\u00029Q,7\u000f^%P\u000bb\u001cW\r\u001d;j_:$UO]5oO2{wMU8mYR\u0019q)!\u0002\t\u000by\u0003\u0002\u0019A0)\u000bAQ'/!\u0003-\u0005Q4\b\u0006\u0002\tz{z\fq\b^3tij['I]8lKJ<\u0016\u000e\u001e5PY\u0012Le\u000e^3s\u0005J|7.\u001a:Qe>$xnY8m'\"|W\u000f\u001c3IC2$xJ\u001c'pO\u0012K'OR1jYV\u0014X\rF\u0001HQ\r\t\u00121\u0003\t\u0004\u001b\u0006U\u0011bAA\f\u001d\n!A+Z:u\u0003\u001d\"Xm\u001d;Qe>$WoY3FeJ|'O\u0012:p[\u001a\u000b\u0017\u000e\\;sK>s7\t[3dWB|\u0017N\u001c;\u0015\u0007\u001d\u000bi\u0002C\u0003_%\u0001\u0007q\fK\u0003\u0013UJ\f\t\u0003\f\u0002um\"\"!#_?\u007f\u0003}!Xm\u001d;J\u001f\u0016C8-\u001a9uS>tG)\u001e:j]\u001e\u001c\u0005.Z2la>Lg\u000e\u001e\u000b\u0004\u000f\u0006%\u0002\"\u00020\u0014\u0001\u0004y\u0006&B\nke\u00065BF\u0001;wQ\u0011\u0019\u00120 @\u0002iQ,7\u000f\u001e*fa2L7-\u0019$fi\u000eDWM\u001d+ie\u0016\fG-\u00114uKJdun\u001a#je\u001a\u000b\u0017\u000e\\;sK>sgi\u001c7m_^,'\u000fF\u0002H\u0003kAQA\u0018\u000bA\u0002}CS\u0001\u00066s\u0003sa#\u0001\u001e<)\tQIXP`\u0001+i\u0016\u001cH\u000f\u0015:pIV\u001cW-\u0012:s_J\u001chI]8n\u0019><G)\u001b:GC&dWO]3P]2+\u0017\rZ3s)\r9\u0015\u0011\t\u0005\b\u0003\u0007*\u0002\u0019AA#\u0003-1\u0017-\u001b7ve\u0016$\u0016\u0010]3\u0011\t\u0005\u001d\u0013\u0011\f\b\u0005\u0003\u0013\n\u0019F\u0004\u0003\u0002L\u0005=cb\u00012\u0002N%\tA$C\u0002\u0002Rm\tQ!\u001e;jYNLA!!\u0016\u0002X\u0005IA+Z:u+RLGn\u001d\u0006\u0004\u0003#Z\u0012\u0002BA.\u0003;\u0012\u0011\u0003T8h\t&\u0014h)Y5mkJ,G+\u001f9f\u0015\u0011\t)&a\u0016\u0002KQ,7\u000f\u001e)s_\u0012,8-Z!gi\u0016\u0014Hj\\4ESJ4\u0015-\u001b7ve\u0016|e\u000eT3bI\u0016\u0014H#B$\u0002d\u0005\u0015\u0004bBA\"-\u0001\u0007\u0011Q\t\u0005\u0006=Z\u0001\raX\u0001\u001egV\u00147o\u0019:jE\u0016\fe\u000eZ,bSR4uN]!tg&<g.\\3oiR)q)a\u001b\u0002n!)ag\u0006a\u0001?\"9\u0011qN\fA\u0002\u0005E\u0014\u0001C2p]N,X.\u001a:\u0011\u0011\u0005M\u0014\u0011QAC\u0003\u000bk!!!\u001e\u000b\t\u0005=\u0014q\u000f\u0006\u0005\u0003s\nY(A\u0004dY&,g\u000e^:\u000b\u0007q\tiHC\u0002\u0002\u0000M\u000ba!\u00199bG\",\u0017\u0002BAB\u0003k\u0012\u0001bQ8ogVlWM\u001d\t\u0006Y\u0005\u001d\u00151R\u0005\u0004\u0003\u0013k#!B!se\u0006L\bc\u0001\u0017\u0002\u000e&\u0019\u0011qR\u0017\u0003\t\tKH/\u001a")
public class LogDirFailureTest
extends IntegrationTestHarness {
    private final int producerCount;
    private final int consumerCount;
    private final int brokerCount;
    private final String topic;
    private final int partitionNum;
    private final int logDirCount;

    public int producerCount() {
        return this.producerCount;
    }

    public int consumerCount() {
        return this.consumerCount;
    }

    @Override
    public int brokerCount() {
        return this.brokerCount;
    }

    private String topic() {
        return this.topic;
    }

    private int partitionNum() {
        return this.partitionNum;
    }

    @Override
    public int logDirCount() {
        return this.logDirCount;
    }

    @Override
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        super.setUp(testInfo);
        this.createTopic(this.topic(), this.partitionNum(), this.brokerCount(), this.createTopic$default$4(), this.createTopic$default$5(), this.createTopic$default$6());
        this.ensureConsistentKRaftMetadata();
    }

    @ParameterizedTest(name="{displayName}.{argumentsWithNames}")
    @ValueSource(strings={"zk", "kraft"})
    public void testProduceErrorFromFailureOnLogRoll(String quorum) {
        this.testProduceErrorsFromLogDirFailureOnLeader(TestUtils$Roll$.MODULE$);
    }

    @ParameterizedTest(name="{displayName}.{argumentsWithNames}")
    @ValueSource(strings={"zk", "kraft"})
    public void testIOExceptionDuringLogRoll(String quorum) {
        this.testProduceAfterLogDirFailureOnLeader(TestUtils$Roll$.MODULE$, quorum);
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void testZkBrokerWithOldInterBrokerProtocolShouldHaltOnLogDirFailure() {
        void setHaltProcedure_haltProcedure;
        VolatileObjectRef statusCodeOption = VolatileObjectRef.create((Object)None$.MODULE$);
        Function2 & Serializable & scala.Serializable intersect = (Function2 & Serializable & scala.Serializable)(statusCode, x$1) -> LogDirFailureTest.$anonfun$testZkBrokerWithOldInterBrokerProtocolShouldHaltOnLogDirFailure$1(statusCodeOption, BoxesRunTime.unboxToInt((Object)statusCode), x$1);
        if (Exit$.MODULE$ == null) {
            throw null;
        }
        Exit.setHaltProcedure((Exit.Procedure)new /* Unavailable Anonymous Inner Class!! */);
        KafkaServer server = null;
        try {
            int x$12 = this.brokerCount();
            String x$2 = this.zkConnect();
            int x$3 = 3;
            boolean x$4 = TestUtils$.MODULE$.createBrokerConfig$default$3();
            boolean x$5 = TestUtils$.MODULE$.createBrokerConfig$default$4();
            int x$6 = TestUtils$.MODULE$.createBrokerConfig$default$5();
            Option<SecurityProtocol> x$7 = TestUtils$.MODULE$.createBrokerConfig$default$6();
            Option<File> x$8 = TestUtils$.MODULE$.createBrokerConfig$default$7();
            Option<Properties> x$9 = TestUtils$.MODULE$.createBrokerConfig$default$8();
            boolean x$10 = TestUtils$.MODULE$.createBrokerConfig$default$9();
            boolean x$11 = TestUtils$.MODULE$.createBrokerConfig$default$10();
            int x$122 = TestUtils$.MODULE$.createBrokerConfig$default$11();
            boolean x$13 = TestUtils$.MODULE$.createBrokerConfig$default$12();
            int x$14 = TestUtils$.MODULE$.createBrokerConfig$default$13();
            boolean x$15 = TestUtils$.MODULE$.createBrokerConfig$default$14();
            int x$16 = TestUtils$.MODULE$.createBrokerConfig$default$15();
            Option<String> x$17 = TestUtils$.MODULE$.createBrokerConfig$default$16();
            boolean x$18 = TestUtils$.MODULE$.createBrokerConfig$default$18();
            int x$19 = TestUtils$.MODULE$.createBrokerConfig$default$19();
            short x$20 = TestUtils$.MODULE$.createBrokerConfig$default$20();
            boolean x$21 = TestUtils$.MODULE$.createBrokerConfig$default$21();
            Properties props = TestUtils$.MODULE$.createBrokerConfig(x$12, x$2, x$4, x$5, x$6, x$7, x$8, x$9, x$10, x$11, x$122, x$13, x$14, x$15, x$16, x$17, x$3, x$18, x$19, x$20, x$21);
            props.put(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), "0.11.0");
            props.put(KafkaConfig$.MODULE$.LogMessageFormatVersionProp(), "0.11.0");
            KafkaConfig kafkaConfig = KafkaConfig$.MODULE$.fromProps(props);
            File logDir = new File((String)kafkaConfig.logDirs().head());
            CoreUtils$.MODULE$.swallow((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> Utils.delete((File)logDir), (Logging)this, CoreUtils$.MODULE$.swallow$default$3());
            Files.createFile(logDir.toPath(), new FileAttribute[0]);
            Assertions.assertTrue((boolean)logDir.isFile());
            server = TestUtils$.MODULE$.createServer(kafkaConfig, TestUtils$.MODULE$.createServer$default$2());
            long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
            long l2 = TestUtils$.MODULE$.waitUntilTrue$default$3();
            if (TestUtils$.MODULE$ == null) {
                throw null;
            }
            long waitUntilTrue_startTime = System.currentTimeMillis();
            while (!LogDirFailureTest.$anonfun$testZkBrokerWithOldInterBrokerProtocolShouldHaltOnLogDirFailure$3(statusCodeOption)) {
                void waitUntilTrue_pause;
                void waitUntilTrue_waitTimeMs;
                if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                    Assertions.fail((String)LogDirFailureTest.$anonfun$testZkBrokerWithOldInterBrokerProtocolShouldHaltOnLogDirFailure$4());
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
            }
        }
        catch (Throwable throwable) {
            Exit$.MODULE$.resetHaltProcedure();
            if (server != null) {
                TestUtils$.MODULE$.shutdownServers(new .colon.colon(server, (List)Nil$.MODULE$), TestUtils$.MODULE$.shutdownServers$default$2());
            }
            throw throwable;
        }
        Exit$.MODULE$.resetHaltProcedure();
        if (server != null) {
            TestUtils$.MODULE$.shutdownServers(new .colon.colon((Object)server, (List)Nil$.MODULE$), TestUtils$.MODULE$.shutdownServers$default$2());
        }
    }

    @ParameterizedTest(name="{displayName}.{argumentsWithNames}")
    @ValueSource(strings={"zk", "kraft"})
    public void testProduceErrorFromFailureOnCheckpoint(String quorum) {
        this.testProduceErrorsFromLogDirFailureOnLeader(TestUtils$Checkpoint$.MODULE$);
    }

    @ParameterizedTest(name="{displayName}.{argumentsWithNames}")
    @ValueSource(strings={"zk", "kraft"})
    public void testIOExceptionDuringCheckpoint(String quorum) {
        this.testProduceAfterLogDirFailureOnLeader(TestUtils$Checkpoint$.MODULE$, quorum);
    }

    @ParameterizedTest(name="{displayName}.{argumentsWithNames}")
    @ValueSource(strings={"zk", "kraft"})
    public void testReplicaFetcherThreadAfterLogDirFailureOnFollower(String quorum) {
        this.producerConfig().setProperty("retries", "0");
        this.producerConfig().setProperty("enable.idempotence", "false");
        KafkaProducer producer = this.createProducer(this.createProducer$default$1(), this.createProducer$default$2(), this.createProducer$default$3());
        TopicPartition partition = new TopicPartition(this.topic(), 0);
        PartitionInfo partitionInfo = (PartitionInfo)((IterableLike)CollectionConverters$.MODULE$.asScalaBufferConverter(producer.partitionsFor(this.topic())).asScala()).find((Function1 & Serializable & scala.Serializable)x$2 -> BoxesRunTime.boxToBoolean((boolean)LogDirFailureTest.$anonfun$testReplicaFetcherThreadAfterLogDirFailureOnFollower$1(x$2))).get();
        int leaderServerId = partitionInfo.leader().id();
        KafkaBroker leaderServer = (KafkaBroker)this.brokers().find((Function1 & Serializable & scala.Serializable)x$3 -> BoxesRunTime.boxToBoolean((boolean)LogDirFailureTest.$anonfun$testReplicaFetcherThreadAfterLogDirFailureOnFollower$2(leaderServerId, x$3))).get();
        int followerServerId = BoxesRunTime.unboxToInt((Object)new ArrayOps.ofInt(Predef$.MODULE$.intArrayOps((int[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])partitionInfo.replicas())).map((Function1 & Serializable & scala.Serializable)x$4 -> BoxesRunTime.boxToInteger((int)x$4.id()), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.Int())))).find((Function1)(JFunction1.mcZI.sp & Serializable & scala.Serializable)x$5 -> x$5 != leaderServerId).get());
        KafkaBroker followerServer = (KafkaBroker)this.brokers().find((Function1 & Serializable & scala.Serializable)x$6 -> BoxesRunTime.boxToBoolean((boolean)LogDirFailureTest.$anonfun$testReplicaFetcherThreadAfterLogDirFailureOnFollower$5(followerServerId, x$6))).get();
        followerServer.replicaManager().markPartitionOffline(partition);
        int anotherPartitionWithTheSameLeader = BoxesRunTime.unboxToInt((Object)RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(1), this.partitionNum()).find((Function1)(JFunction1.mcZI.sp & Serializable & scala.Serializable)i -> leaderServer.replicaManager().onlinePartition(new TopicPartition(this.topic(), i)).flatMap((Function1 & Serializable & scala.Serializable)x$7 -> x$7.leaderLogIfLocal()).isDefined()).get());
        ProducerRecord record = new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(anotherPartitionWithTheSameLeader), (Object)this.topic().getBytes(), (Object)"message".getBytes());
        producer.send(record).get();
        Assertions.assertEquals((int)this.brokerCount(), (int)((Partition)leaderServer.replicaManager().onlinePartition(new TopicPartition(this.topic(), anotherPartitionWithTheSameLeader)).get()).inSyncReplicaIds().size());
        followerServer.replicaManager().replicaFetcherManager().fetcherThreadMap().values().foreach((Function1 & Serializable & scala.Serializable)thread -> {
            LogDirFailureTest.$anonfun$testReplicaFetcherThreadAfterLogDirFailureOnFollower$8(thread);
            return BoxedUnit.UNIT;
        });
    }

    public void testProduceErrorsFromLogDirFailureOnLeader(TestUtils.LogDirFailureType failureType) {
        this.producerConfig().setProperty("retries", "0");
        this.producerConfig().setProperty("enable.idempotence", "false");
        KafkaProducer producer = this.createProducer(this.createProducer$default$1(), this.createProducer$default$2(), this.createProducer$default$3());
        TopicPartition partition = new TopicPartition(this.topic(), 0);
        ProducerRecord record = new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), (Object)"key".getBytes(), (Object)"value".getBytes());
        int leaderServerId = ((PartitionInfo)((IterableLike)CollectionConverters$.MODULE$.asScalaBufferConverter(producer.partitionsFor(this.topic())).asScala()).find((Function1 & Serializable & scala.Serializable)x$8 -> BoxesRunTime.boxToBoolean((boolean)LogDirFailureTest.$anonfun$testProduceErrorsFromLogDirFailureOnLeader$1(x$8))).get()).leader().id();
        KafkaBroker leaderServer = (KafkaBroker)this.brokers().find((Function1 & Serializable & scala.Serializable)x$9 -> BoxesRunTime.boxToBoolean((boolean)LogDirFailureTest.$anonfun$testProduceErrorsFromLogDirFailureOnLeader$2(leaderServerId, x$9))).get();
        TestUtils$.MODULE$.causeLogDirFailure(failureType, leaderServer, partition);
        ExecutionException e = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> producer.send(record).get(6000L, TimeUnit.MILLISECONDS));
        Assertions.assertTrue((e.getCause() instanceof KafkaStorageException || e.getCause() instanceof NotLeaderOrFollowerException ? 1 : 0) != 0);
    }

    /*
     * WARNING - void declaration
     */
    public void testProduceAfterLogDirFailureOnLeader(TestUtils.LogDirFailureType failureType, String quorum) {
        Consumer consumer = this.createConsumer(this.createConsumer$default$1(), this.createConsumer$default$2(), this.createConsumer$default$3(), this.createConsumer$default$4());
        this.subscribeAndWaitForAssignment(this.topic(), consumer);
        KafkaProducer producer = this.createProducer(this.createProducer$default$1(), this.createProducer$default$2(), this.createProducer$default$3());
        TopicPartition partition = new TopicPartition(this.topic(), 0);
        ProducerRecord record = new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), (Object)"key".getBytes(), (Object)"value".getBytes());
        int originalLeaderServerId = ((PartitionInfo)((IterableLike)CollectionConverters$.MODULE$.asScalaBufferConverter(producer.partitionsFor(this.topic())).asScala()).find((Function1 & Serializable & scala.Serializable)x$10 -> BoxesRunTime.boxToBoolean((boolean)LogDirFailureTest.$anonfun$testProduceAfterLogDirFailureOnLeader$1(x$10))).get()).leader().id();
        KafkaBroker originalLeaderServer = (KafkaBroker)this.brokers().find((Function1 & Serializable & scala.Serializable)x$11 -> BoxesRunTime.boxToBoolean((boolean)LogDirFailureTest.$anonfun$testProduceAfterLogDirFailureOnLeader$2(originalLeaderServerId, x$11))).get();
        producer.send(record).get();
        TestUtils$.MODULE$.consumeRecords(consumer, 1, TestUtils$.MODULE$.consumeRecords$default$3());
        File failedLogDir = TestUtils$.MODULE$.causeLogDirFailure(failureType, originalLeaderServer, partition);
        long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l2 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!LogDirFailureTest.$anonfun$testProduceAfterLogDirFailureOnLeader$3(this, producer, record, originalLeaderServerId)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)LogDirFailureTest.$anonfun$testProduceAfterLogDirFailureOnLeader$5());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
        producer.send(record).get(6000L, TimeUnit.MILLISECONDS);
        TestUtils$.MODULE$.pollUntilAtLeastNumRecords(consumer, 1, TestUtils$.MODULE$.pollUntilAtLeastNumRecords$default$3());
        String string = quorum;
        String string2 = "kraft";
        if (string != null && string.equals(string2)) {
            long l3 = TestUtils$.MODULE$.waitUntilTrue$default$4();
            long l4 = TestUtils$.MODULE$.waitUntilTrue$default$3();
            if (TestUtils$.MODULE$ == null) {
                throw null;
            }
            long waitUntilTrue_startTime2 = System.currentTimeMillis();
            while (!LogDirFailureTest.$anonfun$testProduceAfterLogDirFailureOnLeader$6(this, originalLeaderServerId, failedLogDir)) {
                void waitUntilTrue_pause;
                void waitUntilTrue_waitTimeMs;
                if (System.currentTimeMillis() > waitUntilTrue_startTime2 + waitUntilTrue_waitTimeMs) {
                    Assertions.fail((String)LogDirFailureTest.$anonfun$testProduceAfterLogDirFailureOnLeader$12());
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
            }
            return;
        }
        Assertions.assertTrue((boolean)this.zkClient().getAllLogDirEventNotifications().isEmpty());
        Assertions.assertTrue((boolean)((KafkaServer)this.servers().find((Function1 & Serializable & scala.Serializable)x$17 -> BoxesRunTime.boxToBoolean((boolean)LogDirFailureTest.$anonfun$testProduceAfterLogDirFailureOnLeader$13(x$17))).get()).kafkaController().controllerContext().replicasInState(this.topic(), (ReplicaState)OfflineReplica$.MODULE$).contains((Object)new PartitionAndReplica(new TopicPartition(this.topic(), 0), originalLeaderServerId)));
    }

    /*
     * WARNING - void declaration
     */
    private void subscribeAndWaitForAssignment(String topic, Consumer<byte[], byte[]> consumer) {
        void pollUntilTrue_action;
        consumer.subscribe(Collections.singletonList(topic));
        long l = TestUtils$.MODULE$.pollUntilTrue$default$4();
        JFunction0.mcZ.sp & Serializable & scala.Serializable intersect = (JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> !consumer.assignment().isEmpty();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long pollUntilTrue_x$3 = 0L;
        long pollUntilTrue_waitUntilTrue_startTime = System.currentTimeMillis();
        while (!TestUtils$.$anonfun$pollUntilTrue$1(consumer, (Function0)pollUntilTrue_action)) {
            void pollUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > pollUntilTrue_waitUntilTrue_startTime + pollUntilTrue_waitTimeMs) {
                Assertions.fail((String)LogDirFailureTest.$anonfun$subscribeAndWaitForAssignment$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)pollUntilTrue_waitTimeMs), pollUntilTrue_x$3));
        }
    }

    public static final /* synthetic */ Nothing$ $anonfun$testZkBrokerWithOldInterBrokerProtocolShouldHaltOnLogDirFailure$1(VolatileObjectRef statusCodeOption$1, int statusCode, Option x$1) {
        statusCodeOption$1.elem = new Some((Object)BoxesRunTime.boxToInteger((int)statusCode));
        throw new IllegalArgumentException();
    }

    public static final /* synthetic */ boolean $anonfun$testZkBrokerWithOldInterBrokerProtocolShouldHaltOnLogDirFailure$3(VolatileObjectRef statusCodeOption$1) {
        return ((Option)statusCodeOption$1.elem).contains((Object)BoxesRunTime.boxToInteger((int)1));
    }

    public static final /* synthetic */ String $anonfun$testZkBrokerWithOldInterBrokerProtocolShouldHaltOnLogDirFailure$4() {
        return "timed out waiting for broker to halt";
    }

    public static final /* synthetic */ boolean $anonfun$testReplicaFetcherThreadAfterLogDirFailureOnFollower$1(PartitionInfo x$2) {
        return x$2.partition() == 0;
    }

    public static final /* synthetic */ boolean $anonfun$testReplicaFetcherThreadAfterLogDirFailureOnFollower$2(int leaderServerId$1, KafkaBroker x$3) {
        return x$3.config().brokerId() == leaderServerId$1;
    }

    public static final /* synthetic */ boolean $anonfun$testReplicaFetcherThreadAfterLogDirFailureOnFollower$5(int followerServerId$1, KafkaBroker x$6) {
        return x$6.config().brokerId() == followerServerId$1;
    }

    public static final /* synthetic */ void $anonfun$testReplicaFetcherThreadAfterLogDirFailureOnFollower$8(ReplicaFetcherThread thread) {
        Assertions.assertFalse((boolean)thread.isShutdownComplete(), (String)"ReplicaFetcherThread should still be working if its partition count > 0");
    }

    public static final /* synthetic */ boolean $anonfun$testProduceErrorsFromLogDirFailureOnLeader$1(PartitionInfo x$8) {
        return x$8.partition() == 0;
    }

    public static final /* synthetic */ boolean $anonfun$testProduceErrorsFromLogDirFailureOnLeader$2(int leaderServerId$2, KafkaBroker x$9) {
        return x$9.config().brokerId() == leaderServerId$2;
    }

    public static final /* synthetic */ boolean $anonfun$testProduceAfterLogDirFailureOnLeader$1(PartitionInfo x$10) {
        return x$10.partition() == 0;
    }

    public static final /* synthetic */ boolean $anonfun$testProduceAfterLogDirFailureOnLeader$2(int originalLeaderServerId$1, KafkaBroker x$11) {
        return x$11.config().brokerId() == originalLeaderServerId$1;
    }

    public static final /* synthetic */ boolean $anonfun$testProduceAfterLogDirFailureOnLeader$4(PartitionInfo x$12) {
        return x$12.partition() == 0;
    }

    public static final /* synthetic */ boolean $anonfun$testProduceAfterLogDirFailureOnLeader$3(LogDirFailureTest $this, KafkaProducer producer$2, ProducerRecord record$2, int originalLeaderServerId$1) {
        producer$2.send(record$2);
        return ((PartitionInfo)((IterableLike)CollectionConverters$.MODULE$.asScalaBufferConverter(producer$2.partitionsFor($this.topic())).asScala()).find((Function1 & Serializable & scala.Serializable)x$12 -> BoxesRunTime.boxToBoolean((boolean)LogDirFailureTest.$anonfun$testProduceAfterLogDirFailureOnLeader$4(x$12))).get()).leader().id() != originalLeaderServerId$1;
    }

    public static final /* synthetic */ String $anonfun$testProduceAfterLogDirFailureOnLeader$5() {
        return "Expected new leader for the partition";
    }

    public static final /* synthetic */ boolean $anonfun$testProduceAfterLogDirFailureOnLeader$7(int originalLeaderServerId$1, KafkaBroker x$13) {
        return x$13.config().nodeId() == originalLeaderServerId$1;
    }

    public static final /* synthetic */ boolean $anonfun$testProduceAfterLogDirFailureOnLeader$9(File failedLogDir$1, BrokerServer x$15) {
        return x$15.logDirFailureChannel().hasOfflineLogDir(((Object)failedLogDir$1.toPath()).toString());
    }

    public static final /* synthetic */ boolean $anonfun$testProduceAfterLogDirFailureOnLeader$10(LogDirFailureTest $this, int originalLeaderServerId$1, BrokerServer broker) {
        return new ArrayOps.ofInt(Predef$.MODULE$.intArrayOps((int[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])broker.replicaManager().metadataCache().getClusterMetadata(broker.clusterId(), broker.config().interBrokerListenerName()).partition(new TopicPartition($this.topic(), 0)).offlineReplicas())).map((Function1 & Serializable & scala.Serializable)x$16 -> BoxesRunTime.boxToInteger((int)x$16.id()), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.Int())))).contains((Object)BoxesRunTime.boxToInteger((int)originalLeaderServerId$1));
    }

    public static final /* synthetic */ boolean $anonfun$testProduceAfterLogDirFailureOnLeader$6(LogDirFailureTest $this, int originalLeaderServerId$1, File failedLogDir$1) {
        Option brokerWithDirFail = $this.brokers().find((Function1 & Serializable & scala.Serializable)x$13 -> BoxesRunTime.boxToBoolean((boolean)LogDirFailureTest.$anonfun$testProduceAfterLogDirFailureOnLeader$7(originalLeaderServerId$1, x$13))).map((Function1 & Serializable & scala.Serializable)x$14 -> (BrokerServer)x$14);
        return brokerWithDirFail.exists((Function1 & Serializable & scala.Serializable)x$15 -> BoxesRunTime.boxToBoolean((boolean)LogDirFailureTest.$anonfun$testProduceAfterLogDirFailureOnLeader$9(failedLogDir$1, x$15))) && brokerWithDirFail.exists((Function1 & Serializable & scala.Serializable)broker -> BoxesRunTime.boxToBoolean((boolean)LogDirFailureTest.$anonfun$testProduceAfterLogDirFailureOnLeader$10($this, originalLeaderServerId$1, broker)));
    }

    public static final /* synthetic */ String $anonfun$testProduceAfterLogDirFailureOnLeader$12() {
        return "Expected to find an offline log dir";
    }

    public static final /* synthetic */ boolean $anonfun$testProduceAfterLogDirFailureOnLeader$13(KafkaServer x$17) {
        return x$17.kafkaController().isActive();
    }

    public static final /* synthetic */ String $anonfun$subscribeAndWaitForAssignment$2() {
        return "Expected non-empty assignment";
    }

    public LogDirFailureTest() {
        this.producerCount = 1;
        this.consumerCount = 1;
        this.brokerCount = 2;
        this.topic = "topic";
        this.partitionNum = 12;
        this.logDirCount = 3;
        this.serverConfig().setProperty(KafkaConfig$.MODULE$.ReplicaHighWatermarkCheckpointIntervalMsProp(), "60000");
        this.serverConfig().setProperty(KafkaConfig$.MODULE$.NumReplicaFetchersProp(), "1");
    }
}

