/*
 * Decompiled with CFR 0.152.
 */
package com.telamin.mongoose.dispatch;

import com.fluxtion.agrona.concurrent.OneToOneConcurrentArrayQueue;
import com.fluxtion.runtime.event.NamedFeedEvent;
import com.fluxtion.runtime.event.NamedFeedEventImpl;
import com.fluxtion.runtime.event.ReplayRecord;
import com.telamin.mongoose.exception.QueuePublishException;
import com.telamin.mongoose.service.EventSource;
import com.telamin.mongoose.service.error.ErrorEvent;
import com.telamin.mongoose.service.error.ErrorReporting;
import com.telamin.mongoose.service.pool.PoolAware;
import com.telamin.mongoose.service.pool.impl.PoolTracker;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import lombok.Generated;

public class EventToQueuePublisher<T> {
    @Generated
    private static final Logger log = Logger.getLogger(EventToQueuePublisher.class.getName());
    private final List<NamedQueue> targetQueues = new CopyOnWriteArrayList<NamedQueue>();
    private final List<NamedFeedEvent<?>> eventLog = new ArrayList();
    private final String name;
    private boolean cacheEventLog;
    private long sequenceNumber = 0L;
    private EventSource.EventWrapStrategy eventWrapStrategy = EventSource.EventWrapStrategy.SUBSCRIPTION_NOWRAP;
    private Function<T, ?> dataMapper = Function.identity();
    private int cacheReadPointer = 0;
    private final boolean logWarning = log.isLoggable(Level.WARNING);
    private final boolean logInfo = log.isLoggable(Level.INFO);
    private final boolean logFine = log.isLoggable(Level.FINE);

    public void addTargetQueue(OneToOneConcurrentArrayQueue<Object> targetQueue, String name) {
        NamedQueue namedQueue = new NamedQueue(name, targetQueue);
        if (log.isLoggable(Level.FINE)) {
            log.fine("adding a publisher queue:" + String.valueOf(namedQueue));
        }
        if (!this.targetQueues.contains(namedQueue)) {
            this.targetQueues.add(namedQueue);
        }
    }

    public void publish(T itemToPublish) {
        if (itemToPublish == null) {
            log.info("itemToPublish is null");
            return;
        }
        Object mappedItem = this.mapItemSafely(itemToPublish, "publish");
        if (mappedItem == null) {
            log.fine("mapped itemToPublish is null");
            return;
        }
        ++this.sequenceNumber;
        if (log.isLoggable(Level.FINE)) {
            log.fine("listenerCount:" + this.targetQueues.size() + " sequenceNumber:" + this.sequenceNumber + " publish:" + String.valueOf(itemToPublish));
        }
        if (this.cacheEventLog) {
            this.dispatchCachedEventLog();
            Object cachedData = mappedItem instanceof PoolAware ? String.valueOf(mappedItem) : mappedItem;
            NamedFeedEventImpl namedFeedEvent = new NamedFeedEventImpl(this.name).data(cachedData).sequenceNumber(this.sequenceNumber);
            this.eventLog.add((NamedFeedEvent<?>)namedFeedEvent);
        } else {
            PoolTracker<?> tracker = this.trackerOf(mappedItem);
            if (tracker != null) {
                tracker.releaseReference();
            }
        }
        ++this.cacheReadPointer;
        this.dispatch(mappedItem);
    }

    public void cache(T itemToCache) {
        if (itemToCache == null) {
            log.fine("itemToCache is null");
            return;
        }
        Object mappedItem = this.mapItemSafely(itemToCache, "cache");
        if (mappedItem == null) {
            return;
        }
        if (log.isLoggable(Level.FINE)) {
            log.fine("listenerCount:" + this.targetQueues.size() + " sequenceNumber:" + this.sequenceNumber + " publish:" + String.valueOf(itemToCache));
        }
        ++this.sequenceNumber;
        if (this.cacheEventLog) {
            PoolTracker<?> tracker = this.trackerOf(mappedItem);
            if (tracker != null) {
                tracker.removeFromPool();
            }
            NamedFeedEventImpl namedFeedEvent = new NamedFeedEventImpl(this.name).data(mappedItem).sequenceNumber(this.sequenceNumber);
            this.eventLog.add((NamedFeedEvent<?>)namedFeedEvent);
        }
    }

    public void publishReplay(ReplayRecord record) {
        if (record == null) {
            log.fine("itemToPublish is null");
            return;
        }
        if (log.isLoggable(Level.FINE)) {
            log.fine("listenerCount:" + this.targetQueues.size() + " publish:" + String.valueOf(record));
        }
        int targetQueuesSize = this.targetQueues.size();
        for (int i = 0; i < targetQueuesSize; ++i) {
            NamedQueue namedQueue = this.targetQueues.get(i);
            OneToOneConcurrentArrayQueue<Object> targetQueue = namedQueue.targetQueue();
            targetQueue.offer((Object)record);
            if (!log.isLoggable(Level.FINE)) continue;
            log.fine("queue:" + namedQueue.name() + " size:" + targetQueue.size());
        }
    }

    public void dispatchCachedEventLog() {
        if (this.cacheReadPointer < this.eventLog.size()) {
            if (log.isLoggable(Level.FINE)) {
                log.fine("publishing cached items cacheReadPointer:" + this.cacheReadPointer + " eventLog.size():" + this.eventLog.size());
            }
            int eventLogSize = this.eventLog.size();
            for (int i = this.cacheReadPointer; i < eventLogSize; ++i) {
                NamedFeedEvent<?> cachedFeedEvent = this.eventLog.get(i);
                this.dispatch(cachedFeedEvent.data());
            }
        }
        this.cacheReadPointer = this.eventLog.size();
    }

    public List<NamedFeedEvent<?>> getEventLog() {
        if (!this.cacheEventLog) {
            return Collections.emptyList();
        }
        return Collections.unmodifiableList(new ArrayList(this.eventLog));
    }

    private Object mapItemSafely(T item, String context) {
        try {
            Object mapped = this.dataMapper.apply(item);
            if (mapped == null) {
                log.fine("mappedItem is null");
            } else if (item != mapped && item instanceof PoolAware) {
                PoolAware poolAware = (PoolAware)item;
                poolAware.getPoolTracker().releaseReference();
                poolAware.getPoolTracker().returnToPool();
            }
            return mapped;
        }
        catch (Throwable t) {
            log.severe("data mapping (" + context + ") failed: publisher=" + this.name + ", nextSequenceNumber=" + (this.sequenceNumber + 1L) + ", item=" + String.valueOf(item) + ", error=" + String.valueOf(t));
            ErrorReporting.report("EventToQueuePublisher:" + this.name, "data mapping failed for " + context + ": nextSeq=" + (this.sequenceNumber + 1L) + ", item=" + String.valueOf(item), t, ErrorEvent.Severity.ERROR);
            return null;
        }
    }

    private void dispatch(Object mappedItem) {
        int targetQueuesSize = this.targetQueues.size();
        for (int i = 0; i < targetQueuesSize; ++i) {
            NamedQueue namedQueue = this.targetQueues.get(i);
            OneToOneConcurrentArrayQueue<Object> targetQueue = namedQueue.targetQueue();
            switch (this.eventWrapStrategy) {
                case SUBSCRIPTION_NOWRAP: 
                case BROADCAST_NOWRAP: {
                    this.writeToQueue(namedQueue, mappedItem);
                    break;
                }
                case SUBSCRIPTION_NAMED_EVENT: 
                case BROADCAST_NAMED_EVENT: {
                    NamedFeedEventImpl namedFeedEvent = new NamedFeedEventImpl(this.name).data(mappedItem).sequenceNumber(this.sequenceNumber);
                    this.writeToQueue(namedQueue, namedFeedEvent);
                }
            }
            if (!log.isLoggable(Level.FINE)) continue;
            log.fine("queue:" + namedQueue.name() + " size:" + targetQueue.size());
        }
    }

    private void writeToQueue(NamedQueue namedQueue, Object itemToPublish) {
        OneToOneConcurrentArrayQueue<Object> targetQueue = namedQueue.targetQueue();
        boolean offered = false;
        long startNs = -1L;
        long maxSpinNs = TimeUnit.MILLISECONDS.toNanos(10L);
        PoolTracker<?> tracker = this.trackerOf(itemToPublish);
        try {
            while (!offered) {
                boolean attemptRef = false;
                if (tracker != null) {
                    tracker.acquireReference();
                    attemptRef = true;
                }
                if (offered = targetQueue.offer(itemToPublish)) continue;
                if (attemptRef) {
                    tracker.releaseReference();
                }
                if (startNs < 0L) {
                    startNs = System.nanoTime();
                } else if (System.nanoTime() - startNs > maxSpinNs) {
                    if (this.logWarning) {
                        log.warning("dropping publish to slow/contended queue: " + namedQueue.name() + " after ~" + (System.nanoTime() - startNs) / 1000000L + "ms seq:" + this.sequenceNumber + " queueSize:" + targetQueue.size());
                    }
                    return;
                }
                Thread.onSpinWait();
            }
        }
        catch (Throwable t) {
            if (tracker != null) {
                tracker.returnToPool();
            }
            log.severe("queue write failed: publisher=" + this.name + ", queue=" + namedQueue.name() + ", seq=" + this.sequenceNumber + ", item=" + String.valueOf(itemToPublish) + ", error=" + String.valueOf(t));
            ErrorReporting.report("EventToQueuePublisher:" + this.name, "queue write failed: queue=" + namedQueue.name() + ", seq=" + this.sequenceNumber + ", item=" + String.valueOf(itemToPublish), t, ErrorEvent.Severity.CRITICAL);
            throw new QueuePublishException("Failed to write to queue '" + namedQueue.name() + "' for publisher '" + this.name + "'", t);
        }
        if (this.logFine && startNs > 1L) {
            long delta = System.nanoTime() - startNs;
            log.fine("spin wait took " + delta / 1000000L + "ms queue:" + namedQueue.name() + " size:" + targetQueue.size());
        }
    }

    private PoolTracker<?> trackerOf(Object item) {
        NamedFeedEvent nfe;
        Object data;
        if (item instanceof PoolAware) {
            PoolAware pa = (PoolAware)item;
            return pa.getPoolTracker();
        }
        if (item instanceof NamedFeedEvent && (data = (nfe = (NamedFeedEvent)item).data()) instanceof PoolAware) {
            PoolAware pa = (PoolAware)data;
            return pa.getPoolTracker();
        }
        return null;
    }

    public void removeTargetQueueByName(String queueName) {
        if (queueName == null) {
            return;
        }
        this.targetQueues.removeIf(q -> queueName.equals(q.name()));
    }

    @Generated
    public EventToQueuePublisher(String name) {
        this.name = name;
    }

    @Generated
    public String toString() {
        return "EventToQueuePublisher(targetQueues=" + String.valueOf(this.getTargetQueues()) + ", eventLog=" + String.valueOf(this.getEventLog()) + ", name=" + this.getName() + ", cacheEventLog=" + this.isCacheEventLog() + ", sequenceNumber=" + this.getSequenceNumber() + ", eventWrapStrategy=" + String.valueOf((Object)this.getEventWrapStrategy()) + ", dataMapper=" + String.valueOf(this.getDataMapper()) + ", cacheReadPointer=" + this.getCacheReadPointer() + ", logWarning=" + this.isLogWarning() + ", logInfo=" + this.isLogInfo() + ", logFine=" + this.isLogFine() + ")";
    }

    @Generated
    public List<NamedQueue> getTargetQueues() {
        return this.targetQueues;
    }

    @Generated
    public String getName() {
        return this.name;
    }

    @Generated
    public boolean isCacheEventLog() {
        return this.cacheEventLog;
    }

    @Generated
    public long getSequenceNumber() {
        return this.sequenceNumber;
    }

    @Generated
    public EventSource.EventWrapStrategy getEventWrapStrategy() {
        return this.eventWrapStrategy;
    }

    @Generated
    public Function<T, ?> getDataMapper() {
        return this.dataMapper;
    }

    @Generated
    public int getCacheReadPointer() {
        return this.cacheReadPointer;
    }

    @Generated
    public boolean isLogWarning() {
        return this.logWarning;
    }

    @Generated
    public boolean isLogInfo() {
        return this.logInfo;
    }

    @Generated
    public boolean isLogFine() {
        return this.logFine;
    }

    @Generated
    public void setCacheEventLog(boolean cacheEventLog) {
        this.cacheEventLog = cacheEventLog;
    }

    @Generated
    public void setEventWrapStrategy(EventSource.EventWrapStrategy eventWrapStrategy) {
        this.eventWrapStrategy = eventWrapStrategy;
    }

    @Generated
    public void setDataMapper(Function<T, ?> dataMapper) {
        this.dataMapper = dataMapper;
    }

    public record NamedQueue(String name, OneToOneConcurrentArrayQueue<Object> targetQueue) {
    }
}

