/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.funqy.runtime.bindings.knative.events;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import io.netty.buffer.ByteBufInputStream;
import io.quarkus.arc.ManagedContext;
import io.quarkus.arc.runtime.BeanContainer;
import io.quarkus.funqy.knative.events.AbstractCloudEvent;
import io.quarkus.funqy.knative.events.CloudEvent;
import io.quarkus.funqy.knative.events.CloudEventBuilder;
import io.quarkus.funqy.runtime.FunctionInvoker;
import io.quarkus.funqy.runtime.FunctionRecorder;
import io.quarkus.funqy.runtime.FunqyServerRequest;
import io.quarkus.funqy.runtime.FunqyServerResponse;
import io.quarkus.funqy.runtime.RequestContext;
import io.quarkus.funqy.runtime.RequestContextImpl;
import io.quarkus.funqy.runtime.bindings.knative.events.FunqyKnativeEventsConfig;
import io.quarkus.funqy.runtime.bindings.knative.events.FunqyRequestImpl;
import io.quarkus.funqy.runtime.bindings.knative.events.FunqyResponseImpl;
import io.quarkus.funqy.runtime.bindings.knative.events.HeaderCloudEventImpl;
import io.quarkus.funqy.runtime.bindings.knative.events.JsonCloudEventImpl;
import io.quarkus.funqy.runtime.bindings.knative.events.KnativeEventsBindingRecorder;
import io.quarkus.funqy.runtime.query.QueryReader;
import io.quarkus.security.identity.CurrentIdentityAssociation;
import io.quarkus.security.identity.SecurityIdentity;
import io.quarkus.vertx.http.runtime.CurrentVertxRequest;
import io.quarkus.vertx.http.runtime.security.QuarkusHttpUser;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.ext.web.RoutingContext;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.spi.CDI;
import java.io.IOException;
import java.io.InputStream;
import java.lang.annotation.Annotation;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.jboss.logging.Logger;

public class VertxRequestHandler
implements Handler<RoutingContext> {
    private static final Logger log = Logger.getLogger((String)"io.quarkus.funqy");
    protected final Vertx vertx;
    protected final ObjectMapper mapper;
    protected final BeanContainer beanContainer;
    protected final CurrentIdentityAssociation association;
    protected final CurrentVertxRequest currentVertxRequest;
    protected final Executor executor;
    protected final FunctionInvoker defaultInvoker;
    protected final Map<String, Collection<FunctionInvoker>> typeTriggers;
    protected final Map<String, List<Predicate<CloudEvent>>> invokersFilters;
    protected final String rootPath;

    public VertxRequestHandler(Vertx vertx, String rootPath, BeanContainer beanContainer, ObjectMapper mapper, FunqyKnativeEventsConfig config, FunctionInvoker defaultInvoker, Map<String, Collection<FunctionInvoker>> typeTriggers, Map<String, List<Predicate<CloudEvent>>> invokersFilters, Executor executor) {
        this.rootPath = rootPath;
        this.defaultInvoker = defaultInvoker;
        this.vertx = vertx;
        this.beanContainer = beanContainer;
        this.executor = executor;
        this.mapper = mapper;
        this.typeTriggers = typeTriggers;
        this.invokersFilters = invokersFilters;
        Instance association = CDI.current().select(CurrentIdentityAssociation.class, new Annotation[0]);
        this.association = association.isResolvable() ? (CurrentIdentityAssociation)association.get() : null;
        this.currentVertxRequest = (CurrentVertxRequest)CDI.current().select(CurrentVertxRequest.class, new Annotation[0]).get();
    }

    public void handle(RoutingContext routingContext) {
        HttpServerRequest request = routingContext.request();
        String mediaType = request.getHeader("Content-Type");
        boolean binaryCE = request.headers().contains("Ce-Id");
        boolean structuredCE = false;
        if (mediaType != null) {
            structuredCE = mediaType.startsWith("application/cloudevents+json");
        }
        if (structuredCE || binaryCE) {
            try {
                this.processCloudEvent(routingContext);
            }
            catch (Throwable t) {
                routingContext.fail(t);
            }
        } else if (mediaType != null && mediaType.startsWith("application/json") && request.method() == HttpMethod.POST || request.method() == HttpMethod.GET) {
            this.regularFunqyHttp(routingContext);
        } else {
            if (mediaType != null && mediaType.startsWith("application/cloudevents-batch+json")) {
                routingContext.fail(406);
                log.error((Object)"Batch mode not supported yet");
                return;
            }
            routingContext.fail(406);
            log.error((Object)("Illegal media type:" + mediaType));
            return;
        }
    }

    private void processCloudEvent(RoutingContext routingContext) {
        HttpServerRequest httpRequest = routingContext.request();
        HttpServerResponse httpResponse = routingContext.response();
        boolean binaryCE = httpRequest.headers().contains("ce-id");
        httpRequest.bodyHandler(bodyBuff -> this.executor.execute(() -> {
            try {
                AbstractCloudEvent input;
                AbstractCloudEvent inputCloudEvent;
                JsonNode structuredPayload;
                String ceSpecVersion;
                String ceType;
                if (binaryCE) {
                    ceType = httpRequest.headers().get("ce-type");
                    ceSpecVersion = httpRequest.headers().get("ce-specversion");
                    structuredPayload = null;
                } else {
                    try {
                        structuredPayload = this.mapper.readTree(bodyBuff.getBytes());
                        ceType = structuredPayload.get("type").asText();
                        ceSpecVersion = structuredPayload.get("specversion").asText();
                    }
                    catch (IOException e) {
                        routingContext.fail((Throwable)e);
                        return;
                    }
                }
                if (!AbstractCloudEvent.isKnownSpecVersion(ceSpecVersion)) {
                    log.warnf("Unexpected CloudEvent spec-version '%s'.", (Object)ceSpecVersion);
                }
                Collection<Object> candidates = new ArrayList<FunctionInvoker>();
                if (this.defaultInvoker != null) {
                    candidates.add(this.defaultInvoker);
                } else {
                    candidates = this.typeTriggers.get(ceType);
                    if (candidates == null || candidates.isEmpty()) {
                        candidates = this.typeTriggers.get("*");
                    }
                    if (candidates == null || candidates.isEmpty()) {
                        routingContext.fail(404);
                        log.error((Object)("Couldn't map CloudEvent type: '" + ceType + "' to a function."));
                        return;
                    }
                }
                AbstractCloudEvent evt = binaryCE ? new HeaderCloudEventImpl(httpRequest.headers(), null, null, null, null) : new JsonCloudEventImpl(structuredPayload, null, null, null);
                List matchingCandidates = candidates.stream().filter(fi -> this.match((FunctionInvoker)fi, evt)).collect(Collectors.toList());
                if (matchingCandidates.size() <= 0) {
                    routingContext.fail(404);
                    log.error((Object)("Couldn't map CloudEvent type: '" + ceType + "' to any function."));
                    return;
                }
                if (matchingCandidates.size() > 1) {
                    routingContext.fail(409);
                    log.error((Object)("CloudEvent type: '" + ceType + "' matches multiple function."));
                    return;
                }
                FunctionInvoker invoker = (FunctionInvoker)matchingCandidates.get(0);
                Type inputCeDataType = (Type)invoker.getBindingContext().get("io.quarkus.funqy.knative.events.INPUT_CE_DATA_TYPE");
                Type outputCeDataType = (Type)invoker.getBindingContext().get("io.quarkus.funqy.knative.events.OUTPUT_CE_DATA_TYPE");
                Type innerInputType = inputCeDataType != null ? inputCeDataType : invoker.getInputType();
                Type innerOutputType = outputCeDataType != null ? outputCeDataType : invoker.getOutputType();
                ObjectReader reader = (ObjectReader)invoker.getBindingContext().get(KnativeEventsBindingRecorder.DATA_OBJECT_READER);
                ObjectWriter writer = (ObjectWriter)invoker.getBindingContext().get(KnativeEventsBindingRecorder.DATA_OBJECT_WRITER);
                if (invoker.hasInput()) {
                    inputCloudEvent = binaryCE ? new HeaderCloudEventImpl(httpRequest.headers(), (Buffer)bodyBuff, inputCeDataType != null ? inputCeDataType : innerInputType, this.mapper, reader) : new JsonCloudEventImpl(structuredPayload, inputCeDataType != null ? inputCeDataType : innerInputType, this.mapper, reader);
                    input = inputCeDataType == null ? inputCloudEvent.data() : inputCloudEvent;
                } else {
                    input = null;
                    inputCloudEvent = null;
                }
                Consumer<Object> sendOutput = output -> {
                    try {
                        String type;
                        CloudEvent<Object> outputCloudEvent;
                        if (!invoker.hasOutput()) {
                            routingContext.response().setStatusCode(204);
                            routingContext.response().end();
                            return;
                        }
                        if (outputCeDataType == null) {
                            CloudEventBuilder builder = CloudEventBuilder.create();
                            outputCloudEvent = byte[].class.equals((Object)innerOutputType) ? builder.build((byte[])output, "application/octet-stream") : builder.build(output);
                        } else {
                            outputCloudEvent = (CloudEvent<Object>)output;
                        }
                        if (outputCloudEvent == null) {
                            routingContext.response().setStatusCode(204);
                            routingContext.response().end();
                            return;
                        }
                        String id = outputCloudEvent.id();
                        if (id == null) {
                            id = this.getResponseId();
                        }
                        String specVersion = outputCloudEvent.specVersion() != null ? outputCloudEvent.specVersion() : (inputCloudEvent != null && inputCloudEvent.specVersion() != null ? inputCloudEvent.specVersion() : "1.0");
                        String source = outputCloudEvent.source();
                        if (source == null) {
                            source = (String)invoker.getBindingContext().get("response.cloud.event.source");
                        }
                        if ((type = outputCloudEvent.type()) == null) {
                            type = (String)invoker.getBindingContext().get("response.cloud.event.type");
                        }
                        boolean ceHasData = !Void.class.equals((Object)innerInputType);
                        int majorSpecVer = AbstractCloudEvent.parseMajorSpecVersion(specVersion);
                        if (binaryCE) {
                            httpResponse.putHeader("ce-id", id);
                            httpResponse.putHeader("ce-specversion", specVersion);
                            httpResponse.putHeader("ce-source", source);
                            httpResponse.putHeader("ce-type", type);
                            if (outputCloudEvent.time() != null) {
                                httpResponse.putHeader("ce-time", outputCloudEvent.time().toString());
                            }
                            if (outputCloudEvent.subject() != null) {
                                httpResponse.putHeader("ce-subject", outputCloudEvent.subject());
                            }
                            if (outputCloudEvent.dataSchema() != null) {
                                String dsName = majorSpecVer == 0 ? "ce-schemaurl" : "ce-dataschema";
                                httpResponse.putHeader(dsName, outputCloudEvent.dataSchema());
                            }
                            outputCloudEvent.extensions().forEach((key, value) -> httpResponse.putHeader("ce-" + key, value));
                            String dataContentType = outputCloudEvent.dataContentType();
                            if (dataContentType != null) {
                                httpResponse.putHeader("Content-Type", dataContentType);
                            }
                            if (!ceHasData) {
                                routingContext.response().setStatusCode(204);
                                routingContext.response().end();
                                return;
                            }
                            if (dataContentType != null && dataContentType.startsWith("application/json")) {
                                httpResponse.end(Buffer.buffer((byte[])writer.writeValueAsBytes(outputCloudEvent.data())));
                                return;
                            }
                            if (byte[].class.equals((Object)innerOutputType)) {
                                httpResponse.end(Buffer.buffer((byte[])((byte[])outputCloudEvent.data())));
                                return;
                            }
                            log.errorf("Don't know how to write ce to output (dataContentType: %s, javaType: %s).", (Object)dataContentType, (Object)innerOutputType);
                            routingContext.fail(500);
                            return;
                        }
                        HashMap<String, Object> responseEvent = new HashMap<String, Object>();
                        responseEvent.put("id", id);
                        responseEvent.put("specversion", specVersion);
                        responseEvent.put("source", source);
                        responseEvent.put("type", type);
                        if (outputCloudEvent.time() != null) {
                            responseEvent.put("time", outputCloudEvent.time());
                        }
                        if (outputCloudEvent.subject() != null) {
                            responseEvent.put("subject", outputCloudEvent.subject());
                        }
                        if (outputCloudEvent.dataSchema() != null) {
                            String dsName = majorSpecVer == 0 ? "schemaurl" : "dataschema";
                            responseEvent.put(dsName, outputCloudEvent.dataSchema());
                        }
                        responseEvent.putAll(outputCloudEvent.extensions());
                        String dataContentType = outputCloudEvent.dataContentType();
                        if (dataContentType != null) {
                            responseEvent.put("datacontenttype", dataContentType);
                        }
                        if (ceHasData) {
                            if (majorSpecVer == 0) {
                                if (dataContentType != null && dataContentType.startsWith("application/json")) {
                                    responseEvent.put("data", outputCloudEvent.data());
                                } else {
                                    if (!byte[].class.equals((Object)innerOutputType)) {
                                        log.errorf("Don't know how to write ce to output (dataContentType: %s, javaType: %s).", (Object)dataContentType, (Object)innerOutputType);
                                        routingContext.fail(500);
                                        return;
                                    }
                                    responseEvent.put("datacontentencoding", "base64");
                                    responseEvent.put("data", (byte[])outputCloudEvent.data());
                                }
                            } else if (dataContentType != null && dataContentType.startsWith("application/json")) {
                                responseEvent.put("data", outputCloudEvent.data());
                            } else {
                                if (!byte[].class.equals((Object)innerOutputType)) {
                                    log.errorf("Don't know how to write ce to output (dataContentType: %s, javaType: %s).", (Object)dataContentType, (Object)innerOutputType);
                                    routingContext.fail(500);
                                    return;
                                }
                                responseEvent.put("data_base64", (byte[])outputCloudEvent.data());
                            }
                        }
                        routingContext.response().putHeader("Content-Type", "application/cloudevents+json");
                        httpResponse.end(Buffer.buffer((byte[])this.mapper.writer().writeValueAsBytes(responseEvent)));
                        return;
                    }
                    catch (Throwable t) {
                        routingContext.fail(t);
                        return;
                    }
                };
                this.dispatch(inputCloudEvent, routingContext, invoker, input).getOutput().subscribe().with(sendOutput, t -> routingContext.fail(t));
            }
            catch (Throwable t2) {
                routingContext.fail(t2);
            }
        }));
    }

    private boolean match(FunctionInvoker invoker, CloudEvent<?> inputCloudEvent) {
        if (this.invokersFilters.get(invoker.getName()) == null || this.invokersFilters.get(invoker.getName()).isEmpty()) {
            return true;
        }
        return this.invokersFilters.get(invoker.getName()).stream().allMatch(p -> p.test(inputCloudEvent));
    }

    private void regularFunqyHttp(RoutingContext routingContext) {
        String path = routingContext.request().path();
        if (path == null) {
            routingContext.fail(404);
            return;
        }
        if (!path.startsWith(this.rootPath)) {
            routingContext.fail(404);
            return;
        }
        FunctionInvoker invoker = !(path = path.substring(this.rootPath.length())).isEmpty() ? FunctionRecorder.registry.matchInvoker(path) : this.defaultInvoker;
        if (invoker == null) {
            routingContext.fail(404);
            log.error((Object)"There is no function matching the path.");
            return;
        }
        if (invoker.getBindingContext().get("io.quarkus.funqy.knative.events.INPUT_CE_DATA_TYPE") != null || invoker.getBindingContext().get("io.quarkus.funqy.knative.events.OUTPUT_CE_DATA_TYPE") != null) {
            routingContext.fail(400);
            log.errorf("Bad request: the '%s' function expects CloudEvent, but plain HTTP was received.", (Object)invoker.getName());
            return;
        }
        this.processHttpRequest(null, routingContext, invoker);
    }

    private void processHttpRequest(CloudEvent event, RoutingContext routingContext, FunctionInvoker invoker) {
        if (routingContext.request().method() == HttpMethod.GET) {
            Object input = null;
            if (invoker.hasInput()) {
                QueryReader reader = (QueryReader)invoker.getBindingContext().get(QueryReader.class.getName());
                try {
                    input = reader.readValue(routingContext.request().params().iterator());
                }
                catch (Exception e) {
                    log.error((Object)"Failed to unmarshal input", (Throwable)e);
                    routingContext.fail(400);
                    return;
                }
            }
            try {
                this.execute(event, routingContext, invoker, input);
            }
            catch (Throwable t) {
                log.error((Object)t);
                routingContext.fail(500, t);
            }
        } else if (routingContext.request().method() == HttpMethod.POST) {
            routingContext.request().bodyHandler(buff -> {
                try {
                    Object input = null;
                    if (buff.length() > 0) {
                        ByteBufInputStream in = new ByteBufInputStream(buff.getByteBuf());
                        ObjectReader reader = (ObjectReader)invoker.getBindingContext().get(KnativeEventsBindingRecorder.DATA_OBJECT_READER);
                        try {
                            input = reader.readValue((InputStream)in);
                        }
                        catch (JsonProcessingException e) {
                            log.error((Object)"Failed to unmarshal input", (Throwable)e);
                            routingContext.fail(400);
                            return;
                        }
                    }
                    this.execute(event, routingContext, invoker, input);
                }
                catch (Throwable t) {
                    log.error((Object)t);
                    routingContext.fail(500, t);
                }
            });
        } else {
            routingContext.fail(405);
            log.error((Object)("Must be POST or GET for: " + invoker.getName()));
        }
    }

    private void execute(CloudEvent event, RoutingContext routingContext, FunctionInvoker invoker, Object finalInput) {
        this.executor.execute(() -> {
            try {
                HttpServerResponse httpResponse = routingContext.response();
                FunqyServerResponse response = this.dispatch(event, routingContext, invoker, finalInput);
                response.getOutput().emitOn(this.executor).subscribe().with(obj -> {
                    if (invoker.hasOutput()) {
                        try {
                            httpResponse.setStatusCode(200);
                            ObjectWriter writer = (ObjectWriter)invoker.getBindingContext().get(KnativeEventsBindingRecorder.DATA_OBJECT_WRITER);
                            httpResponse.putHeader("Content-Type", "application/json");
                            httpResponse.end(writer.writeValueAsString(obj));
                        }
                        catch (JsonProcessingException jpe) {
                            log.error((Object)"Failed to unmarshal input", (Throwable)jpe);
                            routingContext.fail(400);
                        }
                        catch (Throwable e) {
                            routingContext.fail(e);
                        }
                    } else {
                        httpResponse.setStatusCode(204);
                        httpResponse.end();
                    }
                }, t -> routingContext.fail(t));
            }
            catch (Throwable t2) {
                log.error((Object)t2);
                routingContext.fail(500, t2);
            }
        });
    }

    private String getResponseId() {
        return UUID.randomUUID().toString();
    }

    private FunqyServerResponse dispatch(CloudEvent event, RoutingContext routingContext, FunctionInvoker invoker, Object input) {
        final ManagedContext requestContext = this.beanContainer.requestContext();
        requestContext.activate();
        if (this.association != null) {
            QuarkusHttpUser existing = (QuarkusHttpUser)routingContext.user();
            if (existing != null) {
                SecurityIdentity identity = existing.getSecurityIdentity();
                this.association.setIdentity(identity);
            } else {
                this.association.setIdentity(QuarkusHttpUser.getSecurityIdentity((RoutingContext)routingContext, null));
            }
        }
        this.currentVertxRequest.setCurrent(routingContext);
        RequestContextImpl funqContext = new RequestContextImpl();
        if (event != null) {
            funqContext.setContextData(CloudEvent.class, (Object)event);
        }
        FunqyRequestImpl funqyRequest = new FunqyRequestImpl((RequestContext)funqContext, input);
        FunqyResponseImpl funqyResponse = new FunqyResponseImpl();
        invoker.invoke((FunqyServerRequest)funqyRequest, (FunqyServerResponse)funqyResponse);
        funqyResponse.setOutput(funqyResponse.getOutput().onTermination().invoke(new Runnable(){

            @Override
            public void run() {
                if (requestContext.isActive()) {
                    requestContext.terminate();
                }
            }
        }));
        return funqyResponse;
    }
}

