/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.connector.amazon.kinesis.internal.connection;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.mulesoft.connector.amazon.kinesis.api.Role;
import com.mulesoft.connector.amazon.kinesis.api.StreamInitialPosition;
import com.mulesoft.connector.amazon.kinesis.internal.config.parameter.AdvancedProducerParameters;
import com.mulesoft.connector.amazon.kinesis.internal.connection.KinesisConnection;
import com.mulesoft.connector.amazon.kinesis.internal.connection.adapter.KinesisConsumerAsyncAdapter;
import com.mulesoft.connector.amazon.kinesis.internal.connection.adapter.KinesisConsumerSyncAdapter;
import com.mulesoft.connector.amazon.kinesis.internal.connection.client.CloudWatchAsyncClientDelegate;
import com.mulesoft.connector.amazon.kinesis.internal.connection.client.DynamoDbAsyncClientDelegate;
import com.mulesoft.connector.amazon.kinesis.internal.connection.client.KinesisAsyncClientDelegate;
import com.mulesoft.connector.amazon.kinesis.internal.connection.client.KinesisClientDelegate;
import com.mulesoft.connector.amazon.kinesis.internal.connection.provider.parameter.CommonListenerParameters;
import com.mulesoft.connector.amazon.kinesis.internal.connection.sts.ConsumerRoleCredentialsProvider;
import com.mulesoft.connector.amazon.kinesis.internal.connection.sts.ProducerRoleCredentialsProvider;
import com.mulesoft.connector.amazon.kinesis.internal.error.exception.InvalidArgumentException;
import com.mulesoft.connector.amazon.kinesis.internal.error.exception.InvalidRegionException;
import com.mulesoft.connector.amazon.kinesis.internal.error.exception.InvalidStreamException;
import com.mulesoft.connector.amazon.kinesis.internal.error.exception.KinesisRuntimeException;
import com.mulesoft.connector.amazon.kinesis.internal.error.exception.ProcessingException;
import com.mulesoft.connector.amazon.kinesis.internal.error.exception.UnauthorizedException;
import com.mulesoft.connector.amazon.kinesis.internal.source.consumer.Consumer;
import com.mulesoft.connector.amazon.kinesis.internal.source.consumer.ConsumerImpl;
import com.mulesoft.connector.amazon.kinesis.internal.util.KinesisUtil;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.process.CompletionCallback;
import org.mule.runtime.http.api.client.HttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.StsClientBuilder;
import software.amazon.kinesis.producer.Attempt;
import software.amazon.kinesis.producer.KinesisProducer;
import software.amazon.kinesis.producer.KinesisProducerConfiguration;
import software.amazon.kinesis.producer.UserRecord;
import software.amazon.kinesis.producer.UserRecordFailedException;
import software.amazon.kinesis.producer.UserRecordResult;

public class KinesisConnectionImpl
implements KinesisConnection {
    private static final Logger logger = LoggerFactory.getLogger(KinesisConnectionImpl.class);
    private final String accessKey;
    private final String secretKey;
    private final String region;
    private final Role role;
    private final KinesisAsyncClient kinesisAsyncClient;
    private final KinesisClient kinesisSyncClient;
    private final DynamoDbAsyncClient dynamoClient;
    private final CloudWatchAsyncClient cloudWatchClient;
    private final Scheduler kplCallbackScheduler;
    private final ObjectMapper objectMapper;
    private final int responseTimeout;
    private volatile KinesisProducer kinesisProducer;
    private final StsClient stsClient;
    private final String customKinesisEndpoint;
    private final String customCloudWatchEndpoint;
    private final String customStsEndpoint;

    public KinesisConnectionImpl(HttpClient httpClient, String accessKey, String secretKey, String region, Role role, Integer responseTimeout, Scheduler kplCallbackScheduler, ObjectMapper objectMapper, String customKinesisEndpoint, String customStsEndpoint, String customDynamoDBEndpoint, String customCloudWatchEndpoint) {
        this.accessKey = accessKey;
        this.secretKey = secretKey;
        this.region = region;
        this.role = role;
        this.responseTimeout = responseTimeout;
        this.customKinesisEndpoint = customKinesisEndpoint;
        this.customCloudWatchEndpoint = customCloudWatchEndpoint;
        this.customStsEndpoint = customStsEndpoint;
        Region awsRegion = Region.of((String)region);
        try (KinesisConsumerAsyncAdapter customAsyncClient = new KinesisConsumerAsyncAdapter(httpClient, responseTimeout);
             KinesisConsumerSyncAdapter customSyncClient = new KinesisConsumerSyncAdapter(httpClient, responseTimeout);){
            StsClientBuilder stsClientBuilder = (StsClientBuilder)((StsClientBuilder)((StsClientBuilder)StsClient.builder().httpClient((SdkHttpClient)customSyncClient)).credentialsProvider((AwsCredentialsProvider)StaticCredentialsProvider.create((AwsCredentials)AwsBasicCredentials.create((String)accessKey, (String)secretKey)))).region(awsRegion);
            if (!StringUtils.isBlank((CharSequence)customStsEndpoint)) {
                stsClientBuilder.endpointOverride(URI.create(customStsEndpoint));
            }
            this.stsClient = (StsClient)stsClientBuilder.build();
            AwsCredentialsProvider credentialsProvider = this.getConsumerConnectionProvider(this.stsClient, this.accessKey, this.secretKey, this.role);
            this.kinesisAsyncClient = new KinesisAsyncClientDelegate(customAsyncClient, credentialsProvider, awsRegion, customKinesisEndpoint);
            this.kinesisSyncClient = new KinesisClientDelegate(customSyncClient, credentialsProvider, awsRegion, customKinesisEndpoint);
            this.dynamoClient = new DynamoDbAsyncClientDelegate(customAsyncClient, credentialsProvider, awsRegion, customDynamoDBEndpoint);
            this.cloudWatchClient = new CloudWatchAsyncClientDelegate(customAsyncClient, credentialsProvider, awsRegion, customCloudWatchEndpoint);
        }
        catch (Exception e) {
            throw this.wrapException(e);
        }
        this.kplCallbackScheduler = kplCallbackScheduler;
        this.objectMapper = objectMapper;
        logger.debug("Consumer connection has been created.");
    }

    public void disconnect() {
        if (this.kinesisProducer != null) {
            this.kinesisProducer.destroy();
        }
    }

    public void validate() {
        try {
            this.kinesisSyncClient.listStreams(builder -> builder.limit(Integer.valueOf(1)));
            logger.debug("Kinesis connection test passed.");
            this.dynamoClient.listTables(builder -> builder.limit(Integer.valueOf(1))).get(this.responseTimeout, TimeUnit.MILLISECONDS);
            logger.debug("DynamoDB connection test passed.");
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new KinesisRuntimeException(e);
        }
        catch (ExecutionException e) {
            throw this.wrapException(e.getCause());
        }
        catch (Exception e) {
            throw this.wrapException(e);
        }
        try {
            this.cloudWatchClient.listMetrics().get(this.responseTimeout, TimeUnit.MILLISECONDS);
            logger.debug("CloudWatch connection test passed.");
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.warn("CloudWatch connection test failed. CloudWatch metrics may not be published.");
        }
        catch (ExecutionException | TimeoutException e) {
            logger.warn("CloudWatch connection test failed. CloudWatch metrics may not be published.");
        }
    }

    private MuleRuntimeException wrapException(Throwable t) {
        if (t instanceof MuleRuntimeException) {
            return (MuleRuntimeException)t;
        }
        if (t.getMessage().contains("The security token included in the request is invalid.") || t.getMessage().contains("The request signature we calculated does not match the signature you provided.")) {
            return new UnauthorizedException(t);
        }
        if (t.getMessage().contains("Configured region") && t.getMessage().contains("resulted in an invalid URI:")) {
            return new InvalidRegionException(t);
        }
        if (t.getMessage().contains("validation error") && t.getMessage().contains("detected:")) {
            return new InvalidArgumentException(t);
        }
        return new KinesisRuntimeException(t);
    }

    @Override
    public Consumer createConsumer(String consumerId, CommonListenerParameters commonParams, StreamInitialPosition initialPosition) {
        return new ConsumerImpl(consumerId, commonParams, initialPosition, this.kinesisAsyncClient, this.kinesisSyncClient, this.dynamoClient, this.cloudWatchClient);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void putRecord(InputStream data, String partitionKey, String explicitHashKey, String streamName, AdvancedProducerParameters advancedProducerParameters, CompletionCallback<InputStream, Void> muleCallback) {
        ListenableFuture userRecordResultListenableFuture;
        ByteBuffer sdkData;
        if (this.kinesisProducer == null) {
            KinesisConnectionImpl kinesisConnectionImpl = this;
            synchronized (kinesisConnectionImpl) {
                if (this.kinesisProducer == null) {
                    KinesisProducerConfiguration config;
                    try {
                        config = new KinesisProducerConfiguration().setCredentialsProvider(this.getProducerCredentialsProvider(this.stsClient, this.accessKey, this.secretKey, this.role)).setRegion(this.region).setThreadingModel(KinesisProducerConfiguration.ThreadingModel.POOLED).setAggregationEnabled(advancedProducerParameters.isAggregationEnabled()).setAggregationMaxCount(advancedProducerParameters.getAggregationMaxCount()).setAggregationMaxSize((long)advancedProducerParameters.getAggregationMaxSizeInBytes()).setCollectionMaxCount((long)advancedProducerParameters.getCollectionMaxCount()).setCollectionMaxSize((long)advancedProducerParameters.getCollectionMaxSizeInBytes()).setRateLimit(advancedProducerParameters.getRateLimit()).setMaxConnections((long)advancedProducerParameters.getMaxDaemonConnections()).setMinConnections((long)advancedProducerParameters.getMinConnections()).setThreadPoolSize(advancedProducerParameters.getThreadPoolSize()).setConnectTimeout(advancedProducerParameters.getConnectionTimeoutInMillis()).setRecordTtl(advancedProducerParameters.getRecordTtlInMillis()).setRequestTimeout(advancedProducerParameters.getResponseTimeoutInMillis()).setRecordMaxBufferedTime(advancedProducerParameters.getRecordMaxBufferedTimeInMillis());
                        this.setKinesisEndpoints(config);
                    }
                    catch (IllegalArgumentException e) {
                        muleCallback.error((Throwable)((Object)new InvalidArgumentException(e)));
                        return;
                    }
                    this.kinesisProducer = new KinesisProducer(config);
                }
            }
        }
        String finalPartitionKey = partitionKey != null && !partitionKey.isEmpty() ? partitionKey : UUID.randomUUID().toString();
        try {
            sdkData = ByteBuffer.wrap(IOUtils.toByteArray((InputStream)data));
            UserRecord rec = new UserRecord(streamName, finalPartitionKey, explicitHashKey, sdkData);
            userRecordResultListenableFuture = this.kinesisProducer.addUserRecord(rec);
        }
        catch (IllegalArgumentException e) {
            muleCallback.error((Throwable)((Object)new InvalidArgumentException(e)));
            return;
        }
        catch (Exception e) {
            muleCallback.error((Throwable)this.wrapException(e));
            return;
        }
        FutureCallback<UserRecordResult> callback = this.getUserRecordResultFutureCallback(muleCallback, finalPartitionKey, sdkData);
        Futures.addCallback((ListenableFuture)userRecordResultListenableFuture, callback, (Executor)this.kplCallbackScheduler);
    }

    private void setKinesisEndpoints(KinesisProducerConfiguration config) {
        if (!StringUtils.isBlank((CharSequence)this.customKinesisEndpoint)) {
            config.setKinesisEndpoint(KinesisUtil.extractUrlAfterScheme(this.customKinesisEndpoint));
        }
        if (!StringUtils.isBlank((CharSequence)this.customCloudWatchEndpoint)) {
            config.setCloudwatchEndpoint(KinesisUtil.extractUrlAfterScheme(this.customCloudWatchEndpoint));
        }
        if (!StringUtils.isBlank((CharSequence)this.customStsEndpoint)) {
            config.setStsEndpoint(KinesisUtil.extractUrlAfterScheme(this.customStsEndpoint));
        }
    }

    @NotNull
    private FutureCallback<UserRecordResult> getUserRecordResultFutureCallback(final CompletionCallback<InputStream, Void> muleCallback, final String finalPartitionKey, final ByteBuffer sdkData) {
        return new FutureCallback<UserRecordResult>(){

            public void onFailure(Throwable t) {
                if (t instanceof UserRecordFailedException) {
                    UserRecordResult result = ((UserRecordFailedException)t).getResult();
                    if (!result.getAttempts().isEmpty() && ((Attempt)result.getAttempts().get(0)).getErrorCode().equals("ResourceNotFoundException")) {
                        muleCallback.error((Throwable)((Object)new InvalidStreamException(t)));
                    }
                    muleCallback.error((Throwable)((Object)new ProcessingException(String.format("Failed to put a record, partitionKey=%s, payload=%s, %d attempt(s):%n%s", finalPartitionKey, sdkData, result.getAttempts().size(), KinesisConnectionImpl.this.extractAttempts(result)), t)));
                }
            }

            public void onSuccess(UserRecordResult result) {
                byte[] bytes;
                if (result.getAttempts() != null && !result.getAttempts().isEmpty()) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Record added, partitionKey={}, payload={}, {} attempt(s):\n{}", new Object[]{finalPartitionKey, sdkData, result.getAttempts().size(), KinesisConnectionImpl.this.extractAttempts(result)});
                    }
                } else {
                    logger.debug("Record added, partitionKey={}, payload={}", (Object)finalPartitionKey, (Object)sdkData);
                }
                try {
                    bytes = KinesisConnectionImpl.this.objectMapper.writeValueAsBytes((Object)result);
                }
                catch (JsonProcessingException e) {
                    throw new ProcessingException("Unable to process the response from the server.", e.getCause());
                }
                muleCallback.success(Result.builder().output((Object)new ByteArrayInputStream(bytes)).build());
            }
        };
    }

    private AwsCredentialsProvider getConsumerConnectionProvider(StsClient stsClient, String accessKey, String secretKey, Role role) {
        AwsBasicCredentials credentials = AwsBasicCredentials.create((String)accessKey, (String)secretKey);
        if (role != null) {
            return new ConsumerRoleCredentialsProvider(stsClient, role);
        }
        return StaticCredentialsProvider.create((AwsCredentials)credentials);
    }

    private AwsCredentialsProvider getProducerCredentialsProvider(StsClient stsClient, String accessKey, String secretKey, Role role) {
        if (role != null) {
            return new ProducerRoleCredentialsProvider(stsClient, role);
        }
        return StaticCredentialsProvider.create((AwsCredentials)AwsBasicCredentials.create((String)accessKey, (String)secretKey));
    }

    private String extractAttempts(UserRecordResult result) {
        return result.getAttempts().stream().map(attempt -> String.format("Delay after prev attempt: %d ms, Duration: %d ms, Error code: %s, Message: %s", attempt.getDelay(), attempt.getDuration(), attempt.getErrorCode(), attempt.getErrorMessage())).collect(Collectors.joining("\n"));
    }
}

