/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.tests.query;

import com.google.inject.Inject;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.apache.druid.curator.discovery.ServerDiscoveryFactory;
import org.apache.druid.curator.discovery.ServerDiscoverySelector;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.EventReceiverFirehoseTestClient;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.guice.TestClient;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.ServerDiscoveryUtil;
import org.apache.druid.tests.indexer.AbstractITBatchIndexTest;
import org.apache.druid.tests.indexer.AbstractIndexerTest;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
import org.testng.annotations.BeforeSuite;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;

@Test(groups={"query"})
@Guice(moduleFactory=DruidTestModuleFactory.class)
public class ITUnionQueryTest
extends AbstractIndexerTest {
    private static final Logger LOG = new Logger(ITUnionQueryTest.class);
    private static final String UNION_TASK_RESOURCE = "/indexer/wikipedia_union_index_task.json";
    private static final String EVENT_RECEIVER_SERVICE_PREFIX = "eventReceiverServiceName";
    private static final String UNION_DATA_FILE = "/data/union_query/wikipedia_index_data.json";
    private static final String UNION_QUERIES_RESOURCE = "/queries/union_queries.json";
    private static final String UNION_DATASOURCE = "wikipedia_index_test";
    @Inject
    ServerDiscoveryFactory factory;
    @Inject
    @TestClient
    HttpClient httpClient;
    @Inject
    IntegrationTestingConfig config;
    private String fullDatasourceName;

    @BeforeSuite
    public void setFullDatasourceName() {
        this.fullDatasourceName = UNION_DATASOURCE + this.config.getExtraDatasourceNameSuffix();
    }

    @Test
    public void testUnionQuery() throws IOException {
        int numTasks = 3;
        Closer closer = Closer.create();
        for (int i = 0; i < 3; ++i) {
            closer.register(this.unloader(this.fullDatasourceName + i));
        }
        try {
            int i;
            String queryResponseTemplate;
            int i2;
            String task = this.setShutOffTime(ITUnionQueryTest.getResourceAsString(UNION_TASK_RESOURCE), DateTimes.utc((long)(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3L))));
            ArrayList<String> taskIDs = new ArrayList<String>();
            for (i2 = 0; i2 < 3; ++i2) {
                taskIDs.add(this.indexer.submitTask(this.withServiceName(this.withDataSource(task, this.fullDatasourceName + i2), EVENT_RECEIVER_SERVICE_PREFIX + i2)));
            }
            for (i2 = 0; i2 < 3; ++i2) {
                this.postEvents(i2);
            }
            ITRetryUtil.retryUntil(() -> {
                for (int i = 0; i < 3; ++i) {
                    int countRows = this.queryHelper.countRows(this.fullDatasourceName + i, Intervals.of((String)"2013-08-31/2013-09-01"), name -> new LongSumAggregatorFactory(name, "count"));
                    if (countRows >= 5) continue;
                    LOG.warn("%d events have been ingested to %s so far", new Object[]{countRows, this.fullDatasourceName + i});
                    return false;
                }
                return true;
            }, (boolean)true, (long)1000L, (int)100, (String)"Waiting all events are ingested");
            LOG.info("Running Union Queries..", new Object[0]);
            try {
                InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(UNION_QUERIES_RESOURCE);
                queryResponseTemplate = IOUtils.toString((InputStream)is, (Charset)StandardCharsets.UTF_8);
            }
            catch (IOException e) {
                throw new ISE((Throwable)e, "could not read query file: %s", new Object[]{UNION_QUERIES_RESOURCE});
            }
            queryResponseTemplate = StringUtils.replace((String)queryResponseTemplate, (String)"%%DATASOURCE%%", (String)this.fullDatasourceName);
            this.queryHelper.testQueriesFromString(queryResponseTemplate);
            for (i = 0; i < 3; ++i) {
                this.indexer.waitUntilTaskCompletes((String)taskIDs.get(i));
            }
            i = 0;
            while (i < 3) {
                int taskNum = i++;
                ITRetryUtil.retryUntil(() -> this.coordinator.areSegmentsLoaded(this.fullDatasourceName + taskNum), (boolean)true, (long)10000L, (int)10, (String)"Real-time generated segments loaded");
            }
            this.queryHelper.testQueriesFromString(queryResponseTemplate);
        }
        catch (Throwable e) {
            throw closer.rethrow(e);
        }
        finally {
            closer.close();
        }
    }

    private String setShutOffTime(String taskAsString, DateTime time) {
        return StringUtils.replace((String)taskAsString, (String)"#SHUTOFFTIME", (String)time.toString());
    }

    private String withDataSource(String taskAsString, String dataSource) {
        return StringUtils.replace((String)taskAsString, (String)"%%DATASOURCE%%", (String)dataSource);
    }

    private String withServiceName(String taskAsString, String serviceName) {
        return StringUtils.replace((String)taskAsString, (String)EVENT_RECEIVER_SERVICE_PREFIX, (String)serviceName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void postEvents(int id) throws Exception {
        ServerDiscoverySelector eventReceiverSelector = this.factory.createSelector(EVENT_RECEIVER_SERVICE_PREFIX + id);
        eventReceiverSelector.start();
        try {
            ServerDiscoveryUtil.waitUntilInstanceReady((ServerDiscoverySelector)eventReceiverSelector, (String)"Event Receiver");
            String host = this.config.getMiddleManagerHost() + ":" + eventReceiverSelector.pick().getPort();
            LOG.info("Event Receiver Found at host [%s]", new Object[]{host});
            LOG.info("Checking worker /status/health for [%s]", new Object[]{host});
            ITRetryUtil.retryUntilTrue(() -> {
                try {
                    StatusResponseHolder response = (StatusResponseHolder)this.httpClient.go(new Request(HttpMethod.GET, new URL(StringUtils.format((String)"https://%s/status/health", (Object[])new Object[]{host}))), (HttpResponseHandler)StatusResponseHandler.getInstance()).get();
                    return response.getStatus().equals((Object)HttpResponseStatus.OK);
                }
                catch (Throwable e) {
                    LOG.error(e, "", new Object[0]);
                    return false;
                }
            }, (String)StringUtils.format((String)"Checking /status/health for worker [%s]", (Object[])new Object[]{host}));
            LOG.info("Finished checking worker /status/health for [%s], success", new Object[]{host});
            EventReceiverFirehoseTestClient client = new EventReceiverFirehoseTestClient(host, EVENT_RECEIVER_SERVICE_PREFIX + id, this.jsonMapper, this.httpClient, this.smileMapper);
            client.postEventsFromFile(UNION_DATA_FILE);
        }
        finally {
            eventReceiverSelector.stop();
        }
    }
}

