/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.sql.impl.processors;

import com.hazelcast.cluster.Address;
import com.hazelcast.jet.core.Inbox;
import com.hazelcast.jet.core.Outbox;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.jet.sql.impl.JetQueryResultProducer;
import com.hazelcast.jet.sql.impl.JetSqlCoreBackendImpl;
import com.hazelcast.jet.sql.impl.SimpleExpressionEvalContext;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.sql.impl.QueryException;
import com.hazelcast.sql.impl.expression.Expression;
import com.hazelcast.sql.impl.expression.ExpressionEvalContext;
import com.hazelcast.sql.impl.row.EmptyRow;
import java.util.concurrent.CancellationException;
import javax.annotation.Nonnull;

public final class RootResultConsumerSink
implements Processor {
    private final Expression<?> limitExpression;
    private final Expression<?> offsetExpression;
    private JetQueryResultProducer rootResultConsumer;

    private RootResultConsumerSink(Expression<?> limitExpression, Expression<?> offsetExpression) {
        this.limitExpression = limitExpression;
        this.offsetExpression = offsetExpression;
    }

    @Override
    public void init(@Nonnull Outbox outbox, @Nonnull Processor.Context context) {
        NodeEngineImpl nodeEngine = Util.getNodeEngine(context.hazelcastInstance());
        JetSqlCoreBackendImpl jetSqlCoreBackend = (JetSqlCoreBackendImpl)nodeEngine.getService("hz:impl:jetSqlCoreBackend");
        this.rootResultConsumer = jetSqlCoreBackend.getResultConsumerRegistry().remove(context.jobId());
        assert (this.rootResultConsumer != null);
        SimpleExpressionEvalContext evalContext = SimpleExpressionEvalContext.from(context);
        Number limit = RootResultConsumerSink.evaluate(this.limitExpression, evalContext);
        if (limit == null) {
            throw QueryException.error("LIMIT value cannot be null");
        }
        if (limit.longValue() < 0L) {
            throw QueryException.error("LIMIT value cannot be negative: " + limit);
        }
        Number offset = RootResultConsumerSink.evaluate(this.offsetExpression, evalContext);
        if (offset == null) {
            throw QueryException.error("OFFSET value cannot be null");
        }
        if (offset.longValue() < 0L) {
            throw QueryException.error("OFFSET value cannot be negative: " + offset);
        }
        this.rootResultConsumer.init(limit.longValue(), offset.longValue());
    }

    private static Number evaluate(Expression<?> expression, ExpressionEvalContext evalContext) {
        return (Number)expression.eval(EmptyRow.INSTANCE, evalContext);
    }

    @Override
    public boolean tryProcess() {
        try {
            this.rootResultConsumer.ensureNotDone();
        }
        catch (QueryException e) {
            if (e.getCode() == 1003) {
                throw new CancellationException();
            }
            throw e;
        }
        return true;
    }

    @Override
    public void process(int ordinal, @Nonnull Inbox inbox) {
        try {
            this.rootResultConsumer.consume(inbox);
        }
        catch (QueryException e) {
            if (e.getCode() == 1003) {
                throw new CancellationException();
            }
            throw e;
        }
    }

    @Override
    public boolean complete() {
        this.rootResultConsumer.done();
        return true;
    }

    @Override
    public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
        return true;
    }

    public static ProcessorMetaSupplier rootResultConsumerSink(Address initiatorAddress, Expression<?> limitExpression, Expression<?> offsetExpression) {
        ProcessorSupplier pSupplier = ProcessorSupplier.of(() -> new RootResultConsumerSink(limitExpression, offsetExpression));
        return ProcessorMetaSupplier.forceTotalParallelismOne(pSupplier, initiatorAddress);
    }
}

