/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.connector;

import com.hazelcast.client.impl.clientside.HazelcastClientProxy;
import com.hazelcast.client.impl.spi.ClientPartitionService;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import com.hazelcast.internal.nio.IOUtil;
import com.hazelcast.internal.partition.InternalPartitionService;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.serialization.SerializationService;
import com.hazelcast.jet.core.Inbox;
import com.hazelcast.jet.core.JetDataSerializerHook;
import com.hazelcast.jet.core.Outbox;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.impl.connector.AsyncHazelcastWriterP;
import com.hazelcast.jet.impl.util.ImdgUtil;
import com.hazelcast.map.EntryProcessor;
import com.hazelcast.map.IMap;
import com.hazelcast.map.impl.proxy.MapProxyImpl;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.partition.PartitioningStrategy;
import com.hazelcast.query.impl.QueryableEntry;
import com.hazelcast.spi.impl.NodeEngineImpl;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.ToIntFunction;
import javax.annotation.Nonnull;

public abstract class AbstractUpdateMapP<T, K, V>
extends AsyncHazelcastWriterP {
    private static final int PENDING_ITEM_COUNT_LIMIT = 1024;
    protected final FunctionEx<? super T, ? extends K> keyFn;
    protected final String mapName;
    protected IMap<K, V> map;
    protected SerializationContext<K> serializationContext;
    protected Map<Data, Object>[] partitionBuffers;
    protected int[] pendingInPartition;
    protected int pendingItemCount;
    protected int currentPartitionId;

    public AbstractUpdateMapP(@Nonnull HazelcastInstance instance, int maxParallelAsyncOps, @Nonnull String mapName, @Nonnull FunctionEx<? super T, ? extends K> keyFn) {
        super(instance, maxParallelAsyncOps);
        this.mapName = Objects.requireNonNull(mapName, "mapName");
        this.keyFn = keyFn;
    }

    @Override
    public void init(@Nonnull Outbox outbox, @Nonnull Processor.Context context) {
        this.map = this.instance().getMap(this.mapName);
        this.serializationContext = new SerializationContext<K>(this.instance(), this.map);
        int partitionCount = this.serializationContext.partitionCount();
        this.partitionBuffers = new Map[partitionCount];
        this.pendingInPartition = new int[partitionCount];
        for (int i = 0; i < partitionCount; ++i) {
            this.partitionBuffers[i] = new HashMap<Data, Object>();
        }
    }

    @Override
    protected final void processInternal(Inbox inbox) {
        if (this.pendingItemCount < 1024) {
            this.pendingItemCount += inbox.size();
            inbox.drain(this::addToBuffer);
        }
        this.submitPending();
    }

    protected abstract void addToBuffer(T var1);

    @Override
    protected final boolean flushInternal() {
        return this.submitPending();
    }

    private boolean submitPending() {
        if (this.pendingItemCount == 0) {
            return true;
        }
        for (int i = 0; i < this.partitionBuffers.length; ++i) {
            if (!this.partitionBuffers[this.currentPartitionId].isEmpty()) {
                if (!this.tryAcquirePermit()) {
                    return false;
                }
                Map<Data, Object> buffer = this.partitionBuffers[this.currentPartitionId];
                EntryProcessor<K, V, Void> entryProcessor = this.entryProcessor(buffer);
                IMap<Data, V> map = this.map;
                this.setCallback(map.submitToKeys(buffer.keySet(), entryProcessor));
                this.pendingItemCount -= this.pendingInPartition[this.currentPartitionId];
                this.pendingInPartition[this.currentPartitionId] = 0;
                this.partitionBuffers[this.currentPartitionId] = new HashMap<Data, Object>();
            }
            this.currentPartitionId = AbstractUpdateMapP.incrCircular(this.currentPartitionId, this.partitionBuffers.length);
        }
        if (this.currentPartitionId == this.partitionBuffers.length) {
            this.currentPartitionId = 0;
        }
        assert (this.pendingItemCount == 0) : "pending item count should be 0, but was " + this.pendingItemCount;
        return true;
    }

    protected abstract EntryProcessor<K, V, Void> entryProcessor(Map<Data, Object> var1);

    private static int incrCircular(int v, int limit) {
        if (++v == limit) {
            v = 0;
        }
        return v;
    }

    public static class ApplyValuesEntryProcessor<K, V>
    implements EntryProcessor<K, V, Void>,
    IdentifiedDataSerializable {
        private Map<Data, Object> keysToUpdate;

        public ApplyValuesEntryProcessor() {
        }

        public ApplyValuesEntryProcessor(Map<Data, Object> keysToUpdate) {
            this.keysToUpdate = keysToUpdate;
        }

        @Override
        public Void process(Map.Entry<K, V> entry) {
            QueryableEntry e = (QueryableEntry)entry;
            e.setValue(this.keysToUpdate.get(e.getKeyData()));
            return null;
        }

        @Override
        public void writeData(ObjectDataOutput out) throws IOException {
            out.writeInt(this.keysToUpdate.size());
            for (Map.Entry<Data, Object> e : this.keysToUpdate.entrySet()) {
                IOUtil.writeData(out, e.getKey());
                IOUtil.writeData(out, (Data)e.getValue());
            }
        }

        @Override
        public void readData(ObjectDataInput in) throws IOException {
            int size = in.readInt();
            this.keysToUpdate = new LinkedHashMap<Data, Object>(size);
            for (int i = 0; i < size; ++i) {
                Data key = IOUtil.readData(in);
                Data value = IOUtil.readData(in);
                this.keysToUpdate.put(key, value);
            }
        }

        @Override
        public int getFactoryId() {
            return JetDataSerializerHook.FACTORY_ID;
        }

        @Override
        public int getClassId() {
            return 4;
        }
    }

    public static class SerializationContext<K> {
        private final int partitionCount;
        private final ToIntFunction<Data> partitionIdFn;
        private final SerializationService serializationService;
        private final PartitioningStrategy<K> partitioningStrategy;

        SerializationContext(HazelcastInstance instance, IMap<K, ?> map) {
            if (ImdgUtil.isMemberInstance(instance)) {
                NodeEngineImpl nodeEngine = ((HazelcastInstanceImpl)instance).node.nodeEngine;
                InternalPartitionService partitionService = nodeEngine.getPartitionService();
                this.partitionCount = partitionService.getPartitionCount();
                this.partitionIdFn = partitionService::getPartitionId;
                this.serializationService = nodeEngine.getSerializationService();
                this.partitioningStrategy = ((MapProxyImpl)map).getPartitionStrategy();
            } else {
                HazelcastClientProxy clientProxy = (HazelcastClientProxy)instance;
                ClientPartitionService clientPartitionService = clientProxy.client.getClientPartitionService();
                this.partitionCount = clientPartitionService.getPartitionCount();
                this.partitionIdFn = clientPartitionService::getPartitionId;
                this.serializationService = clientProxy.getSerializationService();
                this.partitioningStrategy = null;
            }
        }

        public int partitionCount() {
            return this.partitionCount;
        }

        public int partitionId(Data data) {
            return this.partitionIdFn.applyAsInt(data);
        }

        public Data toKeyData(K key) {
            if (this.partitioningStrategy != null) {
                return this.serializationService.toData(key, this.partitioningStrategy);
            }
            return this.serializationService.toData(key);
        }

        public Data toData(Object value) {
            return this.serializationService.toData(value);
        }
    }
}

