/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.sql.impl.connector.map;

import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.internal.serialization.SerializationService;
import com.hazelcast.internal.serialization.SerializationServiceAware;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.impl.processor.AsyncTransformUsingServiceBatchedP;
import com.hazelcast.jet.pipeline.ServiceFactories;
import com.hazelcast.jet.sql.impl.SimpleExpressionEvalContext;
import com.hazelcast.jet.sql.impl.connector.keyvalue.KvRowProjector;
import com.hazelcast.jet.sql.impl.connector.map.ValueProjector;
import com.hazelcast.map.EntryProcessor;
import com.hazelcast.map.IMap;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.DataSerializable;
import com.hazelcast.query.impl.getters.Extractors;
import com.hazelcast.sql.impl.QueryException;
import com.hazelcast.sql.impl.expression.ExpressionEvalContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;

final class UpdateProcessorSupplier
implements ProcessorSupplier,
DataSerializable {
    private static final int MAX_CONCURRENT_OPS = 8;
    private static final int MAX_BATCH_SIZE = 1024;
    private String mapName;
    private KvRowProjector.Supplier rowProjectorSupplier;
    private ValueProjector.Supplier valueProjectorSupplier;
    private transient ExpressionEvalContext evalContext;

    private UpdateProcessorSupplier() {
    }

    UpdateProcessorSupplier(String mapName, KvRowProjector.Supplier rowProjectorSupplier, ValueProjector.Supplier valueProjectorSupplier) {
        this.mapName = mapName;
        this.rowProjectorSupplier = rowProjectorSupplier;
        this.valueProjectorSupplier = valueProjectorSupplier;
    }

    @Override
    public void init(@Nonnull ProcessorSupplier.Context context) {
        this.evalContext = SimpleExpressionEvalContext.from(context);
    }

    @Override
    @Nonnull
    public Collection<? extends Processor> get(int count) {
        ArrayList processors = new ArrayList(count);
        for (int i = 0; i < count; ++i) {
            String mapName = this.mapName;
            AsyncTransformUsingServiceBatchedP processor = new AsyncTransformUsingServiceBatchedP(ServiceFactories.nonSharedService(ctx -> ctx.hazelcastInstance().getMap(mapName)), null, 8, 1024, (map, rows) -> this.update((List<Object[]>)rows, (IMap<Object, Object>)map));
            processors.add(processor);
        }
        return processors;
    }

    private CompletableFuture<Traverser<Integer>> update(List<Object[]> rows, IMap<Object, Object> map) {
        HashSet<Object> keys = new HashSet<Object>();
        for (Object[] row : rows) {
            assert (row.length == 1);
            keys.add(row[0]);
        }
        return map.submitToKeys(keys, new ValueUpdater(this.rowProjectorSupplier, this.valueProjectorSupplier, this.evalContext.getArguments())).toCompletableFuture().thenApply(m -> Traversers.empty());
    }

    @Override
    public void writeData(ObjectDataOutput out) throws IOException {
        out.writeString(this.mapName);
        out.writeObject(this.rowProjectorSupplier);
        out.writeObject(this.valueProjectorSupplier);
    }

    @Override
    public void readData(ObjectDataInput in) throws IOException {
        this.mapName = in.readString();
        this.rowProjectorSupplier = (KvRowProjector.Supplier)in.readObject();
        this.valueProjectorSupplier = (ValueProjector.Supplier)in.readObject();
    }

    private static final class ValueUpdater
    implements EntryProcessor<Object, Object, Object>,
    SerializationServiceAware,
    DataSerializable {
        private KvRowProjector.Supplier rowProjector;
        private ValueProjector.Supplier valueProjector;
        private List<Object> arguments;
        private transient ExpressionEvalContext evalContext;
        private transient Extractors extractors;

        private ValueUpdater() {
        }

        private ValueUpdater(KvRowProjector.Supplier rowProjector, ValueProjector.Supplier valueProjector, List<Object> arguments) {
            this.rowProjector = rowProjector;
            this.valueProjector = valueProjector;
            this.arguments = arguments;
        }

        @Override
        public Object process(Map.Entry<Object, Object> entry) {
            Object[] row = this.rowProjector.get(this.evalContext, this.extractors).project(entry.getKey(), entry.getValue());
            Object value = this.valueProjector.get(this.evalContext).project(row);
            if (value == null) {
                throw QueryException.error("Cannot assign null to value");
            }
            entry.setValue(value);
            return 1;
        }

        @Override
        public void setSerializationService(SerializationService serializationService) {
            this.evalContext = new SimpleExpressionEvalContext(this.arguments, (InternalSerializationService)serializationService);
            this.extractors = Extractors.newBuilder(this.evalContext.getSerializationService()).build();
        }

        @Override
        public void writeData(ObjectDataOutput out) throws IOException {
            out.writeObject(this.rowProjector);
            out.writeObject(this.valueProjector);
            out.writeObject(this.arguments);
        }

        @Override
        public void readData(ObjectDataInput in) throws IOException {
            this.rowProjector = (KvRowProjector.Supplier)in.readObject();
            this.valueProjector = (ValueProjector.Supplier)in.readObject();
            this.arguments = (List)in.readObject();
        }
    }
}

