/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.sql.impl.connector.map;

import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.impl.execution.init.Contexts;
import com.hazelcast.jet.impl.memory.AccumulationLimitExceededException;
import com.hazelcast.jet.sql.impl.connector.keyvalue.KvProjector;
import com.hazelcast.map.impl.proxy.MapProxyImpl;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.DataSerializable;
import com.hazelcast.sql.impl.QueryException;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;

final class InsertProcessorSupplier
implements ProcessorSupplier,
DataSerializable {
    private String mapName;
    private KvProjector.Supplier projectorSupplier;
    private transient InternalSerializationService serializationService;

    private InsertProcessorSupplier() {
    }

    InsertProcessorSupplier(String mapName, KvProjector.Supplier projectorSupplier) {
        this.mapName = mapName;
        this.projectorSupplier = projectorSupplier;
    }

    @Override
    public void init(@Nonnull ProcessorSupplier.Context context) throws Exception {
        this.serializationService = ((Contexts.ProcSupplierCtx)context).serializationService();
    }

    @Override
    @Nonnull
    public Collection<? extends Processor> get(int count) {
        assert (count == 1);
        return Collections.singletonList(new InsertP(this.mapName, this.projectorSupplier.get(this.serializationService)));
    }

    @Override
    public void writeData(ObjectDataOutput out) throws IOException {
        out.writeString(this.mapName);
        out.writeObject(this.projectorSupplier);
    }

    @Override
    public void readData(ObjectDataInput in) throws IOException {
        this.mapName = in.readString();
        this.projectorSupplier = (KvProjector.Supplier)in.readObject();
    }

    private static final class InsertP
    extends AbstractProcessor {
        private static final int MAX_IN_FLIGHT_INSERTS = 16;
        private final String mapName;
        private final KvProjector projector;
        private final Set<Object> seenKeys = new HashSet<Object>();
        private final Deque<CompletableFuture<Object>> inFlightInserts = new ArrayDeque<CompletableFuture<Object>>(16);
        private MapProxyImpl<Object, Object> map;
        private long maxAccumulatedKeys;
        private long numberOfInsertedEntries;

        private InsertP(String mapName, KvProjector projector) {
            this.mapName = mapName;
            this.projector = projector;
        }

        @Override
        protected void init(@Nonnull Processor.Context context) throws Exception {
            this.map = (MapProxyImpl)context.hazelcastInstance().getMap(this.mapName);
            this.maxAccumulatedKeys = context.maxProcessorAccumulatedRecords();
        }

        @Override
        protected boolean tryProcess(int ordinal, @Nonnull Object row) {
            if (!this.tryFlushQueue() && this.isQueueFull()) {
                return false;
            }
            Map.Entry<Object, Object> entry = this.projector.project((Object[])row);
            if (entry.getKey() == null) {
                throw QueryException.error("Key cannot be null");
            }
            if (!this.seenKeys.add(entry.getKey())) {
                throw QueryException.error("Duplicate key");
            }
            if ((long)this.seenKeys.size() > this.maxAccumulatedKeys) {
                throw new AccumulationLimitExceededException();
            }
            this.inFlightInserts.add(this.map.putIfAbsentAsync(entry.getKey(), entry.getValue()));
            return true;
        }

        @Override
        public boolean complete() {
            return this.tryFlushQueue() && this.tryEmit(new Object[]{this.numberOfInsertedEntries});
        }

        private boolean isQueueFull() {
            return this.inFlightInserts.size() == 16;
        }

        private boolean tryFlushQueue() {
            CompletableFuture<Object> future;
            while ((future = this.inFlightInserts.peek()) != null) {
                Object previousValue;
                if (!future.isDone()) {
                    return false;
                }
                try {
                    previousValue = future.get();
                }
                catch (Throwable e) {
                    throw new JetException("INSERT operation completed exceptionally: " + e, e);
                }
                this.inFlightInserts.remove();
                if (previousValue == null) {
                    ++this.numberOfInsertedEntries;
                    continue;
                }
                throw QueryException.error("Duplicate key");
            }
            return true;
        }
    }
}

