/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.plugin.gcp.publisher;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.format.StructuredRecordStringConverter;
import io.cdap.plugin.format.avro.StructuredToAvroTransformer;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.publisher.GooglePublisher;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.threeten.bp.Duration;

public class PubSubOutputFormat
extends OutputFormat<NullWritable, StructuredRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(PubSubOutputFormat.class);
    private static final String SERVICE_ACCOUNT = "service.account";
    private static final String SERVICE_ACCOUNT_TYPE = "service.account.type";
    private static final String SERVICE_ACCOUNT_TYPE_JSON = "json";
    private static final String SERVICE_ACCOUNT_TYPE_FILE_PATH = "filePath";
    private static final String PROJECT = "project";
    private static final String TOPIC = "topic";
    private static final String COUNT_BATCH_SIZE = "message.count.batch.size";
    private static final String REQUEST_BYTES_THRESHOLD = "request.bytes.threshold";
    private static final String DELAY_THRESHOLD = "delay.threshold";
    private static final String ERROR_THRESHOLD = "error.threshold";
    private static final String RETRY_TIMEOUT_SECONDS = "retry.timeout";

    public static void configure(Configuration configuration, GooglePublisher.Config config) {
        String serviceAccount = config.getServiceAccount();
        String format = config.getFormat();
        String delimiter = config.getDelimiter();
        if (serviceAccount != null) {
            configuration.set(SERVICE_ACCOUNT_TYPE, config.getServiceAccountType());
            configuration.set(SERVICE_ACCOUNT, config.getServiceAccount());
        }
        String projectId = config.getProject();
        configuration.set(PROJECT, projectId);
        configuration.set(TOPIC, config.getTopic());
        configuration.set(COUNT_BATCH_SIZE, String.valueOf(config.getMessageCountBatchSize()));
        configuration.set(REQUEST_BYTES_THRESHOLD, String.valueOf(config.getRequestBytesThreshold()));
        configuration.set(DELAY_THRESHOLD, String.valueOf(config.getPublishDelayThresholdMillis()));
        configuration.set(ERROR_THRESHOLD, String.valueOf(config.getErrorThreshold()));
        configuration.set(RETRY_TIMEOUT_SECONDS, String.valueOf(config.getRetryTimeoutSeconds()));
        configuration.set("format", format);
        if (delimiter != null) {
            configuration.set("delimiter", config.getDelimiter());
        }
    }

    public RecordWriter<NullWritable, StructuredRecord> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException {
        Configuration config = taskAttemptContext.getConfiguration();
        String serviceAccount = config.get(SERVICE_ACCOUNT);
        boolean isServiceAccountFilePath = SERVICE_ACCOUNT_TYPE_FILE_PATH.equals(config.get(SERVICE_ACCOUNT_TYPE));
        String projectId = config.get(PROJECT);
        String topic = config.get(TOPIC);
        String format = config.get("format");
        String delimiter = config.get("delimiter");
        long countSize = Long.parseLong(config.get(COUNT_BATCH_SIZE));
        long bytesThreshold = Long.parseLong(config.get(REQUEST_BYTES_THRESHOLD));
        long delayThreshold = Long.parseLong(config.get(DELAY_THRESHOLD));
        long errorThreshold = Long.parseLong(config.get(ERROR_THRESHOLD));
        int retryTimeout = Integer.parseInt(config.get(RETRY_TIMEOUT_SECONDS));
        Publisher.Builder publisher = Publisher.newBuilder((TopicName)ProjectTopicName.of((String)projectId, (String)topic)).setBatchingSettings(this.getBatchingSettings(countSize, bytesThreshold, delayThreshold)).setRetrySettings(this.getRetrySettings(retryTimeout));
        if (serviceAccount != null) {
            publisher.setCredentialsProvider(() -> GCPUtils.loadServiceAccountCredentials(serviceAccount, isServiceAccountFilePath));
        }
        return new PubSubRecordWriter(publisher.build(), format, delimiter, errorThreshold);
    }

    private RetrySettings getRetrySettings(int maxRetryTimeout) {
        return RetrySettings.newBuilder().setInitialRetryDelay(Duration.ofMillis((long)100L)).setRetryDelayMultiplier(2.0).setMaxRetryDelay(Duration.ofSeconds((long)2L)).setTotalTimeout(Duration.ofSeconds((long)maxRetryTimeout)).setInitialRpcTimeout(Duration.ofSeconds((long)1L)).setMaxRpcTimeout(Duration.ofSeconds((long)5L)).build();
    }

    private BatchingSettings getBatchingSettings(long countSize, long bytesThreshold, long delayThreshold) {
        return BatchingSettings.newBuilder().setElementCountThreshold(Long.valueOf(countSize)).setRequestByteThreshold(Long.valueOf(bytesThreshold)).setDelayThreshold(Duration.ofMillis((long)delayThreshold)).build();
    }

    public void checkOutputSpecs(JobContext jobContext) {
    }

    public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) {
        return new OutputCommitter(){

            public void setupJob(JobContext jobContext) {
            }

            public void setupTask(TaskAttemptContext taskAttemptContext) {
            }

            public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) {
                return false;
            }

            public void commitTask(TaskAttemptContext taskAttemptContext) {
            }

            public void abortTask(TaskAttemptContext taskAttemptContext) {
            }
        };
    }

    public class PubSubRecordWriter
    extends RecordWriter<NullWritable, StructuredRecord> {
        private final Publisher publisher;
        private final AtomicLong failures;
        private final AtomicReference<Throwable> error;
        private final long errorThreshold;
        private final Set<ApiFuture> futures;
        private final String format;
        private final String delimiter;

        public PubSubRecordWriter(Publisher publisher, String format, String delimiter, long errorThreshold) {
            this.publisher = publisher;
            this.error = new AtomicReference();
            this.errorThreshold = errorThreshold;
            this.failures = new AtomicLong(0L);
            this.futures = ConcurrentHashMap.newKeySet();
            this.format = format;
            this.delimiter = delimiter;
        }

        public void write(NullWritable key, StructuredRecord value) throws IOException {
            this.handleErrorIfAny();
            PubsubMessage message = this.getPubSubMessage(value);
            final ApiFuture future = this.publisher.publish(message);
            this.futures.add(future);
            ApiFutures.addCallback((ApiFuture)future, (ApiFutureCallback)new ApiFutureCallback<String>(){

                public void onFailure(Throwable throwable) {
                    PubSubRecordWriter.this.error.set(throwable);
                    PubSubRecordWriter.this.failures.incrementAndGet();
                    PubSubRecordWriter.this.futures.remove(future);
                }

                public void onSuccess(String s) {
                    PubSubRecordWriter.this.futures.remove(future);
                }
            });
        }

        private PubsubMessage getPubSubMessage(StructuredRecord value) throws IOException {
            PubsubMessage message = null;
            switch (this.format) {
                case "avro": 
                case "parquet": {
                    StructuredToAvroTransformer structuredToAvroTransformer = new StructuredToAvroTransformer(value.getSchema());
                    GenericRecord transform = structuredToAvroTransformer.transform(value);
                    Schema avroSchema = new Schema.Parser().parse(String.valueOf(value.getSchema()));
                    GenericDatumWriter datumWriter = new GenericDatumWriter(avroSchema);
                    ByteArrayOutputStream out = new ByteArrayOutputStream();
                    BinaryEncoder encoder = EncoderFactory.get().binaryEncoder((OutputStream)out, null);
                    datumWriter.write((Object)transform, (Encoder)encoder);
                    encoder.flush();
                    out.close();
                    byte[] serializedBytes = out.toByteArray();
                    ByteString data = ByteString.copyFrom((byte[])serializedBytes);
                    message = PubsubMessage.newBuilder().setData(data).build();
                    break;
                }
                case "text": 
                case "blob": 
                case "json": {
                    String payload = StructuredRecordStringConverter.toJsonString((StructuredRecord)value);
                    ByteString data = ByteString.copyFromUtf8((String)payload);
                    message = PubsubMessage.newBuilder().setData(data).build();
                    break;
                }
                case "csv": {
                    String payload = StructuredRecordStringConverter.toDelimitedString((StructuredRecord)value, (String)",");
                    ByteString data = ByteString.copyFromUtf8((String)payload);
                    message = PubsubMessage.newBuilder().setData(data).build();
                    break;
                }
                case "delimited": {
                    String payload = StructuredRecordStringConverter.toDelimitedString((StructuredRecord)value, (String)this.delimiter);
                    ByteString data = ByteString.copyFromUtf8((String)payload);
                    message = PubsubMessage.newBuilder().setData(data).build();
                    break;
                }
                case "tsv": {
                    String payload = StructuredRecordStringConverter.toDelimitedString((StructuredRecord)value, (String)"\t");
                    ByteString data = ByteString.copyFromUtf8((String)payload);
                    message = PubsubMessage.newBuilder().setData(data).build();
                }
            }
            return message;
        }

        private void handleErrorIfAny() throws IOException {
            if (this.failures.get() > this.errorThreshold) {
                throw new IOException(String.format("Failed to publish %s records", this.failures.get()), this.error.get());
            }
        }

        public void close(TaskAttemptContext taskAttemptContext) throws IOException {
            try {
                this.publisher.publishAllOutstanding();
                for (ApiFuture future : this.futures) {
                    future.get();
                    this.handleErrorIfAny();
                }
            }
            catch (InterruptedException | ExecutionException e) {
                throw new IOException("Error publishing records to PubSub", e);
            }
            finally {
                try {
                    this.publisher.shutdown();
                }
                catch (Exception e) {
                    LOG.debug("Exception while shutting down publisher ", (Throwable)e);
                }
            }
        }

        public String getFormat() {
            return this.format;
        }

        public String getDelimiter() {
            return this.delimiter;
        }
    }
}

