/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.connectors.kafka.internal.operation;

import com.mulesoft.connectors.kafka.api.KafkaRecordAttributes;
import com.mulesoft.connectors.kafka.api.source.AckMode;
import com.mulesoft.connectors.kafka.internal.config.ConsumerConfiguration;
import com.mulesoft.connectors.kafka.internal.connection.ConsumerConnection;
import com.mulesoft.connectors.kafka.internal.error.provider.ConsumeErrorTypeProvider;
import java.io.InputStream;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.extension.api.annotation.error.Throws;
import org.mule.runtime.extension.api.annotation.param.Config;
import org.mule.runtime.extension.api.annotation.param.ConfigOverride;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.MediaType;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.display.DisplayName;
import org.mule.runtime.extension.api.annotation.param.display.Placement;
import org.mule.runtime.extension.api.annotation.param.display.Summary;
import org.mule.runtime.extension.api.runtime.operation.FlowListener;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Throws(value={ConsumeErrorTypeProvider.class})
public class ConsumeOperation {
    private static final Logger logger = LoggerFactory.getLogger(ConsumeOperation.class);

    @MediaType(value="application/octet-stream")
    public Result<InputStream, KafkaRecordAttributes> consume(@Config ConsumerConfiguration config, @Connection ConsumerConnection connection, @ConfigOverride @Optional @Placement(order=1) @DisplayName(value="Consumption timeout") int pollTimeout, @ConfigOverride @Optional @Placement(order=2) @DisplayName(value="Timeout time unit") TimeUnit pollTimeoutTimeUnit, @Optional @ConfigOverride @Placement(tab="Advanced", order=1) int operationTimeout, @Optional @ConfigOverride @Placement(tab="Advanced", order=2) TimeUnit operationTimeoutTimeUnit, @Summary(value="Declares the kind of Acknowledgement mode supported.") @Optional(defaultValue="IMMEDIATE") @DisplayName(value="Acknowledgement mode") AckMode ackMode, FlowListener flowListener) throws ConnectionException {
        if (logger.isDebugEnabled()) {
            logger.debug("Consuming a single record from the Kafka Consumer.");
            logger.debug("Consume operation invoked using connection {}.", (Object)connection);
        }
        return connection.consume(config::parseRecord, config.asDuration(pollTimeout, pollTimeoutTimeUnit), AckMode.MANUAL == ackMode ? Duration.ofMillis(-1L) : config.asDuration(operationTimeout, operationTimeoutTimeUnit), ackMode, flowListener);
    }
}

