/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.amqp.implementation.handler;

import com.azure.core.amqp.implementation.AmqpLoggingUtils;
import com.azure.core.util.logging.ClientLogger;
import java.io.Closeable;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.EndpointState;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

public abstract class Handler
extends BaseHandler
implements Closeable {
    private final AtomicBoolean isTerminal = new AtomicBoolean();
    private final Sinks.Many<EndpointState> endpointStates = Sinks.many().replay().latestOrDefault((Object)EndpointState.UNINITIALIZED);
    private final String connectionId;
    private final String hostname;
    final ClientLogger logger;

    Handler(String connectionId, String hostname) {
        this.connectionId = Objects.requireNonNull(connectionId, "'connectionId' cannot be null.");
        this.hostname = Objects.requireNonNull(hostname, "'hostname' cannot be null.");
        this.logger = new ClientLogger(this.getClass(), AmqpLoggingUtils.createContextWithConnectionId(connectionId));
    }

    public String getConnectionId() {
        return this.connectionId;
    }

    public String getHostname() {
        return this.hostname;
    }

    public Flux<EndpointState> getEndpointStates() {
        return this.endpointStates.asFlux().distinctUntilChanged();
    }

    void onNext(EndpointState state) {
        if (this.isTerminal.get()) {
            return;
        }
        this.endpointStates.emitNext((Object)state, (signalType, emitResult) -> {
            AmqpLoggingUtils.addSignalTypeAndResult(this.logger.atVerbose(), signalType, emitResult).log("could not emit endpoint state.");
            return false;
        });
    }

    void onError(Throwable error) {
        if (this.isTerminal.getAndSet(true)) {
            return;
        }
        this.endpointStates.emitError(error, (signalType, emitResult) -> {
            AmqpLoggingUtils.addSignalTypeAndResult(this.logger.atWarning(), signalType, emitResult).log("could not emit error.", new Object[]{error});
            return false;
        });
    }

    @Override
    public void close() {
        if (this.isTerminal.getAndSet(true)) {
            return;
        }
        this.endpointStates.emitNext((Object)EndpointState.CLOSED, (signalType, emitResult) -> {
            AmqpLoggingUtils.addSignalTypeAndResult(this.logger.atInfo(), signalType, emitResult).log("Could not emit closed endpoint state.");
            return false;
        });
        this.endpointStates.emitComplete((signalType, emitResult) -> {
            AmqpLoggingUtils.addSignalTypeAndResult(this.logger.atVerbose(), signalType, emitResult).log("Could not emit complete.");
            return false;
        });
    }
}

