/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.sql.impl;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.JobStateSnapshot;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.impl.AbstractJetInstance;
import com.hazelcast.jet.impl.JetServiceBackend;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.jet.sql.impl.JetPlan;
import com.hazelcast.jet.sql.impl.JetQueryResultProducer;
import com.hazelcast.jet.sql.impl.JetSqlResultImpl;
import com.hazelcast.jet.sql.impl.JetStaticQueryResultProducer;
import com.hazelcast.jet.sql.impl.SimpleExpressionEvalContext;
import com.hazelcast.jet.sql.impl.parse.SqlShowStatement;
import com.hazelcast.jet.sql.impl.schema.MappingCatalog;
import com.hazelcast.map.impl.EntryRemovingProcessor;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.sql.SqlColumnMetadata;
import com.hazelcast.sql.SqlColumnType;
import com.hazelcast.sql.SqlResult;
import com.hazelcast.sql.SqlRowMetadata;
import com.hazelcast.sql.impl.ParameterConverter;
import com.hazelcast.sql.impl.QueryException;
import com.hazelcast.sql.impl.QueryId;
import com.hazelcast.sql.impl.QueryParameterMetadata;
import com.hazelcast.sql.impl.SqlResultImpl;
import com.hazelcast.sql.impl.expression.Expression;
import com.hazelcast.sql.impl.row.EmptyRow;
import com.hazelcast.sql.impl.row.HeapRow;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;

public class JetPlanExecutor {
    private final MappingCatalog catalog;
    private final HazelcastInstance hazelcastInstance;
    private final Map<Long, JetQueryResultProducer> resultConsumerRegistry;

    public JetPlanExecutor(MappingCatalog catalog, HazelcastInstance hazelcastInstance, Map<Long, JetQueryResultProducer> resultConsumerRegistry) {
        this.catalog = catalog;
        this.hazelcastInstance = hazelcastInstance;
        this.resultConsumerRegistry = resultConsumerRegistry;
    }

    SqlResult execute(JetPlan.CreateMappingPlan plan) {
        this.catalog.createMapping(plan.mapping(), plan.replace(), plan.ifNotExists());
        return SqlResultImpl.createUpdateCountResult(0L);
    }

    SqlResult execute(JetPlan.DropMappingPlan plan) {
        this.catalog.removeMapping(plan.name(), plan.ifExists());
        return SqlResultImpl.createUpdateCountResult(0L);
    }

    SqlResult execute(JetPlan.CreateJobPlan plan, List<Object> arguments) {
        List<Object> args = this.prepareArguments(plan.getParameterMetadata(), arguments);
        JobConfig jobConfig = plan.getJobConfig().setArgument("__sql.arguments", args);
        if (plan.isIfNotExists()) {
            this.hazelcastInstance.getJet().newJobIfAbsent(plan.getExecutionPlan().getDag(), jobConfig);
        } else {
            this.hazelcastInstance.getJet().newJob(plan.getExecutionPlan().getDag(), jobConfig);
        }
        return SqlResultImpl.createUpdateCountResult(0L);
    }

    SqlResult execute(JetPlan.AlterJobPlan plan) {
        Job job = this.hazelcastInstance.getJet().getJob(plan.getJobName());
        if (job == null) {
            throw QueryException.error("The job '" + plan.getJobName() + "' doesn't exist");
        }
        switch (plan.getOperation()) {
            case SUSPEND: {
                job.suspend();
                break;
            }
            case RESUME: {
                job.resume();
                break;
            }
            case RESTART: {
                job.restart();
                break;
            }
        }
        return SqlResultImpl.createUpdateCountResult(0L);
    }

    SqlResult execute(JetPlan.DropJobPlan plan) {
        boolean jobTerminated;
        Job job = this.hazelcastInstance.getJet().getJob(plan.getJobName());
        boolean bl = jobTerminated = job != null && job.getStatus().isTerminal();
        if (job == null || jobTerminated) {
            if (plan.isIfExists()) {
                return SqlResultImpl.createUpdateCountResult(0L);
            }
            if (jobTerminated) {
                throw QueryException.error("Job already terminated: " + plan.getJobName());
            }
            throw QueryException.error("Job doesn't exist: " + plan.getJobName());
        }
        if (plan.getWithSnapshotName() != null) {
            job.cancelAndExportSnapshot(plan.getWithSnapshotName());
        } else {
            job.cancel();
        }
        try {
            job.join();
        }
        catch (CancellationException cancellationException) {
            // empty catch block
        }
        return SqlResultImpl.createUpdateCountResult(0L);
    }

    SqlResult execute(JetPlan.CreateSnapshotPlan plan) {
        Job job = this.hazelcastInstance.getJet().getJob(plan.getJobName());
        if (job == null) {
            throw QueryException.error("The job '" + plan.getJobName() + "' doesn't exist");
        }
        job.exportSnapshot(plan.getSnapshotName());
        return SqlResultImpl.createUpdateCountResult(0L);
    }

    SqlResult execute(JetPlan.DropSnapshotPlan plan) {
        JobStateSnapshot snapshot = this.hazelcastInstance.getJet().getJobStateSnapshot(plan.getSnapshotName());
        if (snapshot == null) {
            if (plan.isIfExists()) {
                return SqlResultImpl.createUpdateCountResult(0L);
            }
            throw QueryException.error("The snapshot doesn't exist: " + plan.getSnapshotName());
        }
        snapshot.destroy();
        return SqlResultImpl.createUpdateCountResult(0L);
    }

    SqlResult execute(JetPlan.ShowStatementPlan plan) {
        Stream<Object> rows;
        SqlRowMetadata metadata = new SqlRowMetadata(Collections.singletonList(new SqlColumnMetadata("name", SqlColumnType.VARCHAR, false)));
        if (plan.getShowTarget() == SqlShowStatement.ShowStatementTarget.MAPPINGS) {
            rows = this.catalog.getMappingNames().stream();
        } else {
            assert (plan.getShowTarget() == SqlShowStatement.ShowStatementTarget.JOBS);
            NodeEngineImpl nodeEngine = Util.getNodeEngine(this.hazelcastInstance);
            JetServiceBackend jetServiceBackend = (JetServiceBackend)nodeEngine.getService("hz:impl:jetService");
            rows = jetServiceBackend.getJobRepository().getJobRecords().stream().map(record -> record.getConfig().getName()).filter(Objects::nonNull);
        }
        return new JetSqlResultImpl(QueryId.create(this.hazelcastInstance.getLocalEndpoint().getUuid()), new JetStaticQueryResultProducer(rows.sorted().map(name -> new HeapRow(new Object[]{name})).iterator()), metadata, false);
    }

    SqlResult execute(JetPlan.SelectPlan plan, QueryId queryId, List<Object> arguments, long timeout) {
        List<Object> args = this.prepareArguments(plan.getParameterMetadata(), arguments);
        JobConfig jobConfig = new JobConfig().setArgument("__sql.arguments", args).setTimeoutMillis(timeout);
        JetQueryResultProducer queryResultProducer = new JetQueryResultProducer();
        AbstractJetInstance jet = (AbstractJetInstance)this.hazelcastInstance.getJet();
        Long jobId = jet.newJobId();
        JetQueryResultProducer oldValue = this.resultConsumerRegistry.put(jobId, queryResultProducer);
        assert (oldValue == null) : oldValue;
        try {
            Job job = jet.newLightJob(jobId, plan.getDag(), jobConfig);
            job.getFuture().whenComplete((r, t) -> {
                if (t != null) {
                    int errorCode = JetPlanExecutor.findQueryExceptionCode(t);
                    queryResultProducer.onError(QueryException.error(errorCode, "The Jet SQL job failed: " + t.getMessage(), t));
                }
            });
        }
        catch (Throwable e) {
            this.resultConsumerRegistry.remove(jobId);
            throw e;
        }
        return new JetSqlResultImpl(queryId, queryResultProducer, plan.getRowMetadata(), plan.isStreaming());
    }

    SqlResult execute(JetPlan.DmlPlan plan, QueryId queryId, List<Object> arguments, long timeout) {
        List<Object> args = this.prepareArguments(plan.getParameterMetadata(), arguments);
        JobConfig jobConfig = new JobConfig().setArgument("__sql.arguments", args).setTimeoutMillis(timeout);
        Job job = this.hazelcastInstance.getJet().newLightJob(plan.getDag(), jobConfig);
        job.join();
        return SqlResultImpl.createUpdateCountResult(0L);
    }

    SqlResult execute(JetPlan.IMapDeletePlan plan, List<Object> arguments, long timeout) {
        List<Object> args = this.prepareArguments(plan.parameterMetadata(), arguments);
        String mapName = plan.mapName();
        Expression<?> keyCondition = plan.keyCondition();
        Object key = keyCondition.eval(EmptyRow.INSTANCE, new SimpleExpressionEvalContext(args, ((HazelcastInstanceImpl)this.hazelcastInstance).getSerializationService()));
        CompletableFuture<Void> future = this.hazelcastInstance.getMap(mapName).submitToKey(key, EntryRemovingProcessor.ENTRY_REMOVING_PROCESSOR).toCompletableFuture();
        try {
            if (timeout > 0L) {
                future.get(timeout, TimeUnit.MILLISECONDS);
            } else {
                future.get();
            }
        }
        catch (TimeoutException e) {
            future.cancel(true);
            throw QueryException.error("Timeout occurred while deleting an entry");
        }
        catch (InterruptedException | ExecutionException e) {
            throw QueryException.error(e.getMessage(), e);
        }
        return SqlResultImpl.createUpdateCountResult(0L);
    }

    private List<Object> prepareArguments(QueryParameterMetadata parameterMetadata, List<Object> arguments) {
        assert (arguments != null);
        int parameterCount = parameterMetadata.getParameterCount();
        if (parameterCount != arguments.size()) {
            throw QueryException.error(2000, "Unexpected parameter count: expected " + parameterCount + ", got " + arguments.size());
        }
        for (int i = 0; i < arguments.size(); ++i) {
            Object value = arguments.get(i);
            ParameterConverter parameterConverter = parameterMetadata.getParameterConverter(i);
            Object newValue = parameterConverter.convert(value);
            if (newValue == value) continue;
            arguments.set(i, newValue);
        }
        return arguments;
    }

    private static int findQueryExceptionCode(Throwable t) {
        while (t != null) {
            if (t instanceof QueryException) {
                return ((QueryException)t).getCode();
            }
            t = t.getCause();
        }
        return -1;
    }
}

