/*
 * Decompiled with CFR 0.152.
 */
package org.hawkular.rx.cassandra.driver;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import java.util.concurrent.atomic.AtomicLong;
import rx.Observable;
import rx.Producer;
import rx.Scheduler;
import rx.Subscriber;
import rx.internal.operators.BackpressureUtils;
import rx.schedulers.Schedulers;

public class ResultSetToRowsTransformer
implements Observable.Transformer<ResultSet, Row> {
    private final Scheduler scheduler;

    public ResultSetToRowsTransformer() {
        this(Schedulers.computation());
    }

    public ResultSetToRowsTransformer(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    @Override
    public Observable<Row> call(Observable<ResultSet> resultSetObservable) {
        return resultSetObservable.flatMap(resultSet -> Observable.create(subscriber -> subscriber.setProducer(new RowProducer((ResultSet)resultSet, (Subscriber<? super Row>)subscriber, this.scheduler))));
    }

    private static class RowProducer
    implements Producer {
        final ResultSet resultSet;
        final Subscriber<? super Row> subscriber;
        final Scheduler scheduler;
        final AtomicLong requested = new AtomicLong();

        RowProducer(ResultSet resultSet, Subscriber<? super Row> subscriber, Scheduler scheduler) {
            this.resultSet = resultSet;
            this.subscriber = subscriber;
            this.scheduler = scheduler;
        }

        @Override
        public void request(long n) {
            if (n < 0L) {
                throw new IllegalArgumentException();
            }
            if (n == 0L) {
                return;
            }
            if (BackpressureUtils.getAndAddRequest(this.requested, n) != 0L) {
                return;
            }
            this.execute(this::produce);
        }

        void produce() {
            long r = this.requested.get();
            long a = this.resultSet.getAvailableWithoutFetching();
            do {
                long e = Math.min(r, a);
                for (long i = 0L; i < e; ++i) {
                    this.subscriber.onNext(this.resultSet.one());
                    if (!this.subscriber.isUnsubscribed()) continue;
                    return;
                }
                r = this.requested.addAndGet(-e);
                if (r != 0L) continue;
                return;
            } while ((a = (long)this.resultSet.getAvailableWithoutFetching()) != 0L);
            if (this.resultSet.isFullyFetched()) {
                this.subscriber.onCompleted();
                return;
            }
            Futures.addCallback(this.resultSet.fetchMoreResults(), new FutureCallback<ResultSet>(){

                @Override
                public void onSuccess(ResultSet result) {
                    if (!subscriber.isUnsubscribed()) {
                        this.produce();
                    }
                }

                @Override
                public void onFailure(Throwable t) {
                    if (!subscriber.isUnsubscribed()) {
                        subscriber.onError(t);
                    }
                }
            }, this::execute);
        }

        void execute(Runnable command) {
            Scheduler.Worker worker = this.scheduler.createWorker();
            worker.schedule(() -> {
                try {
                    command.run();
                }
                finally {
                    worker.unsubscribe();
                }
            });
        }
    }
}

