/*
 * Decompiled with CFR 0.152.
 */
package com.salesforce.reactivegrpc.common;

import com.salesforce.reactivegrpc.common.ReactiveStreamObserverPublisherBase;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.ServerCallStreamObserver;
import org.reactivestreams.Subscriber;

public class ReactiveStreamObserverPublisherServer<T>
extends ReactiveStreamObserverPublisherBase<T> {
    private ServerCallStreamObserver callStreamObserver;
    private volatile boolean abandonDelayedCancel;

    public ReactiveStreamObserverPublisherServer(ServerCallStreamObserver callStreamObserver) {
        super((CallStreamObserver)callStreamObserver);
        this.callStreamObserver = callStreamObserver;
    }

    @Override
    protected ReactiveStreamObserverPublisherBase.ReactiveStreamObserverPublisherSubscriptionBase createSubscription() {
        return new ReactiveStreamObserverPublisherBase.ReactiveStreamObserverPublisherSubscriptionBase(){

            public void cancel() {
                if (ReactiveStreamObserverPublisherServer.this.callStreamObserver.isCancelled()) {
                    return;
                }
                new Thread(){
                    private final int WAIT_FOR_ERROR_DELAY_MILLS = 100;

                    @Override
                    public void run() {
                        try {
                            Thread.sleep(100L);
                            if (!ReactiveStreamObserverPublisherServer.this.abandonDelayedCancel) {
                                ReactiveStreamObserverPublisherServer.super.cancel();
                                ReactiveStreamObserverPublisherServer.this.callStreamObserver.onError((Throwable)Status.CANCELLED.withDescription("Server canceled request").asRuntimeException());
                                ReactiveStreamObserverPublisherServer.super.freeSubscriber();
                                ReactiveStreamObserverPublisherServer.this.callStreamObserver = null;
                            }
                        }
                        catch (Throwable throwable) {
                            // empty catch block
                        }
                    }
                }.start();
            }
        };
    }

    @Override
    public void subscribe(Subscriber<? super T> subscriber) {
        super.subscribe(subscriber);
    }

    @Override
    public void onNext(T value) {
        super.onNext(value);
    }

    @Override
    public void onError(Throwable throwable) {
        if (throwable instanceof StatusRuntimeException && throwable.getMessage().contains("cancelled before receiving half close")) {
            return;
        }
        super.onError(throwable);
    }

    @Override
    public void onCompleted() {
        super.onCompleted();
    }

    public void abortPendingCancel() {
        this.abandonDelayedCancel = true;
    }
}

