/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.connector;

import com.hazelcast.client.impl.proxy.ClientMapProxy;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.internal.serialization.SerializationService;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.core.Inbox;
import com.hazelcast.jet.core.Outbox;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.impl.connector.AbstractHazelcastConnectorSupplier;
import com.hazelcast.jet.impl.connector.AsyncHazelcastWriterP;
import com.hazelcast.jet.impl.connector.HazelcastWriters;
import com.hazelcast.jet.impl.serialization.DelegatingSerializationService;
import com.hazelcast.map.IMap;
import com.hazelcast.map.impl.proxy.MapProxyImpl;
import com.hazelcast.map.impl.proxy.NearCachedMapProxyImpl;
import com.hazelcast.partition.PartitioningStrategy;
import com.hazelcast.security.PermissionsUtil;
import java.security.Permission;
import java.util.AbstractMap;
import java.util.function.Consumer;
import javax.annotation.Nonnull;

public final class WriteMapP<T, K, V>
extends AsyncHazelcastWriterP {
    private static final int BUFFER_LIMIT = 1024;
    private final String mapName;
    private final SerializationService serializationService;
    private final FunctionEx<? super T, ? extends K> toKeyFn;
    private final FunctionEx<? super T, ? extends V> toValueFn;
    private HazelcastWriters.ArrayMap<Object, Object> buffer;
    private IMap<Object, Object> map;
    private Consumer<T> addToBuffer;

    private WriteMapP(@Nonnull HazelcastInstance instance, int maxParallelAsyncOps, String mapName, @Nonnull SerializationService serializationService, @Nonnull FunctionEx<? super T, ? extends K> toKeyFn, @Nonnull FunctionEx<? super T, ? extends V> toValueFn) {
        super(instance, maxParallelAsyncOps);
        this.mapName = mapName;
        this.serializationService = serializationService;
        this.toKeyFn = toKeyFn;
        this.toValueFn = toValueFn;
        this.resetBuffer();
    }

    @Override
    public void init(@Nonnull Outbox outbox, @Nonnull Processor.Context context) {
        this.map = this.instance().getMap(this.mapName);
        boolean hasCustomSerializers = this.serializationService instanceof DelegatingSerializationService && ((DelegatingSerializationService)this.serializationService).hasAddedSerializers();
        boolean hasNearCache = this.map instanceof NearCachedMapProxyImpl;
        if (hasNearCache && hasCustomSerializers) {
            throw new JetException("Writing into IMap with both near cache and custom serializers not supported");
        }
        if (!hasCustomSerializers) {
            this.addToBuffer = item -> this.buffer.add(new AbstractMap.SimpleEntry<K, V>(this.key(item), this.value(item)));
        } else if (this.map instanceof MapProxyImpl) {
            PartitioningStrategy partitionStrategy = ((MapProxyImpl)this.map).getPartitionStrategy();
            this.addToBuffer = item -> {
                Object key = this.serializationService.toData(this.key(item), partitionStrategy);
                Object value = this.serializationService.toData(this.value(item));
                this.buffer.add(new AbstractMap.SimpleEntry(key, value));
            };
        } else if (this.map instanceof ClientMapProxy) {
            this.addToBuffer = item -> {
                Object key = this.serializationService.toData(this.key(item));
                Object value = this.serializationService.toData(this.value(item));
                this.buffer.add(new AbstractMap.SimpleEntry(key, value));
            };
        } else {
            throw new RuntimeException("Unexpected map class: " + this.map.getClass().getName());
        }
    }

    private K key(T item) {
        return this.toKeyFn.apply(item);
    }

    private V value(T item) {
        return this.toValueFn.apply(item);
    }

    @Override
    protected void processInternal(Inbox inbox) {
        if (this.buffer.size() < 1024) {
            inbox.drain(this.addToBuffer);
        }
        this.submitPending();
    }

    @Override
    protected boolean flushInternal() {
        return this.submitPending();
    }

    private boolean submitPending() {
        if (this.buffer.isEmpty()) {
            return true;
        }
        if (!this.tryAcquirePermit()) {
            return false;
        }
        this.setCallback(this.map.putAllAsync(this.buffer));
        this.resetBuffer();
        return true;
    }

    private void resetBuffer() {
        this.buffer = new HazelcastWriters.ArrayMap(1024);
    }

    public static class Supplier<T, K, V>
    extends AbstractHazelcastConnectorSupplier {
        private static final long serialVersionUID = 1L;
        private static final int MAX_PARALLELISM = 16;
        private final String mapName;
        private final FunctionEx<? super T, ? extends K> toKeyFn;
        private final FunctionEx<? super T, ? extends V> toValueFn;
        private int maxParallelAsyncOps;

        public Supplier(String clientXml, String mapName, @Nonnull FunctionEx<? super T, ? extends K> toKeyFn, @Nonnull FunctionEx<? super T, ? extends V> toValueFn) {
            super(clientXml);
            this.mapName = mapName;
            this.toKeyFn = toKeyFn;
            this.toValueFn = toValueFn;
        }

        @Override
        public void init(@Nonnull ProcessorSupplier.Context context) {
            super.init(context);
            this.maxParallelAsyncOps = Integer.max(1, 16 / context.localParallelism());
        }

        @Override
        protected Processor createProcessor(HazelcastInstance instance, SerializationService serializationService) {
            return new WriteMapP(instance, this.maxParallelAsyncOps, this.mapName, serializationService, this.toKeyFn, this.toValueFn);
        }

        @Override
        public Permission permission() {
            return PermissionsUtil.mapPutPermission(this.clientXml, this.mapName);
        }
    }
}

