/*
 * Decompiled with CFR 0.152.
 */
package com.telamin.mongoose.dutycycle;

import com.fluxtion.agrona.concurrent.OneToOneConcurrentArrayQueue;
import com.fluxtion.runtime.StaticEventProcessor;
import com.fluxtion.runtime.annotations.feature.Experimental;
import com.fluxtion.runtime.event.BroadcastEvent;
import com.fluxtion.runtime.event.NamedFeedEvent;
import com.fluxtion.runtime.event.ReplayRecord;
import com.telamin.mongoose.dispatch.RetryPolicy;
import com.telamin.mongoose.dutycycle.EventQueueToEventProcessor;
import com.telamin.mongoose.service.EventToInvokeStrategy;
import com.telamin.mongoose.service.error.ErrorEvent;
import com.telamin.mongoose.service.error.ErrorReporting;
import com.telamin.mongoose.service.pool.PoolAware;
import com.telamin.mongoose.service.pool.impl.PoolTracker;
import java.util.logging.Logger;
import lombok.Generated;

@Experimental
public class EventQueueToEventProcessorAgent
implements EventQueueToEventProcessor {
    @Generated
    private static final Logger log = Logger.getLogger(EventQueueToEventProcessorAgent.class.getName());
    private final OneToOneConcurrentArrayQueue<?> inputQueue;
    private final EventToInvokeStrategy eventToInvokeStrategy;
    private final String name;
    private final Logger logger;
    private RetryPolicy retryPolicy = RetryPolicy.defaultProcessingPolicy();
    private Runnable unsubscribeAction;

    public EventQueueToEventProcessorAgent(OneToOneConcurrentArrayQueue<?> inputQueue, EventToInvokeStrategy eventToInvokeStrategy, String name) {
        this.inputQueue = inputQueue;
        this.eventToInvokeStrategy = eventToInvokeStrategy;
        this.name = name;
        this.logger = Logger.getLogger("EventQueueToEventProcessorAgent." + name);
    }

    public void onStart() {
        this.logger.info("start");
    }

    public int doWork() {
        Object event;
        int processed;
        int batchLimit = 64;
        for (processed = 0; processed < 64 && (event = this.inputQueue.poll()) != null; ++processed) {
            PoolTracker<?> tracker = this.trackerOf(event);
            if (tracker != null) {
                try {
                    tracker.releaseReference();
                }
                catch (Throwable throwable) {
                    // empty catch block
                }
            }
            int attempt = 0;
            boolean done = false;
            Throwable lastError = null;
            while (!done) {
                try {
                    if (event instanceof ReplayRecord) {
                        ReplayRecord replayRecord = (ReplayRecord)event;
                        this.eventToInvokeStrategy.processEvent(replayRecord.getEvent(), replayRecord.getWallClockTime());
                    } else if (event instanceof BroadcastEvent) {
                        BroadcastEvent broadcastEvent = (BroadcastEvent)event;
                        this.eventToInvokeStrategy.processEvent(broadcastEvent.getEvent());
                    } else {
                        this.eventToInvokeStrategy.processEvent(event);
                    }
                    done = true;
                }
                catch (Throwable t) {
                    lastError = t;
                    String warnMsg = "event processing failed: agent=" + this.name + ", attempt=" + ++attempt + ", eventClass=" + (event == null ? "null" : event.getClass().getName()) + ", event=" + String.valueOf(event) + ", error=" + String.valueOf(t);
                    this.logger.warning(warnMsg);
                    ErrorReporting.report("EventQueueToEventProcessorAgent:" + this.name, warnMsg, t, ErrorEvent.Severity.WARNING);
                    if (!this.retryPolicy.shouldRetry(t, attempt)) {
                        String errMsg = "dropping event after retries: agent=" + this.name + ", attempts=" + attempt + ", eventClass=" + (event == null ? "null" : event.getClass().getName()) + ", event=" + String.valueOf(event) + ", lastError=" + String.valueOf(t);
                        this.logger.severe(errMsg);
                        ErrorReporting.report("EventQueueToEventProcessorAgent:" + this.name, errMsg, t, ErrorEvent.Severity.ERROR);
                        break;
                    }
                    this.retryPolicy.backoff(attempt);
                }
            }
            if (tracker == null) continue;
            try {
                tracker.returnToPool();
                continue;
            }
            catch (Throwable ignored) {
                this.logger.warning("unable to return to pool: " + String.valueOf(tracker));
            }
        }
        return processed;
    }

    public void onClose() {
        this.logger.info("onClose");
    }

    public String roleName() {
        return this.name;
    }

    public EventQueueToEventProcessorAgent withRetryPolicy(RetryPolicy retryPolicy) {
        if (retryPolicy != null) {
            this.retryPolicy = retryPolicy;
        }
        return this;
    }

    public EventQueueToEventProcessorAgent withUnsubscribeAction(Runnable unsubscribeAction) {
        this.unsubscribeAction = unsubscribeAction;
        return this;
    }

    @Override
    public int registerProcessor(StaticEventProcessor eventProcessor) {
        this.logger.info("registerProcessor: " + String.valueOf(eventProcessor));
        this.eventToInvokeStrategy.registerProcessor(eventProcessor);
        this.logger.info("listener count:" + this.listenerCount());
        return this.listenerCount();
    }

    @Override
    public int deregisterProcessor(StaticEventProcessor eventProcessor) {
        this.logger.info("deregisterProcessor: " + String.valueOf(eventProcessor));
        this.eventToInvokeStrategy.deregisterProcessor(eventProcessor);
        int listeners = this.listenerCount();
        if (listeners < 1 && this.unsubscribeAction != null) {
            try {
                this.unsubscribeAction.run();
            }
            catch (Throwable t) {
                this.logger.severe("error running unsubscribe action for agent=" + this.name + ": " + String.valueOf(t));
            }
        }
        return listeners;
    }

    @Override
    public int listenerCount() {
        return this.eventToInvokeStrategy.listenerCount();
    }

    private PoolTracker<?> trackerOf(Object event) {
        NamedFeedEvent nfe;
        Object data;
        if (event == null) {
            return null;
        }
        Object candidate = event;
        if (candidate instanceof ReplayRecord) {
            ReplayRecord rr = (ReplayRecord)candidate;
            candidate = rr.getEvent();
        }
        if (candidate instanceof BroadcastEvent) {
            BroadcastEvent be = (BroadcastEvent)candidate;
            candidate = be.getEvent();
        }
        if (candidate instanceof PoolAware) {
            PoolAware paDirect = (PoolAware)candidate;
            return paDirect.getPoolTracker();
        }
        if (candidate instanceof NamedFeedEvent && (data = (nfe = (NamedFeedEvent)candidate).data()) instanceof PoolAware) {
            PoolAware pa = (PoolAware)data;
            return pa.getPoolTracker();
        }
        return null;
    }
}

