/*
 * Decompiled with CFR 0.152.
 */
package com.caucho.env.actor2;

import com.caucho.env.actor.ActorProcessor;
import com.caucho.env.actor.ActorQueueApi;
import com.caucho.env.actor2.QueueRing;
import com.caucho.env.actor2.QueueRingFixed;
import com.caucho.env.thread.AbstractTaskWorker;
import com.caucho.util.L10N;
import java.util.concurrent.TimeUnit;

public class ActorQueue2MultiWorker<T>
implements ActorQueueApi<T> {
    private static final L10N L = new L10N(ActorQueue2MultiWorker.class);
    private final QueueRing<T> _actorQueue;
    private final ActorWorker<T>[] _workers;

    public ActorQueue2MultiWorker(int capacity, int offset, ActorProcessor<? super T> ... processors) {
        if (processors == null || processors.length == 0) {
            throw new NullPointerException();
        }
        this._actorQueue = new QueueRingFixed<T>(capacity);
        this._workers = new ActorWorker[processors.length];
        for (int i = 0; i < processors.length; ++i) {
            this._workers[i] = new ActorWorker<T>(this._actorQueue, processors[i]);
        }
    }

    @Override
    public int getAvailable() {
        return this._actorQueue.remainingCapacity();
    }

    @Override
    public boolean isEmpty() {
        return this._actorQueue.isEmpty();
    }

    @Override
    public int getSize() {
        return this._actorQueue.size();
    }

    @Override
    public final void offer(T value) {
        this.offer(value, true);
    }

    @Override
    public final boolean offer(T value, boolean isWait) {
        QueueRing<T> queue = this._actorQueue;
        if (!queue.offer(value, 0L, TimeUnit.SECONDS)) {
            if (!isWait) {
                return false;
            }
            this.wake();
            if (!queue.offer(value, 5L, TimeUnit.MINUTES)) {
                throw new IllegalStateException(L.l("offer timeout {0} {1}", (Object)this, (Object)value));
            }
        }
        this.wake();
        return true;
    }

    public String getWorkerState() {
        return this._workers[0].getState();
    }

    @Override
    public void wake() {
        int len = Math.min(this._workers.length, this._actorQueue.size());
        for (int i = 0; i < len; ++i) {
            this._workers[i].wake();
        }
    }

    public void close() {
    }

    public String toString() {
        return this.getClass().getSimpleName() + "[" + this._actorQueue + "]";
    }

    private static class ActorWorker<T>
    extends AbstractTaskWorker {
        private final ActorProcessor<? super T> _processor;
        private final QueueRing<T> _queue;

        ActorWorker(QueueRing<T> queue, ActorProcessor<? super T> processor) {
            this._queue = queue;
            this._processor = processor;
        }

        @Override
        protected String getThreadName() {
            return this._processor.getThreadName();
        }

        @Override
        protected boolean isRetry() {
            return !this._queue.isEmpty();
        }

        @Override
        public final long runTask() {
            try {
                try {
                    Object item;
                    this._processor.onProcessStart();
                    while ((item = this._queue.poll()) != null) {
                        this._processor.process(item);
                    }
                }
                finally {
                    this._processor.onProcessComplete();
                }
            }
            catch (RuntimeException e) {
                throw e;
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            return 0L;
        }

        @Override
        public String toString() {
            return this.getClass().getSimpleName() + "[" + this._processor + "]";
        }
    }
}

