/*
 * Decompiled with CFR 0.152.
 */
package reactor.bus;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.List;
import java.util.UUID;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.Environment;
import reactor.bus.Bus;
import reactor.bus.Event;
import reactor.bus.filter.PassThroughFilter;
import reactor.bus.publisher.BusPublisher;
import reactor.bus.registry.Registration;
import reactor.bus.registry.Registries;
import reactor.bus.registry.Registry;
import reactor.bus.routing.ConsumerFilteringRouter;
import reactor.bus.routing.Router;
import reactor.bus.selector.ClassSelector;
import reactor.bus.selector.Selector;
import reactor.bus.selector.Selectors;
import reactor.bus.spec.EventBusSpec;
import reactor.core.Dispatcher;
import reactor.core.dispatch.SynchronousDispatcher;
import reactor.core.support.Assert;
import reactor.core.support.UUIDUtils;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.fn.Supplier;
import reactor.fn.support.SingleUseConsumer;

public class EventBus
implements Bus<Event<?>>,
Consumer<Event<?>> {
    private static final Router DEFAULT_EVENT_ROUTER = new ConsumerFilteringRouter(new PassThroughFilter());
    private final Dispatcher dispatcher;
    private final Registry<Object, Consumer<? extends Event<?>>> consumerRegistry;
    private final Router router;
    private final Consumer<Throwable> dispatchErrorHandler;
    private final Consumer<Throwable> uncaughtErrorHandler;
    private volatile UUID id;

    public static EventBusSpec config() {
        return new EventBusSpec();
    }

    public static EventBus create() {
        return new EventBus((Dispatcher)SynchronousDispatcher.INSTANCE);
    }

    public static EventBus create(Environment env) {
        return (EventBus)((EventBusSpec)((EventBusSpec)new EventBusSpec().env(env)).dispatcher(env.getDefaultDispatcher())).get();
    }

    public static EventBus create(Dispatcher dispatcher) {
        return (EventBus)((EventBusSpec)new EventBusSpec().dispatcher(dispatcher)).get();
    }

    public static EventBus create(Environment env, String dispatcher) {
        return (EventBus)((EventBusSpec)((EventBusSpec)new EventBusSpec().env(env)).dispatcher(dispatcher)).get();
    }

    public static EventBus create(Environment env, Dispatcher dispatcher) {
        return (EventBus)((EventBusSpec)((EventBusSpec)new EventBusSpec().env(env)).dispatcher(dispatcher)).get();
    }

    public EventBus(@Nullable Dispatcher dispatcher) {
        this(dispatcher, null);
    }

    public EventBus(@Nullable Dispatcher dispatcher, @Nullable Router router) {
        this(dispatcher, router, null, null);
    }

    public EventBus(@Nullable Dispatcher dispatcher, @Nullable Router router, @Nullable Consumer<Throwable> dispatchErrorHandler, @Nullable Consumer<Throwable> uncaughtErrorHandler) {
        this(Registries.create(), dispatcher, router, dispatchErrorHandler, uncaughtErrorHandler);
    }

    public EventBus(@Nonnull Registry<Object, Consumer<? extends Event<?>>> consumerRegistry, @Nullable Dispatcher dispatcher, @Nullable Router router, @Nullable Consumer<Throwable> dispatchErrorHandler, final @Nullable Consumer<Throwable> uncaughtErrorHandler) {
        Assert.notNull(consumerRegistry, (String)"Consumer Registry cannot be null.");
        this.consumerRegistry = consumerRegistry;
        this.dispatcher = null == dispatcher ? SynchronousDispatcher.INSTANCE : dispatcher;
        this.router = null == router ? DEFAULT_EVENT_ROUTER : router;
        this.dispatchErrorHandler = null == dispatchErrorHandler ? new Consumer<Throwable>(){

            public void accept(Throwable t) {
                Class<?> type = t.getClass();
                EventBus.this.router.route(type, Event.wrap(t), EventBus.this.consumerRegistry.select(type), null, null);
            }
        } : dispatchErrorHandler;
        this.uncaughtErrorHandler = uncaughtErrorHandler;
        this.on((Selector)new ClassSelector(Throwable.class), (Consumer<T>)((Consumer)new Consumer<Event<Throwable>>(){
            Logger log;

            public void accept(Event<Throwable> ev) {
                if (null == uncaughtErrorHandler) {
                    if (null == this.log) {
                        this.log = LoggerFactory.getLogger(EventBus.class);
                    }
                    this.log.error(ev.getData().getMessage(), ev.getData());
                } else {
                    uncaughtErrorHandler.accept((Object)ev.getData());
                }
            }
        }));
    }

    public synchronized UUID getId() {
        if (null == this.id) {
            this.id = UUIDUtils.create();
        }
        return this.id;
    }

    public Registry<?, Consumer<? extends Event<?>>> getConsumerRegistry() {
        return this.consumerRegistry;
    }

    public Dispatcher getDispatcher() {
        return this.dispatcher;
    }

    public Router getRouter() {
        return this.router;
    }

    public Consumer<Throwable> getDispatchErrorHandler() {
        return this.dispatchErrorHandler;
    }

    public Consumer<Throwable> getUncaughtErrorHandler() {
        return this.uncaughtErrorHandler;
    }

    @Override
    public boolean respondsToKey(Object key) {
        List<Registration<Object, Consumer<Event<?>>>> registrations = this.consumerRegistry.select(key);
        if (registrations.isEmpty()) {
            return false;
        }
        for (Registration<Object, Consumer<Event<?>>> reg : registrations) {
            if (reg.isCancelled()) continue;
            return true;
        }
        return false;
    }

    @Override
    public <T extends Event<?>> Registration<Object, Consumer<? extends Event<?>>> on(final Selector selector, final Consumer<T> consumer) {
        Assert.notNull((Object)selector, (String)"Selector cannot be null.");
        Assert.notNull(consumer, (String)"Consumer cannot be null.");
        final Class<?> tClass = this.extractGeneric(consumer);
        Consumer proxyConsumer = new Consumer<T>(){

            public void accept(T e) {
                if (null != selector.getHeaderResolver()) {
                    ((Event)e).getHeaders().setAll(selector.getHeaderResolver().resolve(((Event)e).getKey()));
                }
                if (tClass == null || ((Event)e).getData() == null || tClass.isAssignableFrom(((Event)e).getData().getClass())) {
                    consumer.accept(e);
                }
            }
        };
        return this.consumerRegistry.register(selector, proxyConsumer);
    }

    private Class<?> extractGeneric(Consumer<? extends Event<?>> consumer) {
        if (consumer.getClass().getGenericInterfaces().length == 0) {
            return null;
        }
        Type t = consumer.getClass().getGenericInterfaces()[0];
        if (ParameterizedType.class.isAssignableFrom(t.getClass())) {
            ParameterizedType pt = (ParameterizedType)t;
            if (pt.getActualTypeArguments().length == 0) {
                return null;
            }
            t = pt.getActualTypeArguments()[0];
            if (ParameterizedType.class.isAssignableFrom(t.getClass())) {
                pt = (ParameterizedType)t;
                if (pt.getActualTypeArguments().length == 0) {
                    return null;
                }
                Type t1 = pt.getActualTypeArguments()[0];
                if (t1 instanceof ParameterizedType) {
                    return (Class)((ParameterizedType)t1).getRawType();
                }
                if (t1 instanceof Class) {
                    return (Class)t1;
                }
            }
        }
        return null;
    }

    public Publisher<? extends Event<?>> on(Selector broadcastSelector) {
        return new BusPublisher(this, broadcastSelector);
    }

    public EventBus notify(Object key, Event<?> ev) {
        Assert.notNull((Object)key, (String)"Key cannot be null.");
        Assert.notNull(ev, (String)"Event cannot be null.");
        ev.setKey(key);
        this.dispatcher.dispatch(ev, (Consumer)this, this.dispatchErrorHandler);
        return this;
    }

    public final EventBus notify(@Nonnull Publisher<?> source, final @Nonnull Object key) {
        return this.notify(source, new Function<Object, Object>(){

            public Object apply(Object o) {
                return key;
            }
        });
    }

    public final <T> EventBus notify(@Nonnull Publisher<? extends T> source, final @Nonnull Function<? super T, ?> keyMapper) {
        source.subscribe(new Subscriber<T>(){
            Subscription s;

            public void onSubscribe(Subscription s) {
                this.s = s;
                s.request(Long.MAX_VALUE);
            }

            public void onNext(T t) {
                EventBus.this.notify(keyMapper.apply(t), Event.wrap(t));
            }

            public void onError(Throwable t) {
                if (this.s != null) {
                    this.s.cancel();
                }
            }

            public void onComplete() {
                if (this.s != null) {
                    this.s.cancel();
                }
            }
        });
        return this;
    }

    public <T extends Event<?>, V> Registration<?, Consumer<? extends Event<?>>> receive(Selector sel, Function<T, V> fn) {
        return this.on(sel, new ReplyToConsumer(fn));
    }

    public EventBus notify(Object key, Supplier<? extends Event<?>> supplier) {
        return this.notify(key, (Event)supplier.get());
    }

    public EventBus notify(Object key) {
        return this.notify(key, new Event<Class<Void>>(Void.class));
    }

    public EventBus send(Object key, Event<?> ev) {
        return this.notify(key, new ReplyToEvent(ev, this));
    }

    public EventBus send(Object key, Supplier<? extends Event<?>> supplier) {
        return this.notify(key, new ReplyToEvent((Event)supplier.get(), this));
    }

    public EventBus send(Object key, Event<?> ev, Bus replyTo) {
        return this.notify(key, new ReplyToEvent(ev, replyTo));
    }

    public EventBus send(Object key, Supplier<? extends Event<?>> supplier, Bus replyTo) {
        return this.notify(key, new ReplyToEvent((Event)supplier.get(), replyTo));
    }

    public <T extends Event<?>> EventBus sendAndReceive(Object key, Event<?> event, Consumer<T> reply) {
        Selector sel = Selectors.anonymous();
        this.on(sel, (Consumer<T>)new SingleUseConsumer(reply)).cancelAfterUse();
        this.notify(key, event.setReplyTo(sel.getObject()));
        return this;
    }

    public <T extends Event<?>> EventBus sendAndReceive(Object key, Supplier<? extends Event<?>> supplier, Consumer<T> reply) {
        return this.sendAndReceive(key, (Event)supplier.get(), reply);
    }

    public <T> Consumer<Event<T>> prepare(final Object key) {
        return new Consumer<Event<T>>(){
            final List<Registration<Object, ? extends Consumer<? extends Event<?>>>> regs;
            final int size;
            {
                this.regs = EventBus.this.consumerRegistry.select(key);
                this.size = this.regs.size();
            }

            public void accept(Event<T> ev) {
                for (int i = 0; i < this.size; ++i) {
                    Registration<Object, Consumer<Event<?>>> reg = this.regs.get(i);
                    ev.setKey(key);
                    EventBus.this.dispatcher.dispatch(ev, reg.getObject(), EventBus.this.dispatchErrorHandler);
                }
            }
        };
    }

    public <T> void schedule(final Consumer<T> consumer, final T data) {
        this.dispatcher.dispatch(null, new Consumer<Event<?>>(){

            public void accept(Event<?> event) {
                consumer.accept(data);
            }
        }, this.dispatchErrorHandler);
    }

    public void accept(Event<?> event) {
        this.router.route(event.getKey(), event, this.consumerRegistry.select(event.getKey()), null, this.dispatchErrorHandler);
    }

    public class ReplyToConsumer<E extends Event<?>, V>
    implements Consumer<E> {
        private final Function<E, V> fn;

        private ReplyToConsumer(Function<E, V> fn) {
            this.fn = fn;
        }

        public void accept(E ev) {
            Bus o;
            Bus<Event<?>> replyToObservable = EventBus.this;
            if (ReplyToEvent.class.isAssignableFrom(ev.getClass()) && null != (o = ((ReplyToEvent)ev).getReplyToObservable())) {
                replyToObservable = o;
            }
            try {
                Object reply = this.fn.apply(ev);
                Event<Object> replyEv = null == reply ? new Event<Class<Void>>(Void.class) : (Event.class.isAssignableFrom(reply.getClass()) ? (Event<Object>)reply : Event.wrap(reply));
                replyToObservable.notify(((Event)ev).getReplyTo(), replyEv);
            }
            catch (Throwable x) {
                replyToObservable.notify(x.getClass(), Event.wrap(x));
            }
        }

        public Function<E, V> getDelegate() {
            return this.fn;
        }
    }

    public static class ReplyToEvent<T>
    extends Event<T> {
        private static final long serialVersionUID = 1937884784799135647L;
        private final Bus replyToObservable;

        private ReplyToEvent(Event.Headers headers, T data, Object replyTo, Bus replyToObservable, Consumer<Throwable> errorConsumer) {
            super(headers, data, errorConsumer);
            this.setReplyTo(replyTo);
            this.replyToObservable = replyToObservable;
        }

        private ReplyToEvent(Event<T> delegate, Bus replyToObservable) {
            this(delegate.getHeaders(), delegate.getData(), delegate.getReplyTo(), replyToObservable, delegate.getErrorConsumer());
        }

        @Override
        public <X> Event<X> copy(X data) {
            return new ReplyToEvent<X>(this.getHeaders(), data, this.getReplyTo(), this.replyToObservable, this.getErrorConsumer());
        }

        public Bus getReplyToObservable() {
            return this.replyToObservable;
        }
    }
}

