/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.security.authenticator;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import javax.security.auth.login.Configuration;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.network.CertStores;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.ChannelState;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.NetworkTestUtils;
import org.apache.kafka.common.network.NioEchoServer;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.requests.SaslHandshakeRequest;
import org.apache.kafka.common.requests.SaslHandshakeResponse;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.TestSecurityConfig;
import org.apache.kafka.common.security.authenticator.TestJaasConfig;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.ScramFormatter;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.security.scram.ScramMechanism;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SaslAuthenticatorTest {
    private static final int BUFFER_SIZE = 4096;
    private NioEchoServer server;
    private Selector selector;
    private ChannelBuilder channelBuilder;
    private CertStores serverCertStores;
    private CertStores clientCertStores;
    private Map<String, Object> saslClientConfigs;
    private Map<String, Object> saslServerConfigs;

    @Before
    public void setup() throws Exception {
        this.serverCertStores = new CertStores(true, "localhost");
        this.clientCertStores = new CertStores(false, "localhost");
        this.saslServerConfigs = this.serverCertStores.getTrustingConfig(this.clientCertStores);
        this.saslClientConfigs = this.clientCertStores.getTrustingConfig(this.serverCertStores);
    }

    @After
    public void teardown() throws Exception {
        if (this.server != null) {
            this.server.close();
        }
        if (this.selector != null) {
            this.selector.close();
        }
    }

    @Test
    public void testValidSaslPlainOverSsl() throws Exception {
        String node = "0";
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.server = this.createEchoServer(securityProtocol);
        this.createAndCheckClientConnection(securityProtocol, node);
    }

    @Test
    public void testValidSaslPlainOverPlaintext() throws Exception {
        String node = "0";
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        this.configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.server = this.createEchoServer(securityProtocol);
        this.createAndCheckClientConnection(securityProtocol, node);
    }

    @Test
    public void testInvalidPasswordSaslPlain() throws Exception {
        String node = "0";
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        TestJaasConfig jaasConfig = this.configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        jaasConfig.setPlainClientOptions("myuser", "invalidpassword");
        this.server = this.createEchoServer(securityProtocol);
        this.createAndCheckClientConnectionFailure(securityProtocol, node);
    }

    @Test
    public void testInvalidUsernameSaslPlain() throws Exception {
        String node = "0";
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        TestJaasConfig jaasConfig = this.configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        jaasConfig.setPlainClientOptions("invaliduser", "mypassword");
        this.server = this.createEchoServer(securityProtocol);
        this.createAndCheckClientConnectionFailure(securityProtocol, node);
    }

    @Test
    public void testMissingUsernameSaslPlain() throws Exception {
        String node = "0";
        TestJaasConfig jaasConfig = this.configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        jaasConfig.setPlainClientOptions(null, "mypassword");
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.server = this.createEchoServer(securityProtocol);
        this.createSelector(securityProtocol, this.saslClientConfigs);
        InetSocketAddress addr = new InetSocketAddress("127.0.0.1", this.server.port());
        try {
            this.selector.connect(node, addr, 4096, 4096);
            Assert.fail((String)"SASL/PLAIN channel created without username");
        }
        catch (IOException e) {
            Assert.assertTrue((String)"Channels not closed", (boolean)this.selector.channels().isEmpty());
            for (SelectionKey key : this.selector.keys()) {
                Assert.assertFalse((String)"Key not cancelled", (boolean)key.isValid());
            }
        }
    }

    @Test
    public void testMissingPasswordSaslPlain() throws Exception {
        String node = "0";
        TestJaasConfig jaasConfig = this.configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        jaasConfig.setPlainClientOptions("myuser", null);
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.server = this.createEchoServer(securityProtocol);
        this.createSelector(securityProtocol, this.saslClientConfigs);
        InetSocketAddress addr = new InetSocketAddress("127.0.0.1", this.server.port());
        try {
            this.selector.connect(node, addr, 4096, 4096);
            Assert.fail((String)"SASL/PLAIN channel created without password");
        }
        catch (IOException e) {
            // empty catch block
        }
    }

    @Test
    public void testMechanismPluggability() throws Exception {
        String node = "0";
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.configureMechanisms("DIGEST-MD5", Arrays.asList("DIGEST-MD5"));
        this.server = this.createEchoServer(securityProtocol);
        this.createAndCheckClientConnection(securityProtocol, node);
    }

    @Test
    public void testMultipleServerMechanisms() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.configureMechanisms("DIGEST-MD5", Arrays.asList("DIGEST-MD5", "PLAIN", "SCRAM-SHA-256"));
        this.server = this.createEchoServer(securityProtocol);
        this.updateScramCredentialCache("myuser", "mypassword");
        String node1 = "1";
        this.saslClientConfigs.put("sasl.mechanism", "PLAIN");
        this.createAndCheckClientConnection(securityProtocol, node1);
        String node2 = "2";
        this.saslClientConfigs.put("sasl.mechanism", "DIGEST-MD5");
        this.createSelector(securityProtocol, this.saslClientConfigs);
        InetSocketAddress addr = new InetSocketAddress("127.0.0.1", this.server.port());
        this.selector.connect(node2, addr, 4096, 4096);
        NetworkTestUtils.checkClientConnection(this.selector, node2, 100, 10);
        String node3 = "3";
        this.saslClientConfigs.put("sasl.mechanism", "SCRAM-SHA-256");
        this.createSelector(securityProtocol, this.saslClientConfigs);
        this.selector.connect(node3, new InetSocketAddress("127.0.0.1", this.server.port()), 4096, 4096);
        NetworkTestUtils.checkClientConnection(this.selector, node3, 100, 10);
    }

    @Test
    public void testValidSaslScramSha256() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.configureMechanisms("SCRAM-SHA-256", Arrays.asList("SCRAM-SHA-256"));
        this.server = this.createEchoServer(securityProtocol);
        this.updateScramCredentialCache("myuser", "mypassword");
        this.createAndCheckClientConnection(securityProtocol, "0");
    }

    @Test
    public void testValidSaslScramMechanisms() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.configureMechanisms("SCRAM-SHA-256", new ArrayList<String>(ScramMechanism.mechanismNames()));
        this.server = this.createEchoServer(securityProtocol);
        this.updateScramCredentialCache("myuser", "mypassword");
        for (String mechanism : ScramMechanism.mechanismNames()) {
            this.saslClientConfigs.put("sasl.mechanism", mechanism);
            this.createAndCheckClientConnection(securityProtocol, "node-" + mechanism);
        }
    }

    @Test
    public void testInvalidPasswordSaslScram() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        TestJaasConfig jaasConfig = this.configureMechanisms("SCRAM-SHA-256", Arrays.asList("SCRAM-SHA-256"));
        HashMap<String, Object> options = new HashMap<String, Object>();
        options.put("username", "myuser");
        options.put("password", "invalidpassword");
        jaasConfig.createOrUpdateEntry("KafkaClient", ScramLoginModule.class.getName(), options);
        String node = "0";
        this.server = this.createEchoServer(securityProtocol);
        this.updateScramCredentialCache("myuser", "mypassword");
        this.createAndCheckClientConnectionFailure(securityProtocol, node);
    }

    @Test
    public void testUnknownUserSaslScram() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        TestJaasConfig jaasConfig = this.configureMechanisms("SCRAM-SHA-256", Arrays.asList("SCRAM-SHA-256"));
        HashMap<String, Object> options = new HashMap<String, Object>();
        options.put("username", "unknownUser");
        options.put("password", "mypassword");
        jaasConfig.createOrUpdateEntry("KafkaClient", ScramLoginModule.class.getName(), options);
        String node = "0";
        this.server = this.createEchoServer(securityProtocol);
        this.updateScramCredentialCache("myuser", "mypassword");
        this.createAndCheckClientConnectionFailure(securityProtocol, node);
    }

    @Test
    public void testUserCredentialsUnavailableForScramMechanism() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.configureMechanisms("SCRAM-SHA-256", new ArrayList<String>(ScramMechanism.mechanismNames()));
        this.server = this.createEchoServer(securityProtocol);
        this.updateScramCredentialCache("myuser", "mypassword");
        this.server.credentialCache().cache(ScramMechanism.SCRAM_SHA_256.mechanismName(), ScramCredential.class).remove("myuser");
        String node = "1";
        this.saslClientConfigs.put("sasl.mechanism", "SCRAM-SHA-256");
        this.createAndCheckClientConnectionFailure(securityProtocol, node);
        this.saslClientConfigs.put("sasl.mechanism", "SCRAM-SHA-512");
        this.createAndCheckClientConnection(securityProtocol, "2");
    }

    @Test
    public void testScramUsernameWithSpecialCharacters() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        String username = "special user= test,scram";
        String password = username + "-password";
        TestJaasConfig jaasConfig = this.configureMechanisms("SCRAM-SHA-256", Arrays.asList("SCRAM-SHA-256"));
        HashMap<String, Object> options = new HashMap<String, Object>();
        options.put("username", username);
        options.put("password", password);
        jaasConfig.createOrUpdateEntry("KafkaClient", ScramLoginModule.class.getName(), options);
        this.server = this.createEchoServer(securityProtocol);
        this.updateScramCredentialCache(username, password);
        this.createAndCheckClientConnection(securityProtocol, "0");
    }

    @Test
    public void testUnauthenticatedApiVersionsRequestOverPlaintext() throws Exception {
        this.testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_PLAINTEXT);
    }

    @Test
    public void testUnauthenticatedApiVersionsRequestOverSsl() throws Exception {
        this.testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_SSL);
    }

    @Test
    public void testApiVersionsRequestWithUnsupportedVersion() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        this.configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.server = this.createEchoServer(securityProtocol);
        String node = "1";
        this.createClientConnection(SecurityProtocol.PLAINTEXT, node);
        RequestHeader header = new RequestHeader(ApiKeys.API_VERSIONS.id, Short.MAX_VALUE, "someclient", 1);
        ApiVersionsRequest request = (ApiVersionsRequest)new ApiVersionsRequest.Builder().build();
        this.selector.send(request.toSend(node, header));
        ByteBuffer responseBuffer = this.waitForResponse();
        ResponseHeader.parse((ByteBuffer)responseBuffer);
        ApiVersionsResponse response = ApiVersionsResponse.parse((ByteBuffer)responseBuffer, (short)0);
        Assert.assertEquals((Object)Errors.UNSUPPORTED_VERSION, (Object)response.error());
        this.sendVersionRequestReceiveResponse(node);
        this.sendHandshakeRequestReceiveResponse(node);
        this.authenticateUsingSaslPlainAndCheckConnection(node);
    }

    @Test
    public void testSaslHandshakeRequestWithUnsupportedVersion() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        this.configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.server = this.createEchoServer(securityProtocol);
        String node1 = "invalid1";
        this.createClientConnection(SecurityProtocol.PLAINTEXT, node1);
        SaslHandshakeRequest request = new SaslHandshakeRequest("PLAIN");
        RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE.id, Short.MAX_VALUE, "someclient", 2);
        this.selector.send(request.toSend(node1, header));
        NetworkTestUtils.waitForChannelClose(this.selector, node1, ChannelState.READY);
        this.selector.close();
        this.createAndCheckClientConnection(securityProtocol, "good1");
    }

    @Test
    public void testInvalidSaslPacket() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        this.configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.server = this.createEchoServer(securityProtocol);
        String node1 = "invalid1";
        this.createClientConnection(SecurityProtocol.PLAINTEXT, node1);
        this.sendHandshakeRequestReceiveResponse(node1);
        Random random = new Random();
        byte[] bytes = new byte[1024];
        random.nextBytes(bytes);
        this.selector.send((Send)new NetworkSend(node1, ByteBuffer.wrap(bytes)));
        NetworkTestUtils.waitForChannelClose(this.selector, node1, ChannelState.READY);
        this.selector.close();
        this.createAndCheckClientConnection(securityProtocol, "good1");
        String node2 = "invalid2";
        this.createClientConnection(SecurityProtocol.PLAINTEXT, node2);
        random.nextBytes(bytes);
        this.selector.send((Send)new NetworkSend(node2, ByteBuffer.wrap(bytes)));
        NetworkTestUtils.waitForChannelClose(this.selector, node2, ChannelState.READY);
        this.selector.close();
        this.createAndCheckClientConnection(securityProtocol, "good2");
    }

    @Test
    public void testInvalidApiVersionsRequestSequence() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        this.configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.server = this.createEchoServer(securityProtocol);
        String node1 = "invalid1";
        this.createClientConnection(SecurityProtocol.PLAINTEXT, node1);
        this.sendHandshakeRequestReceiveResponse(node1);
        ApiVersionsRequest request = (ApiVersionsRequest)new ApiVersionsRequest.Builder().build();
        RequestHeader versionsHeader = new RequestHeader(ApiKeys.API_VERSIONS.id, request.version(), "someclient", 2);
        this.selector.send(request.toSend(node1, versionsHeader));
        NetworkTestUtils.waitForChannelClose(this.selector, node1, ChannelState.READY);
        this.selector.close();
        this.createAndCheckClientConnection(securityProtocol, "good1");
    }

    @Test
    public void testPacketSizeTooBig() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        this.configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.server = this.createEchoServer(securityProtocol);
        String node1 = "invalid1";
        this.createClientConnection(SecurityProtocol.PLAINTEXT, node1);
        this.sendHandshakeRequestReceiveResponse(node1);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        buffer.putInt(Integer.MAX_VALUE);
        buffer.put(new byte[buffer.capacity() - 4]);
        buffer.rewind();
        this.selector.send((Send)new NetworkSend(node1, buffer));
        NetworkTestUtils.waitForChannelClose(this.selector, node1, ChannelState.READY);
        this.selector.close();
        this.createAndCheckClientConnection(securityProtocol, "good1");
        String node2 = "invalid2";
        this.createClientConnection(SecurityProtocol.PLAINTEXT, node2);
        buffer.clear();
        buffer.putInt(Integer.MAX_VALUE);
        buffer.put(new byte[buffer.capacity() - 4]);
        buffer.rewind();
        this.selector.send((Send)new NetworkSend(node2, buffer));
        NetworkTestUtils.waitForChannelClose(this.selector, node2, ChannelState.READY);
        this.selector.close();
        this.createAndCheckClientConnection(securityProtocol, "good2");
    }

    @Test
    public void testDisallowedKafkaRequestsBeforeAuthentication() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        this.configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.server = this.createEchoServer(securityProtocol);
        String node1 = "invalid1";
        this.createClientConnection(SecurityProtocol.PLAINTEXT, node1);
        MetadataRequest metadataRequest1 = (MetadataRequest)new MetadataRequest.Builder(Collections.singletonList("sometopic"), true).build();
        RequestHeader metadataRequestHeader1 = new RequestHeader(ApiKeys.METADATA.id, metadataRequest1.version(), "someclient", 1);
        this.selector.send(metadataRequest1.toSend(node1, metadataRequestHeader1));
        NetworkTestUtils.waitForChannelClose(this.selector, node1, ChannelState.READY);
        this.selector.close();
        this.createAndCheckClientConnection(securityProtocol, "good1");
        String node2 = "invalid2";
        this.createClientConnection(SecurityProtocol.PLAINTEXT, node2);
        this.sendHandshakeRequestReceiveResponse(node2);
        MetadataRequest metadataRequest2 = (MetadataRequest)new MetadataRequest.Builder(Collections.singletonList("sometopic"), true).build();
        RequestHeader metadataRequestHeader2 = new RequestHeader(ApiKeys.METADATA.id, metadataRequest2.version(), "someclient", 2);
        this.selector.send(metadataRequest2.toSend(node2, metadataRequestHeader2));
        NetworkTestUtils.waitForChannelClose(this.selector, node2, ChannelState.READY);
        this.selector.close();
        this.createAndCheckClientConnection(securityProtocol, "good2");
    }

    @Test
    public void testInvalidLoginModule() throws Exception {
        TestJaasConfig jaasConfig = this.configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        jaasConfig.createOrUpdateEntry("KafkaClient", "InvalidLoginModule", TestJaasConfig.defaultClientOptions());
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.server = this.createEchoServer(securityProtocol);
        try {
            this.createSelector(securityProtocol, this.saslClientConfigs);
            Assert.fail((String)"SASL/PLAIN channel created without valid login module");
        }
        catch (KafkaException kafkaException) {
            // empty catch block
        }
    }

    @Test
    public void testDisabledMechanism() throws Exception {
        String node = "0";
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.configureMechanisms("PLAIN", Arrays.asList("DIGEST-MD5"));
        this.server = this.createEchoServer(securityProtocol);
        this.createAndCheckClientConnectionFailure(securityProtocol, node);
    }

    @Test
    public void testInvalidMechanism() throws Exception {
        String node = "0";
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.saslClientConfigs.put("sasl.mechanism", "INVALID");
        this.server = this.createEchoServer(securityProtocol);
        this.createAndCheckClientConnectionFailure(securityProtocol, node);
    }

    @Test
    public void testDynamicJaasConfiguration() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.saslClientConfigs.put("sasl.mechanism", "PLAIN");
        this.saslServerConfigs.put("sasl.enabled.mechanisms", Arrays.asList("PLAIN"));
        HashMap<String, Object> serverOptions = new HashMap<String, Object>();
        serverOptions.put("user_user1", "user1-secret");
        serverOptions.put("user_user2", "user2-secret");
        TestJaasConfig staticJaasConfig = new TestJaasConfig();
        staticJaasConfig.createOrUpdateEntry("KafkaServer", PlainLoginModule.class.getName(), serverOptions);
        staticJaasConfig.setPlainClientOptions("user1", "invalidpassword");
        Configuration.setConfiguration(staticJaasConfig);
        this.server = this.createEchoServer(securityProtocol);
        this.createAndCheckClientConnectionFailure(securityProtocol, "1");
        this.saslClientConfigs.put("sasl.jaas.config", TestJaasConfig.jaasConfigProperty("PLAIN", "user1", "user1-secret"));
        this.createAndCheckClientConnection(securityProtocol, "2");
        this.saslClientConfigs.put("sasl.jaas.config", TestJaasConfig.jaasConfigProperty("PLAIN", "user1", "user2-secret"));
        this.createAndCheckClientConnectionFailure(securityProtocol, "3");
        this.saslClientConfigs.put("sasl.jaas.config", TestJaasConfig.jaasConfigProperty("PLAIN", "user2", "user2-secret"));
        this.createAndCheckClientConnection(securityProtocol, "4");
        String module1 = TestJaasConfig.jaasConfigProperty("PLAIN", "user1", "user1-secret").value();
        String module2 = TestJaasConfig.jaasConfigProperty("PLAIN", "user2", "user2-secret").value();
        this.saslClientConfigs.put("sasl.jaas.config", new Password(module1 + " " + module2));
        try {
            this.createClientConnection(securityProtocol, "1");
            Assert.fail((String)"Connection created with multiple login modules in sasl.jaas.config");
        }
        catch (IllegalArgumentException e) {
            // empty catch block
        }
    }

    @Test
    public void testJaasConfigurationForListener() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        this.saslClientConfigs.put("sasl.mechanism", "PLAIN");
        this.saslServerConfigs.put("sasl.enabled.mechanisms", Arrays.asList("PLAIN"));
        TestJaasConfig staticJaasConfig = new TestJaasConfig();
        HashMap<String, Object> globalServerOptions = new HashMap<String, Object>();
        globalServerOptions.put("user_global1", "gsecret1");
        globalServerOptions.put("user_global2", "gsecret2");
        staticJaasConfig.createOrUpdateEntry("KafkaServer", PlainLoginModule.class.getName(), globalServerOptions);
        HashMap<String, Object> clientListenerServerOptions = new HashMap<String, Object>();
        clientListenerServerOptions.put("user_client1", "csecret1");
        clientListenerServerOptions.put("user_client2", "csecret2");
        String clientJaasEntryName = "client.KafkaServer";
        staticJaasConfig.createOrUpdateEntry(clientJaasEntryName, PlainLoginModule.class.getName(), clientListenerServerOptions);
        Configuration.setConfiguration(staticJaasConfig);
        this.server = this.createEchoServer(new ListenerName("client"), securityProtocol);
        this.saslClientConfigs.put("sasl.jaas.config", TestJaasConfig.jaasConfigProperty("PLAIN", "client1", "csecret1"));
        this.createAndCheckClientConnection(securityProtocol, "1");
        this.saslClientConfigs.put("sasl.jaas.config", TestJaasConfig.jaasConfigProperty("PLAIN", "global1", "gsecret1"));
        this.createAndCheckClientConnectionFailure(securityProtocol, "2");
        this.server.close();
        this.server = this.createEchoServer(new ListenerName("other"), securityProtocol);
        this.saslClientConfigs.put("sasl.jaas.config", TestJaasConfig.jaasConfigProperty("PLAIN", "global1", "gsecret1"));
        this.createAndCheckClientConnection(securityProtocol, "3");
        this.saslClientConfigs.put("sasl.jaas.config", TestJaasConfig.jaasConfigProperty("PLAIN", "client1", "csecret1"));
        this.createAndCheckClientConnectionFailure(securityProtocol, "4");
    }

    private void testUnauthenticatedApiVersionsRequest(SecurityProtocol securityProtocol) throws Exception {
        SecurityProtocol clientProtocol;
        this.configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.server = this.createEchoServer(securityProtocol);
        String node = "1";
        switch (securityProtocol) {
            case SASL_PLAINTEXT: {
                clientProtocol = SecurityProtocol.PLAINTEXT;
                break;
            }
            case SASL_SSL: {
                clientProtocol = SecurityProtocol.SSL;
                break;
            }
            default: {
                throw new IllegalArgumentException("Server protocol " + securityProtocol + " is not SASL");
            }
        }
        this.createClientConnection(clientProtocol, node);
        NetworkTestUtils.waitForChannelReady(this.selector, node);
        ApiVersionsResponse versionsResponse = this.sendVersionRequestReceiveResponse(node);
        Assert.assertEquals((long)ApiKeys.SASL_HANDSHAKE.oldestVersion(), (long)versionsResponse.apiVersion((short)ApiKeys.SASL_HANDSHAKE.id).minVersion);
        Assert.assertEquals((long)ApiKeys.SASL_HANDSHAKE.latestVersion(), (long)versionsResponse.apiVersion((short)ApiKeys.SASL_HANDSHAKE.id).maxVersion);
        SaslHandshakeResponse handshakeResponse = this.sendHandshakeRequestReceiveResponse(node);
        Assert.assertEquals(Collections.singletonList("PLAIN"), (Object)handshakeResponse.enabledMechanisms());
        this.authenticateUsingSaslPlainAndCheckConnection(node);
    }

    private void authenticateUsingSaslPlainAndCheckConnection(String node) throws Exception {
        String authString = "\u0000myuser\u0000mypassword";
        this.selector.send((Send)new NetworkSend(node, ByteBuffer.wrap(authString.getBytes("UTF-8"))));
        this.waitForResponse();
        NetworkTestUtils.checkClientConnection(this.selector, node, 100, 10);
    }

    private TestJaasConfig configureMechanisms(String clientMechanism, List<String> serverMechanisms) {
        this.saslClientConfigs.put("sasl.mechanism", clientMechanism);
        this.saslServerConfigs.put("sasl.enabled.mechanisms", serverMechanisms);
        return TestJaasConfig.createConfiguration(clientMechanism, serverMechanisms);
    }

    private void createSelector(SecurityProtocol securityProtocol, Map<String, Object> clientConfigs) {
        if (this.selector != null) {
            this.selector.close();
            this.selector = null;
        }
        String saslMechanism = (String)this.saslClientConfigs.get("sasl.mechanism");
        this.channelBuilder = ChannelBuilders.clientChannelBuilder((SecurityProtocol)securityProtocol, (JaasContext.Type)JaasContext.Type.CLIENT, (AbstractConfig)new TestSecurityConfig(clientConfigs), null, (String)saslMechanism, (boolean)true);
        this.selector = NetworkTestUtils.createSelector(this.channelBuilder);
    }

    private NioEchoServer createEchoServer(SecurityProtocol securityProtocol) throws Exception {
        return this.createEchoServer(ListenerName.forSecurityProtocol((SecurityProtocol)securityProtocol), securityProtocol);
    }

    private NioEchoServer createEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol) throws Exception {
        return NetworkTestUtils.createEchoServer(listenerName, securityProtocol, new TestSecurityConfig(this.saslServerConfigs));
    }

    private void createClientConnection(SecurityProtocol securityProtocol, String node) throws Exception {
        this.createSelector(securityProtocol, this.saslClientConfigs);
        InetSocketAddress addr = new InetSocketAddress("127.0.0.1", this.server.port());
        this.selector.connect(node, addr, 4096, 4096);
    }

    private void createAndCheckClientConnection(SecurityProtocol securityProtocol, String node) throws Exception {
        this.createClientConnection(securityProtocol, node);
        NetworkTestUtils.checkClientConnection(this.selector, node, 100, 10);
        this.selector.close();
        this.selector = null;
    }

    private void createAndCheckClientConnectionFailure(SecurityProtocol securityProtocol, String node) throws Exception {
        this.createClientConnection(securityProtocol, node);
        NetworkTestUtils.waitForChannelClose(this.selector, node, ChannelState.AUTHENTICATE);
        this.selector.close();
        this.selector = null;
    }

    private AbstractResponse sendKafkaRequestReceiveResponse(String node, ApiKeys apiKey, AbstractRequest request) throws IOException {
        RequestHeader header = new RequestHeader(apiKey.id, request.version(), "someclient", 1);
        Send send = request.toSend(node, header);
        this.selector.send(send);
        ByteBuffer responseBuffer = this.waitForResponse();
        return NetworkClient.parseResponse((ByteBuffer)responseBuffer, (RequestHeader)header);
    }

    private SaslHandshakeResponse sendHandshakeRequestReceiveResponse(String node) throws Exception {
        SaslHandshakeRequest handshakeRequest = new SaslHandshakeRequest("PLAIN");
        SaslHandshakeResponse response = (SaslHandshakeResponse)this.sendKafkaRequestReceiveResponse(node, ApiKeys.SASL_HANDSHAKE, (AbstractRequest)handshakeRequest);
        Assert.assertEquals((Object)Errors.NONE, (Object)response.error());
        return response;
    }

    private ApiVersionsResponse sendVersionRequestReceiveResponse(String node) throws Exception {
        ApiVersionsRequest handshakeRequest = (ApiVersionsRequest)new ApiVersionsRequest.Builder().build();
        ApiVersionsResponse response = (ApiVersionsResponse)this.sendKafkaRequestReceiveResponse(node, ApiKeys.API_VERSIONS, (AbstractRequest)handshakeRequest);
        Assert.assertEquals((Object)Errors.NONE, (Object)response.error());
        return response;
    }

    private ByteBuffer waitForResponse() throws IOException {
        int waitSeconds = 10;
        do {
            this.selector.poll(1000L);
        } while (this.selector.completedReceives().isEmpty() && waitSeconds-- > 0);
        Assert.assertEquals((long)1L, (long)this.selector.completedReceives().size());
        return ((NetworkReceive)this.selector.completedReceives().get(0)).payload();
    }

    private void updateScramCredentialCache(String username, String password) throws NoSuchAlgorithmException {
        for (String mechanism : (List)this.saslServerConfigs.get("sasl.enabled.mechanisms")) {
            ScramMechanism scramMechanism = ScramMechanism.forMechanismName((String)mechanism);
            if (scramMechanism == null) continue;
            ScramFormatter formatter = new ScramFormatter(scramMechanism);
            ScramCredential credential = formatter.generateCredential(password, 4096);
            this.server.credentialCache().cache(scramMechanism.mechanismName(), ScramCredential.class).put(username, (Object)credential);
        }
    }
}

