/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.connectors.kafka.internal.connection.provider;

import com.mulesoft.connectors.kafka.api.connection.provider.AutoOffsetReset;
import com.mulesoft.connectors.kafka.api.connection.provider.ClientDNSLookup;
import com.mulesoft.connectors.kafka.api.connection.provider.IsolationLevel;
import com.mulesoft.connectors.kafka.api.connection.provider.KafkaFetchParamGroup;
import com.mulesoft.connectors.kafka.api.params.SubscriptionParamGroup;
import com.mulesoft.connectors.kafka.api.source.TopicPartition;
import com.mulesoft.connectors.kafka.internal.connection.ConsumerConnection;
import com.mulesoft.connectors.kafka.internal.connection.provider.KafkaConnectionProvider;
import com.mulesoft.connectors.kafka.internal.error.exception.InvalidConfigurationException;
import com.mulesoft.connectors.kafka.internal.error.exception.InvalidInputException;
import com.mulesoft.connectors.kafka.internal.model.consumer.DefaultConsumerPool;
import com.mulesoft.connectors.kafka.internal.model.consumer.DefaultMuleConsumer;
import com.mulesoft.connectors.kafka.internal.model.consumer.MuleConsumer;
import com.mulesoft.connectors.kafka.internal.model.serializer.InputStreamDeserializer;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionValidationResult;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.meta.ExpressionSupport;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerConfig;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.api.util.DataUnit;
import org.mule.runtime.core.api.util.IOUtils;
import org.mule.runtime.extension.api.annotation.Expression;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.ParameterGroup;
import org.mule.runtime.extension.api.annotation.param.RefName;
import org.mule.runtime.extension.api.annotation.param.display.DisplayName;
import org.mule.runtime.extension.api.annotation.param.display.Example;
import org.mule.runtime.extension.api.annotation.param.display.Placement;
import org.mule.runtime.extension.api.annotation.param.display.Summary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ConsumerConnectionProvider
extends KafkaConnectionProvider<ConsumerConnection> {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerConnectionProvider.class);
    @Parameter
    @org.mule.runtime.extension.api.annotation.param.Optional
    @Summary(value="Default Group ID for all the Kafka Consumers that use this configuration.")
    @Example(value="test-consumer-group")
    @DisplayName(value="Group ID")
    @Placement(order=20)
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    private String groupId;
    @ParameterGroup(name="Topics")
    @Summary(value="The topics configuration to consume messages.")
    @DisplayName(value="topics")
    @Placement(order=30)
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    private SubscriptionParamGroup topics;
    @Parameter
    @Summary(value="Defines how many consumers will be available in the consumer pool.")
    @Placement(order=1, tab="Advanced")
    @org.mule.runtime.extension.api.annotation.param.Optional(defaultValue="1")
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    private int consumerAmount;
    @Parameter
    @Placement(order=2, tab="Advanced")
    @Summary(value="The maximum delay between invocations of poll() when using consumer group management.")
    @DisplayName(value="Maximum polling interval")
    @org.mule.runtime.extension.api.annotation.param.Optional(defaultValue="60")
    @Example(value="60")
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    private long maximumPollingInterval;
    @Parameter
    @Placement(order=3, tab="Advanced")
    @Summary(value="Time unit for max poll interval timeout.")
    @Example(value="SECONDS")
    @org.mule.runtime.extension.api.annotation.param.Optional(defaultValue="SECONDS")
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    private TimeUnit maximumPollingIntervalTimeUnit;
    @Parameter
    @Placement(order=4, tab="Advanced")
    @Summary(value="Controls how to read messages written transactionally")
    @Example(value="read_committed")
    @org.mule.runtime.extension.api.annotation.param.Optional(defaultValue="READ_UNCOMMITTED")
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    private IsolationLevel isolationLevel;
    @Parameter
    @Placement(order=5, tab="Advanced")
    @Summary(value="Exclude or include internal topics that match a pattern used to subscribe to multiple topics.")
    @Example(value="true")
    @org.mule.runtime.extension.api.annotation.param.Optional(defaultValue="true")
    @DisplayName(value="Exclude internal topics")
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    private boolean excludeInternalTopics;
    @Parameter
    @Placement(order=6, tab="Advanced")
    @Summary(value="Sets behavior for when the current offset to be committed doesn't exist in kafka.")
    @org.mule.runtime.extension.api.annotation.param.Optional(defaultValue="LATEST")
    @DisplayName(value="Auto offset reset")
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    private AutoOffsetReset autoOffsetReset;
    @Parameter
    @Placement(order=7, tab="Advanced")
    @Summary(value="The amount of time to wait before attempting to retry a failed request to a given topic partition.")
    @Example(value="100")
    @org.mule.runtime.extension.api.annotation.param.Optional(defaultValue="100")
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    private long retryBackoffTimeout;
    @Parameter
    @Placement(order=8, tab="Advanced")
    @Summary(value="Time unit for retry backoff timeout.")
    @Example(value="MILLISECONDS")
    @org.mule.runtime.extension.api.annotation.param.Optional(defaultValue="MILLISECONDS")
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    private TimeUnit retryBackoffTimeoutTimeUnit;
    @Parameter
    @Placement(order=9, tab="Advanced")
    @Summary(value="Enable CRC32 verification.")
    @Example(value="true")
    @org.mule.runtime.extension.api.annotation.param.Optional(defaultValue="true")
    @DisplayName(value="Check CRC")
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    private boolean enableCRCCheck;
    @ParameterGroup(name="Fetch configuration")
    @DisplayName(value="Fetching configuration")
    private KafkaFetchParamGroup fetchParamGroup;
    @Parameter
    @Placement(order=10, tab="Advanced")
    @Summary(value="TCP buffer size to receive data.")
    @org.mule.runtime.extension.api.annotation.param.Optional(defaultValue="64")
    @DisplayName(value="Default receive buffer size")
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    private Integer receiveBufferSize;
    @Parameter
    @Placement(order=11, tab="Advanced")
    @org.mule.runtime.extension.api.annotation.param.Optional(defaultValue="KB")
    @DisplayName(value="Default receive buffer size unit")
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    private DataUnit receiveBufferSizeUnit;
    @Parameter
    @Placement(order=12, tab="Advanced")
    @Summary(value="TCP buffer size for sending data.\n")
    @org.mule.runtime.extension.api.annotation.param.Optional(defaultValue="128")
    @DisplayName(value="Default send buffer size")
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    private Integer sendBufferSize;
    @Parameter
    @Placement(order=13, tab="Advanced")
    @Summary(value="The send buffer size unit of measure.")
    @org.mule.runtime.extension.api.annotation.param.Optional(defaultValue="KB")
    @Example(value="KB")
    @DisplayName(value="Default send buffer size unit")
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    private DataUnit sendBufferSizeUnit;
    @Parameter
    @Placement(order=14, tab="Advanced")
    @Summary(value="The amount of time to wait before attempting to retry a failed request to a given topic partition.")
    @Example(value="30")
    @org.mule.runtime.extension.api.annotation.param.Optional(defaultValue="30")
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    private long requestTimeout;
    @Parameter
    @Placement(order=15, tab="Advanced")
    @Summary(value="Time unit for request timeout.")
    @Example(value="SECONDS")
    @org.mule.runtime.extension.api.annotation.param.Optional(defaultValue="SECONDS")
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    private TimeUnit requestTimeoutTimeUnit;
    @Parameter
    @Placement(order=16, tab="Advanced")
    @Summary(value="Maximum number of returned records on a single poll.")
    @Example(value="500")
    @org.mule.runtime.extension.api.annotation.param.Optional(defaultValue="500")
    @DisplayName(value="Default record limit")
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    private Integer recordLimit;
    @Parameter
    @Placement(order=17, tab="Advanced")
    @Summary(value="Controls how the client uses DNS lookups")
    @org.mule.runtime.extension.api.annotation.param.Optional(defaultValue="USE_ALL_DNS_IPS")
    @DisplayName(value="DNS Lookups")
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    private ClientDNSLookup clientDNSLookup;
    @Parameter
    @Placement(order=18, tab="Advanced")
    @Summary(value="Time between heartbeat calls")
    @org.mule.runtime.extension.api.annotation.param.Optional(defaultValue="3")
    @DisplayName(value="Heartbeat interval")
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    private int heartbeatInterval;
    @Parameter
    @Placement(order=19, tab="Advanced")
    @Summary(value="Time unit for heartbeat interval")
    @org.mule.runtime.extension.api.annotation.param.Optional(defaultValue="SECONDS")
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    private TimeUnit heartbeatIntervalTimeUnit;
    @Parameter
    @Placement(order=20, tab="Advanced")
    @Summary(value="Time to pass before consumer is removed from the group.")
    @org.mule.runtime.extension.api.annotation.param.Optional(defaultValue="10")
    @DisplayName(value="Session timeout")
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    private int sessionTimeout;
    @Parameter
    @Placement(order=21, tab="Advanced")
    @Summary(value="Time unit for session timeout")
    @org.mule.runtime.extension.api.annotation.param.Optional(defaultValue="SECONDS")
    @DisplayName(value="Session timeout time unit")
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    private TimeUnit sessionTimeoutTimeUnit;
    @Parameter
    @Placement(order=22, tab="Advanced")
    @Summary(value="Time to wait before closing idle connection.")
    @org.mule.runtime.extension.api.annotation.param.Optional(defaultValue="540")
    @DisplayName(value="Connection maximum idle time")
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    private int connectionsMaximumIdleTime;
    @Parameter
    @Placement(order=23, tab="Advanced")
    @Summary(value="{@link TimeUnit} which qualifies the {@link connectionsMaximumIdleTime} parameter.")
    @org.mule.runtime.extension.api.annotation.param.Optional(defaultValue="SECONDS")
    @DisplayName(value="Connection maximum idle time time unit")
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    private TimeUnit connectionsMaximumIdleTimeUnit;
    @Inject
    private SchedulerService schedulerService;
    @RefName
    private String configName;
    private Scheduler workerScheduler;

    protected ConsumerConnectionProvider(SecurityProtocol plainProtocol, SecurityProtocol sslProtocol) {
        super(plainProtocol, sslProtocol);
    }

    @Override
    public ConsumerConnection connect(Properties properties) throws ConnectionException {
        HashSet<MuleConsumer> consumers = new HashSet<MuleConsumer>();
        logger.debug("Consumer connection provider is {}.", (Object)this);
        if (this.consumerAmount < 1) {
            throw new InvalidConfigurationException("Invalid consumer amount.", (Throwable)((Object)new InvalidInputException("Consumer amount must be greater than zero.")));
        }
        for (int i = 0; i < this.consumerAmount; ++i) {
            try {
                consumers.add(new DefaultMuleConsumer(this.getKafkaConsumerFunction(), properties));
                continue;
            }
            catch (KafkaException e) {
                consumers.forEach(IOUtils::closeQuietly);
                this.handleConnectionException(e);
            }
        }
        logger.debug("Created {} consumers.", (Object)consumers.size());
        ConsumerConnection consumerConnection = new ConsumerConnection(new DefaultConsumerPool(consumers), this.workerScheduler);
        try {
            logger.debug("Created ConsumerConnection");
            List<TopicPartition> assignments = this.topics.getAssignments();
            List<String> topicPatterns = this.topics.getTopicPatterns();
            if (assignments != null && !assignments.isEmpty()) {
                logger.info("Consumers will use assignments: {}", assignments);
                consumerConnection.assign(Duration.ofMillis(-1L), assignments);
            } else if (topicPatterns != null && !topicPatterns.isEmpty()) {
                logger.info("Consumers will use subscriptions: {}", topicPatterns);
                consumerConnection.subscribe(Duration.ofMillis(-1L), topicPatterns);
            }
        }
        catch (RuntimeException e) {
            consumerConnection.disconnect();
            logger.error("There was an error when trying to establish the connection.", (Throwable)e);
            throw new ConnectionException((Throwable)e);
        }
        return consumerConnection;
    }

    @Override
    protected void initialise(Properties properties) throws InitialisationException {
        this.workerScheduler = this.schedulerService.ioScheduler(SchedulerConfig.config().withName(this.configName + "-worker"));
        this.setPropertyAsString("group.id", Optional.ofNullable(this.groupId).orElse(this.configName + "-" + UUID.randomUUID()));
        this.setPropertyAsString("exclude.internal.topics", this.excludeInternalTopics);
        this.setPropertyAsString("auto.offset.reset", Optional.ofNullable(this.autoOffsetReset).map(Enum::name).map(String::toLowerCase).orElse(null));
        this.setPropertyAsString("max.poll.records", this.recordLimit);
        this.setPropertyAsString("check.crcs", this.enableCRCCheck);
        this.setPropertyAsString("receive.buffer.bytes", this.receiveBufferSizeUnit.toBytes(this.receiveBufferSize.intValue()));
        this.setPropertyAsString("send.buffer.bytes", this.sendBufferSizeUnit.toBytes(this.sendBufferSize.intValue()));
        this.setPropertyAsString("max.poll.interval.ms", (int)this.maximumPollingIntervalTimeUnit.toMillis(this.maximumPollingInterval));
        this.setPropertyAsString("retry.backoff.ms", (int)this.retryBackoffTimeoutTimeUnit.toMillis(this.retryBackoffTimeout));
        this.setPropertyAsString("request.timeout.ms", (int)this.requestTimeoutTimeUnit.toMillis(this.requestTimeout));
        this.setPropertyAsString("fetch.min.bytes", this.fetchParamGroup.getFetchMinimumSizeUnit().toBytes(this.fetchParamGroup.getFetchMinimumSize().intValue()));
        this.setPropertyAsString("fetch.max.bytes", this.fetchParamGroup.getFetchMaximumSizeUnit().toBytes(this.fetchParamGroup.getFetchMaximumSize().intValue()));
        this.setPropertyAsString("max.partition.fetch.bytes", this.fetchParamGroup.getMaximumPartitionFetchSizeUnit().toBytes(this.fetchParamGroup.getMaximumPartitionFetchSize().intValue()));
        this.setPropertyAsString("fetch.max.wait.ms", (int)this.fetchParamGroup.getFetchMaximumWaitTimeoutUnit().toMillis(this.fetchParamGroup.getFetchMaximumWaitTimeout()));
        this.setPropertyAsString("key.deserializer", InputStreamDeserializer.class.getName());
        this.setPropertyAsString("value.deserializer", InputStreamDeserializer.class.getName());
        this.setPropertyAsString("enable.auto.commit", "false");
        this.setPropertyAsString("heartbeat.interval.ms", Math.toIntExact(this.heartbeatIntervalTimeUnit.toMillis(this.heartbeatInterval)));
        this.setPropertyAsString("session.timeout.ms", Math.toIntExact(this.sessionTimeoutTimeUnit.toMillis(this.sessionTimeout)));
        this.setPropertyAsString("connections.max.idle.ms", this.connectionsMaximumIdleTimeUnit.toMillis(this.connectionsMaximumIdleTime));
        this.setPropertyAsString("metadata.max.age.ms", Math.toIntExact(this.heartbeatIntervalTimeUnit.toMillis(this.heartbeatInterval)));
        this.setPropertyAsString("isolation.level", this.isolationLevel.getValue());
    }

    public ConnectionValidationResult validate(ConsumerConnection connection) {
        return connection.validateWithResult();
    }

    public void dispose() {
        this.workerScheduler.stop();
    }
}

