/*
 * Decompiled with CFR 0.152.
 */
package io.openlineage.client.transports;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import com.amazonaws.services.kinesis.producer.UserRecord;
import com.amazonaws.services.kinesis.producer.UserRecordResult;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineageClientUtils;
import io.openlineage.client.transports.KinesisConfig;
import io.openlineage.client.transports.Transport;
import io.openlineage.spark.shaded.org.apache.commons.lang3.StringUtils;
import java.nio.ByteBuffer;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KinesisTransport
extends Transport {
    private static final Logger log = LoggerFactory.getLogger(KinesisTransport.class);
    private final String streamName;
    private final String region;
    private final String roleArn;
    private final KinesisProducer producer;
    private final Executor listeningExecutor;

    public KinesisTransport(@NonNull KinesisProducer kinesisProducer, @NonNull KinesisConfig kinesisConfig) {
        super(Transport.Type.KINESIS);
        if (kinesisProducer == null) {
            throw new NullPointerException("kinesisProducer is marked non-null but is null");
        }
        if (kinesisConfig == null) {
            throw new NullPointerException("kinesisConfig is marked non-null but is null");
        }
        this.streamName = kinesisConfig.getStreamName();
        this.region = kinesisConfig.getRegion();
        this.roleArn = kinesisConfig.getRoleArn();
        this.producer = kinesisProducer;
        this.listeningExecutor = Executors.newSingleThreadExecutor();
    }

    public KinesisTransport(@NonNull KinesisConfig kinesisConfig) {
        super(Transport.Type.KINESIS);
        if (kinesisConfig == null) {
            throw new NullPointerException("kinesisConfig is marked non-null but is null");
        }
        this.streamName = kinesisConfig.getStreamName();
        this.region = kinesisConfig.getRegion();
        this.roleArn = kinesisConfig.getRoleArn();
        KinesisProducerConfiguration config = KinesisProducerConfiguration.fromProperties((Properties)kinesisConfig.getProperties());
        config.setRegion(this.region);
        if (StringUtils.isNotBlank(this.roleArn)) {
            config.setCredentialsProvider((AWSCredentialsProvider)new STSAssumeRoleSessionCredentialsProvider.Builder(this.roleArn, "OLProducer").build());
        }
        this.producer = new KinesisProducer(config);
        this.listeningExecutor = Executors.newSingleThreadExecutor();
    }

    @Override
    public void emit(@NonNull OpenLineage.RunEvent runEvent) {
        if (runEvent == null) {
            throw new NullPointerException("runEvent is marked non-null but is null");
        }
        String eventAsJson = OpenLineageClientUtils.toJson(runEvent);
        OpenLineage.Job job = runEvent.getJob();
        String partitionKey = "run:" + job.getNamespace() + "/" + job.getName();
        this.emit(eventAsJson, partitionKey);
    }

    @Override
    public void emit(@NonNull OpenLineage.DatasetEvent datasetEvent) {
        if (datasetEvent == null) {
            throw new NullPointerException("datasetEvent is marked non-null but is null");
        }
        String eventAsJson = OpenLineageClientUtils.toJson(datasetEvent);
        OpenLineage.StaticDataset dataset = datasetEvent.getDataset();
        String partitionKey = "dataset:" + dataset.getNamespace() + "/" + dataset.getName();
        this.emit(eventAsJson, partitionKey);
    }

    @Override
    public void emit(@NonNull OpenLineage.JobEvent jobEvent) {
        if (jobEvent == null) {
            throw new NullPointerException("jobEvent is marked non-null but is null");
        }
        String eventAsJson = OpenLineageClientUtils.toJson(jobEvent);
        OpenLineage.Job job = jobEvent.getJob();
        String partitionKey = "job:" + job.getNamespace() + "/" + job.getName();
        this.emit(eventAsJson, partitionKey);
    }

    private void emit(final String eventAsJson, String partitionKey) {
        ListenableFuture future = this.producer.addUserRecord(new UserRecord(this.streamName, partitionKey, ByteBuffer.wrap(eventAsJson.getBytes())));
        FutureCallback<UserRecordResult> callback = new FutureCallback<UserRecordResult>(){

            public void onSuccess(UserRecordResult result) {
                log.debug("Success to send to Kinesis lineage event: {}", (Object)eventAsJson);
            }

            public void onFailure(Throwable t) {
                log.error("Failed to send to Kinesis lineage event: {}", (Object)eventAsJson, (Object)t);
            }
        };
        Futures.addCallback((ListenableFuture)future, (FutureCallback)callback, (Executor)this.listeningExecutor);
    }
}

