/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.RowSource;
import org.apache.hudi.utilities.sources.helpers.gcs.MessageBatch;
import org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity;
import org.apache.hudi.utilities.sources.helpers.gcs.MetadataMessage;
import org.apache.hudi.utilities.sources.helpers.gcs.PubsubMessagesFetcher;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class GcsEventsSource
extends RowSource {
    private final PubsubMessagesFetcher pubsubMessagesFetcher;
    private final boolean ackMessages;
    private final List<String> messagesToAck = new ArrayList<String>();
    private static final String CHECKPOINT_VALUE_ZERO = "0";
    private static final Logger LOG = LogManager.getLogger(GcsEventsSource.class);

    public GcsEventsSource(TypedProperties props, JavaSparkContext jsc, SparkSession spark, SchemaProvider schemaProvider) {
        this(props, jsc, spark, schemaProvider, new PubsubMessagesFetcher(props.getString("hoodie.deltastreamer.source.gcs.project.id"), props.getString("hoodie.deltastreamer.source.gcs.subscription.id"), props.getInteger("hoodie.deltastreamer.source.cloud.meta.batch.size", 10)));
    }

    public GcsEventsSource(TypedProperties props, JavaSparkContext jsc, SparkSession spark, SchemaProvider schemaProvider, PubsubMessagesFetcher pubsubMessagesFetcher) {
        super(props, jsc, spark, schemaProvider);
        this.pubsubMessagesFetcher = pubsubMessagesFetcher;
        this.ackMessages = props.getBoolean("hoodie.deltastreamer.source.cloud.meta.ack", true);
        LOG.info((Object)"Created GcsEventsSource");
    }

    @Override
    protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
        LOG.info((Object)("fetchNextBatch(): Input checkpoint: " + lastCkptStr));
        MessageBatch messageBatch = this.fetchFileMetadata();
        if (messageBatch.isEmpty()) {
            LOG.info((Object)"No new data. Returning empty batch with checkpoint value: 0");
            return Pair.of(Option.empty(), CHECKPOINT_VALUE_ZERO);
        }
        Dataset eventRecords = this.sparkSession.createDataset(messageBatch.getMessages(), Encoders.STRING());
        LOG.info((Object)"Returning checkpoint value: 0");
        return Pair.of(Option.of(this.sparkSession.read().json(eventRecords)), CHECKPOINT_VALUE_ZERO);
    }

    @Override
    public void onCommit(String lastCkptStr) {
        LOG.info((Object)("onCommit(): Checkpoint: " + lastCkptStr));
        if (this.ackMessages) {
            this.ackOutstandingMessages();
        } else {
            LOG.warn((Object)"Not acknowledging messages. Can result in repeated redeliveries.");
        }
    }

    MessageBatch fetchFileMetadata() {
        List<ReceivedMessage> receivedMessages = this.pubsubMessagesFetcher.fetchMessages();
        return this.processMessages(receivedMessages);
    }

    private MessageBatch processMessages(List<ReceivedMessage> receivedMessages) {
        ArrayList<String> messages = new ArrayList<String>();
        for (ReceivedMessage received : receivedMessages) {
            MetadataMessage message = new MetadataMessage(received.getMessage());
            String msgStr = message.toStringUtf8();
            this.logDetails(message, msgStr);
            this.messagesToAck.add(received.getAckId());
            MessageValidity messageValidity = message.shouldBeProcessed();
            if (messageValidity.getDecision() == MessageValidity.ProcessingDecision.DO_SKIP) {
                LOG.info((Object)("Skipping message: " + messageValidity.getDescription()));
                continue;
            }
            messages.add(msgStr);
        }
        return new MessageBatch(messages);
    }

    private void ackOutstandingMessages() {
        if (this.messagesToAck.isEmpty()) {
            return;
        }
        try {
            this.pubsubMessagesFetcher.sendAcks(this.messagesToAck);
            this.messagesToAck.clear();
        }
        catch (IOException e) {
            throw new HoodieException("Error when acknowledging messages from Pubsub", e);
        }
    }

    private void logDetails(MetadataMessage message, String msgStr) {
        LOG.info((Object)("eventType: " + message.getEventType() + ", objectId: " + message.getObjectId()));
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("msg: " + msgStr));
        }
    }
}

