/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.connectors.kafka.internal.config;

import com.mulesoft.connectors.commons.template.config.ConnectorConfig;
import com.mulesoft.connectors.kafka.api.KafkaRecordAttributes;
import com.mulesoft.connectors.kafka.api.source.AckMode;
import com.mulesoft.connectors.kafka.api.source.ConsumerContext;
import com.mulesoft.connectors.kafka.api.source.Record;
import com.mulesoft.connectors.kafka.internal.connection.provider.plaintext.PlaintextConsumerConnectionProvider;
import com.mulesoft.connectors.kafka.internal.connection.provider.sasl.kerberos.KerberosConsumerConnectionProvider;
import com.mulesoft.connectors.kafka.internal.connection.provider.sasl.plain.PlainConsumerConnectionProvider;
import com.mulesoft.connectors.kafka.internal.connection.provider.sasl.scram.ScramConsumerConnectionProvider;
import com.mulesoft.connectors.kafka.internal.connection.provider.sasl.token.TokenConsumerConnectionProvider;
import com.mulesoft.connectors.kafka.internal.model.serializer.SerByteArray;
import com.mulesoft.connectors.kafka.internal.operation.CommitOperation;
import com.mulesoft.connectors.kafka.internal.operation.ConsumeOperation;
import com.mulesoft.connectors.kafka.internal.operation.SeekOperation;
import com.mulesoft.connectors.kafka.internal.source.BatchMessageListenerSource;
import com.mulesoft.connectors.kafka.internal.source.SingleMessageListenerSource;
import java.io.InputStream;
import java.time.Duration;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.record.TimestampType;
import org.mule.runtime.api.meta.ExpressionSupport;
import org.mule.runtime.api.util.MultiMap;
import org.mule.runtime.extension.api.annotation.Configuration;
import org.mule.runtime.extension.api.annotation.Expression;
import org.mule.runtime.extension.api.annotation.Operations;
import org.mule.runtime.extension.api.annotation.Sources;
import org.mule.runtime.extension.api.annotation.connectivity.ConnectionProviders;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.display.DisplayName;
import org.mule.runtime.extension.api.annotation.param.display.Placement;
import org.mule.runtime.extension.api.annotation.param.display.Summary;
import org.mule.runtime.extension.api.runtime.operation.Result;

@Operations(value={CommitOperation.class, ConsumeOperation.class, SeekOperation.class})
@Sources(value={BatchMessageListenerSource.class, SingleMessageListenerSource.class})
@ConnectionProviders(value={PlaintextConsumerConnectionProvider.class, ScramConsumerConnectionProvider.class, KerberosConsumerConnectionProvider.class, PlainConsumerConnectionProvider.class, TokenConsumerConnectionProvider.class})
@Configuration(name="consumer-config")
@DisplayName(value="Consumer configuration")
public class ConsumerConfiguration
implements ConnectorConfig {
    @Parameter
    @Placement(order=1)
    @Summary(value="Declares the kind of Acknowledgement mode supported.")
    @org.mule.runtime.extension.api.annotation.param.Optional(defaultValue="AUTO")
    @DisplayName(value="Default acknowledgement mode")
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    private AckMode ackMode;
    @Parameter
    @Placement(order=2)
    @Summary(value="The amount of time units to block on each poll, for Message Listeners (Sources)")
    @DisplayName(value="Default listener poll timeout")
    @org.mule.runtime.extension.api.annotation.param.Optional(defaultValue="100")
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    private Integer pollTimeout;
    @Parameter
    @Placement(order=3)
    @DisplayName(value="Default listener poll timeout time unit")
    @org.mule.runtime.extension.api.annotation.param.Optional(defaultValue="MILLISECONDS")
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    private TimeUnit pollTimeoutTimeUnit;
    @Parameter
    @Placement(order=4)
    @Summary(value="The amount of time units to block on each poll, for consumer operations")
    @DisplayName(value="Default operation poll timeout")
    @org.mule.runtime.extension.api.annotation.param.Optional(defaultValue="-1")
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    private int operationTimeout;
    @Parameter
    @Placement(order=5)
    @DisplayName(value="Default operation poll timeout time unit")
    @org.mule.runtime.extension.api.annotation.param.Optional(defaultValue="SECONDS")
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    private TimeUnit operationTimeoutTimeUnit;
    @Parameter
    @Placement(order=6)
    @org.mule.runtime.extension.api.annotation.param.Optional
    @DisplayName(value="Zone ID")
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    private String zoneId;

    public Result<List<Record>, ConsumerContext> parseRecords(String sessionId, List<ConsumerRecord<InputStream, InputStream>> records) {
        return Result.builder().output(StreamSupport.stream(records.spliterator(), false).map(consumerRecord -> this.parseRecord(sessionId, (ConsumerRecord<InputStream, InputStream>)consumerRecord)).map(record -> new Record((InputStream)record.getOutput(), (KafkaRecordAttributes)record.getAttributes().get(), new SerByteArray((InputStream)record.getOutput()))).collect(Collectors.toList())).attributes((Object)new ConsumerContext(sessionId)).build();
    }

    public Result<InputStream, KafkaRecordAttributes> parseRecord(String consumerCommitKey, ConsumerRecord<InputStream, InputStream> record) {
        return Result.builder().output(record.value()).attributes((Object)new KafkaRecordAttributes(Optional.ofNullable(consumerCommitKey).orElse(null), record.topic(), record.partition(), (Map)StreamSupport.stream(record.headers().spliterator(), false).collect(Collectors.toMap(Header::key, header -> Optional.ofNullable(header.value()).orElseGet(""::getBytes), (prevValue, currentValue) -> currentValue, MultiMap::new)), (InputStream)record.key(), record.offset(), TimestampType.CREATE_TIME.equals((Object)record.timestampType()) ? ZonedDateTime.ofInstant(new Date(record.timestamp()).toInstant(), Optional.ofNullable(this.zoneId).map(ZoneId::of).orElseGet(ZoneId::systemDefault)) : null, TimestampType.LOG_APPEND_TIME.equals((Object)record.timestampType()) ? ZonedDateTime.ofInstant(new Date(record.timestamp()).toInstant(), Optional.ofNullable(this.zoneId).map(ZoneId::of).orElseGet(ZoneId::systemDefault)) : null, record.serializedKeySize(), record.serializedValueSize(), record.leaderEpoch().orElse(null))).build();
    }

    public Duration asDuration(long amount, TimeUnit timeUnit) {
        ChronoUnit chronoUnit = null;
        switch (timeUnit) {
            case NANOSECONDS: {
                chronoUnit = ChronoUnit.NANOS;
                break;
            }
            case MICROSECONDS: {
                chronoUnit = ChronoUnit.MICROS;
                break;
            }
            case MILLISECONDS: {
                chronoUnit = ChronoUnit.MILLIS;
                break;
            }
            case SECONDS: {
                chronoUnit = ChronoUnit.SECONDS;
                break;
            }
            case MINUTES: {
                chronoUnit = ChronoUnit.MINUTES;
                break;
            }
            case HOURS: {
                chronoUnit = ChronoUnit.HOURS;
                break;
            }
            case DAYS: {
                chronoUnit = ChronoUnit.DAYS;
            }
        }
        return Duration.of(amount, chronoUnit);
    }
}

