/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.eureka2.transport.base;

import com.eureka2.shading.reactivex.netty.channel.ObservableConnection;
import com.netflix.eureka2.metric.MessageConnectionMetrics;
import com.netflix.eureka2.transport.Acknowledgement;
import com.netflix.eureka2.transport.MessageConnection;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subjects.AsyncSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;

public class BaseMessageConnection
implements MessageConnection {
    private static final Logger logger = LoggerFactory.getLogger(BaseMessageConnection.class);
    private static final Pattern NETTY_CHANNEL_NAME_RE = Pattern.compile("\\[.*=>\\s*(.*)\\]");
    private final String name;
    private final ObservableConnection<Object, Object> connection;
    private final MessageConnectionMetrics metrics;
    private final Scheduler.Worker schedulerWorker;
    private final long startTime;
    private final AtomicBoolean closed = new AtomicBoolean();
    private final Subject<Void, Void> lifecycleSubject = new SerializedSubject((Subject)AsyncSubject.create());
    private final Queue<PendingAck> pendingAckQueue = new ConcurrentLinkedQueue<PendingAck>();
    private final Action0 ackTimeoutTask = new Action0(){

        public void call() {
            try {
                long currentTime = BaseMessageConnection.this.schedulerWorker.now();
                PendingAck latestAck = (PendingAck)((Object)BaseMessageConnection.this.pendingAckQueue.peek());
                if (latestAck != null && latestAck.getExpiryTime() <= currentTime) {
                    latestAck = (PendingAck)((Object)BaseMessageConnection.this.pendingAckQueue.peek());
                    TimeoutException timeoutException = new TimeoutException("{connection=" + BaseMessageConnection.this.name + "}: acknowledgement timeout");
                    if (latestAck != null) {
                        latestAck.onError(timeoutException);
                    }
                    BaseMessageConnection.this.doShutdown(timeoutException);
                } else {
                    BaseMessageConnection.this.schedulerWorker.schedule(BaseMessageConnection.this.ackTimeoutTask, 1L, TimeUnit.SECONDS);
                }
            }
            catch (Exception e) {
                logger.error("Acknowledgement cleanup task failed with an exception", (Throwable)e);
                BaseMessageConnection.this.doShutdown(e);
            }
        }
    };

    public BaseMessageConnection(String name, ObservableConnection<Object, Object> connection, MessageConnectionMetrics metrics) {
        this(name, connection, metrics, Schedulers.computation());
    }

    public BaseMessageConnection(String name, ObservableConnection<Object, Object> connection, MessageConnectionMetrics metrics, Scheduler expiryScheduler) {
        this.connection = connection;
        this.metrics = metrics;
        this.name = this.descriptiveName(name);
        this.schedulerWorker = expiryScheduler.createWorker();
        this.installAcknowledgementHandler();
        this.startTime = expiryScheduler.now();
        metrics.incrementConnectionCounter();
    }

    private String descriptiveName(String name) {
        String endpointName = this.connection.getChannel().toString();
        Matcher matcher = NETTY_CHANNEL_NAME_RE.matcher(endpointName);
        if (matcher.matches()) {
            endpointName = matcher.group(1);
        }
        return name + "=>" + endpointName;
    }

    private void installAcknowledgementHandler() {
        this.connection.getInput().ofType(Acknowledgement.class).subscribe((Action1)new Action1<Acknowledgement>(){

            public void call(Acknowledgement acknowledgement) {
                PendingAck pending = (PendingAck)((Object)BaseMessageConnection.this.pendingAckQueue.poll());
                BaseMessageConnection.this.metrics.decrementPendingAckCounter();
                if (pending == null) {
                    BaseMessageConnection.this.shutdown(new IllegalStateException("{connection=" + BaseMessageConnection.this.name + "}: unexpected acknowledgment"));
                } else {
                    pending.ackSubject.onCompleted();
                }
            }
        });
        this.schedulerWorker.schedule(this.ackTimeoutTask, 1L, TimeUnit.SECONDS);
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    public Observable<Void> submit(Object message) {
        if (this.closed.get()) {
            return Observable.error((Throwable)new IllegalStateException("{connection=" + this.name + "}: connection closed"));
        }
        return this.writeWhenSubscribed(message);
    }

    @Override
    public Observable<Void> submitWithAck(Object message) {
        return this.submitWithAck(message, 0L);
    }

    @Override
    public Observable<Void> submitWithAck(Object message, long timeout) {
        if (this.closed.get()) {
            return Observable.error((Throwable)new IllegalStateException("{connection=" + this.name + "}: connection closed"));
        }
        long expiryTime = timeout <= 0L ? Long.MAX_VALUE : this.schedulerWorker.now() + timeout;
        return this.writeWhenSubscribed(message, PendingAck.create(expiryTime));
    }

    @Override
    public Observable<Void> acknowledge() {
        if (this.closed.get()) {
            return Observable.error((Throwable)new IllegalStateException("{connection=" + this.name + "}: connection closed"));
        }
        return this.writeWhenSubscribed(Acknowledgement.INSTANCE);
    }

    @Override
    public Observable<Object> incoming() {
        return this.connection.getInput().filter((Func1)new Func1<Object, Boolean>(){

            public Boolean call(Object message) {
                return !(message instanceof Acknowledgement);
            }
        }).doOnNext((Action1)new Action1<Object>(){

            public void call(Object o) {
                BaseMessageConnection.this.metrics.incrementIncomingMessageCounter(o.getClass(), 1);
            }
        }).doOnTerminate(new Action0(){

            public void call() {
                BaseMessageConnection.this.shutdown(new IllegalStateException("{connection=" + BaseMessageConnection.this.name + "}: connection input onCompleted"));
            }
        });
    }

    @Override
    public Observable<Void> onError(Throwable error) {
        return Observable.error((Throwable)error);
    }

    @Override
    public Observable<Void> onCompleted() {
        return Observable.empty();
    }

    @Override
    public void shutdown() {
        this.doShutdown(null);
    }

    @Override
    public void shutdown(Throwable e) {
        this.doShutdown(e);
    }

    private void doShutdown(Throwable throwable) {
        boolean wasClosed = this.closed.getAndSet(true);
        if (!wasClosed) {
            if (throwable == null) {
                logger.info("Shutting down connection {}", (Object)this.name);
            } else {
                logger.info("Shutting down connection {} because of an exception {}:{}", new Object[]{this.name, throwable.getClass().getName(), throwable.getMessage()});
            }
            this.drainPendingAckQueue();
            this.metrics.decrementConnectionCounter();
            this.metrics.connectionDuration(this.startTime);
            this.terminateLifecycle(throwable);
            this.connection.close().subscribe((Subscriber)new Subscriber<Void>(){

                public void onCompleted() {
                }

                public void onError(Throwable e) {
                    logger.debug("Error during closing the connection", e);
                }

                public void onNext(Void aVoid) {
                }
            });
            this.schedulerWorker.unsubscribe();
        }
    }

    private void drainPendingAckQueue() {
        PendingAck pendingAck;
        while ((pendingAck = this.pendingAckQueue.poll()) != null) {
            this.metrics.decrementPendingAckCounter();
            try {
                pendingAck.onCompleted();
            }
            catch (Exception e) {
                logger.warn("Acknowledgement subscriber hasn't handled properly onError", (Throwable)e);
            }
        }
    }

    private void terminateLifecycle(Throwable e) {
        if (e == null) {
            this.lifecycleSubject.onCompleted();
        } else {
            this.lifecycleSubject.onError(e);
        }
    }

    @Override
    public Observable<Void> lifecycleObservable() {
        return this.lifecycleSubject;
    }

    private Observable<Void> writeWhenSubscribed(final Object message) {
        return this.connection.writeAndFlush(message).doOnCompleted(new Action0(){

            public void call() {
                BaseMessageConnection.this.metrics.incrementOutgoingMessageCounter(message.getClass(), 1);
            }
        }).cache();
    }

    private Observable<Void> writeWhenSubscribed(final Object message, final PendingAck ack) {
        return this.connection.writeAndFlush(message).doOnCompleted(new Action0(){

            public void call() {
                BaseMessageConnection.this.pendingAckQueue.add(ack);
                BaseMessageConnection.this.metrics.incrementPendingAckCounter();
                BaseMessageConnection.this.metrics.incrementOutgoingMessageCounter(message.getClass(), 1);
                if (BaseMessageConnection.this.closed.get()) {
                    BaseMessageConnection.this.drainPendingAckQueue();
                }
            }
        }).concatWith((Observable)ack).cache();
    }

    static class PendingAck
    extends Subject<Void, Void> {
        private final long expiryTime;
        private final Subject<Void, Void> ackSubject;
        private final AtomicBoolean isCompleted = new AtomicBoolean();

        PendingAck(Observable.OnSubscribe<Void> onSubscribe, Subject<Void, Void> ackSubject, long expiryTime) {
            super(onSubscribe);
            this.ackSubject = ackSubject;
            this.expiryTime = expiryTime;
        }

        public long getExpiryTime() {
            return this.expiryTime;
        }

        public boolean hasObservers() {
            return this.ackSubject.hasObservers();
        }

        public void onCompleted() {
            if (this.isCompleted.compareAndSet(false, true)) {
                this.ackSubject.onCompleted();
            }
        }

        public void onError(Throwable e) {
            if (this.isCompleted.compareAndSet(false, true)) {
                this.ackSubject.onError(e);
            }
        }

        public void onNext(Void aVoid) {
        }

        static PendingAck create(long expiryTime) {
            AsyncSubject ackSubject = AsyncSubject.create();
            Observable.OnSubscribe<Void> onSubscribe = new Observable.OnSubscribe<Void>((Subject)ackSubject){
                final /* synthetic */ Subject val$ackSubject;
                {
                    this.val$ackSubject = subject;
                }

                public void call(Subscriber<? super Void> subscriber) {
                    this.val$ackSubject.subscribe(subscriber);
                }
            };
            return new PendingAck(onSubscribe, (Subject<Void, Void>)ackSubject, expiryTime);
        }
    }
}

