/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.kafka.streams.deployment;

import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.arc.deployment.BeanContainerListenerBuildItem;
import io.quarkus.builder.item.BuildItem;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.ExecutionTime;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.LaunchModeBuildItem;
import io.quarkus.deployment.builditem.nativeimage.JniRuntimeAccessBuildItem;
import io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.deployment.builditem.nativeimage.RuntimeReinitializedClassBuildItem;
import io.quarkus.deployment.pkg.NativeConfig;
import io.quarkus.kafka.streams.deployment.KafkaStreamsBuildTimeConfig;
import io.quarkus.kafka.streams.runtime.HotReplacementInterceptor;
import io.quarkus.kafka.streams.runtime.KafkaStreamsRecorder;
import io.quarkus.kafka.streams.runtime.KafkaStreamsRuntimeConfig;
import io.quarkus.kafka.streams.runtime.KafkaStreamsTopologyManager;
import io.quarkus.runtime.LaunchMode;
import io.quarkus.smallrye.health.deployment.spi.HealthBuildItem;
import java.io.IOException;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.rocksdb.RocksDBException;
import org.rocksdb.Status;
import org.rocksdb.util.Environment;

class KafkaStreamsProcessor {
    private static final String STREAMS_OPTION_PREFIX = "kafka-streams.";

    KafkaStreamsProcessor() {
    }

    @BuildStep
    void build(BuildProducer<FeatureBuildItem> feature, BuildProducer<ReflectiveClassBuildItem> reflectiveClasses, BuildProducer<JniRuntimeAccessBuildItem> jniRuntimeAccessibleClasses, BuildProducer<RuntimeReinitializedClassBuildItem> reinitialized, BuildProducer<NativeImageResourceBuildItem> nativeLibs, LaunchModeBuildItem launchMode, NativeConfig config) throws IOException {
        feature.produce((BuildItem)new FeatureBuildItem("kafka-streams"));
        this.registerClassesThatAreLoadedThroughReflection(reflectiveClasses, launchMode);
        this.registerClassesThatAreAccessedViaJni(jniRuntimeAccessibleClasses);
        this.addSupportForRocksDbLib(nativeLibs, config);
        this.enableLoadOfNativeLibs(reinitialized);
    }

    private void registerClassesThatAreLoadedThroughReflection(BuildProducer<ReflectiveClassBuildItem> reflectiveClasses, LaunchModeBuildItem launchMode) {
        this.registerCompulsoryClasses(reflectiveClasses);
        this.registerClassesThatClientMaySpecify(reflectiveClasses, launchMode);
    }

    private void registerCompulsoryClasses(BuildProducer<ReflectiveClassBuildItem> reflectiveClasses) {
        reflectiveClasses.produce((BuildItem)new ReflectiveClassBuildItem(true, false, false, new Class[]{StreamsPartitionAssignor.class}));
        reflectiveClasses.produce((BuildItem)new ReflectiveClassBuildItem(true, false, false, new Class[]{DefaultPartitionGrouper.class}));
        reflectiveClasses.produce((BuildItem)new ReflectiveClassBuildItem(true, false, false, new Class[]{DefaultProductionExceptionHandler.class}));
        reflectiveClasses.produce((BuildItem)new ReflectiveClassBuildItem(true, false, false, new Class[]{FailOnInvalidTimestamp.class}));
    }

    private void registerClassesThatClientMaySpecify(BuildProducer<ReflectiveClassBuildItem> reflectiveClasses, LaunchModeBuildItem launchMode) {
        Properties properties = this.buildKafkaStreamsProperties(launchMode.getLaunchMode());
        this.registerExceptionHandler(reflectiveClasses, properties);
        this.registerDefaultSerdes(reflectiveClasses, properties);
    }

    private void registerExceptionHandler(BuildProducer<ReflectiveClassBuildItem> reflectiveClasses, Properties kafkaStreamsProperties) {
        String exceptionHandlerClassName = kafkaStreamsProperties.getProperty("default.deserialization.exception.handler");
        if (exceptionHandlerClassName == null) {
            this.registerDefaultExceptionHandler(reflectiveClasses);
        } else {
            this.registerClassName(reflectiveClasses, exceptionHandlerClassName);
        }
    }

    private void registerDefaultExceptionHandler(BuildProducer<ReflectiveClassBuildItem> reflectiveClasses) {
        reflectiveClasses.produce((BuildItem)new ReflectiveClassBuildItem(true, false, false, new Class[]{LogAndFailExceptionHandler.class}));
    }

    private void registerDefaultSerdes(BuildProducer<ReflectiveClassBuildItem> reflectiveClasses, Properties kafkaStreamsProperties) {
        String defaultKeySerdeClass = kafkaStreamsProperties.getProperty("default.key.serde");
        String defaultValueSerdeClass = kafkaStreamsProperties.getProperty("default.value.serde");
        if (defaultKeySerdeClass != null) {
            this.registerClassName(reflectiveClasses, defaultKeySerdeClass);
        }
        if (defaultValueSerdeClass != null) {
            this.registerClassName(reflectiveClasses, defaultValueSerdeClass);
        }
        if (!this.allDefaultSerdesAreDefinedInProperties(defaultKeySerdeClass, defaultValueSerdeClass)) {
            this.registerDefaultSerde(reflectiveClasses);
        }
    }

    private void registerClassesThatAreAccessedViaJni(BuildProducer<JniRuntimeAccessBuildItem> jniRuntimeAccessibleClasses) {
        jniRuntimeAccessibleClasses.produce((BuildItem)new JniRuntimeAccessBuildItem(true, false, false, new Class[]{RocksDBException.class, Status.class}));
    }

    private void addSupportForRocksDbLib(BuildProducer<NativeImageResourceBuildItem> nativeLibs, NativeConfig nativeConfig) {
        if (nativeConfig.containerRuntime.isPresent() || nativeConfig.containerBuild) {
            nativeLibs.produce((BuildItem)new NativeImageResourceBuildItem(new String[]{"librocksdbjni-linux64.so"}));
        } else {
            nativeLibs.produce((BuildItem)new NativeImageResourceBuildItem(new String[]{Environment.getJniLibraryFileName((String)"rocksdb")}));
        }
    }

    private void enableLoadOfNativeLibs(BuildProducer<RuntimeReinitializedClassBuildItem> reinitialized) {
        reinitialized.produce((BuildItem)new RuntimeReinitializedClassBuildItem("org.rocksdb.RocksDB"));
    }

    private void registerClassName(BuildProducer<ReflectiveClassBuildItem> reflectiveClasses, String defaultKeySerdeClass) {
        reflectiveClasses.produce((BuildItem)new ReflectiveClassBuildItem(true, false, false, new String[]{defaultKeySerdeClass}));
    }

    private boolean allDefaultSerdesAreDefinedInProperties(String defaultKeySerdeClass, String defaultValueSerdeClass) {
        return defaultKeySerdeClass != null && defaultValueSerdeClass != null;
    }

    private void registerDefaultSerde(BuildProducer<ReflectiveClassBuildItem> reflectiveClasses) {
        reflectiveClasses.produce((BuildItem)new ReflectiveClassBuildItem(true, false, false, new Class[]{Serdes.ByteArraySerde.class}));
    }

    @BuildStep
    @Record(value=ExecutionTime.STATIC_INIT)
    BeanContainerListenerBuildItem processBuildTimeConfig(KafkaStreamsRecorder recorder, LaunchModeBuildItem launchMode) {
        Properties kafkaStreamsProperties = this.buildKafkaStreamsProperties(launchMode.getLaunchMode());
        return new BeanContainerListenerBuildItem(recorder.configure(kafkaStreamsProperties));
    }

    private Properties buildKafkaStreamsProperties(LaunchMode launchMode) {
        Config config = ConfigProvider.getConfig();
        Properties kafkaStreamsProperties = new Properties();
        for (String property : config.getPropertyNames()) {
            if (!this.isKafkaStreamsProperty(property)) continue;
            this.includeKafkaStreamsProperty(config, kafkaStreamsProperties, property);
        }
        if (launchMode == LaunchMode.DEVELOPMENT) {
            this.addHotReplacementInterceptor(kafkaStreamsProperties);
        }
        return kafkaStreamsProperties;
    }

    private void addHotReplacementInterceptor(Properties kafkaStreamsProperties) {
        String interceptorConfig = HotReplacementInterceptor.class.getName();
        Object originalInterceptorConfig = kafkaStreamsProperties.get(StreamsConfig.consumerPrefix((String)"interceptor.classes"));
        if (originalInterceptorConfig != null) {
            interceptorConfig = interceptorConfig + "," + originalInterceptorConfig;
        }
        kafkaStreamsProperties.put(StreamsConfig.consumerPrefix((String)"interceptor.classes"), interceptorConfig);
    }

    private boolean isKafkaStreamsProperty(String property) {
        return property.startsWith(STREAMS_OPTION_PREFIX);
    }

    private void includeKafkaStreamsProperty(Config config, Properties kafkaStreamsProperties, String property) {
        kafkaStreamsProperties.setProperty(property.substring(STREAMS_OPTION_PREFIX.length()), (String)config.getValue(property, String.class));
    }

    @BuildStep
    @Record(value=ExecutionTime.RUNTIME_INIT)
    void configureAndLoadRocksDb(KafkaStreamsRecorder recorder, KafkaStreamsRuntimeConfig runtimeConfig) {
        recorder.loadRocksDb();
        recorder.configureRuntimeProperties(runtimeConfig);
    }

    @BuildStep
    AdditionalBeanBuildItem registerBean() {
        return AdditionalBeanBuildItem.unremovableOf(KafkaStreamsTopologyManager.class);
    }

    @BuildStep
    void addHealthChecks(KafkaStreamsBuildTimeConfig buildTimeConfig, BuildProducer<HealthBuildItem> healthChecks) {
        healthChecks.produce((BuildItem)new HealthBuildItem("io.quarkus.kafka.streams.runtime.health.KafkaStreamsTopicsHealthCheck", buildTimeConfig.healthEnabled, "kafka-streams"));
        healthChecks.produce((BuildItem)new HealthBuildItem("io.quarkus.kafka.streams.runtime.health.KafkaStreamsStateHealthCheck", buildTimeConfig.healthEnabled, "kafka-streams"));
    }
}

