/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.mongodb.impl;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.internal.nio.IOUtil;
import com.hazelcast.internal.util.Preconditions;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.BroadcastKey;
import com.hazelcast.jet.core.EventTimeMapper;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.mongodb.impl.CursorTraverser;
import com.hazelcast.jet.mongodb.impl.MongoConnection;
import com.hazelcast.jet.mongodb.impl.MongoUtilities;
import com.hazelcast.jet.mongodb.impl.ReadMongoParams;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.mongodb.MongoException;
import com.mongodb.MongoServerException;
import com.mongodb.MongoSocketException;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.MongoIterable;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Sorts;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.bson.BsonDateTime;
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.conversions.Bson;

public class ReadMongoP<I>
extends AbstractProcessor {
    private static final int BATCH_SIZE = 1000;
    private final boolean checkExistenceOnEachConnect;
    private ILogger logger;
    private int totalParallelism;
    private int processorIndex;
    private boolean snapshotsEnabled;
    private boolean snapshotInProgress;
    private final MongoChunkedReader reader;
    private final MongoConnection connection;
    private final boolean nonDistributed;
    private Traverser<?> traverser;
    private Traverser<Map.Entry<BroadcastKey<Integer>, Object>> snapshotTraverser;
    private boolean canParallelize;

    public ReadMongoP(ReadMongoParams<I> params) {
        if (params.isStream()) {
            EventTimeMapper eventTimeMapper = new EventTimeMapper(params.eventTimePolicy);
            eventTimeMapper.addPartitions(1);
            this.reader = new StreamMongoReader(params.databaseName, params.collectionName, params.mapStreamFn, params.getStartAtTimestamp(), params.getAggregates(), eventTimeMapper);
        } else {
            this.reader = new BatchMongoReader(params.databaseName, params.collectionName, params.mapItemFn, params.getAggregates());
        }
        this.connection = new MongoConnection(params.clientSupplier, params.dataConnectionRef, client -> this.reader.connect((MongoClient)client, this.snapshotsEnabled));
        this.nonDistributed = params.isNonDistributed();
        this.checkExistenceOnEachConnect = params.isCheckExistenceOnEachConnect();
    }

    protected void init(@Nonnull Processor.Context context) {
        this.logger = context.logger();
        this.totalParallelism = context.totalParallelism();
        this.canParallelize = !this.nonDistributed && this.totalParallelism > 1;
        this.processorIndex = context.globalProcessorIndex();
        this.snapshotsEnabled = context.snapshottingEnabled();
        NodeEngineImpl nodeEngine = com.hazelcast.jet.impl.util.Util.getNodeEngine((HazelcastInstance)context.hazelcastInstance());
        this.connection.assembleSupplier(nodeEngine);
        try {
            this.connection.reconnectIfNecessary();
        }
        catch (MongoException e) {
            throw new JetException((Throwable)e);
        }
    }

    public boolean isCooperative() {
        return false;
    }

    public boolean complete() {
        if (!this.connection.reconnectIfNecessary()) {
            return false;
        }
        if (this.traverser == null) {
            this.traverser = this.reader.nextChunkTraverser().onFirstNull(() -> {
                this.traverser = null;
            });
        }
        if (!this.emitFromTraverser(this.traverser)) {
            return false;
        }
        if (this.snapshotInProgress) {
            return false;
        }
        return this.reader.everCompletes();
    }

    public void close() {
        IOUtil.closeResource((AutoCloseable)this.reader);
        IOUtil.closeResource((AutoCloseable)this.connection);
    }

    public boolean saveToSnapshot() {
        if (!this.snapshotsEnabled) {
            return true;
        }
        if (this.traverser != null && !this.emitFromTraverser(this.traverser)) {
            return false;
        }
        this.snapshotInProgress = true;
        if (this.snapshotTraverser == null) {
            int partition = this.processorIndex % this.totalParallelism;
            Object snapshot = this.reader.snapshot();
            if (snapshot == null) {
                return true;
            }
            this.snapshotTraverser = Traversers.singleton((Object)Util.entry((Object)BroadcastKey.broadcastKey((Object)partition), (Object)snapshot)).onFirstNull(() -> {
                this.snapshotTraverser = null;
                this.getLogger().finest("Finished saving snapshot.");
            });
            if (this.reader.supportsWatermarks()) {
                Object watermark = this.reader.watermark();
                this.snapshotTraverser = this.snapshotTraverser.append((Object)Util.entry((Object)BroadcastKey.broadcastKey((Object)(-partition)), (Object)watermark));
            }
        }
        return this.emitFromTraverserToSnapshot(this.snapshotTraverser);
    }

    public boolean snapshotCommitFinish(boolean success) {
        if (this.logger.isFineEnabled()) {
            this.logger.fine("Snapshot commit finished");
        }
        this.snapshotInProgress = false;
        return true;
    }

    protected void restoreFromSnapshot(@Nonnull Object key, @Nonnull Object value) {
        boolean forThisProcessor;
        int keyInteger = (Integer)((BroadcastKey)key).key();
        boolean wm = keyInteger < 0;
        int keyAb = Math.abs(keyInteger);
        boolean bl = forThisProcessor = keyAb % this.totalParallelism == this.processorIndex;
        if (forThisProcessor) {
            if (!wm) {
                this.reader.restore(value);
                this.reader.connect(this.connection.client(), true);
            } else if (this.reader.supportsWatermarks()) {
                this.reader.restoreWatermark((Long)value);
            }
        }
    }

    private final class StreamMongoReader
    extends MongoChunkedReader {
        private final BiFunctionEx<ChangeStreamDocument<Document>, Long, I> mapFn;
        private final BsonTimestamp startTimestamp;
        private final ReadMongoParams.Aggregates aggregates;
        private final EventTimeMapper<I> eventTimeMapper;
        private MongoCursor<ChangeStreamDocument<Document>> cursor;
        private BsonDocument resumeToken;

        private StreamMongoReader(String databaseName, String collectionName, BiFunctionEx<ChangeStreamDocument<Document>, Long, I> mapFn, BsonTimestamp startTimestamp, ReadMongoParams.Aggregates aggregates, EventTimeMapper<I> eventTimeMapper) {
            super(databaseName, collectionName);
            this.mapFn = mapFn;
            this.startTimestamp = startTimestamp;
            this.aggregates = aggregates;
            this.eventTimeMapper = eventTimeMapper;
        }

        @Override
        public void onConnect(MongoClient mongoClient, boolean snapshotsEnabled) {
            ArrayList<Document> aggregateList = new ArrayList<Document>(this.aggregates.nonNulls());
            if (ReadMongoP.this.canParallelize) {
                aggregateList.addAll(this.aggregates.indexAfterFilter(), MongoUtilities.partitionAggregate(ReadMongoP.this.totalParallelism, ReadMongoP.this.processorIndex, true));
            }
            ChangeStreamIterable changeStream = this.collection != null ? this.collection.watch(aggregateList) : (this.database != null ? this.database.watch(aggregateList) : mongoClient.watch(aggregateList));
            if (this.resumeToken != null) {
                changeStream.resumeAfter(this.resumeToken);
            } else if (this.startTimestamp != null) {
                changeStream.startAtOperationTime(this.startTimestamp);
            }
            this.cursor = changeStream.batchSize(1000).fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
        }

        @Override
        boolean everCompletes() {
            return false;
        }

        @Override
        @Nonnull
        public Traverser<?> nextChunkTraverser() {
            try {
                MongoCursor<ChangeStreamDocument<Document>> localCursor = this.cursor;
                Preconditions.checkNotNull(localCursor, (String)"unable to connect to Mongo");
                return new CursorTraverser(localCursor).flatMap(input -> {
                    if (input instanceof CursorTraverser.EmptyItem) {
                        return this.eventTimeMapper.flatMapIdle();
                    }
                    ChangeStreamDocument doc = (ChangeStreamDocument)input;
                    this.resumeToken = doc.getResumeToken();
                    long eventTime = this.clusterTime((ChangeStreamDocument<Document>)doc);
                    Object item = this.mapFn.apply((Object)doc, (Object)eventTime);
                    return this.eventTimeMapper.flatMapEvent(item, 0, eventTime);
                });
            }
            catch (MongoException e) {
                throw new JetException("error while reading from mongodb", (Throwable)e);
            }
        }

        private long clusterTime(ChangeStreamDocument<Document> changeStreamDocument) {
            BsonDateTime time = changeStreamDocument.getWallTime();
            return time == null ? System.currentTimeMillis() : time.getValue();
        }

        @Override
        @Nullable
        public Object snapshot() {
            return this.resumeToken;
        }

        @Override
        @Nonnull
        public Object watermark() {
            return this.eventTimeMapper.getWatermark(0);
        }

        @Override
        public void restore(Object value) {
            if (value != null && value instanceof BsonDocument) {
                this.resumeToken = (BsonDocument)value;
            }
        }

        @Override
        public void restoreWatermark(Long value) {
            this.eventTimeMapper.restoreWatermark(0, value.longValue());
        }

        @Override
        public void close() {
            if (this.cursor != null) {
                this.cursor.close();
                this.cursor = null;
            }
        }
    }

    private abstract class MongoChunkedReader
    implements Closeable {
        protected MongoDatabase database;
        protected MongoCollection<Document> collection;
        private final String databaseName;
        private final String collectionName;

        protected MongoChunkedReader(String databaseName, String collectionName) {
            this.databaseName = databaseName;
            this.collectionName = collectionName;
        }

        void onConnect(MongoClient mongoClient, boolean snapshotsEnabled) {
        }

        void connect(MongoClient newClient, boolean snapshotsEnabled) {
            try {
                ReadMongoP.this.logger.fine("(Re)connecting to MongoDB");
                if (this.databaseName != null) {
                    this.database = newClient.getDatabase(this.databaseName);
                    if (ReadMongoP.this.checkExistenceOnEachConnect) {
                        MongoUtilities.checkDatabaseExists(newClient, this.databaseName);
                    }
                }
                if (this.collectionName != null) {
                    Preconditions.checkState((this.databaseName != null ? 1 : 0) != 0, (String)"you have to provide database name if collection name is specified");
                    Preconditions.checkState((this.database != null ? 1 : 0) != 0, (String)("database " + this.databaseName + " does not exists"));
                    if (ReadMongoP.this.checkExistenceOnEachConnect) {
                        MongoUtilities.checkCollectionExists(this.database, this.collectionName);
                    }
                    this.collection = this.database.getCollection(this.collectionName);
                }
                this.onConnect(newClient, snapshotsEnabled);
            }
            catch (MongoServerException | MongoSocketException e) {
                ReadMongoP.this.logger.warning("Could not connect to MongoDB", e);
            }
        }

        @Nonnull
        abstract Traverser<?> nextChunkTraverser();

        @Nullable
        abstract Object snapshot();

        abstract void restore(Object var1);

        abstract boolean everCompletes();

        boolean supportsWatermarks() {
            return !this.everCompletes();
        }

        public abstract void restoreWatermark(Long var1);

        public abstract Object watermark();
    }

    private final class BatchMongoReader
    extends MongoChunkedReader {
        private final FunctionEx<Document, I> mapItemFn;
        private final ReadMongoParams.Aggregates aggregates;
        private Traverser<Document> delegate;
        private Object lastKey;

        private BatchMongoReader(String databaseName, String collectionName, FunctionEx<Document, I> mapItemFn, ReadMongoParams.Aggregates aggregates) {
            super(databaseName, collectionName);
            this.mapItemFn = mapItemFn;
            this.aggregates = aggregates;
        }

        @Override
        void onConnect(MongoClient mongoClient, boolean supportsSnapshots) {
            ArrayList<Document> aggregateList = new ArrayList<Document>(this.aggregates.nonNulls());
            if (supportsSnapshots && !this.hasSorts(aggregateList)) {
                aggregateList.add((Document)Aggregates.sort((Bson)Sorts.ascending((String[])new String[]{"_id"})).toBsonDocument());
            }
            if (supportsSnapshots && this.lastKey != null) {
                aggregateList.add((Document)Aggregates.match((Bson)Filters.gt((String)"_id", (Object)this.lastKey)).toBsonDocument());
            }
            if (ReadMongoP.this.canParallelize) {
                aggregateList.addAll(this.aggregates.indexAfterFilter(), MongoUtilities.partitionAggregate(ReadMongoP.this.totalParallelism, ReadMongoP.this.processorIndex, false));
            }
            if (this.collection != null) {
                this.delegate = this.delegateForCollection((MongoCollection<Document>)this.collection, aggregateList);
            } else if (this.database != null) {
                this.delegate = this.delegateForDb(this.database, aggregateList);
            } else {
                MongoClient clientLocal = mongoClient;
                this.delegate = Traversers.traverseIterable((Iterable)mongoClient.listDatabaseNames()).flatMap(name -> {
                    MongoDatabase db = clientLocal.getDatabase(name);
                    return this.delegateForDb(db, aggregateList);
                });
            }
            Preconditions.checkNotNull(this.delegate, (String)"unable to connect to Mongo");
        }

        private boolean hasSorts(List<Bson> aggregateList) {
            return aggregateList.stream().anyMatch(agg -> agg.toBsonDocument().get((Object)"$sort") != null);
        }

        private Traverser<Document> delegateForCollection(MongoCollection<Document> collection, List<Bson> aggregateList) {
            return Traversers.traverseIterable((Iterable)collection.aggregate(aggregateList).batchSize(1000));
        }

        private Traverser<Document> delegateForDb(MongoDatabase database, List<Bson> aggregateList) {
            MongoIterable collectionsIterable = database.listCollectionNames();
            return Traversers.traverseIterable((Iterable)collectionsIterable).flatMap(colName -> this.delegateForCollection((MongoCollection<Document>)database.getCollection(colName), aggregateList));
        }

        @Nonnull
        public Traverser<I> nextChunkTraverser() {
            Traverser<Document> localDelegate = this.delegate;
            Preconditions.checkNotNull(localDelegate, (String)"unable to connect to Mongo");
            return localDelegate.map(item -> {
                this.lastKey = item.get((Object)"_id");
                return this.mapItemFn.apply(item);
            });
        }

        @Override
        boolean everCompletes() {
            return true;
        }

        @Override
        public void restoreWatermark(Long value) {
            throw new UnsupportedOperationException("watermarks are only in streaming case");
        }

        @Override
        public Object watermark() {
            throw new UnsupportedOperationException("watermarks are only in streaming case");
        }

        @Override
        @Nonnull
        public Object snapshot() {
            return this.lastKey;
        }

        @Override
        public void restore(Object value) {
            this.lastKey = value;
        }

        @Override
        public void close() {
        }
    }
}

