/*
 * Decompiled with CFR 0.152.
 */
package com.mongodb.reactivestreams.client.internal;

import com.mongodb.assertions.Assertions;
import com.mongodb.client.model.Collation;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.client.model.changestream.ChangeStreamLevel;
import com.mongodb.internal.operation.AsyncReadOperation;
import com.mongodb.internal.operation.ChangeStreamOperation;
import com.mongodb.lang.Nullable;
import com.mongodb.reactivestreams.client.ChangeStreamPublisher;
import com.mongodb.reactivestreams.client.ClientSession;
import com.mongodb.reactivestreams.client.internal.BatchCursorPublisher;
import com.mongodb.reactivestreams.client.internal.MongoOperationPublisher;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
import org.bson.codecs.Codec;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.conversions.Bson;
import org.reactivestreams.Publisher;

final class ChangeStreamPublisherImpl<T>
extends BatchCursorPublisher<ChangeStreamDocument<T>>
implements ChangeStreamPublisher<T> {
    private final List<? extends Bson> pipeline;
    private final Codec<ChangeStreamDocument<T>> codec;
    private final ChangeStreamLevel changeStreamLevel;
    private FullDocument fullDocument = FullDocument.DEFAULT;
    private BsonDocument resumeToken;
    private BsonDocument startAfter;
    private long maxAwaitTimeMS;
    private Collation collation;
    private BsonTimestamp startAtOperationTime;

    ChangeStreamPublisherImpl(@Nullable ClientSession clientSession, MongoOperationPublisher<?> mongoOperationPublisher, Class<T> innerResultClass, List<? extends Bson> pipeline, ChangeStreamLevel changeStreamLevel) {
        this(clientSession, mongoOperationPublisher, ChangeStreamDocument.createCodec((Class)((Class)Assertions.notNull((String)"innerResultClass", innerResultClass)), (CodecRegistry)mongoOperationPublisher.getCodecRegistry()), (List<? extends Bson>)((List)Assertions.notNull((String)"pipeline", pipeline)), (ChangeStreamLevel)Assertions.notNull((String)"changeStreamLevel", (Object)changeStreamLevel));
    }

    private ChangeStreamPublisherImpl(@Nullable ClientSession clientSession, MongoOperationPublisher<?> mongoOperationPublisher, Codec<ChangeStreamDocument<T>> codec, List<? extends Bson> pipeline, ChangeStreamLevel changeStreamLevel) {
        super(clientSession, mongoOperationPublisher.withDocumentClass(codec.getEncoderClass()));
        this.pipeline = pipeline;
        this.codec = codec;
        this.changeStreamLevel = changeStreamLevel;
    }

    @Override
    public ChangeStreamPublisher<T> fullDocument(FullDocument fullDocument) {
        this.fullDocument = (FullDocument)Assertions.notNull((String)"fullDocument", (Object)fullDocument);
        return this;
    }

    @Override
    public ChangeStreamPublisher<T> resumeAfter(BsonDocument resumeAfter) {
        this.resumeToken = (BsonDocument)Assertions.notNull((String)"resumeAfter", (Object)resumeAfter);
        return this;
    }

    @Override
    public ChangeStreamPublisher<T> batchSize(int batchSize) {
        super.batchSize(batchSize);
        return this;
    }

    @Override
    public ChangeStreamPublisher<T> maxAwaitTime(long maxAwaitTime, TimeUnit timeUnit) {
        Assertions.notNull((String)"timeUnit", (Object)((Object)timeUnit));
        this.maxAwaitTimeMS = TimeUnit.MILLISECONDS.convert(maxAwaitTime, timeUnit);
        return this;
    }

    @Override
    public ChangeStreamPublisher<T> collation(@Nullable Collation collation) {
        this.collation = (Collation)Assertions.notNull((String)"collation", (Object)collation);
        return this;
    }

    @Override
    public <TDocument> Publisher<TDocument> withDocumentClass(final Class<TDocument> clazz) {
        return new BatchCursorPublisher<TDocument>(this.getClientSession(), this.getMongoOperationPublisher().withDocumentClass(clazz)){

            @Override
            AsyncReadOperation<AsyncBatchCursor<TDocument>> asAsyncReadOperation() {
                return ChangeStreamPublisherImpl.this.createChangeStreamOperation(this.getMongoOperationPublisher().getCodecRegistry().get(clazz));
            }
        };
    }

    @Override
    public ChangeStreamPublisher<T> startAtOperationTime(BsonTimestamp startAtOperationTime) {
        this.startAtOperationTime = (BsonTimestamp)Assertions.notNull((String)"startAtOperationTime", (Object)startAtOperationTime);
        return this;
    }

    @Override
    public ChangeStreamPublisherImpl<T> startAfter(BsonDocument startAfter) {
        this.startAfter = (BsonDocument)Assertions.notNull((String)"startAfter", (Object)startAfter);
        return this;
    }

    @Override
    AsyncReadOperation<AsyncBatchCursor<ChangeStreamDocument<T>>> asAsyncReadOperation() {
        return this.createChangeStreamOperation(this.codec);
    }

    private <S> AsyncReadOperation<AsyncBatchCursor<S>> createChangeStreamOperation(Codec<S> codec) {
        return new ChangeStreamOperation(this.getNamespace(), this.fullDocument, this.createBsonDocumentList(this.pipeline), codec, this.changeStreamLevel).batchSize(this.getBatchSize()).collation(this.collation).maxAwaitTime(this.maxAwaitTimeMS, TimeUnit.MILLISECONDS).resumeAfter(this.resumeToken).startAtOperationTime(this.startAtOperationTime).startAfter(this.startAfter).retryReads(this.getRetryReads());
    }

    private List<BsonDocument> createBsonDocumentList(List<? extends Bson> pipeline) {
        ArrayList<BsonDocument> aggregateList = new ArrayList<BsonDocument>(pipeline.size());
        for (Bson bson : pipeline) {
            if (bson == null) {
                throw new IllegalArgumentException("pipeline can not contain a null value");
            }
            aggregateList.add(bson.toBsonDocument(BsonDocument.class, this.getCodecRegistry()));
        }
        return aggregateList;
    }
}

