/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.tests.indexer;

import com.google.common.base.Preconditions;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Function;
import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.utils.KafkaAdminClient;
import org.apache.druid.testing.utils.KafkaEventWriter;
import org.apache.druid.testing.utils.KafkaUtil;
import org.apache.druid.testing.utils.StreamAdminClient;
import org.apache.druid.testing.utils.StreamEventWriter;
import org.apache.druid.tests.indexer.AbstractStreamIndexingTest;
import org.joda.time.ReadableInstant;

public abstract class AbstractKafkaIndexingServiceTest
extends AbstractStreamIndexingTest {
    @Override
    StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config) {
        return new KafkaAdminClient(config);
    }

    @Override
    public StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config, Boolean transactionEnabled) {
        return new KafkaEventWriter(config, ((Boolean)Preconditions.checkNotNull((Object)transactionEnabled, (Object)"transactionEnabled")).booleanValue());
    }

    @Override
    Function<String, String> generateStreamIngestionPropsTransform(String streamName, String fullDatasourceName, String parserType, String parserOrInputFormat, List<String> dimensions, IntegrationTestingConfig config) {
        Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
        Properties consumerProperties = new Properties();
        consumerProperties.putAll((Map<?, ?>)consumerConfigs);
        consumerProperties.setProperty("bootstrap.servers", config.getKafkaInternalHost());
        KafkaUtil.addPropertiesFromTestConfig((IntegrationTestingConfig)config, (Properties)consumerProperties);
        return spec -> {
            try {
                spec = StringUtils.replace((String)spec, (String)"%%DATASOURCE%%", (String)fullDatasourceName);
                spec = StringUtils.replace((String)spec, (String)"%%STREAM_TYPE%%", (String)"kafka");
                spec = StringUtils.replace((String)spec, (String)"%%TOPIC_KEY%%", (String)"topic");
                spec = StringUtils.replace((String)spec, (String)"%%TOPIC_VALUE%%", (String)streamName);
                if ("inputFormat".equals(parserType)) {
                    spec = StringUtils.replace((String)spec, (String)"%%INPUT_FORMAT%%", (String)parserOrInputFormat);
                    spec = StringUtils.replace((String)spec, (String)"%%PARSER%%", (String)"null");
                } else if ("parser".equals(parserType)) {
                    spec = StringUtils.replace((String)spec, (String)"%%PARSER%%", (String)parserOrInputFormat);
                    spec = StringUtils.replace((String)spec, (String)"%%INPUT_FORMAT%%", (String)"null");
                }
                spec = StringUtils.replace((String)spec, (String)"%%USE_EARLIEST_KEY%%", (String)"useEarliestOffset");
                spec = StringUtils.replace((String)spec, (String)"%%STREAM_PROPERTIES_KEY%%", (String)"consumerProperties");
                spec = StringUtils.replace((String)spec, (String)"%%SCHEMA_REGISTRY_HOST%%", (String)StringUtils.format((String)"http://%s", (Object[])new Object[]{config.getSchemaRegistryInternalHost()}));
                spec = StringUtils.replace((String)spec, (String)"%%DIMENSIONS%%", (String)this.jsonMapper.writeValueAsString((Object)dimensions));
                return StringUtils.replace((String)spec, (String)"%%STREAM_PROPERTIES_VALUE%%", (String)this.jsonMapper.writeValueAsString((Object)consumerProperties));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        };
    }

    @Override
    Function<String, String> generateStreamQueryPropsTransform(String streamName, String fullDatasourceName) {
        return spec -> {
            try {
                spec = StringUtils.replace((String)spec, (String)"%%DATASOURCE%%", (String)fullDatasourceName);
                spec = StringUtils.replace((String)spec, (String)"%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", (String)TIMESTAMP_FMT.print((ReadableInstant)FIRST_EVENT_TIME));
                spec = StringUtils.replace((String)spec, (String)"%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", (String)TIMESTAMP_FMT.print((ReadableInstant)FIRST_EVENT_TIME.plusSeconds(9)));
                spec = StringUtils.replace((String)spec, (String)"%%TIMEBOUNDARY_RESPONSE_MINTIME%%", (String)TIMESTAMP_FMT.print((ReadableInstant)FIRST_EVENT_TIME));
                spec = StringUtils.replace((String)spec, (String)"%%TIMESERIES_QUERY_START%%", (String)INTERVAL_FMT.print((ReadableInstant)FIRST_EVENT_TIME));
                spec = StringUtils.replace((String)spec, (String)"%%TIMESERIES_QUERY_END%%", (String)INTERVAL_FMT.print((ReadableInstant)FIRST_EVENT_TIME.plusSeconds(9).plusMinutes(2)));
                spec = StringUtils.replace((String)spec, (String)"%%TIMESERIES_RESPONSE_TIMESTAMP%%", (String)TIMESTAMP_FMT.print((ReadableInstant)FIRST_EVENT_TIME));
                spec = StringUtils.replace((String)spec, (String)"%%TIMESERIES_ADDED%%", (String)Long.toString(this.getSumOfEventSequence(6) * 10L));
                return StringUtils.replace((String)spec, (String)"%%TIMESERIES_NUMEVENTS%%", (String)Integer.toString(60));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        };
    }
}

