/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.tests.indexer;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import org.apache.druid.curator.discovery.ServerDiscoverySelector;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.testing.clients.EventReceiverFirehoseTestClient;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.utils.ServerDiscoveryUtil;
import org.apache.druid.tests.indexer.AbstractITRealtimeIndexTaskTest;
import org.apache.druid.tests.indexer.ITRealtimeIndexTaskTest;
import org.joda.time.DateTime;
import org.joda.time.ReadableInstant;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;

@Test(groups={"realtime-index"})
@Guice(moduleFactory=DruidTestModuleFactory.class)
public class ITAppenderatorDriverRealtimeIndexTaskTest
extends AbstractITRealtimeIndexTaskTest {
    private static final Logger LOG = new Logger(ITAppenderatorDriverRealtimeIndexTaskTest.class);
    private static final String REALTIME_TASK_RESOURCE = "/indexer/wikipedia_realtime_appenderator_index_task.json";
    private static final String REALTIME_QUERIES_RESOURCE = "/indexer/wikipedia_realtime_appenderator_index_queries.json";
    private static final int EXPECTED_NUM_ROWS = 22;

    @Test
    public void testRealtimeIndexTask() {
        this.doTest();
    }

    @Override
    void postEvents() throws Exception {
        InputStreamReader isr;
        ServerDiscoverySelector eventReceiverSelector = this.factory.createSelector("eventReceiverServiceName");
        eventReceiverSelector.start();
        try {
            isr = new InputStreamReader(ITRealtimeIndexTaskTest.class.getResourceAsStream("/indexer/wikipedia_realtime_index_data.json"), StandardCharsets.UTF_8);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        try (BufferedReader reader = new BufferedReader(isr);){
            String line;
            DateTime dt;
            ServerDiscoveryUtil.waitUntilInstanceReady((ServerDiscoverySelector)eventReceiverSelector, (String)"Event Receiver");
            String host = this.config.getMiddleManagerHost() + ":" + eventReceiverSelector.pick().getPort();
            LOG.info("Event Receiver Found at host [%s]", new Object[]{host});
            EventReceiverFirehoseTestClient client = new EventReceiverFirehoseTestClient(host, "eventReceiverServiceName", this.jsonMapper, this.httpClient, this.smileMapper);
            int i = 1;
            this.dtFirst = dt = DateTimes.nowUtc();
            this.dtLast = dt;
            while ((line = reader.readLine()) != null) {
                if (i == 15) {
                    this.dtFirst = dt = dt.minusMinutes(10);
                } else if (i == 16) {
                    this.dtGroupBy = dt;
                } else if (i == 18) {
                    dt = dt.minusSeconds(6);
                }
                String event = StringUtils.replace((String)line, (String)"YYYY-MM-DDTHH:MM:SS", (String)EVENT_FMT.print((ReadableInstant)dt));
                LOG.info("sending event: [%s]\n", new Object[]{event});
                ArrayList<Object> events = new ArrayList<Object>();
                events.add(this.jsonMapper.readValue(event, JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT));
                int eventsPosted = client.postEvents(events, this.jsonMapper, "application/json");
                if (eventsPosted != events.size()) {
                    throw new ISE("Event not posted", new Object[0]);
                }
                try {
                    Thread.sleep(4000L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                this.dtLast = dt;
                dt = DateTimes.nowUtc();
                ++i;
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        finally {
            eventReceiverSelector.stop();
        }
    }

    @Override
    String getTaskResource() {
        return REALTIME_TASK_RESOURCE;
    }

    @Override
    String getQueriesResource() {
        return REALTIME_QUERIES_RESOURCE;
    }

    @Override
    int getNumExpectedRowsIngested() {
        return 22;
    }
}

