/*
 * Decompiled with CFR 0.152.
 */
package io.reactiverse.pgclient.impl;

import io.reactiverse.pgclient.PgCursor;
import io.reactiverse.pgclient.PgRowSet;
import io.reactiverse.pgclient.PgStream;
import io.reactiverse.pgclient.Row;
import io.reactiverse.pgclient.Tuple;
import io.reactiverse.pgclient.impl.PgPreparedQueryImpl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import java.util.Iterator;

public class PgStreamImpl
implements PgStream<Row>,
Handler<AsyncResult<PgRowSet>> {
    private final PgPreparedQueryImpl ps;
    private final int fetch;
    private final Tuple params;
    private Handler<Void> endHandler;
    private Handler<Row> rowHandler;
    private Handler<Throwable> exceptionHandler;
    private long demand;
    private boolean emitting;
    private PgCursor cursor;
    private Iterator<Row> result;

    PgStreamImpl(PgPreparedQueryImpl ps, int fetch, Tuple params) {
        this.ps = ps;
        this.fetch = fetch;
        this.params = params;
        this.demand = Long.MAX_VALUE;
    }

    @Override
    public synchronized PgStream<Row> exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public PgStream<Row> handler(Handler<Row> handler) {
        PgCursor c;
        PgStreamImpl pgStreamImpl = this;
        synchronized (pgStreamImpl) {
            if (handler != null) {
                if (this.cursor != null) {
                    throw new UnsupportedOperationException("Handle me gracefully");
                }
            } else {
                if (this.cursor != null) {
                    this.cursor = null;
                } else {
                    this.rowHandler = null;
                }
                return this;
            }
            this.rowHandler = handler;
            c = this.cursor = this.ps.cursor(this.params);
        }
        c.read(this.fetch, this);
        return this;
    }

    @Override
    public synchronized PgStream<Row> pause() {
        this.demand = 0L;
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public PgStream<Row> fetch(long amount) {
        if (amount < 0L) {
            throw new IllegalArgumentException("Invalid fetch amount " + amount);
        }
        PgStreamImpl pgStreamImpl = this;
        synchronized (pgStreamImpl) {
            this.demand += amount;
            if (this.demand < 0L) {
                this.demand = Long.MAX_VALUE;
            }
            if (this.cursor == null) {
                return this;
            }
        }
        this.checkPending();
        return this;
    }

    @Override
    public PgStream<Row> resume() {
        return this.fetch(Long.MAX_VALUE);
    }

    @Override
    public synchronized PgStream<Row> endHandler(Handler<Void> handler) {
        this.endHandler = handler;
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handle(AsyncResult<PgRowSet> ar) {
        if (ar.failed()) {
            Handler<Throwable> handler;
            PgStreamImpl pgStreamImpl = this;
            synchronized (pgStreamImpl) {
                this.cursor = null;
                handler = this.exceptionHandler;
            }
            if (handler != null) {
                handler.handle((Object)ar.cause());
            }
        } else {
            this.result = ((PgRowSet)ar.result()).iterator();
            this.checkPending();
        }
    }

    @Override
    public void close() {
        this.close((Handler<AsyncResult<Void>>)((Handler)ar -> {}));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close(Handler<AsyncResult<Void>> completionHandler) {
        PgCursor c;
        PgStreamImpl pgStreamImpl = this;
        synchronized (pgStreamImpl) {
            c = this.cursor;
            if (c == null) {
                return;
            }
            this.cursor = null;
        }
        c.close(completionHandler);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkPending() {
        PgStreamImpl pgStreamImpl = this;
        synchronized (pgStreamImpl) {
            if (this.emitting) {
                return;
            }
            this.emitting = true;
        }
        while (true) {
            pgStreamImpl = this;
            synchronized (pgStreamImpl) {
                Row event;
                Object handler;
                if (this.demand == 0L || this.result == null) {
                    this.emitting = false;
                    break;
                }
                if (this.result.hasNext()) {
                    handler = this.rowHandler;
                    event = this.result.next();
                    if (this.demand != Long.MAX_VALUE) {
                        --this.demand;
                    }
                } else {
                    this.result = null;
                    this.emitting = false;
                    if (this.cursor.hasMore()) {
                        this.cursor.read(this.fetch, this);
                        break;
                    }
                    this.cursor = null;
                    handler = this.endHandler;
                    event = null;
                }
                if (handler != null) {
                    handler.handle((Object)event);
                }
            }
        }
    }
}

