/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.companion.test;

import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.UncheckedIOException;
import java.net.ServerSocket;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import kafka.cluster.EndPoint;
import kafka.server.KafkaConfig;
import kafka.server.KafkaRaftServer;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.storage.Formatter;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.jboss.logging.Logger;
import scala.collection.IterableOnce;
import scala.jdk.javaapi.StreamConverters;

public class EmbeddedKafkaBroker
implements Closeable {
    private static final Logger LOGGER = Logger.getLogger((String)EmbeddedKafkaBroker.class.getName());
    private static final String COMPANION_BROKER_PREFIX = "companion-embedded-kafka";
    private KafkaRaftServer kafkaServer;
    private KafkaConfig config;
    private int nodeId = 1;
    private String host = "localhost";
    private int kafkaPort = 0;
    private int controllerPort = 0;
    private boolean deleteDirsOnClose = true;
    private String clusterId = Uuid.randomUuid().toString();
    private final List<Endpoint> advertisedListeners = new ArrayList<Endpoint>();
    private Consumer<Properties> brokerConfigModifier;

    public EmbeddedKafkaBroker withNodeId(int nodeId) {
        this.assertNotRunning();
        this.nodeId = nodeId;
        return this;
    }

    public EmbeddedKafkaBroker withAdditionalProperties(Consumer<Properties> function) {
        this.assertNotRunning();
        this.brokerConfigModifier = function;
        return this;
    }

    public EmbeddedKafkaBroker withKafkaPort(int port) {
        this.assertNotRunning();
        this.kafkaPort = port;
        return this;
    }

    public EmbeddedKafkaBroker withControllerPort(int port) {
        this.assertNotRunning();
        this.controllerPort = port;
        return this;
    }

    public EmbeddedKafkaBroker withKafkaHost(String host) {
        this.assertNotRunning();
        this.host = host;
        return this;
    }

    public EmbeddedKafkaBroker withClusterId(String clusterId) {
        this.assertNotRunning();
        this.clusterId = clusterId;
        return this;
    }

    public EmbeddedKafkaBroker withDeleteLogDirsOnClose(boolean deleteDirsOnClose) {
        this.assertNotRunning();
        this.deleteDirsOnClose = deleteDirsOnClose;
        return this;
    }

    public EmbeddedKafkaBroker withAdvertisedListeners(Endpoint ... endpoints) {
        this.assertNotRunning();
        this.advertisedListeners.addAll(Arrays.asList(endpoints));
        return this;
    }

    public EmbeddedKafkaBroker withAdvertisedListeners(String advertisedListeners) {
        String[] listeners;
        this.assertNotRunning();
        for (String listener : listeners = advertisedListeners.split(",")) {
            this.advertisedListeners.add(EmbeddedKafkaBroker.parseEndpoint(listener));
        }
        return this;
    }

    public synchronized EmbeddedKafkaBroker start() {
        if (this.isRunning()) {
            return this;
        }
        Endpoint internalEndpoint = EmbeddedKafkaBroker.endpoint(SecurityProtocol.PLAINTEXT, this.host, this.kafkaPort);
        Endpoint controller = EmbeddedKafkaBroker.controller(this.host, this.controllerPort);
        Properties properties = EmbeddedKafkaBroker.createDefaultBrokerConfig(this.nodeId, controller, internalEndpoint, this.advertisedListeners);
        if (this.brokerConfigModifier != null) {
            this.brokerConfigModifier.accept(properties);
        }
        if (properties.get("log.dir") == null) {
            EmbeddedKafkaBroker.createAndSetlogDir(properties);
        }
        long start = System.currentTimeMillis();
        this.config = EmbeddedKafkaBroker.formatStorageFromConfig(properties, this.clusterId, true);
        this.kafkaServer = EmbeddedKafkaBroker.createServer(this.config);
        LOGGER.infof("Kafka broker started in %d ms with advertised listeners: %s", (Object)(System.currentTimeMillis() - start), (Object)this.getAdvertisedListeners());
        return this;
    }

    @Override
    public synchronized void close() {
        try {
            if (this.isRunning()) {
                this.kafkaServer.shutdown();
                this.kafkaServer.awaitShutdown();
            }
        }
        catch (Exception e) {
            LOGGER.error((Object)"Error shutting down broker", (Throwable)e);
        }
        finally {
            if (this.deleteDirsOnClose) {
                try {
                    for (String logDir : this.getLogDirs()) {
                        Utils.delete((File)new File(logDir));
                    }
                }
                catch (Exception e) {
                    LOGGER.error((Object)"Error deleting logdirs", (Throwable)e);
                }
            }
            this.kafkaServer = null;
        }
    }

    public boolean isRunning() {
        return this.kafkaServer != null;
    }

    private void assertNotRunning() {
        if (this.isRunning()) {
            throw new IllegalStateException("Configuration of the running broker is not permitted.");
        }
    }

    public KafkaConfig getKafkaConfig() {
        return this.config;
    }

    public String getAdvertisedListeners() {
        return EmbeddedKafkaBroker.getAdvertisedListeners(this.config);
    }

    public List<String> getLogDirs() {
        return EmbeddedKafkaBroker.getLogDirs(this.config);
    }

    public int getNodeId() {
        return this.nodeId;
    }

    public String getClusterId() {
        return this.clusterId;
    }

    public static Endpoint endpoint(SecurityProtocol protocol, int port) {
        return EmbeddedKafkaBroker.endpoint(protocol.name, protocol, "", port);
    }

    public static Endpoint endpoint(SecurityProtocol protocol, String host, int port) {
        return EmbeddedKafkaBroker.endpoint(protocol.name, protocol, host, port);
    }

    public static Endpoint endpoint(String listener, SecurityProtocol protocol, int port) {
        return EmbeddedKafkaBroker.endpoint(listener, protocol, "", port);
    }

    public static Endpoint endpoint(String listener, SecurityProtocol protocol, String host, int port) {
        return new Endpoint(listener, protocol, host, EmbeddedKafkaBroker.getUnusedPort(port));
    }

    public static Endpoint parseEndpoint(SecurityProtocol protocol, String listenerStr) {
        Endpoint endpoint = EmbeddedKafkaBroker.parseEndpoint(listenerStr);
        return new Endpoint(endpoint.listenerName().orElse(protocol.name), protocol, endpoint.host(), endpoint.port());
    }

    public static Endpoint parseEndpoint(String listenerStr) {
        String[] parts = listenerStr.split(":");
        if (parts.length == 2) {
            return new Endpoint(null, SecurityProtocol.PLAINTEXT, parts[0], Integer.parseInt(parts[1]));
        }
        if (parts.length == 3) {
            String listenerName = parts[0];
            String host = parts[1].replace("//", "");
            int port = Integer.parseInt(parts[2]);
            return new Endpoint(listenerName, SecurityProtocol.forName((String)listenerName), host, port);
        }
        throw new IllegalArgumentException("Cannot parse listener: " + listenerStr);
    }

    public static Properties createDefaultBrokerConfig(int nodeId, Endpoint controller, Endpoint internalEndpoint, List<Endpoint> advertisedListeners) {
        Properties props = new Properties();
        props.put("broker.id", Integer.toString(nodeId));
        props.put("process.roles", "broker,controller");
        props.put("controller.listener.names", EmbeddedKafkaBroker.listenerName(controller));
        props.put("controller.quorum.voters", nodeId + "@" + controller.host() + ":" + controller.port());
        Map listeners = advertisedListeners.stream().map(l -> new Endpoint((String)l.listenerName().orElse(null), l.securityProtocol(), "", l.port())).collect(Collectors.toMap(EmbeddedKafkaBroker::listenerName, Function.identity()));
        listeners.put(EmbeddedKafkaBroker.listenerName(controller), controller);
        listeners.put(EmbeddedKafkaBroker.listenerName(internalEndpoint), internalEndpoint);
        String listenersString = listeners.values().stream().map(EmbeddedKafkaBroker::toListenerString).distinct().collect(Collectors.joining(","));
        props.put("listeners", listenersString);
        Endpoint plaintextEndpoint = advertisedListeners.stream().filter(e -> e.securityProtocol() == SecurityProtocol.PLAINTEXT).findFirst().orElse(internalEndpoint);
        String advertisedListenersString = advertisedListeners.stream().map(EmbeddedKafkaBroker::toListenerString).distinct().collect(Collectors.joining(","));
        if (!Utils.isBlank((String)advertisedListenersString)) {
            props.put("advertised.listeners", advertisedListenersString);
        }
        String securityProtocolMap = listeners.values().stream().map(EmbeddedKafkaBroker::toProtocolMap).distinct().collect(Collectors.joining(","));
        props.put("listener.security.protocol.map", securityProtocolMap);
        props.put("inter.broker.listener.name", EmbeddedKafkaBroker.listenerName(plaintextEndpoint));
        props.put("replica.socket.timeout.ms", "1000");
        props.put("replica.high.watermark.checkpoint.interval.ms", String.valueOf(Long.MAX_VALUE));
        props.put("controller.socket.timeout.ms", "1000");
        props.put("controlled.shutdown.enable", Boolean.toString(false));
        props.put("controlled.shutdown.retry.backoff.ms", "100");
        props.put("delete.topic.enable", Boolean.toString(true));
        props.put(ServerLogConfigs.LOG_DELETE_DELAY_MS_CONFIG, "1000");
        props.put("log.cleaner.dedupe.buffer.size", "2097152");
        props.put("[DEPRECATED] The maximum difference allowed between the timestamp when a broker receives a message and the timestamp specified in the message. If log.message.timestamp.type=CreateTime, a message will be rejected if the difference in timestamp exceeds this threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime.The maximum timestamp difference allowed should be no greater than log.retention.ms to avoid unnecessarily frequent log rolling.", String.valueOf(Long.MAX_VALUE));
        props.put("offsets.topic.replication.factor", "1");
        props.put("offsets.topic.num.partitions", "5");
        props.put("group.initial.rebalance.delay.ms", "0");
        props.put("num.partitions", "1");
        props.put("default.replication.factor", "1");
        return props;
    }

    public static KafkaConfig formatStorageFromConfig(Properties properties, String clusterId, boolean ignoreFormatted) {
        KafkaConfig config = KafkaConfig.fromProps((Properties)properties, (boolean)false);
        Formatter formatter = new Formatter();
        formatter.setClusterId(clusterId).setNodeId(config.nodeId()).setUnstableFeatureVersionsEnabled(config.unstableFeatureVersionsEnabled().booleanValue()).setIgnoreFormatted(ignoreFormatted).setControllerListenerName((String)config.controllerListenerNames().head()).setMetadataLogDirectory(config.metadataLogDir());
        EmbeddedKafkaBroker.configToLogDirectories(config).forEach(arg_0 -> ((Formatter)formatter).addDirectory(arg_0));
        try {
            formatter.run();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        return config;
    }

    static Set<String> configToLogDirectories(KafkaConfig config) {
        TreeSet<String> dirs = new TreeSet<String>();
        config.logDirs().foreach(dirs::add);
        String metadataLogDir = config.metadataLogDir();
        if (metadataLogDir != null) {
            dirs.add(metadataLogDir);
        }
        return dirs;
    }

    public static void formatStorage(List<String> directories, String controllerListenerName, String metadataLogDirectory, String clusterId, int nodeId, boolean ignoreFormatted) {
        Formatter formatter = new Formatter();
        formatter.setClusterId(clusterId).setNodeId(nodeId).setIgnoreFormatted(ignoreFormatted).setControllerListenerName(controllerListenerName).setMetadataLogDirectory(metadataLogDirectory);
        directories.forEach(arg_0 -> ((Formatter)formatter).addDirectory(arg_0));
        try {
            formatter.run();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static void formatStorage(List<String> directories, String clusterId, int nodeId, boolean ignoreFormatted) {
        EmbeddedKafkaBroker.formatStorage(directories, "CONTROLLER", directories.get(0), clusterId, nodeId, ignoreFormatted);
    }

    public static KafkaRaftServer createServer(KafkaConfig config) {
        KafkaRaftServer server = new KafkaRaftServer(config, Time.SYSTEM);
        server.startup();
        return server;
    }

    private static String getAdvertisedListeners(KafkaConfig config) {
        return StreamConverters.asJavaParStream((IterableOnce)config.effectiveAdvertisedBrokerListeners()).map(EndPoint::connectionString).collect(Collectors.joining(","));
    }

    private static List<String> getLogDirs(KafkaConfig config) {
        return StreamConverters.asJavaParStream((IterableOnce)config.logDirs()).collect(Collectors.toList());
    }

    private static int getUnusedPort(int port) {
        int n;
        if (port != 0) {
            return port;
        }
        ServerSocket s = new ServerSocket(0);
        try {
            n = s.getLocalPort();
        }
        catch (Throwable throwable) {
            try {
                try {
                    s.close();
                }
                catch (Throwable throwable2) {
                    throwable.addSuppressed(throwable2);
                }
                throw throwable;
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        s.close();
        return n;
    }

    private static void createAndSetlogDir(Properties properties) {
        try {
            properties.put("log.dir", Files.createTempDirectory("companion-embedded-kafka-" + String.valueOf(UUID.randomUUID()), new FileAttribute[0]).toString());
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private static Endpoint controller(String host, int port) {
        return EmbeddedKafkaBroker.endpoint("CONTROLLER", SecurityProtocol.PLAINTEXT, host, port);
    }

    public static String toListenerString(Endpoint endpoint) {
        return String.format("%s://%s:%d", EmbeddedKafkaBroker.listenerName(endpoint), endpoint.host(), endpoint.port());
    }

    private static String toProtocolMap(Endpoint endpoint) {
        return String.format("%s:%s", EmbeddedKafkaBroker.listenerName(endpoint), endpoint.securityProtocol().name);
    }

    private static String listenerName(Endpoint endpoint) {
        return endpoint.listenerName().orElse(endpoint.securityProtocol().name);
    }

    public static class LoggingOutputStream
    extends OutputStream {
        private final ByteArrayOutputStream os = new ByteArrayOutputStream(1000);
        private final Logger logger;

        public static PrintStream loggerPrintStream(Logger logger) {
            return new PrintStream(new LoggingOutputStream(logger));
        }

        LoggingOutputStream(Logger logger) {
            this.logger = logger;
        }

        @Override
        public void write(int b) throws IOException {
            if (b == 10 || b == 13) {
                this.os.flush();
                String log = this.os.toString();
                this.logger.info((Object)log);
            } else {
                this.os.write(b);
            }
        }
    }
}

