/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.streaming;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.streaming.Cursor;
import org.mule.runtime.api.streaming.CursorProvider;
import org.mule.runtime.api.streaming.bytes.CursorStreamProvider;
import org.mule.runtime.api.streaming.object.CursorIteratorProvider;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.internal.streaming.CursorContext;
import org.mule.runtime.core.internal.streaming.MutableStreamingStatistics;
import org.mule.runtime.core.internal.streaming.bytes.ManagedCursorStreamProvider;
import org.mule.runtime.core.internal.streaming.object.ManagedCursorIteratorProvider;
import org.mule.runtime.core.privileged.event.BaseEventContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CursorManager {
    private static Logger LOGGER = LoggerFactory.getLogger(CursorManager.class);
    private final LoadingCache<String, EventStreamingState> registry = CacheBuilder.newBuilder().removalListener(notification -> EventStreamingState.access$700((EventStreamingState)notification.getValue())).build((CacheLoader)new CacheLoader<String, EventStreamingState>(){

        public EventStreamingState load(String key) throws Exception {
            return new EventStreamingState();
        }
    });
    private final MutableStreamingStatistics statistics;
    private final Scheduler disposalScheduler;

    public CursorManager(MutableStreamingStatistics statistics, Scheduler disposalScheduler) {
        this.statistics = statistics;
        this.disposalScheduler = disposalScheduler;
    }

    public CursorProvider manage(CursorProvider provider, CoreEvent creatorEvent) {
        BaseEventContext ownerContext = ((BaseEventContext)creatorEvent.getContext()).getRootContext();
        this.registerEventContext(ownerContext);
        ((EventStreamingState)this.registry.getUnchecked((Object)ownerContext.getId())).addProvider(provider);
        CursorContext context = new CursorContext(provider, ownerContext);
        if (provider instanceof CursorStreamProvider) {
            return new ManagedCursorStreamProvider(context, this);
        }
        if (provider instanceof CursorIteratorProvider) {
            return new ManagedCursorIteratorProvider(context, this);
        }
        throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage("Unknown cursor provider type: " + context.getClass().getName()));
    }

    public void onOpen(Cursor cursor, CursorContext providerHandle) {
        ((EventStreamingState)this.registry.getUnchecked((Object)providerHandle.getOwnerContext().getId())).addCursor(providerHandle.getCursorProvider(), cursor);
        this.statistics.incrementOpenCursors();
    }

    public void onClose(Cursor cursor, CursorContext handle) {
        String eventId = handle.getOwnerContext().getId();
        EventStreamingState state = (EventStreamingState)this.registry.getIfPresent((Object)eventId);
        if (state != null && state.removeCursor(handle.getCursorProvider(), cursor)) {
            this.registry.invalidate((Object)eventId);
        }
    }

    private void terminated(BaseEventContext rootContext) {
        EventStreamingState state = (EventStreamingState)this.registry.getIfPresent((Object)rootContext.getId());
        if (state != null) {
            this.registry.invalidate((Object)rootContext.getId());
        }
    }

    private void registerEventContext(BaseEventContext eventContext) {
        eventContext.onTerminated((response, throwable) -> this.terminated(eventContext));
    }

    private class EventStreamingState {
        private AtomicBoolean disposed = new AtomicBoolean(false);
        private AtomicInteger cursorCount = new AtomicInteger(0);
        private final LoadingCache<CursorProvider, Set<Cursor>> cursors = CacheBuilder.newBuilder().removalListener(notification -> {
            try {
                this.closeProvider((CursorProvider)notification.getKey());
                this.releaseAll((Collection)notification.getValue());
            }
            finally {
                ((CursorProvider)notification.getKey()).releaseResources();
            }
        }).build((CacheLoader)new CacheLoader<CursorProvider, Set<Cursor>>(){

            public Set<Cursor> load(CursorProvider key) throws Exception {
                CursorManager.this.statistics.incrementOpenProviders();
                return Collections.newSetFromMap(new ConcurrentHashMap());
            }
        });

        private EventStreamingState() {
        }

        private synchronized void addProvider(CursorProvider adapter) {
            this.cursors.getUnchecked((Object)adapter);
        }

        private void addCursor(CursorProvider provider, Cursor cursor) {
            ((Set)this.cursors.getUnchecked((Object)provider)).add(cursor);
            this.cursorCount.incrementAndGet();
        }

        private boolean removeCursor(CursorProvider provider, Cursor cursor) {
            Set openCursors = (Set)this.cursors.getUnchecked((Object)provider);
            if (openCursors.remove(cursor)) {
                CursorManager.this.statistics.decrementOpenCursors();
                if (this.cursorCount.decrementAndGet() <= 0 && provider.isClosed()) {
                    this.dispose();
                    return true;
                }
            }
            return false;
        }

        private void dispose() {
            if (this.disposed.compareAndSet(false, true)) {
                try {
                    CursorManager.this.disposalScheduler.execute(() -> this.cursors.invalidateAll());
                }
                catch (RejectedExecutionException e) {
                    this.cursors.invalidateAll();
                }
            }
        }

        private void releaseAll(Collection<Cursor> cursors) {
            cursors.forEach(cursor -> {
                try {
                    cursor.release();
                    CursorManager.this.statistics.decrementOpenCursors();
                }
                catch (Exception e) {
                    LOGGER.warn("Exception was found trying to close cursor. Execution will continue", (Throwable)e);
                }
            });
        }

        private void closeProvider(CursorProvider provider) {
            if (!provider.isClosed()) {
                provider.close();
                CursorManager.this.statistics.decrementOpenProviders();
            }
        }
    }
}

