/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.adapter.mqtt.impl;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.Json;
import io.vertx.mqtt.messages.MqttPublishMessage;
import java.util.Map;
import java.util.Objects;
import org.eclipse.hono.adapter.mqtt.AbstractVertxBasedMqttProtocolAdapter;
import org.eclipse.hono.adapter.mqtt.MappedMessage;
import org.eclipse.hono.adapter.mqtt.MessageMapping;
import org.eclipse.hono.adapter.mqtt.MqttContext;
import org.eclipse.hono.adapter.mqtt.MqttProtocolAdapterProperties;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.command.Command;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.service.metric.MetricsTags;
import org.eclipse.hono.util.RegistrationAssertion;
import org.eclipse.hono.util.ResourceIdentifier;

public final class VertxBasedMqttProtocolAdapter
extends AbstractVertxBasedMqttProtocolAdapter<MqttProtocolAdapterProperties> {
    private static final String MAPPER_DATA = "mapper_data";
    private MessageMapping<MqttContext> messageMapping;

    public String getTypeName() {
        return "hono-mqtt";
    }

    public void setMessageMapping(MessageMapping<MqttContext> messageMappingService) {
        Objects.requireNonNull(messageMappingService);
        this.messageMapping = messageMappingService;
    }

    @Override
    protected Future<Void> onPublishedMessage(MqttContext ctx) {
        return this.mapTopic(ctx).compose(address -> this.validateAddress((ResourceIdentifier)address, ctx.authenticatedDevice())).compose(targetAddress -> this.mapMessage(ctx, (ResourceIdentifier)targetAddress)).compose(mappedMessage -> {
            MqttPublishMessage mqttPublishMessage = MqttPublishMessage.create((int)ctx.message().messageId(), (MqttQoS)ctx.message().qosLevel(), (boolean)ctx.message().isDup(), (boolean)ctx.message().isRetain(), (String)ctx.message().topicName(), (ByteBuf)mappedMessage.getPayload().getByteBuf());
            return this.uploadMessage(ctx, mappedMessage.getTargetAddress(), mqttPublishMessage);
        }).recover(t -> {
            this.log.debug("discarding message [topic: {}] from {}", new Object[]{ctx.message().topicName(), ctx.authenticatedDevice(), t});
            return Future.failedFuture((Throwable)t);
        });
    }

    private Future<MappedMessage> mapMessage(MqttContext ctx, ResourceIdentifier targetAddress) {
        return this.getRegistrationAssertion(targetAddress.getTenantId(), targetAddress.getResourceId(), ctx.authenticatedDevice(), ctx.getTracingContext()).compose(registrationInfo -> this.messageMapping.mapDownstreamMessage(ctx, targetAddress, (RegistrationAssertion)registrationInfo)).map(mappedMessage -> {
            ctx.put(MAPPER_DATA, mappedMessage.getAdditionalProperties());
            ctx.applyMappedTargetAddress(mappedMessage.getTargetAddress());
            return mappedMessage;
        });
    }

    @Override
    protected void customizeDownstreamMessageProperties(Map<String, Object> props, MqttContext ctx) {
        Object additionalProperties = ctx.get(MAPPER_DATA);
        if (additionalProperties instanceof Map) {
            ((Map)additionalProperties).entrySet().stream().filter(entry -> entry.getKey() instanceof String).forEach(entry -> {
                String key = (String)entry.getKey();
                Object value = entry.getValue();
                if (value instanceof String) {
                    props.put(key, value);
                } else {
                    props.put(key, Json.encode(value));
                }
            });
        }
    }

    Future<ResourceIdentifier> mapTopic(MqttContext context) {
        Promise result = Promise.promise();
        ResourceIdentifier topic = context.topic();
        MqttQoS qos = context.message().qosLevel();
        switch (MetricsTags.EndpointType.fromString((String)topic.getEndpoint())) {
            case TELEMETRY: {
                if (MqttQoS.EXACTLY_ONCE.equals((Object)qos)) {
                    result.fail((Throwable)new ClientErrorException(400, "QoS 2 not supported for telemetry messages"));
                    break;
                }
                result.complete((Object)topic);
                break;
            }
            case EVENT: {
                if (MqttQoS.AT_LEAST_ONCE.equals((Object)qos)) {
                    result.complete((Object)topic);
                    break;
                }
                result.fail((Throwable)new ClientErrorException(400, "Only QoS 1 supported for event messages"));
                break;
            }
            case COMMAND: {
                if (MqttQoS.EXACTLY_ONCE.equals((Object)qos)) {
                    result.fail((Throwable)new ClientErrorException(400, "QoS 2 not supported for command response messages"));
                    break;
                }
                result.complete((Object)topic);
                break;
            }
            default: {
                this.log.debug("no such endpoint [{}]", (Object)topic.getEndpoint());
                result.fail((Throwable)new ClientErrorException(404, "no such endpoint"));
            }
        }
        return result.future();
    }

    @Override
    protected Future<Buffer> getCommandPayload(CommandContext ctx) {
        Command command = ctx.getCommand();
        return this.getRegistrationClient().assertRegistration(command.getTenant(), command.getGatewayOrDeviceId(), null, ctx.getTracingContext()).compose(registrationInfo -> this.messageMapping.mapUpstreamMessage((RegistrationAssertion)registrationInfo, command));
    }
}

