/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.config.CloudSourceConfig;
import org.apache.hudi.utilities.config.GCSEventsSourceConfig;
import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.RowSource;
import org.apache.hudi.utilities.sources.helpers.gcs.MessageBatch;
import org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity;
import org.apache.hudi.utilities.sources.helpers.gcs.MetadataMessage;
import org.apache.hudi.utilities.sources.helpers.gcs.PubsubMessagesFetcher;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GcsEventsSource
extends RowSource {
    private final PubsubMessagesFetcher pubsubMessagesFetcher;
    private final SchemaProvider schemaProvider;
    private final boolean ackMessages;
    private final List<String> messagesToAck = new ArrayList<String>();
    private static final String CHECKPOINT_VALUE_ZERO = "0";
    private static final Logger LOG = LoggerFactory.getLogger(GcsEventsSource.class);

    public GcsEventsSource(TypedProperties props, JavaSparkContext jsc, SparkSession spark, SchemaProvider schemaProvider) {
        this(props, jsc, spark, schemaProvider, new PubsubMessagesFetcher(ConfigUtils.getStringWithAltKeys(props, GCSEventsSourceConfig.GOOGLE_PROJECT_ID), ConfigUtils.getStringWithAltKeys(props, GCSEventsSourceConfig.PUBSUB_SUBSCRIPTION_ID), ConfigUtils.getIntWithAltKeys(props, CloudSourceConfig.BATCH_SIZE_CONF), ConfigUtils.getIntWithAltKeys(props, CloudSourceConfig.MAX_NUM_MESSAGES_PER_SYNC), ConfigUtils.getIntWithAltKeys(props, CloudSourceConfig.MAX_FETCH_TIME_PER_SYNC_SECS)));
    }

    public GcsEventsSource(TypedProperties props, JavaSparkContext jsc, SparkSession spark, SchemaProvider schemaProvider, PubsubMessagesFetcher pubsubMessagesFetcher) {
        super(props, jsc, spark, schemaProvider);
        this.pubsubMessagesFetcher = pubsubMessagesFetcher;
        this.ackMessages = ConfigUtils.getBooleanWithAltKeys(props, CloudSourceConfig.ACK_MESSAGES);
        this.schemaProvider = schemaProvider;
        LOG.info("Created GcsEventsSource");
    }

    @Override
    protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
        MessageBatch messageBatch;
        LOG.info("fetchNextBatch(): Input checkpoint: " + lastCkptStr);
        try {
            messageBatch = this.fetchFileMetadata();
        }
        catch (HoodieException e) {
            throw e;
        }
        catch (Exception e) {
            throw new HoodieReadFromSourceException("Failed to fetch file metadata from GCS events source", e);
        }
        if (messageBatch.isEmpty()) {
            LOG.info("No new data. Returning empty batch with checkpoint value: 0");
            return Pair.of(Option.empty(), CHECKPOINT_VALUE_ZERO);
        }
        Dataset eventRecords = this.sparkSession.createDataset(messageBatch.getMessages(), Encoders.STRING());
        LOG.info("Returning checkpoint value: 0");
        StructType sourceSchema = UtilHelpers.getSourceSchema(this.schemaProvider);
        if (sourceSchema != null) {
            return Pair.of(Option.of(this.sparkSession.read().schema(sourceSchema).json(eventRecords)), CHECKPOINT_VALUE_ZERO);
        }
        return Pair.of(Option.of(this.sparkSession.read().json(eventRecords)), CHECKPOINT_VALUE_ZERO);
    }

    @Override
    public void onCommit(String lastCkptStr) {
        LOG.info("onCommit(): Checkpoint: " + lastCkptStr);
        if (this.ackMessages) {
            this.ackOutstandingMessages();
        } else {
            LOG.warn("Not acknowledging messages. Can result in repeated redeliveries.");
        }
    }

    MessageBatch fetchFileMetadata() {
        List<ReceivedMessage> receivedMessages = this.pubsubMessagesFetcher.fetchMessages();
        return this.processMessages(receivedMessages);
    }

    private MessageBatch processMessages(List<ReceivedMessage> receivedMessages) {
        ArrayList<String> messages = new ArrayList<String>();
        for (ReceivedMessage received : receivedMessages) {
            MetadataMessage message = new MetadataMessage(received.getMessage());
            String msgStr = message.toStringUtf8();
            this.logDetails(message, msgStr);
            this.messagesToAck.add(received.getAckId());
            MessageValidity messageValidity = message.shouldBeProcessed();
            if (messageValidity.getDecision() == MessageValidity.ProcessingDecision.DO_SKIP) {
                LOG.info("Skipping message: " + messageValidity.getDescription());
                continue;
            }
            messages.add(msgStr);
        }
        return new MessageBatch(messages);
    }

    private void ackOutstandingMessages() {
        if (this.messagesToAck.isEmpty()) {
            return;
        }
        try {
            this.pubsubMessagesFetcher.sendAcks(this.messagesToAck);
            this.messagesToAck.clear();
        }
        catch (IOException e) {
            throw new HoodieReadFromSourceException("Error when acknowledging messages from Pubsub", e);
        }
    }

    private void logDetails(MetadataMessage message, String msgStr) {
        LOG.info("eventType: " + message.getEventType() + ", objectId: " + message.getObjectId());
        if (LOG.isDebugEnabled()) {
            LOG.debug("msg: " + msgStr);
        }
    }
}

