/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.awssdk.eventstreamrpc;

import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import software.amazon.awssdk.crt.eventstream.ClientConnectionContinuation;
import software.amazon.awssdk.crt.eventstream.ClientConnectionContinuationHandler;
import software.amazon.awssdk.crt.eventstream.Header;
import software.amazon.awssdk.crt.eventstream.HeaderType;
import software.amazon.awssdk.crt.eventstream.MessageFlags;
import software.amazon.awssdk.crt.eventstream.MessageType;
import software.amazon.awssdk.eventstreamrpc.DeserializationException;
import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection;
import software.amazon.awssdk.eventstreamrpc.InvalidDataException;
import software.amazon.awssdk.eventstreamrpc.OperationModelContext;
import software.amazon.awssdk.eventstreamrpc.OperationResponse;
import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler;
import software.amazon.awssdk.eventstreamrpc.UnmappedDataException;
import software.amazon.awssdk.eventstreamrpc.model.EventStreamJsonMessage;
import software.amazon.awssdk.eventstreamrpc.model.EventStreamOperationError;

public class EventStreamRPCClient {
    private static final Logger LOGGER = Logger.getLogger(EventStreamRPCClient.class.getName());
    private final EventStreamRPCConnection connection;

    public EventStreamRPCClient(EventStreamRPCConnection connection) {
        if (connection == null) {
            throw new IllegalArgumentException("Cannot create eventstream RPC client with null connection");
        }
        this.connection = connection;
    }

    public <ReqType extends EventStreamJsonMessage, RespType extends EventStreamJsonMessage, StrReqType extends EventStreamJsonMessage, StrRespType extends EventStreamJsonMessage> OperationResponse<RespType, StrReqType> doOperationInvoke(final OperationModelContext<ReqType, RespType, StrReqType, StrRespType> operationModelContext, ReqType request, final Optional<StreamResponseHandler<StrRespType>> streamResponseHandler) {
        if (operationModelContext.isStreamingOperation() && !streamResponseHandler.isPresent()) {
            throw new IllegalArgumentException(operationModelContext.getOperationName() + " is a streaming operation. Must have a streaming response handler!");
        }
        final CompletableFuture responseFuture = new CompletableFuture();
        final AtomicBoolean isContinuationClosed = new AtomicBoolean(true);
        ClientConnectionContinuation continuation = this.connection.newStream(new ClientConnectionContinuationHandler(){
            boolean initialResponseReceived = false;

            protected void onContinuationMessage(List<Header> headers, byte[] payload, MessageType messageType, int messageFlags) {
                Optional<String> applicationModelType = headers.stream().filter(header -> header.getName().equals("service-model-type") && header.getHeaderType().equals((Object)HeaderType.String)).map(header -> header.getValueAsString()).findFirst();
                if (messageType.equals((Object)MessageType.ApplicationMessage)) {
                    if (applicationModelType.isPresent()) {
                        EventStreamRPCClient.this.handleData(applicationModelType.get(), payload, !this.initialResponseReceived, responseFuture, streamResponseHandler, operationModelContext, this.continuation, isContinuationClosed);
                    }
                    if ((messageFlags & MessageFlags.TerminateStream.getByteValue()) != 0) {
                        this.close();
                        EventStreamRPCClient.this.handleClose(this.initialResponseReceived, responseFuture, streamResponseHandler);
                    } else if (!applicationModelType.isPresent()) {
                        EventStreamRPCClient.this.handleError(new UnmappedDataException(this.initialResponseReceived ? operationModelContext.getResponseApplicationModelType() : operationModelContext.getStreamingResponseApplicationModelType().get()), this.initialResponseReceived, responseFuture, streamResponseHandler, this.continuation, isContinuationClosed);
                    }
                    this.initialResponseReceived = true;
                } else if (messageType.equals((Object)MessageType.ApplicationError)) {
                    Optional<Class<? extends EventStreamJsonMessage>> errorClass = operationModelContext.getServiceModel().getApplicationModelClass(applicationModelType.orElse(""));
                    if (!errorClass.isPresent()) {
                        LOGGER.severe(String.format("Could not map error from service. Incoming error type: " + applicationModelType.orElse("null"), new Object[0]));
                        EventStreamRPCClient.this.handleError(new UnmappedDataException(applicationModelType.orElse("null")), !this.initialResponseReceived, responseFuture, streamResponseHandler, this.continuation, isContinuationClosed);
                    } else {
                        try {
                            EventStreamOperationError error = (EventStreamOperationError)operationModelContext.getServiceModel().fromJson(errorClass.get(), payload);
                            EventStreamRPCClient.this.handleError(error, !this.initialResponseReceived, responseFuture, streamResponseHandler, this.continuation, isContinuationClosed);
                        }
                        catch (Exception error) {
                            // empty catch block
                        }
                    }
                    if ((messageFlags & MessageFlags.TerminateStream.getByteValue()) != 0) {
                        try {
                            this.close();
                            EventStreamRPCClient.this.handleClose(this.initialResponseReceived, responseFuture, streamResponseHandler);
                        }
                        catch (Exception e) {
                            LOGGER.warning(String.format("Exception thrown closing stream on application error received %s: %s", e.getClass().getName(), e.getMessage()));
                        }
                    }
                } else if (messageType == MessageType.Ping) {
                    this.continuation.sendMessage(headers, payload, MessageType.PingResponse, messageFlags);
                } else if (messageType != MessageType.PingResponse) {
                    if (messageType == MessageType.ServerError) {
                        LOGGER.severe(operationModelContext.getOperationName() + " server error received");
                        this.close();
                    } else if (messageType == MessageType.ProtocolError) {
                        LOGGER.severe(operationModelContext.getOperationName() + " protocol error received");
                        this.close();
                    } else {
                        EventStreamRPCClient.this.handleError(new InvalidDataException(messageType), !this.initialResponseReceived, responseFuture, streamResponseHandler, this.continuation, isContinuationClosed);
                        try {
                            EventStreamRPCClient.this.sendClose(this.continuation, isContinuationClosed).whenComplete((res, ex) -> {
                                if (ex != null) {
                                    LOGGER.warning(String.format("Sending close on invalid message threw %s: %s", ex.getClass().getCanonicalName(), ex.getMessage()));
                                }
                            });
                        }
                        catch (Exception e) {
                            LOGGER.warning(String.format("Sending close on invalid message threw %s: %s", e.getClass().getCanonicalName(), e.getMessage()));
                        }
                    }
                }
            }

            protected void onContinuationClosed() {
                super.onContinuationClosed();
                EventStreamRPCClient.this.handleClose(this.initialResponseReceived, responseFuture, streamResponseHandler);
            }
        });
        isContinuationClosed.compareAndSet(false, true);
        LinkedList<Header> headers = new LinkedList<Header>();
        headers.add(Header.createHeader((String)":content-type", (String)"application/json"));
        headers.add(Header.createHeader((String)"service-model-type", (String)operationModelContext.getRequestApplicationModelType()));
        byte[] payload = operationModelContext.getServiceModel().toJson(request);
        CompletableFuture messageFlushFuture = continuation.activate(operationModelContext.getOperationName(), headers, payload, MessageType.ApplicationMessage, 0);
        OperationResponse<ReqType, StrReqType> response = new OperationResponse<ReqType, StrReqType>(operationModelContext, continuation, responseFuture, messageFlushFuture);
        return response;
    }

    private CompletableFuture<Void> sendClose(ClientConnectionContinuation continuation, AtomicBoolean isClosed) {
        if (isClosed.compareAndSet(false, true)) {
            return continuation.sendMessage(null, null, MessageType.ApplicationMessage, MessageFlags.TerminateStream.getByteValue());
        }
        LOGGER.warning("Stream already closed");
        return CompletableFuture.completedFuture(null);
    }

    private <RespType extends EventStreamJsonMessage, StrRespType extends EventStreamJsonMessage> void handleClose(boolean isInitial, CompletableFuture<RespType> responseFuture, Optional<StreamResponseHandler<StrRespType>> streamResponseHandler) {
        if (isInitial && !responseFuture.isDone()) {
            responseFuture.completeExceptionally(new RuntimeException("Closed received before any service operation response!"));
        } else if (streamResponseHandler.isPresent()) {
            try {
                streamResponseHandler.get().onStreamClosed();
            }
            catch (Exception e) {
                LOGGER.warning(String.format("Client handler onStreamClosed() threw %s: %s", e.getClass().getCanonicalName(), e.getMessage()));
            }
        }
    }

    private <RespType extends EventStreamJsonMessage, StrRespType extends EventStreamJsonMessage> void handleData(String applicationModelType, byte[] payload, boolean isInitial, CompletableFuture<RespType> responseFuture, Optional<StreamResponseHandler<StrRespType>> streamResponseHandler, OperationModelContext<?, RespType, ?, StrRespType> operationModelContext, ClientConnectionContinuation continuation, AtomicBoolean isClosed) {
        if (isInitial) {
            if (!applicationModelType.equals(operationModelContext.getResponseApplicationModelType())) {
                this.handleError(new UnmappedDataException(applicationModelType, operationModelContext.getResponseTypeClass()), isInitial, responseFuture, streamResponseHandler, continuation, isClosed);
                return;
            }
            Object responseObj = null;
            try {
                responseObj = operationModelContext.getServiceModel().fromJson(operationModelContext.getResponseTypeClass(), payload);
            }
            catch (Exception e) {
                this.handleError(new DeserializationException(payload, (Throwable)e), isInitial, responseFuture, streamResponseHandler, continuation, isClosed);
                return;
            }
            responseFuture.complete(responseObj);
        } else {
            if (!applicationModelType.equals(operationModelContext.getStreamingResponseApplicationModelType().get())) {
                this.handleError(new UnmappedDataException(applicationModelType, operationModelContext.getStreamingResponseTypeClass().get()), isInitial, responseFuture, streamResponseHandler, continuation, isClosed);
                return;
            }
            Object strResponseObj = null;
            try {
                strResponseObj = operationModelContext.getServiceModel().fromJson(operationModelContext.getStreamingResponseTypeClass().get(), payload);
            }
            catch (Exception e) {
                this.handleError(new DeserializationException(payload, (Throwable)e), isInitial, responseFuture, streamResponseHandler, continuation, isClosed);
                return;
            }
            try {
                streamResponseHandler.get().onStreamEvent(strResponseObj);
            }
            catch (Exception e) {
                this.handleError(e, isInitial, responseFuture, streamResponseHandler, continuation, isClosed);
            }
        }
    }

    private <RespType extends EventStreamJsonMessage, StrRespType extends EventStreamJsonMessage> void handleError(Throwable t, boolean isInitial, CompletableFuture<RespType> responseFuture, Optional<StreamResponseHandler<StrRespType>> streamResponseHandler, ClientConnectionContinuation continuation, AtomicBoolean isClosed) {
        if (!isInitial && !streamResponseHandler.isPresent()) {
            throw new IllegalArgumentException("Cannot process error handling for stream without a stream response handler set!");
        }
        if (isInitial) {
            responseFuture.completeExceptionally(t);
        } else {
            try {
                if (streamResponseHandler.get().onStreamError(t)) {
                    this.sendClose(continuation, isClosed).whenComplete((res, ex) -> this.handleClose(isInitial, responseFuture, streamResponseHandler));
                }
            }
            catch (Exception e) {
                LOGGER.warning(String.format("Stream response handler threw exception %s: %s", e.getClass().getCanonicalName(), e.getMessage()));
                this.sendClose(continuation, isClosed).whenComplete((res, ex) -> this.handleClose(isInitial, responseFuture, streamResponseHandler));
            }
        }
    }
}

