/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.java.util.emitter.core;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.emitter.core.BaseHttpEmittingConfig;
import org.apache.druid.java.util.emitter.core.BatchingStrategy;
import org.apache.druid.java.util.emitter.core.ContentEncoding;
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Emitters;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.core.GoHandler;
import org.apache.druid.java.util.emitter.core.GoHandlers;
import org.apache.druid.java.util.emitter.core.HttpEmitterConfig;
import org.apache.druid.java.util.emitter.core.HttpPostEmitter;
import org.apache.druid.java.util.emitter.core.MockHttpClient;
import org.apache.druid.java.util.emitter.service.UnitEvent;
import org.apache.druid.metadata.DefaultPasswordProvider;
import org.apache.druid.metadata.PasswordProvider;
import org.apache.druid.utils.CompressionUtils;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.HttpResponseBodyPart;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Request;
import org.asynchttpclient.Response;
import org.asynchttpclient.netty.EagerResponseBodyPart;
import org.asynchttpclient.netty.NettyResponseStatus;
import org.asynchttpclient.uri.Uri;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class EmitterTest {
    private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
    public static String TARGET_URL = "http://metrics.foo.bar/";
    public static final Response OK_RESPONSE = Stream.of(EmitterTest.responseBuilder(HttpVersion.HTTP_1_1, HttpResponseStatus.CREATED)).map(b -> {
        b.accumulate((HttpResponseBodyPart)new EagerResponseBodyPart(Unpooled.wrappedBuffer((byte[])"Yay".getBytes(StandardCharsets.UTF_8)), true));
        return b.build();
    }).findFirst().get();
    public static final Response BAD_RESPONSE = Stream.of(EmitterTest.responseBuilder(HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN)).map(b -> {
        b.accumulate((HttpResponseBodyPart)new EagerResponseBodyPart(Unpooled.wrappedBuffer((byte[])"Not yay".getBytes(StandardCharsets.UTF_8)), true));
        return b.build();
    }).findFirst().get();
    MockHttpClient httpClient;
    HttpPostEmitter emitter;

    private static Response.ResponseBuilder responseBuilder(HttpVersion version, HttpResponseStatus status) {
        Response.ResponseBuilder builder = new Response.ResponseBuilder();
        builder.accumulate((org.asynchttpclient.HttpResponseStatus)new NettyResponseStatus(Uri.create((String)TARGET_URL), (HttpResponse)new DefaultHttpResponse(version, status), null));
        return builder;
    }

    public static Response okResponse() {
        return OK_RESPONSE;
    }

    @Before
    public void setUp() {
        this.httpClient = new MockHttpClient();
    }

    @After
    public void tearDown() throws Exception {
        if (this.emitter != null) {
            this.emitter.close();
        }
    }

    private HttpPostEmitter timeBasedEmitter(long timeInMillis) {
        HttpEmitterConfig config = new HttpEmitterConfig.Builder(TARGET_URL).setFlushMillis(timeInMillis).setFlushTimeout(BaseHttpEmittingConfig.TEST_FLUSH_TIMEOUT_MILLIS).setFlushCount(Integer.MAX_VALUE).build();
        HttpPostEmitter emitter = new HttpPostEmitter(config, (AsyncHttpClient)this.httpClient, JSON_MAPPER);
        emitter.start();
        return emitter;
    }

    private HttpPostEmitter sizeBasedEmitter(int size) {
        HttpEmitterConfig config = new HttpEmitterConfig.Builder(TARGET_URL).setFlushMillis(Long.MAX_VALUE).setFlushTimeout(BaseHttpEmittingConfig.TEST_FLUSH_TIMEOUT_MILLIS).setFlushCount(size).build();
        HttpPostEmitter emitter = new HttpPostEmitter(config, (AsyncHttpClient)this.httpClient, JSON_MAPPER);
        emitter.start();
        return emitter;
    }

    private HttpPostEmitter sizeBasedEmitterGeneralizedCreation(int size) {
        Properties props = new Properties();
        props.setProperty("org.apache.druid.java.util.emitter.type", "http");
        props.setProperty("org.apache.druid.java.util.emitter.recipientBaseUrl", TARGET_URL);
        props.setProperty("org.apache.druid.java.util.emitter.flushMillis", String.valueOf(Long.MAX_VALUE));
        props.setProperty("org.apache.druid.java.util.emitter.flushCount", String.valueOf(size));
        props.setProperty("org.apache.druid.java.util.emitter.flushTimeOut", String.valueOf(BaseHttpEmittingConfig.TEST_FLUSH_TIMEOUT_MILLIS));
        Lifecycle lifecycle = new Lifecycle();
        Emitter emitter = Emitters.create((Properties)props, (AsyncHttpClient)this.httpClient, (ObjectMapper)JSON_MAPPER, (Lifecycle)lifecycle);
        Assert.assertTrue((String)StringUtils.format((String)"HttpPostEmitter emitter should be created, but found %s", (Object[])new Object[]{emitter.getClass().getName()}), (boolean)(emitter instanceof HttpPostEmitter));
        emitter.start();
        return (HttpPostEmitter)emitter;
    }

    private HttpPostEmitter sizeBasedEmitterWithContentEncoding(int size, ContentEncoding encoding) {
        HttpEmitterConfig config = new HttpEmitterConfig.Builder(TARGET_URL).setFlushMillis(Long.MAX_VALUE).setFlushCount(size).setContentEncoding(encoding).setFlushTimeout(BaseHttpEmittingConfig.TEST_FLUSH_TIMEOUT_MILLIS).build();
        HttpPostEmitter emitter = new HttpPostEmitter(config, (AsyncHttpClient)this.httpClient, JSON_MAPPER);
        emitter.start();
        return emitter;
    }

    private HttpPostEmitter manualFlushEmitterWithBasicAuthenticationAndNewlineSeparating(PasswordProvider authentication) {
        HttpEmitterConfig config = new HttpEmitterConfig.Builder(TARGET_URL).setFlushMillis(Long.MAX_VALUE).setFlushCount(Integer.MAX_VALUE).setFlushTimeout(BaseHttpEmittingConfig.TEST_FLUSH_TIMEOUT_MILLIS).setBasicAuthentication(authentication).setBatchingStrategy(BatchingStrategy.NEWLINES).setMaxBatchSize(0x100000).build();
        HttpPostEmitter emitter = new HttpPostEmitter(config, (AsyncHttpClient)this.httpClient, JSON_MAPPER);
        emitter.start();
        return emitter;
    }

    private HttpPostEmitter manualFlushEmitterWithBatchSize(int batchSize) {
        HttpEmitterConfig config = new HttpEmitterConfig.Builder(TARGET_URL).setFlushMillis(Long.MAX_VALUE).setFlushCount(Integer.MAX_VALUE).setFlushTimeout(BaseHttpEmittingConfig.TEST_FLUSH_TIMEOUT_MILLIS).setMaxBatchSize(batchSize).build();
        HttpPostEmitter emitter = new HttpPostEmitter(config, (AsyncHttpClient)this.httpClient, JSON_MAPPER);
        emitter.start();
        return emitter;
    }

    @Test
    public void testSanity() throws Exception {
        final List<UnitEvent> events = Arrays.asList(new UnitEvent("test", 1), new UnitEvent("test", 2));
        this.emitter = this.sizeBasedEmitter(2);
        this.httpClient.setGoHandler(new GoHandler(){

            @Override
            protected ListenableFuture<Response> go(Request request) throws JsonProcessingException {
                Assert.assertEquals((Object)TARGET_URL, (Object)request.getUrl());
                Assert.assertEquals((Object)"application/json", (Object)request.getHeaders().get("Content-Type"));
                Assert.assertEquals((Object)JSON_MAPPER.readTree(StringUtils.format((String)"[%s,%s]\n", (Object[])new Object[]{JSON_MAPPER.writeValueAsString(events.get(0)), JSON_MAPPER.writeValueAsString(events.get(1))})), (Object)JSON_MAPPER.readTree(StandardCharsets.UTF_8.decode(request.getByteBufferData().slice()).toString()));
                return GoHandlers.immediateFuture(EmitterTest.okResponse());
            }
        }.times(1));
        for (UnitEvent event : events) {
            this.emitter.emit((Event)event);
        }
        this.waitForEmission(this.emitter, 1);
        this.closeNoFlush((Emitter)this.emitter);
        Assert.assertTrue((boolean)this.httpClient.succeeded());
    }

    @Test
    public void testSanityWithGeneralizedCreation() throws Exception {
        final List<UnitEvent> events = Arrays.asList(new UnitEvent("test", 1), new UnitEvent("test", 2));
        this.emitter = this.sizeBasedEmitterGeneralizedCreation(2);
        this.httpClient.setGoHandler(new GoHandler(){

            @Override
            protected ListenableFuture<Response> go(Request request) throws JsonProcessingException {
                Assert.assertEquals((Object)TARGET_URL, (Object)request.getUrl());
                Assert.assertEquals((Object)"application/json", (Object)request.getHeaders().get("Content-Type"));
                Assert.assertEquals((Object)JSON_MAPPER.readTree(StringUtils.format((String)"[%s,%s]\n", (Object[])new Object[]{JSON_MAPPER.writeValueAsString(events.get(0)), JSON_MAPPER.writeValueAsString(events.get(1))})), (Object)JSON_MAPPER.readTree(StandardCharsets.UTF_8.decode(request.getByteBufferData().slice()).toString()));
                return GoHandlers.immediateFuture(EmitterTest.okResponse());
            }
        }.times(1));
        for (UnitEvent event : events) {
            this.emitter.emit((Event)event);
        }
        this.waitForEmission(this.emitter, 1);
        this.closeNoFlush((Emitter)this.emitter);
        Assert.assertTrue((boolean)this.httpClient.succeeded());
    }

    @Test
    public void testSizeBasedEmission() throws Exception {
        this.emitter = this.sizeBasedEmitter(3);
        this.httpClient.setGoHandler(GoHandlers.failingHandler());
        this.emitter.emit((Event)new UnitEvent("test", 1));
        this.emitter.emit((Event)new UnitEvent("test", 2));
        this.httpClient.setGoHandler(GoHandlers.passingHandler(EmitterTest.okResponse()).times(1));
        this.emitter.emit((Event)new UnitEvent("test", 3));
        this.waitForEmission(this.emitter, 1);
        this.httpClient.setGoHandler(GoHandlers.failingHandler());
        this.emitter.emit((Event)new UnitEvent("test", 4));
        this.emitter.emit((Event)new UnitEvent("test", 5));
        this.closeAndExpectFlush((Emitter)this.emitter);
        Assert.assertTrue((boolean)this.httpClient.succeeded());
    }

    @Test
    public void testTimeBasedEmission() throws Exception {
        int timeBetweenEmissions = 100;
        this.emitter = this.timeBasedEmitter(100L);
        final CountDownLatch latch = new CountDownLatch(1);
        this.httpClient.setGoHandler(new GoHandler(){

            @Override
            protected ListenableFuture<Response> go(Request request) {
                latch.countDown();
                return GoHandlers.immediateFuture(EmitterTest.okResponse());
            }
        }.times(1));
        long emitTime = System.currentTimeMillis();
        this.emitter.emit((Event)new UnitEvent("test", 1));
        latch.await();
        long timeWaited = System.currentTimeMillis() - emitTime;
        Assert.assertTrue((String)StringUtils.format((String)"timeWaited[%s] !< %s", (Object[])new Object[]{timeWaited, 200}), (timeWaited < 200L ? 1 : 0) != 0);
        this.waitForEmission(this.emitter, 1);
        final CountDownLatch thisLatch = new CountDownLatch(1);
        this.httpClient.setGoHandler(new GoHandler(){

            @Override
            protected ListenableFuture<Response> go(Request request) {
                thisLatch.countDown();
                return GoHandlers.immediateFuture(EmitterTest.okResponse());
            }
        }.times(1));
        emitTime = System.currentTimeMillis();
        this.emitter.emit((Event)new UnitEvent("test", 2));
        thisLatch.await();
        timeWaited = System.currentTimeMillis() - emitTime;
        Assert.assertTrue((String)StringUtils.format((String)"timeWaited[%s] !< %s", (Object[])new Object[]{timeWaited, 200}), (timeWaited < 200L ? 1 : 0) != 0);
        this.waitForEmission(this.emitter, 2);
        this.closeNoFlush((Emitter)this.emitter);
        Assert.assertTrue((String)"httpClient.succeeded()", (boolean)this.httpClient.succeeded());
    }

    @Test(timeout=60000L)
    public void testFailedEmission() throws Exception {
        UnitEvent event1 = new UnitEvent("test", 1);
        UnitEvent event2 = new UnitEvent("test", 2);
        this.emitter = this.sizeBasedEmitter(1);
        Assert.assertEquals((long)0L, (long)this.emitter.getTotalEmittedEvents());
        Assert.assertEquals((long)0L, (long)this.emitter.getSuccessfulSendingTimeCounter().getTimeSumAndCount());
        Assert.assertEquals((long)0L, (long)this.emitter.getFailedSendingTimeCounter().getTimeSumAndCount());
        this.httpClient.setGoHandler(new GoHandler(){

            @Override
            protected ListenableFuture<Response> go(Request request) {
                Response response = EmitterTest.responseBuilder(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST).build();
                return GoHandlers.immediateFuture(response);
            }
        });
        this.emitter.emit((Event)event1);
        this.emitter.flush();
        this.waitForEmission(this.emitter, 1);
        Assert.assertTrue((boolean)this.httpClient.succeeded());
        Assert.assertEquals((long)0L, (long)this.emitter.getTotalEmittedEvents());
        Assert.assertEquals((long)0L, (long)this.emitter.getSuccessfulSendingTimeCounter().getTimeSumAndCount());
        Assert.assertTrue((this.emitter.getFailedSendingTimeCounter().getTimeSumAndCount() > 0L ? 1 : 0) != 0);
        this.httpClient.setGoHandler(new GoHandler(){

            @Override
            protected ListenableFuture<Response> go(Request request) {
                return GoHandlers.immediateFuture(EmitterTest.okResponse());
            }
        }.times(2));
        this.emitter.emit((Event)event2);
        this.emitter.flush();
        this.waitForEmission(this.emitter, 2);
        this.closeNoFlush((Emitter)this.emitter);
        this.emitter.joinEmitterThread();
        Assert.assertEquals((long)2L, (long)this.emitter.getTotalEmittedEvents());
        Assert.assertTrue((this.emitter.getSuccessfulSendingTimeCounter().getTimeSumAndCount() > 0L ? 1 : 0) != 0);
        Assert.assertTrue((this.emitter.getFailedSendingTimeCounter().getTimeSumAndCount() > 0L ? 1 : 0) != 0);
        Assert.assertTrue((boolean)this.httpClient.succeeded());
    }

    @Test
    public void testBasicAuthenticationAndNewlineSeparating() throws Exception {
        final List<UnitEvent> events = Arrays.asList(new UnitEvent("test", 1), new UnitEvent("test", 2));
        this.emitter = this.manualFlushEmitterWithBasicAuthenticationAndNewlineSeparating((PasswordProvider)new DefaultPasswordProvider("foo:bar"));
        this.httpClient.setGoHandler(new GoHandler(){

            @Override
            protected ListenableFuture<Response> go(Request request) throws JsonProcessingException {
                Assert.assertEquals((Object)TARGET_URL, (Object)request.getUrl());
                Assert.assertEquals((Object)"application/json", (Object)request.getHeaders().get("Content-Type"));
                Assert.assertEquals((Object)("Basic " + StringUtils.encodeBase64String((byte[])StringUtils.toUtf8((String)"foo:bar"))), (Object)request.getHeaders().get("Authorization"));
                Assert.assertEquals((Object)JSON_MAPPER.readTree(StringUtils.format((String)"%s\n%s\n", (Object[])new Object[]{JSON_MAPPER.writeValueAsString(events.get(0)), JSON_MAPPER.writeValueAsString(events.get(1))})), (Object)JSON_MAPPER.readTree(StandardCharsets.UTF_8.decode(request.getByteBufferData().slice()).toString()));
                return GoHandlers.immediateFuture(EmitterTest.okResponse());
            }
        }.times(1));
        for (UnitEvent event : events) {
            this.emitter.emit((Event)event);
        }
        this.emitter.flush();
        this.waitForEmission(this.emitter, 1);
        this.closeNoFlush((Emitter)this.emitter);
        Assert.assertTrue((boolean)this.httpClient.succeeded());
    }

    @Test
    public void testBatchSplitting() throws Exception {
        byte[] big = new byte[512000];
        for (int i = 0; i < big.length; ++i) {
            big[i] = 120;
        }
        String bigString = StringUtils.fromUtf8((byte[])big);
        final List<UnitEvent> events = Arrays.asList(new UnitEvent(bigString, 1), new UnitEvent(bigString, 2), new UnitEvent(bigString, 3), new UnitEvent(bigString, 4));
        final AtomicInteger counter = new AtomicInteger();
        this.emitter = this.manualFlushEmitterWithBatchSize(0x100000);
        Assert.assertEquals((long)0L, (long)this.emitter.getTotalEmittedEvents());
        Assert.assertEquals((long)0L, (long)this.emitter.getSuccessfulSendingTimeCounter().getTimeSumAndCount());
        Assert.assertEquals((long)0L, (long)this.emitter.getFailedSendingTimeCounter().getTimeSumAndCount());
        this.httpClient.setGoHandler(new GoHandler(){

            @Override
            protected ListenableFuture<Response> go(Request request) throws JsonProcessingException {
                Assert.assertEquals((Object)TARGET_URL, (Object)request.getUrl());
                Assert.assertEquals((Object)"application/json", (Object)request.getHeaders().get("Content-Type"));
                Assert.assertEquals((Object)JSON_MAPPER.readTree(StringUtils.format((String)"[%s,%s]\n", (Object[])new Object[]{JSON_MAPPER.writeValueAsString(events.get(counter.getAndIncrement())), JSON_MAPPER.writeValueAsString(events.get(counter.getAndIncrement()))})), (Object)JSON_MAPPER.readTree(StandardCharsets.UTF_8.decode(request.getByteBufferData().slice()).toString()));
                return GoHandlers.immediateFuture(EmitterTest.okResponse());
            }
        }.times(3));
        for (UnitEvent event : events) {
            this.emitter.emit((Event)event);
        }
        this.waitForEmission(this.emitter, 1);
        Assert.assertEquals((long)2L, (long)this.emitter.getTotalEmittedEvents());
        Assert.assertTrue((this.emitter.getSuccessfulSendingTimeCounter().getTimeSumAndCount() > 0L ? 1 : 0) != 0);
        Assert.assertEquals((long)0L, (long)this.emitter.getFailedSendingTimeCounter().getTimeSumAndCount());
        this.emitter.flush();
        this.waitForEmission(this.emitter, 2);
        Assert.assertEquals((long)4L, (long)this.emitter.getTotalEmittedEvents());
        Assert.assertTrue((this.emitter.getSuccessfulSendingTimeCounter().getTimeSumAndCount() > 0L ? 1 : 0) != 0);
        Assert.assertEquals((long)0L, (long)this.emitter.getFailedSendingTimeCounter().getTimeSumAndCount());
        this.closeNoFlush((Emitter)this.emitter);
        Assert.assertTrue((boolean)this.httpClient.succeeded());
    }

    @Test
    public void testGzipContentEncoding() throws Exception {
        final List<UnitEvent> events = Arrays.asList(new UnitEvent("plain-text", 1), new UnitEvent("plain-text", 2));
        this.emitter = this.sizeBasedEmitterWithContentEncoding(2, ContentEncoding.GZIP);
        this.httpClient.setGoHandler(new GoHandler(){

            @Override
            protected ListenableFuture<Response> go(Request request) throws IOException {
                Assert.assertEquals((Object)TARGET_URL, (Object)request.getUrl());
                Assert.assertEquals((Object)"application/json", (Object)request.getHeaders().get("Content-Type"));
                Assert.assertEquals((Object)"gzip", (Object)request.getHeaders().get("Content-Encoding"));
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                ByteBuffer data = request.getByteBufferData().slice();
                byte[] dataArray = new byte[data.remaining()];
                data.get(dataArray);
                CompressionUtils.gunzip((InputStream)new ByteArrayInputStream(dataArray), (OutputStream)baos);
                Assert.assertEquals((Object)JSON_MAPPER.readTree(StringUtils.format((String)"[%s,%s]\n", (Object[])new Object[]{JSON_MAPPER.writeValueAsString(events.get(0)), JSON_MAPPER.writeValueAsString(events.get(1))})), (Object)JSON_MAPPER.readTree(baos.toString(StandardCharsets.UTF_8.name())));
                return GoHandlers.immediateFuture(EmitterTest.okResponse());
            }
        }.times(1));
        for (UnitEvent event : events) {
            this.emitter.emit((Event)event);
        }
        this.waitForEmission(this.emitter, 1);
        this.closeNoFlush((Emitter)this.emitter);
        Assert.assertTrue((boolean)this.httpClient.succeeded());
    }

    private void closeAndExpectFlush(Emitter emitter) throws IOException {
        this.httpClient.setGoHandler(GoHandlers.passingHandler(EmitterTest.okResponse()).times(1));
        emitter.close();
    }

    private void closeNoFlush(Emitter emitter) throws IOException {
        emitter.close();
    }

    private void waitForEmission(HttpPostEmitter emitter, int batchNumber) throws Exception {
        emitter.waitForEmission(batchNumber);
    }
}

