/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.engine.map;

import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.core.io.IORuntimeException;
import net.openhft.chronicle.core.pool.ClassAliasPool;
import net.openhft.chronicle.core.threads.EventLoop;
import net.openhft.chronicle.core.util.ThrowingConsumer;
import net.openhft.chronicle.engine.api.EngineReplication;
import net.openhft.chronicle.engine.api.map.KeyValueStore;
import net.openhft.chronicle.engine.api.map.MapEvent;
import net.openhft.chronicle.engine.api.pubsub.InvalidSubscriberException;
import net.openhft.chronicle.engine.api.pubsub.SubscriptionConsumer;
import net.openhft.chronicle.engine.api.tree.Asset;
import net.openhft.chronicle.engine.api.tree.AssetNotFoundException;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.fs.Clusters;
import net.openhft.chronicle.engine.fs.EngineCluster;
import net.openhft.chronicle.engine.fs.EngineHostDetails;
import net.openhft.chronicle.engine.map.InsertedEvent;
import net.openhft.chronicle.engine.map.KVSSubscription;
import net.openhft.chronicle.engine.map.ObjectKeyValueStore;
import net.openhft.chronicle.engine.map.ObjectSubscription;
import net.openhft.chronicle.engine.map.RemovedEvent;
import net.openhft.chronicle.engine.map.UpdatedEvent;
import net.openhft.chronicle.engine.server.internal.MapReplicationHandler;
import net.openhft.chronicle.engine.tree.HostIdentifier;
import net.openhft.chronicle.hash.replication.EngineReplicationLangBytesConsumer;
import net.openhft.chronicle.hash.replication.SingleChronicleHashReplication;
import net.openhft.chronicle.map.BytesMapEventListener;
import net.openhft.chronicle.map.ChronicleMap;
import net.openhft.chronicle.map.ChronicleMapBuilder;
import net.openhft.chronicle.map.MapEventListener;
import net.openhft.chronicle.map.Replica;
import net.openhft.chronicle.map.SharedSegment;
import net.openhft.chronicle.map.UpdateResult;
import net.openhft.chronicle.network.api.session.SessionDetails;
import net.openhft.chronicle.network.api.session.SessionProvider;
import net.openhft.chronicle.network.cluster.ConnectionManager;
import net.openhft.chronicle.network.connection.TcpChannelHub;
import net.openhft.chronicle.network.connection.WireOutPublisher;
import net.openhft.chronicle.threads.NamedThreadFactory;
import net.openhft.lang.io.Bytes;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ChronicleMapKeyValueStore<K, V>
implements ObjectKeyValueStore<K, V>,
Closeable,
Supplier<EngineReplication> {
    private static final ScheduledExecutorService DELAYED_CLOSER = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new NamedThreadFactory("ChronicleMapKeyValueStore Closer", Boolean.valueOf(true)));
    private static final Logger LOG = LoggerFactory.getLogger(ChronicleMapKeyValueStore.class);
    private final ChronicleMap<K, V> chronicleMap;
    @NotNull
    private final ObjectSubscription<K, V> subscriptions;
    @Nullable
    private final EngineReplication engineReplicator;
    @NotNull
    private final Asset asset;
    @NotNull
    private final String assetFullName;
    @Nullable
    private final EventLoop eventLoop;
    private final AtomicBoolean isClosed;
    @Nullable
    private final SessionProvider sessionProvider;
    private Class keyType;
    private Class valueType;
    @Nullable
    private SessionDetails replicationSessionDetails;

    public ChronicleMapKeyValueStore(@NotNull RequestContext context, @NotNull Asset asset) {
        EngineReplication engineReplicator1;
        HostIdentifier hostIdentifier;
        ChronicleMapBuilder builder;
        long maxEntries;
        double averageValueSize;
        String basePath;
        block22: {
            this.isClosed = new AtomicBoolean();
            basePath = context.basePath();
            this.keyType = context.keyType();
            this.valueType = context.valueType();
            averageValueSize = context.getAverageValueSize();
            maxEntries = context.getEntries();
            this.asset = asset;
            this.assetFullName = asset.fullName();
            this.subscriptions = asset.acquireView(ObjectSubscription.class, context);
            this.subscriptions.setKvStore(this);
            this.eventLoop = asset.findOrCreateView(EventLoop.class);
            assert (this.eventLoop != null);
            this.sessionProvider = asset.findView(SessionProvider.class);
            this.eventLoop.start();
            this.replicationSessionDetails = asset.root().findView(SessionDetails.class);
            builder = ChronicleMapBuilder.of((Class)context.keyType(), (Class)context.valueType());
            hostIdentifier = null;
            engineReplicator1 = null;
            try {
                engineReplicator1 = asset.acquireView(EngineReplication.class);
                EngineReplicationLangBytesConsumer langBytesConsumer = asset.findView(EngineReplicationLangBytesConsumer.class);
                hostIdentifier = asset.findOrCreateView(HostIdentifier.class);
                assert (hostIdentifier != null);
                builder.putReturnsNull(context.putReturnsNull() != Boolean.FALSE).removeReturnsNull(context.removeReturnsNull() != Boolean.FALSE);
                builder.replication(((SingleChronicleHashReplication.Builder)SingleChronicleHashReplication.builder().engineReplication(langBytesConsumer)).createWithId(hostIdentifier.hostId()));
            }
            catch (AssetNotFoundException anfe) {
                if (!LOG.isDebugEnabled()) break block22;
                Jvm.debug().on(this.getClass(), "replication not enabled ", (Throwable)anfe);
            }
        }
        this.engineReplicator = engineReplicator1;
        Boolean nullOldValueOnUpdateEvent = context.nullOldValueOnUpdateEvent();
        if (nullOldValueOnUpdateEvent != null && nullOldValueOnUpdateEvent.booleanValue()) {
            builder.bytesEventListener((BytesMapEventListener)new NullOldValuePublishingOperations());
        } else {
            builder.eventListener((MapEventListener)new PublishingOperations());
        }
        if (context.putReturnsNull() != Boolean.FALSE) {
            builder.putReturnsNull(true);
        }
        if (context.removeReturnsNull() != Boolean.FALSE) {
            builder.removeReturnsNull(true);
        }
        if (averageValueSize > 0.0) {
            builder.averageValueSize(averageValueSize);
        }
        if (maxEntries > 0L) {
            builder.entries(maxEntries + 1L);
        }
        if (basePath == null) {
            this.chronicleMap = builder.create();
        } else {
            String pathname = basePath + "/" + context.name();
            new File(basePath).mkdirs();
            try {
                this.chronicleMap = builder.createPersistedTo(new File(pathname));
            }
            catch (IOException e) {
                IORuntimeException iore = new IORuntimeException("Could not access " + pathname);
                iore.initCause((Throwable)e);
                throw iore;
            }
        }
        if (hostIdentifier == null) {
            return;
        }
        Clusters clusters = asset.findView(Clusters.class);
        if (clusters == null) {
            Jvm.warn().on(this.getClass(), "no clusters found.");
            return;
        }
        EngineCluster engineCluster = clusters.get(context.cluster());
        if (engineCluster == null) {
            Jvm.warn().on(this.getClass(), "no cluster found, name=" + context.cluster());
            return;
        }
        byte localIdentifier = hostIdentifier.hostId();
        if (LOG.isDebugEnabled()) {
            Jvm.debug().on(this.getClass(), "hostDetails : localIdentifier=" + localIdentifier + ",cluster=" + engineCluster.hostDetails());
        }
        for (EngineHostDetails hostDetails : engineCluster.hostDetails()) {
            try {
                byte remoteIdentifier = (byte)hostDetails.hostId();
                if (remoteIdentifier == localIdentifier) continue;
                ConnectionManager connectionManager = engineCluster.findConnectionManager(remoteIdentifier);
                if (connectionManager == null) {
                    Jvm.warn().on(this.getClass(), "connectionManager==null for remoteIdentifier=" + remoteIdentifier);
                    engineCluster.findConnectionManager(remoteIdentifier);
                    continue;
                }
                connectionManager.addListener((nc, isConnected) -> {
                    if (!isConnected) {
                        return;
                    }
                    if (nc.isAcceptor()) {
                        return;
                    }
                    String csp = context.fullName();
                    long lastUpdateTime = ((Replica)this.chronicleMap).lastModificationTime(remoteIdentifier);
                    WireOutPublisher publisher = nc.wireOutPublisher();
                    publisher.publish(MapReplicationHandler.newMapReplicationHandler(lastUpdateTime, this.keyType, this.valueType, csp, nc.newCid()));
                });
            }
            catch (Exception e) {
                Jvm.warn().on(this.getClass(), "hostDetails=" + (Object)((Object)hostDetails), (Throwable)e);
            }
        }
    }

    @Override
    @NotNull
    public KVSSubscription<K, V> subscription(boolean createIfAbsent) {
        return this.subscriptions;
    }

    @Override
    public boolean put(K key, V value) {
        try {
            return this.chronicleMap.update(key, value) != UpdateResult.INSERT;
        }
        catch (RuntimeException e) {
            if (LOG.isDebugEnabled()) {
                Jvm.debug().on(this.getClass(), "Failed to write " + key + ", " + value, (Throwable)e);
            }
            throw e;
        }
    }

    @Override
    @Nullable
    public V getAndPut(K key, V value) {
        if (!this.isClosed.get()) {
            return (V)this.chronicleMap.put(key, value);
        }
        return null;
    }

    @Override
    public boolean remove(K key) {
        return this.chronicleMap.remove(key) != null;
    }

    @Override
    @Nullable
    public V getAndRemove(K key) {
        if (!this.isClosed.get()) {
            return (V)this.chronicleMap.remove(key);
        }
        return null;
    }

    @Override
    public V getUsing(K key, @Nullable Object value) {
        if (value != null) {
            throw new UnsupportedOperationException("Mutable values not supported");
        }
        return (V)this.chronicleMap.getUsing(key, value);
    }

    @Override
    public long longSize() {
        return this.chronicleMap.size();
    }

    @Override
    public void keysFor(int segment, @NotNull SubscriptionConsumer<K> kConsumer) throws InvalidSubscriberException {
        SubscriptionConsumer.notifyEachEvent(this.chronicleMap.keySet(), kConsumer);
    }

    @Override
    public void entriesFor(int segment, @NotNull SubscriptionConsumer<MapEvent<K, V>> kvConsumer) throws InvalidSubscriberException {
        this.chronicleMap.entrySet().stream().map(e -> InsertedEvent.of(this.assetFullName, e.getKey(), e.getValue(), false)).forEach(ThrowingConsumer.asConsumer(kvConsumer::accept));
    }

    @Override
    @NotNull
    public Iterator<Map.Entry<K, V>> entrySetIterator() {
        return this.chronicleMap.entrySet().iterator();
    }

    @Override
    @NotNull
    public Iterator<K> keySetIterator() {
        return this.chronicleMap.keySet().iterator();
    }

    @Override
    public void clear() {
        this.chronicleMap.clear();
    }

    @Override
    public boolean containsValue(V value) {
        throw new UnsupportedOperationException("todo");
    }

    @Override
    @NotNull
    public Asset asset() {
        return this.asset;
    }

    @Override
    @Nullable
    public KeyValueStore<K, V> underlying() {
        return null;
    }

    public void close() {
        this.isClosed.set(true);
        assert (this.eventLoop != null);
        this.eventLoop.stop();
        Closeable.closeQuietly((Object)this.asset.findView(TcpChannelHub.class));
        DELAYED_CLOSER.schedule(() -> Closeable.closeQuietly(this.chronicleMap), 1L, TimeUnit.SECONDS);
    }

    @Override
    public void accept(@NotNull EngineReplication.ReplicationEntry replicationEntry) {
        if (!this.isClosed.get() && this.engineReplicator != null) {
            this.engineReplicator.applyReplication(replicationEntry);
        } else {
            Jvm.warn().on(this.getClass(), "message skipped as closed replicationEntry=" + replicationEntry);
        }
    }

    @Override
    @Nullable
    public EngineReplication get() {
        return this.engineReplicator;
    }

    @Override
    public Class<K> keyType() {
        return this.keyType;
    }

    @Override
    public Class<V> valueType() {
        return this.valueType;
    }

    static {
        ClassAliasPool.CLASS_ALIASES.addAlias(new Class[]{MapReplicationHandler.class});
    }

    private class NullOldValuePublishingOperations
    extends BytesMapEventListener {
        private NullOldValuePublishingOperations() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onPut(Bytes entry, long metaDataPos, long keyPos, long valuePos, boolean added, boolean replicationEvent, boolean hasValueChanged, byte identifier, byte replacedIdentifier, long timeStamp, long replacedTimeStamp, @NotNull SharedSegment segment) {
            if (identifier == replacedIdentifier && timeStamp == replacedTimeStamp && !hasValueChanged) {
                return;
            }
            Object key = ChronicleMapKeyValueStore.this.chronicleMap.readKey(entry, keyPos);
            Object value = ChronicleMapKeyValueStore.this.chronicleMap.readValue(entry, valuePos);
            segment.writeUnlock();
            try {
                if (added) {
                    ChronicleMapKeyValueStore.this.subscriptions.notifyEvent(InsertedEvent.of(ChronicleMapKeyValueStore.this.assetFullName, key, value, replicationEvent));
                } else {
                    ChronicleMapKeyValueStore.this.subscriptions.notifyEvent(UpdatedEvent.of(ChronicleMapKeyValueStore.this.assetFullName, key, null, value, replicationEvent, hasValueChanged));
                }
            }
            finally {
                segment.writeLock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onRemove(Bytes entry, long metaDataPos, long keyPos, long valuePos, boolean replicationEvent, byte identifier, byte replacedIdentifier, long timeStamp, long replacedTimeStamp, @NotNull SharedSegment segment) {
            if (identifier == replacedIdentifier && timeStamp == replacedTimeStamp) {
                return;
            }
            Object key = ChronicleMapKeyValueStore.this.chronicleMap.readKey(entry, keyPos);
            Object value = ChronicleMapKeyValueStore.this.chronicleMap.readValue(entry, valuePos);
            segment.writeUnlock();
            try {
                ChronicleMapKeyValueStore.this.subscriptions.notifyEvent(RemovedEvent.of(ChronicleMapKeyValueStore.this.assetFullName, key, value, replicationEvent));
            }
            finally {
                segment.writeLock();
            }
        }
    }

    private class PublishingOperations
    extends MapEventListener<K, V> {
        private PublishingOperations() {
        }

        public boolean isActive() {
            return ChronicleMapKeyValueStore.this.subscriptions.hasSubscribers();
        }

        public boolean usesValue() {
            return ChronicleMapKeyValueStore.this.subscriptions.hasValueSubscribers();
        }

        public void onRemove(@NotNull K key, V value, boolean replicationEvent, byte identifier, byte replacedIdentifier, long timestamp, long replacedTimeStamp) {
            if (replicationEvent && ChronicleMapKeyValueStore.this.replicationSessionDetails != null && ChronicleMapKeyValueStore.this.sessionProvider.get() == null) {
                ChronicleMapKeyValueStore.this.sessionProvider.set(ChronicleMapKeyValueStore.this.replicationSessionDetails);
            }
            this.onRemove0(key, value, replicationEvent);
        }

        public void onRemove0(@NotNull K key, V value, boolean replicationEven) {
            ChronicleMapKeyValueStore.this.subscriptions.notifyEvent(RemovedEvent.of(ChronicleMapKeyValueStore.this.assetFullName, key, value, replicationEven));
        }

        private void onPut0(@NotNull K key, V newValue, @Nullable V replacedValue, boolean replicationEvent, boolean added, boolean hasValueChanged) {
            if (added) {
                ChronicleMapKeyValueStore.this.subscriptions.notifyEvent(InsertedEvent.of(ChronicleMapKeyValueStore.this.assetFullName, key, newValue, replicationEvent));
            } else if (hasValueChanged) {
                ChronicleMapKeyValueStore.this.subscriptions.notifyEvent(UpdatedEvent.of(ChronicleMapKeyValueStore.this.assetFullName, key, replacedValue, newValue, replicationEvent, hasValueChanged));
            }
        }

        public void onPut(@NotNull K key, V newValue, @Nullable V replacedValue, boolean replicationEvent, boolean added, boolean hasValueChanged, byte identifier, byte replacedIdentifier, long timestamp, long replacedTimestamp) {
            if (!added && !hasValueChanged && replacedTimestamp == timestamp && identifier == replacedIdentifier) {
                Jvm.debug().on(((Object)((Object)this)).getClass(), "ignore update as nothing has changed");
                return;
            }
            if (replicationEvent && ChronicleMapKeyValueStore.this.replicationSessionDetails != null && ChronicleMapKeyValueStore.this.sessionProvider.get() == null) {
                ChronicleMapKeyValueStore.this.sessionProvider.set(ChronicleMapKeyValueStore.this.replicationSessionDetails);
            }
            this.onPut0(key, newValue, replacedValue, replicationEvent, added, hasValueChanged);
        }
    }
}

