/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.mantis.remote.observable;

import io.mantisrx.common.network.WritableEndpoint;
import io.reactivex.mantis.remote.observable.MutableReference;
import io.reactivex.mantis.remote.observable.RemoteRxEvent;
import io.reactivex.mantis.remote.observable.RxMetrics;
import io.reactivex.mantis.remote.observable.SafeWriter;
import io.reactivex.mantis.remote.observable.slotting.SlottingStrategy;
import java.util.List;
import mantis.io.reactivex.netty.channel.ObservableConnection;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;

public class WriteBytesObserver<T>
extends SafeWriter
implements Action1<List<RemoteRxEvent>> {
    private final ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> connection;
    private final MutableReference<Subscription> subReference;
    private final RxMetrics serverMetrics;
    private final SlottingStrategy<T> slottingStrategy;
    private final WritableEndpoint<T> endpoint;

    public WriteBytesObserver(ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> connection, MutableReference<Subscription> subReference, RxMetrics serverMetrics, SlottingStrategy<T> slottingStrategy, WritableEndpoint<T> endpoint) {
        this.connection = connection;
        this.subReference = subReference;
        this.serverMetrics = serverMetrics;
        this.slottingStrategy = slottingStrategy;
        this.endpoint = endpoint;
    }

    public void call(final List<RemoteRxEvent> events) {
        if (!this.safeWrite(this.connection, events, this.subReference, new Action0(){

            public void call() {
                WriteBytesObserver.this.serverMetrics.incrementNextCount(events.size());
            }
        }, new Action1<Throwable>(){

            public void call(Throwable t1) {
                WriteBytesObserver.this.serverMetrics.incrementNextFailureCount(events.size());
                SafeWriter.logger.warn("Failed to write onNext event to remote observable: " + WriteBytesObserver.this.endpoint + " at address: " + WriteBytesObserver.this.connection.getChannel().remoteAddress() + " reason: " + t1.getMessage() + " force unsubscribe", t1);
                ((Subscription)WriteBytesObserver.this.subReference.getValue()).unsubscribe();
            }
        }, this.slottingStrategy, this.endpoint)) {
            if (this.connection.isCloseIssued()) {
                this.slottingStrategy.removeConnection(this.endpoint);
            }
            this.serverMetrics.incrementNextFailureCount();
        }
    }
}

