/*
 * Decompiled with CFR 0.152.
 */
package org.citrusframework.yaks.kafka;

import io.cucumber.datatable.DataTable;
import io.cucumber.java.Before;
import io.cucumber.java.Scenario;
import io.cucumber.java.en.Given;
import io.cucumber.java.en.Then;
import io.cucumber.java.en.When;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.citrusframework.Citrus;
import org.citrusframework.TestActionBuilder;
import org.citrusframework.TestCaseRunner;
import org.citrusframework.actions.ReceiveMessageAction;
import org.citrusframework.actions.SendMessageAction;
import org.citrusframework.annotations.CitrusFramework;
import org.citrusframework.annotations.CitrusResource;
import org.citrusframework.context.TestContext;
import org.citrusframework.endpoint.Endpoint;
import org.citrusframework.exceptions.CitrusRuntimeException;
import org.citrusframework.kafka.endpoint.KafkaEndpoint;
import org.citrusframework.kafka.endpoint.KafkaEndpointBuilder;
import org.citrusframework.kafka.message.KafkaMessage;
import org.citrusframework.message.Message;
import org.citrusframework.spi.Resource;
import org.citrusframework.util.FileUtils;
import org.citrusframework.yaks.YaksSettings;
import org.citrusframework.yaks.kafka.KafkaSettings;

public class KafkaSteps {
    @CitrusResource
    private TestCaseRunner runner;
    @CitrusResource
    private TestContext context;
    @CitrusFramework
    private Citrus citrus;
    private Map<String, Object> headers = new HashMap<String, Object>();
    private String body;
    private KafkaEndpoint kafkaEndpoint;
    private String messageKey;
    private Integer partition;
    private String topic = "test";
    private String endpointName = KafkaSettings.getEndpointName();
    private long timeout = KafkaSettings.getConsumerTimeout();
    private String messageType = YaksSettings.getDefaultMessageType();

    @Before
    public void before(Scenario scenario) {
        if (this.kafkaEndpoint == null) {
            if ((long)this.citrus.getCitrusContext().getReferenceResolver().resolveAll(KafkaEndpoint.class).size() == 1L) {
                this.kafkaEndpoint = (KafkaEndpoint)this.citrus.getCitrusContext().getReferenceResolver().resolve(KafkaEndpoint.class);
            } else if (this.citrus.getCitrusContext().getReferenceResolver().isResolvable(this.endpointName)) {
                this.kafkaEndpoint = (KafkaEndpoint)this.citrus.getCitrusContext().getReferenceResolver().resolve(this.endpointName, KafkaEndpoint.class);
            } else {
                this.kafkaEndpoint = (KafkaEndpoint)new KafkaEndpointBuilder().build();
                this.citrus.getCitrusContext().getReferenceResolver().bind(this.endpointName, (Object)this.kafkaEndpoint);
            }
        }
        this.headers = new HashMap<String, Object>();
        this.body = null;
        this.messageType = YaksSettings.getDefaultMessageType();
        this.messageKey = null;
        this.partition = null;
    }

    @Given(value="^(?:Kafka|kafka) connection$")
    public void setConnection(DataTable properties) {
        Map connectionProps = properties.asMap(String.class, String.class);
        String url = connectionProps.getOrDefault("url", "localhost:9092");
        String topicName = connectionProps.getOrDefault("topic", this.topic);
        String consumerGroup = connectionProps.getOrDefault("consumerGroup", "citrus_kafka_group");
        String offsetReset = connectionProps.getOrDefault("offsetReset", "earliest");
        this.setTopic(this.context.replaceDynamicContentInString(topicName));
        this.kafkaEndpoint.getEndpointConfiguration().setServer(this.context.replaceDynamicContentInString(url));
        this.kafkaEndpoint.getEndpointConfiguration().setOffsetReset(this.context.replaceDynamicContentInString(offsetReset));
        this.kafkaEndpoint.getEndpointConfiguration().setConsumerGroup(this.context.replaceDynamicContentInString(consumerGroup));
    }

    @Given(value="^new (?:Kafka|kafka) connection$")
    public void createConnection(DataTable properties) {
        this.setConnection(properties);
        this.kafkaEndpoint = new KafkaEndpoint(this.kafkaEndpoint.getEndpointConfiguration());
        this.citrus.getCitrusContext().getReferenceResolver().bind(this.endpointName, (Object)this.kafkaEndpoint);
    }

    @Given(value="^(?:Kafka|kafka) producer configuration$")
    public void setProducerConfig(DataTable properties) {
        Map producerProperties = properties.asMap(String.class, Object.class);
        this.kafkaEndpoint.getEndpointConfiguration().setProducerProperties(producerProperties);
    }

    @Given(value="^(?:Kafka|kafka) consumer configuration$")
    public void setConsumerConfig(DataTable properties) {
        Map consumerProperties = properties.asMap(String.class, Object.class);
        this.kafkaEndpoint.getEndpointConfiguration().setConsumerProperties(consumerProperties);
    }

    @Given(value="^(?:Kafka|kafka) endpoint \"([^\"\\s]+)\"$")
    public void setServer(String name) {
        this.endpointName = name;
        if (this.citrus.getCitrusContext().getReferenceResolver().isResolvable(name)) {
            this.kafkaEndpoint = (KafkaEndpoint)this.citrus.getCitrusContext().getReferenceResolver().resolve(name, KafkaEndpoint.class);
        } else if (this.kafkaEndpoint != null) {
            this.citrus.getCitrusContext().getReferenceResolver().bind(this.endpointName, (Object)this.kafkaEndpoint);
            this.kafkaEndpoint.setName(this.endpointName);
        }
    }

    @Given(value="^(?:Kafka|kafka) message key: (.+)$")
    public void setMessageKey(String key) {
        this.messageKey = key;
    }

    @Given(value="^(?:Kafka|kafka) consumer timeout is (\\d+)(?: ms| milliseconds)$")
    public void setConsumerTimeout(int milliseconds) {
        this.timeout = milliseconds;
    }

    @Given(value="^(?:Kafka|kafka) topic partition: (\\d+)$")
    public void setPartition(int partition) {
        this.partition = partition;
    }

    @Given(value="^(?:Kafka|kafka) topic: (.+)$")
    public void setTopic(String topicName) {
        this.topic = topicName;
        this.kafkaEndpoint.getEndpointConfiguration().setTopic(topicName);
    }

    @Given(value="^(?:Kafka|kafka) message header ([^\\s]+)(?:=| is )\"(.+)\"$")
    @Then(value="^(?:expect|verify) (?:Kafka|kafka) message header ([^\\s]+)(?:=| is )\"(.+)\"$")
    public void addMessageHeader(String name, Object value) {
        this.headers.put(name, value);
    }

    @Given(value="^(?:Kafka|kafka) message type ([^\\s]+)")
    public void setMessageType(String type) {
        this.messageType = type.toUpperCase();
    }

    @Given(value="^(?:Kafka|kafka) message headers$")
    public void addMessageHeaders(DataTable headers) {
        Map headerPairs = headers.asMap(String.class, Object.class);
        headerPairs.forEach(this::addMessageHeader);
    }

    @Given(value="^(?:Kafka|kafka) message body$")
    @Then(value="^(?:expect|verify) (?:Kafka|kafka) message body$")
    public void setMessageBodyMultiline(String body) {
        this.setMessageBody(body);
    }

    @Given.Givens(value={@Given(value="^load (?:Kafka|kafka) message body ([^\\s]+)$"), @Given(value="^(?:expect|verify) (?:Kafka|kafka) message body loaded from ([^\\s]+)$")})
    public void loadMessageBody(String file) {
        try {
            this.setMessageBody(FileUtils.readToString((Resource)FileUtils.getFileResource((String)file)));
        }
        catch (IOException e) {
            throw new CitrusRuntimeException(String.format("Failed to load body from file resource %s", file));
        }
    }

    @Given(value="^(?:Kafka|kafka) message body: (.+)$")
    @Then(value="^(?:expect|verify) (?:Kafka|kafka) message body: (.+)$")
    public void setMessageBody(String body) {
        this.body = body;
    }

    @When(value="^send (?:Kafka|kafka) message$")
    public void sendMessage() {
        this.runner.run((TestActionBuilder)((SendMessageAction.Builder)SendMessageAction.Builder.send().endpoint((Endpoint)this.kafkaEndpoint)).message(this.createKafkaMessage()));
        this.body = null;
        this.headers.clear();
    }

    @Then(value="^receive (?:Kafka|kafka) message$")
    public void receiveMessage() {
        this.runner.run((TestActionBuilder)((ReceiveMessageAction.Builder)((ReceiveMessageAction.Builder)ReceiveMessageAction.Builder.receive().endpoint((Endpoint)this.kafkaEndpoint)).timeout(this.timeout)).message(this.createKafkaMessage()));
        this.body = null;
        this.headers.clear();
    }

    @When(value="^send (?:Kafka|kafka) message to topic (.+)$")
    public void sendMessage(String topicName) {
        this.setTopic(topicName);
        this.sendMessage();
    }

    @Then(value="^receive (?:Kafka|kafka) message on topic (.+)")
    public void receiveMessage(String topicName) {
        this.setTopic(topicName);
        this.receiveMessage();
    }

    @When(value="^send (?:Kafka|kafka) message with body and headers: (.+)$")
    @Given(value="^message in (?:Kafka|kafka) with body and headers: (.+)$")
    public void sendMessageBodyAndHeaders(String body, DataTable headers) {
        this.setMessageBody(body);
        this.addMessageHeaders(headers);
        this.sendMessage();
    }

    @When(value="^send (?:Kafka|kafka) message with body: (.+)$")
    @Given(value="^message in (?:Kafka|kafka) with body: (.+)$")
    public void sendMessageBody(String body) {
        this.setMessageBody(body);
        this.sendMessage();
    }

    @When(value="^send (?:Kafka|kafka) message with body$")
    @Given(value="^message in (?:Kafka|kafka) with body$")
    public void sendMessageBodyMultiline(String body) {
        this.sendMessageBody(body);
    }

    @Then(value="^(?:receive|expect|verify) (?:Kafka|kafka) message with body and headers: (.+)$")
    public void receiveFromKafka(String body, DataTable headers) {
        this.setMessageBody(body);
        this.addMessageHeaders(headers);
        this.receiveMessage();
    }

    @Then(value="^(?:receive|expect|verify) (?:Kafka|kafka) message with body: (.+)$")
    public void receiveMessageBody(String body) {
        this.setMessageBody(body);
        this.receiveMessage();
    }

    @Then(value="^(?:receive|expect|verify) (?:Kafka|kafka) message with body$")
    public void receiveMessageBodyMultiline(String body) {
        this.receiveMessageBody(body);
    }

    private Message createKafkaMessage() {
        KafkaMessage message = new KafkaMessage((Object)this.body, this.headers).topic(this.topic);
        message.setType(this.messageType);
        if (this.messageKey != null) {
            message.messageKey((Object)this.messageKey);
        }
        if (this.partition != null) {
            message.partition(this.partition.intValue());
        }
        return message;
    }
}

