/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.observer;

import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl;
import com.hazelcast.config.Config;
import com.hazelcast.config.RingbufferConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.OperationTimeoutException;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import com.hazelcast.internal.util.UuidUtil;
import com.hazelcast.jet.Observable;
import com.hazelcast.jet.function.Observer;
import com.hazelcast.jet.impl.execution.DoneItem;
import com.hazelcast.jet.impl.observer.WrappedThrowable;
import com.hazelcast.logging.ILogger;
import com.hazelcast.ringbuffer.ReadResultSet;
import com.hazelcast.ringbuffer.Ringbuffer;
import com.hazelcast.ringbuffer.StaleSequenceException;
import com.hazelcast.spi.exception.DistributedObjectDestroyedException;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import javax.annotation.Nonnull;

public class ObservableImpl<T>
implements Observable<T> {
    public static final String JET_OBSERVABLE_NAME_PREFIX = "__jet.observables.";
    public static final String OWNED_OBSERVABLE = ObservableImpl.class.getName() + ".ownedObservable";
    private final ConcurrentMap<UUID, RingbufferListener<T>> listeners = new ConcurrentHashMap<UUID, RingbufferListener<T>>();
    private final String name;
    private final HazelcastInstance hzInstance;
    private final Consumer<Observable<T>> onDestroy;
    private final ILogger logger;

    public ObservableImpl(String name, HazelcastInstance hzInstance, Consumer<Observable<T>> onDestroy, ILogger logger) {
        this.name = name;
        this.hzInstance = hzInstance;
        this.onDestroy = onDestroy;
        this.logger = logger;
    }

    @Override
    @Nonnull
    public String name() {
        return this.name;
    }

    @Override
    @Nonnull
    public UUID addObserver(@Nonnull Observer<T> observer) {
        UUID id = UuidUtil.newUnsecureUUID();
        RingbufferListener<T> listener = new RingbufferListener<T>(this.name, id, observer, this.hzInstance, this.logger);
        this.listeners.put(id, listener);
        listener.next();
        return id;
    }

    @Override
    public void removeObserver(@Nonnull UUID registrationId) {
        RingbufferListener listener = (RingbufferListener)this.listeners.remove(registrationId);
        if (listener == null) {
            throw new IllegalArgumentException(String.format("No registered observer with registration ID %s", registrationId));
        }
        listener.cancel();
    }

    @Override
    public Observable<T> configureCapacity(int capacity) {
        String ringbufferName = ObservableImpl.ringbufferName(this.name);
        if (this.ringbufferExists(ringbufferName)) {
            throw new IllegalStateException("Underlying buffer for observable '" + this.name + "' is already created.");
        }
        Config config = this.hzInstance.getConfig();
        try {
            config.addRingBufferConfig(new RingbufferConfig(ringbufferName).setCapacity(capacity));
        }
        catch (Exception e) {
            throw new RuntimeException("Failed configuring capacity: " + e, e);
        }
        return this;
    }

    @Override
    public int getConfiguredCapacity() {
        String ringbufferName = ObservableImpl.ringbufferName(this.name);
        if (this.ringbufferExists(ringbufferName)) {
            return (int)this.hzInstance.getRingbuffer(ringbufferName).capacity();
        }
        throw new IllegalStateException("Underlying buffer for observable '" + this.name + "' is not yet created.");
    }

    private boolean ringbufferExists(String ringbufferName) {
        return this.hzInstance.getDistributedObjects().stream().anyMatch(o -> o.getServiceName().equals("hz:impl:ringbufferService") && o.getName().equals(ringbufferName));
    }

    @Override
    public void destroy() {
        this.listeners.keySet().forEach(this::removeObserver);
        this.hzInstance.getRingbuffer(ObservableImpl.ringbufferName(this.name)).destroy();
        this.onDestroy.accept(this);
    }

    @Nonnull
    public static String ringbufferName(String observableName) {
        return JET_OBSERVABLE_NAME_PREFIX + observableName;
    }

    private static class RingbufferListener<T> {
        private static final int BATCH_SIZE = 1000;
        private final String id;
        private final Observer<T> observer;
        private final Ringbuffer<Object> ringbuffer;
        private final ILogger logger;
        private final Executor executor;
        private long sequence;
        private volatile boolean cancelled;

        RingbufferListener(String observable, UUID uuid, Observer<T> observer, HazelcastInstance hzInstance, ILogger logger) {
            this.observer = observer;
            this.ringbuffer = hzInstance.getRingbuffer(ObservableImpl.ringbufferName(observable));
            this.id = uuid.toString() + "/" + this.ringbuffer.getName();
            this.executor = RingbufferListener.getExecutor(hzInstance);
            this.sequence = this.ringbuffer.headSequence();
            this.logger = logger;
            this.logger.info("Starting message listener '" + this.id + "'");
        }

        private static Executor getExecutor(HazelcastInstance hzInstance) {
            if (hzInstance instanceof HazelcastInstanceImpl) {
                return ((HazelcastInstanceImpl)hzInstance).node.getNodeEngine().getExecutionService().getExecutor("hz:async");
            }
            if (hzInstance instanceof HazelcastClientInstanceImpl) {
                return ((HazelcastClientInstanceImpl)hzInstance).getTaskScheduler();
            }
            throw new RuntimeException(String.format("Unhandled %s type: %s", HazelcastInstance.class.getSimpleName(), hzInstance.getClass().getName()));
        }

        void next() {
            if (this.cancelled) {
                return;
            }
            this.ringbuffer.readManyAsync(this.sequence, 1, 1000, null).whenCompleteAsync(this::accept, this.executor);
        }

        void cancel() {
            this.cancelled = true;
        }

        private void accept(ReadResultSet<Object> result, Throwable throwable) {
            if (this.cancelled) {
                return;
            }
            if (throwable == null) {
                long lostCount = result.getNextSequenceToReadFrom() - (long)result.readCount() - this.sequence;
                if (lostCount != 0L) {
                    this.logger.warning(String.format("Message loss of %d messages detected in listener '%s'", lostCount, this.id));
                }
                for (int i = 0; i < result.size(); ++i) {
                    try {
                        this.onNewMessage(result.get(i));
                        continue;
                    }
                    catch (Throwable t) {
                        this.logger.warning("Terminating message listener '" + this.id + "'. Reason: Unhandled exception, message: " + t.getMessage(), t);
                        this.cancel();
                        return;
                    }
                }
                this.sequence = result.getNextSequenceToReadFrom();
                this.next();
            } else if (this.handleInternalException(throwable)) {
                this.next();
            } else {
                this.cancel();
            }
        }

        private void onNewMessage(Object message) {
            try {
                if (message instanceof WrappedThrowable) {
                    this.observer.onError(((WrappedThrowable)message).get());
                } else if (message instanceof DoneItem) {
                    this.observer.onComplete();
                } else {
                    this.observer.onNext(message);
                }
            }
            catch (Throwable t) {
                this.logger.warning("Exception thrown while calling observer callback for listener '" + this.id + "'. Will be ignored. Reason: " + t.getMessage(), t);
            }
        }

        protected boolean handleInternalException(Throwable t) {
            if (t instanceof OperationTimeoutException) {
                return this.handleOperationTimeoutException();
            }
            if (t instanceof IllegalArgumentException) {
                return this.handleIllegalArgumentException((IllegalArgumentException)t);
            }
            if (t instanceof StaleSequenceException) {
                return this.handleStaleSequenceException((StaleSequenceException)t);
            }
            if (t instanceof HazelcastInstanceNotActiveException) {
                if (this.logger.isFinestEnabled()) {
                    this.logger.finest("Terminating message listener '" + this.id + "'. Reason: HazelcastInstance is shutting down");
                }
            } else if (t instanceof DistributedObjectDestroyedException) {
                if (this.logger.isFinestEnabled()) {
                    this.logger.finest("Terminating message listener '" + this.id + "'. Reason: Topic is destroyed");
                }
            } else {
                this.logger.warning("Terminating message listener '" + this.id + "'. Reason: Unhandled exception, message: " + t.getMessage(), t);
            }
            return false;
        }

        private boolean handleOperationTimeoutException() {
            if (this.logger.isFinestEnabled()) {
                this.logger.finest("Message listener '" + this.id + "' timed out. Continuing from last known sequence: " + this.sequence);
            }
            return true;
        }

        private boolean handleIllegalArgumentException(IllegalArgumentException t) {
            long currentHeadSequence = this.ringbuffer.headSequence();
            if (this.logger.isFinestEnabled()) {
                this.logger.finest(String.format("Message listener '%s' requested a too large sequence: %s. Jumping from old sequence %d to sequence %d.", this.id, t.getMessage(), this.sequence, currentHeadSequence));
            }
            this.adjustSequence(currentHeadSequence);
            return true;
        }

        private boolean handleStaleSequenceException(StaleSequenceException staleSequenceException) {
            long headSeq = this.ringbuffer.headSequence();
            if (this.logger.isFinestEnabled()) {
                this.logger.finest("Message listener '" + this.id + "' ran into a stale sequence. Jumping from oldSequence " + this.sequence + " to sequence " + headSeq + ".");
            }
            this.adjustSequence(headSeq);
            return true;
        }

        private void adjustSequence(long newSequence) {
            if (newSequence > this.sequence) {
                this.logger.warning(String.format("Message loss of %d messages detected in listener '%s'", newSequence - this.sequence, this.id));
            }
            this.sequence = newSequence;
        }
    }
}

