/*
 * Decompiled with CFR 0.152.
 */
package com.mongodb.kafka.connect.source;

import com.mongodb.MongoClientSettings;
import com.mongodb.MongoCommandException;
import com.mongodb.MongoException;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.kafka.connect.source.MongoCopyDataManager;
import com.mongodb.kafka.connect.source.MongoSourceConfig;
import com.mongodb.kafka.connect.source.MongoSourceTask;
import com.mongodb.kafka.connect.source.heartbeat.HeartbeatManager;
import com.mongodb.kafka.connect.source.producer.SchemaAndValueProducer;
import com.mongodb.kafka.connect.source.producer.SchemaAndValueProducers;
import com.mongodb.kafka.connect.source.statistics.StatisticsManager;
import com.mongodb.kafka.connect.source.topic.mapping.TopicMapper;
import com.mongodb.kafka.connect.util.Assertions;
import com.mongodb.kafka.connect.util.jmx.SourceTaskStatistics;
import com.mongodb.kafka.connect.util.time.InnerOuterTimer;
import com.mongodb.lang.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.bson.BsonDocument;
import org.bson.BsonDocumentWrapper;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.RawBsonDocument;
import org.bson.codecs.Encoder;
import org.bson.codecs.configuration.CodecRegistry;

final class StartedMongoSourceTask
implements AutoCloseable {
    private static final String FULL_DOCUMENT = "fullDocument";
    private static final int NAMESPACE_NOT_FOUND_ERROR = 26;
    private static final int ILLEGAL_OPERATION_ERROR = 20;
    private static final int UNKNOWN_FIELD_ERROR = 40415;
    private static final int INVALIDATED_RESUME_TOKEN_ERROR = 260;
    private static final int CHANGE_STREAM_FATAL_ERROR = 280;
    private static final int CHANGE_STREAM_HISTORY_LOST = 286;
    private static final int BSON_OBJECT_TOO_LARGE = 10334;
    private static final Set<Integer> INVALID_CHANGE_STREAM_ERRORS = new HashSet<Integer>(Arrays.asList(260, 280, 286, 10334));
    private static final String RESUME_TOKEN = "resume token";
    private static final String RESUME_POINT = "resume point";
    private static final String NOT_FOUND = "not found";
    private static final String DOES_NOT_EXIST = "does not exist";
    private static final String INVALID_RESUME_TOKEN = "invalid resume token";
    private static final String NO_LONGER_IN_THE_OPLOG = "no longer be in the oplog";
    private static final SchemaAndValueProducer TOMBSTONE_SCHEMA_AND_VALUE_PRODUCER = i -> SchemaAndValue.NULL;
    private final Supplier<SourceTaskContext> sourceTaskContextAccessor;
    private final Time time;
    private volatile boolean isRunning;
    private boolean isCopying;
    private final MongoSourceConfig sourceConfig;
    private final Map<String, Object> partitionMap;
    private final MongoClient mongoClient;
    private HeartbeatManager heartbeatManager;
    private boolean supportsStartAfter = true;
    private boolean invalidatedCursor = false;
    @Nullable
    private final MongoCopyDataManager copyDataManager;
    private BsonDocument cachedResult;
    private BsonDocument cachedResumeToken;
    @Nullable
    private MongoChangeStreamCursor<? extends BsonDocument> cursor;
    private final StatisticsManager statisticsManager;
    private final InnerOuterTimer inTaskPollInConnectFrameworkTimer;

    StartedMongoSourceTask(Supplier<SourceTaskContext> sourceTaskContextAccessor, MongoSourceConfig sourceConfig, MongoClient mongoClient, @Nullable MongoCopyDataManager copyDataManager, StatisticsManager statisticsManager) {
        boolean shouldCopyData;
        this.sourceTaskContextAccessor = sourceTaskContextAccessor;
        this.sourceConfig = sourceConfig;
        this.mongoClient = mongoClient;
        this.isRunning = true;
        boolean bl = shouldCopyData = copyDataManager != null;
        if (shouldCopyData) {
            Assertions.assertTrue(sourceConfig.getStartupConfig().startupMode() == MongoSourceConfig.StartupConfig.StartupMode.COPY_EXISTING);
        }
        this.isCopying = shouldCopyData;
        this.time = Time.SYSTEM;
        this.partitionMap = MongoSourceTask.createPartitionMap(sourceConfig);
        this.copyDataManager = copyDataManager;
        if (shouldCopyData) {
            this.setCachedResultAndResumeToken();
        } else {
            this.initializeCursorAndHeartbeatManager();
        }
        this.statisticsManager = statisticsManager;
        this.inTaskPollInConnectFrameworkTimer = InnerOuterTimer.start(inTaskPollSample -> {
            SourceTaskStatistics statistics = statisticsManager.currentStatistics();
            statistics.getInTaskPoll().sample(inTaskPollSample.toMillis());
            if (MongoSourceTask.LOGGER.isDebugEnabled()) {
                MongoSourceTask.LOGGER.debug(statistics.getName() + ": " + statistics.toJSON());
            }
        }, inFrameworkSample -> statisticsManager.currentStatistics().getInConnectFramework().sample(inFrameworkSample.toMillis()));
    }

    List<SourceRecord> poll() {
        if (!this.isCopying) {
            this.statisticsManager.switchToStreamStatistics();
        }
        try (InnerOuterTimer.InnerTimer ignored = this.inTaskPollInConnectFrameworkTimer.sampleOuter();){
            List<SourceRecord> sourceRecords = this.pollInternal();
            if (sourceRecords != null) {
                this.statisticsManager.currentStatistics().getRecords().sample(sourceRecords.size());
            }
            List<SourceRecord> list = sourceRecords;
            return list;
        }
    }

    @Nullable
    private List<SourceRecord> pollInternal() {
        TopicMapper topicMapper = this.sourceConfig.getTopicMapper();
        boolean publishFullDocumentOnly = this.sourceConfig.getBoolean("publish.full.document.only");
        boolean publishFullDocumentOnlyTombstoneOnDelete = publishFullDocumentOnly ? this.sourceConfig.getBoolean("publish.full.document.only.tombstone.on.delete") : false;
        SchemaAndValueProducer keySchemaAndValueProducer = SchemaAndValueProducers.createKeySchemaAndValueProvider(this.sourceConfig);
        SchemaAndValueProducer valueSchemaAndValueProducer = SchemaAndValueProducers.createValueSchemaAndValueProvider(this.sourceConfig);
        ArrayList<SourceRecord> sourceRecords = new ArrayList<SourceRecord>();
        Iterator<BsonDocument> batchIterator = this.getNextBatch().iterator();
        while (batchIterator.hasNext()) {
            boolean isTombstoneEvent;
            String topicName;
            boolean noMoreDataToCopy;
            BsonDocument changeStreamDocument = batchIterator.next();
            HashMap<String, String> sourceOffset = new HashMap<String, String>();
            sourceOffset.put("_id", changeStreamDocument.getDocument((Object)"_id").toJson());
            if (this.isCopying) {
                sourceOffset.put("copy", "true");
            }
            boolean lastDocument = !batchIterator.hasNext();
            boolean bl = noMoreDataToCopy = this.copyDataManager != null && !this.copyDataManager.isCopying();
            if (this.isCopying && lastDocument && noMoreDataToCopy && this.cachedResumeToken != null) {
                sourceOffset.put("_id", this.cachedResumeToken.toJson());
                sourceOffset.remove("copy");
            }
            if ((topicName = topicMapper.getTopic(changeStreamDocument)).isEmpty()) {
                MongoSourceTask.LOGGER.warn("No topic set. Could not publish the message: {}", (Object)changeStreamDocument.toJson());
                continue;
            }
            Optional<Object> valueDocument = Optional.empty();
            boolean bl2 = isTombstoneEvent = publishFullDocumentOnlyTombstoneOnDelete && !changeStreamDocument.containsKey((Object)FULL_DOCUMENT);
            if (publishFullDocumentOnly) {
                if (changeStreamDocument.containsKey((Object)FULL_DOCUMENT) && changeStreamDocument.get((Object)FULL_DOCUMENT).isDocument()) {
                    valueDocument = Optional.of(changeStreamDocument.getDocument((Object)FULL_DOCUMENT));
                }
            } else {
                valueDocument = Optional.of(changeStreamDocument);
            }
            if (!valueDocument.isPresent() && !isTombstoneEvent) continue;
            BsonDocument valueDoc = (BsonDocument)valueDocument.orElse(new BsonDocument());
            MongoSourceTask.LOGGER.trace("Adding {} to {}: {}", new Object[]{valueDoc, topicName, sourceOffset});
            if (valueDoc instanceof RawBsonDocument) {
                int sizeBytes = ((RawBsonDocument)valueDoc).getByteBuffer().limit();
                this.statisticsManager.currentStatistics().getMongodbBytesRead().sample(sizeBytes);
            }
            BsonDocument keyDocument = this.sourceConfig.getKeyOutputFormat() == MongoSourceConfig.OutputFormat.SCHEMA ? changeStreamDocument : (this.sourceConfig.getBoolean("change.stream.document.key.as.key") != false && changeStreamDocument.containsKey((Object)"documentKey") ? changeStreamDocument.getDocument((Object)"documentKey") : new BsonDocument("_id", changeStreamDocument.get((Object)"_id")));
            this.createSourceRecord(keySchemaAndValueProducer, isTombstoneEvent ? TOMBSTONE_SCHEMA_AND_VALUE_PRODUCER : valueSchemaAndValueProducer, sourceOffset, topicName, keyDocument, valueDoc).map(sourceRecords::add);
        }
        MongoSourceTask.LOGGER.debug("Return batch of {}", (Object)sourceRecords.size());
        if (sourceRecords.isEmpty()) {
            Optional<SourceRecord> heartbeat;
            if (this.heartbeatManager != null && (heartbeat = this.heartbeatManager.heartbeat()).isPresent()) {
                MongoSourceTask.LOGGER.debug("Returning single heartbeat record");
                return Collections.singletonList(heartbeat.get());
            }
            MongoSourceTask.LOGGER.debug("Returning null because there are no source records and no heartbeat.");
            return null;
        }
        return sourceRecords;
    }

    private Optional<SourceRecord> createSourceRecord(SchemaAndValueProducer keySchemaAndValueProducer, SchemaAndValueProducer valueSchemaAndValueProducer, Map<String, String> sourceOffset, String topicName, @Nullable BsonDocument keyDocument, @Nullable BsonDocument valueDocument) {
        try {
            SchemaAndValue keySchemaAndValue = keySchemaAndValueProducer.get(keyDocument);
            SchemaAndValue valueSchemaAndValue = valueSchemaAndValueProducer.get(valueDocument);
            return Optional.of(new SourceRecord(this.partitionMap, sourceOffset, topicName, keySchemaAndValue.schema(), keySchemaAndValue.value(), valueSchemaAndValue.schema(), valueSchemaAndValue.value()));
        }
        catch (Exception e) {
            Supplier<String> errorMessage = () -> String.format("%s : Exception creating Source record for: Key=%s Value=%s", e.getMessage(), keyDocument == null ? "" : keyDocument.toJson(), valueDocument == null ? "" : valueDocument.toJson());
            if (this.sourceConfig.logErrors()) {
                MongoSourceTask.LOGGER.error(errorMessage.get(), (Throwable)e);
            }
            if (this.sourceConfig.tolerateErrors()) {
                if (this.sourceConfig.getDlqTopic().isEmpty()) {
                    return Optional.empty();
                }
                return Optional.of(new SourceRecord(this.partitionMap, sourceOffset, this.sourceConfig.getDlqTopic(), Schema.STRING_SCHEMA, (Object)(keyDocument == null ? "" : keyDocument.toJson()), Schema.STRING_SCHEMA, (Object)(valueDocument == null ? "" : valueDocument.toJson())));
            }
            throw new DataException(errorMessage.get(), (Throwable)e);
        }
    }

    @Override
    public void close() {
        MongoSourceTask.LOGGER.info("Stopping MongoDB source task");
        this.isRunning = false;
        try (StatisticsManager ignored3 = this.statisticsManager;
             MongoClient ignored2 = this.mongoClient;
             MongoChangeStreamCursor<? extends BsonDocument> ignored1 = this.cursor;){
            MongoCopyDataManager ignored = this.copyDataManager;
            if (ignored != null) {
                ignored.close();
            }
        }
    }

    private void initializeCursorAndHeartbeatManager() {
        this.cursor = this.createCursor(this.sourceConfig, this.mongoClient);
        this.heartbeatManager = new HeartbeatManager(this.time, this.cursor, this.sourceConfig.getLong("heartbeat.interval.ms"), this.sourceConfig.getString("heartbeat.topic.name"), this.partitionMap);
    }

    @Nullable
    MongoChangeStreamCursor<? extends BsonDocument> createCursor(MongoSourceConfig sourceConfig, MongoClient mongoClient) {
        MongoSourceTask.LOGGER.debug("Creating a MongoCursor");
        return this.tryCreateCursor(sourceConfig, mongoClient, this.getResumeToken(sourceConfig));
    }

    @Nullable
    private MongoChangeStreamCursor<? extends BsonDocument> tryRecreateCursor(MongoException e) {
        int errorCode = e instanceof MongoCommandException ? ((MongoCommandException)e).getErrorCode() : e.getCode();
        String errorMessage = e instanceof MongoCommandException ? ((MongoCommandException)e).getErrorMessage() : e.getMessage();
        MongoSourceTask.LOGGER.warn("Failed to resume change stream: {} {}\n===================================================================================\nWhen the resume token is no longer available there is the potential for data loss.\n\nRestarting the change stream with no resume token because `errors.tolerance=all`.\n===================================================================================\n", (Object)errorMessage, (Object)errorCode);
        this.invalidatedCursor = true;
        return this.tryCreateCursor(this.sourceConfig, this.mongoClient, null);
    }

    @Nullable
    private MongoChangeStreamCursor<? extends BsonDocument> tryCreateCursor(MongoSourceConfig sourceConfig, MongoClient mongoClient, BsonDocument resumeToken) {
        try {
            ChangeStreamIterable<Document> changeStreamIterable = StartedMongoSourceTask.getChangeStreamIterable(sourceConfig, mongoClient);
            if (resumeToken != null && this.supportsStartAfter) {
                MongoSourceTask.LOGGER.info("Resuming the change stream after the previous offset: {}", (Object)resumeToken);
                changeStreamIterable.startAfter(resumeToken);
            } else if (resumeToken != null && !this.invalidatedCursor) {
                MongoSourceTask.LOGGER.info("Resuming the change stream after the previous offset using resumeAfter: {}", (Object)resumeToken);
                changeStreamIterable.resumeAfter(resumeToken);
            } else {
                MongoSourceConfig.StartupConfig startupConfig = sourceConfig.getStartupConfig();
                if (startupConfig.startupMode() == MongoSourceConfig.StartupConfig.StartupMode.TIMESTAMP) {
                    Optional<BsonTimestamp> startAtOperationTime = startupConfig.timestampConfig().startAtOperationTime();
                    if (startAtOperationTime.isPresent()) {
                        MongoSourceTask.LOGGER.info("New change stream cursor created without offset but at the configured operation time.");
                        changeStreamIterable.startAtOperationTime(startAtOperationTime.get());
                    } else {
                        MongoSourceTask.LOGGER.info("New change stream cursor created without offset.");
                    }
                } else {
                    MongoSourceTask.LOGGER.info("New change stream cursor created without offset.");
                }
            }
            return (MongoChangeStreamCursor)changeStreamIterable.withDocumentClass(RawBsonDocument.class).cursor();
        }
        catch (MongoCommandException e) {
            if (resumeToken != null) {
                if (StartedMongoSourceTask.invalidatedResumeToken(e)) {
                    this.invalidatedCursor = true;
                    return this.tryCreateCursor(sourceConfig, mongoClient, null);
                }
                if (MongoSourceTask.doesNotSupportsStartAfter(e)) {
                    this.supportsStartAfter = false;
                    return this.tryCreateCursor(sourceConfig, mongoClient, resumeToken);
                }
                if (sourceConfig.tolerateErrors() && StartedMongoSourceTask.changeStreamNotValid((MongoException)((Object)e))) {
                    return this.tryRecreateCursor((MongoException)((Object)e));
                }
            }
            if (e.getErrorCode() == 26) {
                MongoSourceTask.LOGGER.info("Namespace not found cursor closed.");
            } else {
                if (e.getErrorCode() == 20) {
                    MongoSourceTask.LOGGER.error("Illegal $changeStream operation: {} {}\n\n=====================================================================================\n{}\n\nPlease Note: Not all aggregation pipeline operations are suitable for modifying the\nchange stream output. For more information, please see the official documentation:\n   https://docs.mongodb.com/manual/changeStreams/\n=====================================================================================\n", new Object[]{e.getErrorMessage(), e.getErrorCode(), e.getErrorMessage()});
                    throw new ConnectException("Illegal $changeStream operation", (Throwable)e);
                }
                if (e.getErrorCode() == 40415) {
                    String msg = String.format("Invalid operation: %s %s. It is likely that you are trying to use functionality unsupported by your version of MongoDB.", e.getErrorMessage(), e.getErrorCode());
                    MongoSourceTask.LOGGER.error(msg);
                    throw new ConnectException(msg, (Throwable)e);
                }
                MongoSourceTask.LOGGER.warn("Failed to resume change stream: {} {}\n\n=====================================================================================\nIf the resume token is no longer available then there is the potential for data loss.\nSaved resume tokens are managed by Kafka and stored with the offset data.\n\nTo restart the change stream with no resume token either: \n  * Create a new partition name using the `offset.partition.name` configuration.\n  * Set `errors.tolerance=all` and ignore the erroring resume token. \n  * Manually remove the old offset from its configured storage.\n\nResetting the offset will allow for the connector to be resume from the latest resume\ntoken. Using `startup.mode = copy_existing` ensures that all data will be outputted by the\nconnector but it will duplicate existing data.\n=====================================================================================\n", (Object)e.getErrorMessage(), (Object)e.getErrorCode());
                if (StartedMongoSourceTask.changeStreamNotValid((MongoException)((Object)e))) {
                    throw new ConnectException("ResumeToken not found. Cannot create a change stream cursor", (Throwable)e);
                }
            }
            return null;
        }
    }

    private static boolean invalidatedResumeToken(MongoCommandException e) {
        return e.getErrorCode() == 260;
    }

    private static boolean changeStreamNotValid(MongoException e) {
        if (INVALID_CHANGE_STREAM_ERRORS.contains(e.getCode())) {
            return true;
        }
        String errorMessage = e instanceof MongoCommandException ? ((MongoCommandException)((Object)e)).getErrorMessage().toLowerCase(Locale.ROOT) : e.getMessage().toLowerCase(Locale.ROOT);
        return !(!errorMessage.contains(RESUME_TOKEN) && !errorMessage.contains(RESUME_POINT) || !errorMessage.contains(NOT_FOUND) && !errorMessage.contains(DOES_NOT_EXIST) && !errorMessage.contains(INVALID_RESUME_TOKEN) && !errorMessage.contains(NO_LONGER_IN_THE_OPLOG));
    }

    private void setCachedResultAndResumeToken() {
        MongoChangeStreamCursor changeStreamCursor;
        try {
            changeStreamCursor = StartedMongoSourceTask.getChangeStreamIterable(this.sourceConfig, this.mongoClient).cursor();
        }
        catch (MongoCommandException e) {
            if (e.getErrorCode() == 26) {
                return;
            }
            throw new ConnectException((Throwable)e);
        }
        ChangeStreamDocument firstResult = (ChangeStreamDocument)changeStreamCursor.tryNext();
        if (firstResult != null) {
            this.cachedResult = new BsonDocumentWrapper((Object)firstResult, (Encoder)ChangeStreamDocument.createCodec(Document.class, (CodecRegistry)MongoClientSettings.getDefaultCodecRegistry()));
        }
        this.cachedResumeToken = firstResult != null ? firstResult.getResumeToken() : changeStreamCursor.getResumeToken();
        changeStreamCursor.close();
    }

    private List<BsonDocument> getNextBatch() {
        ArrayList<BsonDocument> batch;
        block15: {
            batch = new ArrayList<BsonDocument>();
            long maxBatchSize = this.sourceConfig.getInt("poll.max.batch.size").intValue();
            if (this.isCopying) {
                Assertions.assertNotNull(this.copyDataManager);
                if (this.copyDataManager.isCopying()) {
                    Optional<BsonDocument> result;
                    do {
                        result = this.copyDataManager.poll();
                        result.ifPresent(batch::add);
                    } while (result.isPresent() && (long)batch.size() < maxBatchSize);
                    return batch;
                }
                this.isCopying = false;
                MongoSourceTask.LOGGER.info("Finished copying existing data from the collection(s).");
                if (this.cachedResult != null) {
                    batch.add(this.cachedResult);
                    this.cachedResult = null;
                }
            }
            if (this.cursor == null) {
                this.initializeCursorAndHeartbeatManager();
            } else if (this.cursor.getServerCursor() == null) {
                MongoSourceTask.LOGGER.info("Cursor has been closed by the server - reinitializing");
                this.invalidateCursorAndReinitialize();
            }
            if (this.cursor == null) {
                MongoSourceTask.LOGGER.info("Unable to recreate the cursor");
                return batch;
            }
            try {
                BsonDocument next;
                do {
                    if ((next = (BsonDocument)this.cursor.tryNext()) == null) continue;
                    batch.add(next);
                } while (next != null && (long)batch.size() < maxBatchSize && this.cursor.available() > 0);
            }
            catch (MongoException e) {
                this.closeCursor();
                if (!this.isRunning) break block15;
                if (this.sourceConfig.tolerateErrors()) {
                    if (StartedMongoSourceTask.changeStreamNotValid(e)) {
                        this.cursor = this.tryRecreateCursor(e);
                    } else {
                        MongoSourceTask.LOGGER.error("An exception occurred when trying to get the next item from the Change Stream", (Throwable)e);
                    }
                }
                throw new ConnectException("An exception occurred when trying to get the next item from the Change Stream: " + e.getMessage(), (Throwable)e);
            }
            catch (Exception e) {
                this.closeCursor();
                if (!this.isRunning) break block15;
                throw new ConnectException("Unexpected error: " + e.getMessage(), (Throwable)e);
            }
        }
        return batch;
    }

    private void closeCursor() {
        if (this.cursor != null) {
            try {
                this.cursor.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
            this.cursor = null;
        }
    }

    private void invalidateCursorAndReinitialize() {
        this.invalidatedCursor = true;
        if (this.cursor != null) {
            this.cursor.close();
            this.cursor = null;
        }
        this.initializeCursorAndHeartbeatManager();
    }

    private static ChangeStreamIterable<Document> getChangeStreamIterable(MongoSourceConfig sourceConfig, MongoClient mongoClient) {
        ChangeStreamIterable changeStream;
        String database = sourceConfig.getString("database");
        String collection = sourceConfig.getString("collection");
        Optional<List<Document>> pipeline = sourceConfig.getPipeline();
        if (database.isEmpty()) {
            MongoSourceTask.LOGGER.info("Watching all changes on the cluster");
            changeStream = pipeline.map(arg_0 -> ((MongoClient)mongoClient).watch(arg_0)).orElse(mongoClient.watch());
        } else if (collection.isEmpty()) {
            MongoSourceTask.LOGGER.info("Watching for database changes on '{}'", (Object)database);
            MongoDatabase db = mongoClient.getDatabase(database);
            changeStream = pipeline.map(arg_0 -> ((MongoDatabase)db).watch(arg_0)).orElse(db.watch());
        } else {
            MongoSourceTask.LOGGER.info("Watching for collection changes on '{}.{}'", (Object)database, (Object)collection);
            MongoCollection coll = mongoClient.getDatabase(database).getCollection(collection);
            changeStream = pipeline.map(arg_0 -> ((MongoCollection)coll).watch(arg_0)).orElse(coll.watch());
        }
        changeStream.maxAwaitTime(sourceConfig.getLong("poll.await.time.ms").longValue(), TimeUnit.MILLISECONDS);
        int batchSize = sourceConfig.getInt("batch.size");
        if (batchSize > 0) {
            changeStream.batchSize(batchSize);
        }
        changeStream.showExpandedEvents(sourceConfig.getShowExpandedEvents());
        sourceConfig.getFullDocumentBeforeChange().ifPresent(arg_0 -> ((ChangeStreamIterable)changeStream).fullDocumentBeforeChange(arg_0));
        sourceConfig.getFullDocument().ifPresent(arg_0 -> ((ChangeStreamIterable)changeStream).fullDocument(arg_0));
        sourceConfig.getCollation().ifPresent(arg_0 -> ((ChangeStreamIterable)changeStream).collation(arg_0));
        return changeStream;
    }

    private BsonDocument getResumeToken(MongoSourceConfig sourceConfig) {
        BsonDocument resumeToken = null;
        if (this.cachedResumeToken != null) {
            resumeToken = this.cachedResumeToken;
            this.cachedResumeToken = null;
        } else if (this.invalidatedCursor) {
            this.invalidatedCursor = false;
        } else {
            Map<String, Object> offset = MongoSourceTask.getOffset(this.sourceTaskContextAccessor.get(), sourceConfig);
            if (offset != null && offset.containsKey("_id") && !offset.containsKey("copy")) {
                resumeToken = BsonDocument.parse((String)((String)offset.get("_id")));
                if (offset.containsKey("HEARTBEAT")) {
                    MongoSourceTask.LOGGER.info("Resume token from heartbeat: {}", (Object)resumeToken);
                }
            }
        }
        return resumeToken;
    }

    void commitRecord(SourceRecord ignored, RecordMetadata metadata) {
        if (metadata == null) {
            this.statisticsManager.currentStatistics().getRecordsFiltered().sample(1L);
        } else {
            this.statisticsManager.currentStatistics().getRecordsAcknowledged().sample(1L);
        }
    }
}

