/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.spark.execution.http;

import com.facebook.airlift.http.client.HeaderName;
import com.facebook.airlift.http.client.HttpClient;
import com.facebook.airlift.http.client.HttpStatus;
import com.facebook.airlift.http.client.HttpUriBuilder;
import com.facebook.airlift.http.client.Request;
import com.facebook.airlift.http.client.RequestStats;
import com.facebook.airlift.http.client.Response;
import com.facebook.airlift.http.client.ResponseHandler;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.presto.PrestoMediaTypes;
import com.facebook.presto.client.NodeVersion;
import com.facebook.presto.client.ServerInfo;
import com.facebook.presto.execution.QueryManagerConfig;
import com.facebook.presto.execution.ScheduledSplit;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.TaskInfo;
import com.facebook.presto.execution.TaskManagerConfig;
import com.facebook.presto.execution.TaskTestUtils;
import com.facebook.presto.execution.buffer.OutputBuffers;
import com.facebook.presto.execution.scheduler.TableWriteInfo;
import com.facebook.presto.operator.PageBufferClient;
import com.facebook.presto.operator.PageTransportErrorException;
import com.facebook.presto.operator.TaskStats;
import com.facebook.presto.server.smile.BaseResponse;
import com.facebook.presto.spark.execution.BatchTaskUpdateRequest;
import com.facebook.presto.spark.execution.HttpNativeExecutionTaskInfoFetcher;
import com.facebook.presto.spark.execution.HttpNativeExecutionTaskResultFetcher;
import com.facebook.presto.spark.execution.NativeExecutionProcess;
import com.facebook.presto.spark.execution.NativeExecutionProcessFactory;
import com.facebook.presto.spark.execution.NativeExecutionTask;
import com.facebook.presto.spark.execution.NativeExecutionTaskFactory;
import com.facebook.presto.spark.execution.http.PrestoSparkHttpServerClient;
import com.facebook.presto.spark.execution.http.PrestoSparkHttpTaskClient;
import com.facebook.presto.spark.execution.property.NativeExecutionConnectorConfig;
import com.facebook.presto.spark.execution.property.NativeExecutionNodeConfig;
import com.facebook.presto.spark.execution.property.NativeExecutionSystemConfig;
import com.facebook.presto.spark.execution.property.NativeExecutionVeloxConfig;
import com.facebook.presto.spark.execution.property.PrestoSparkWorkerProperty;
import com.facebook.presto.spark.execution.property.WorkerProperty;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.page.PageCodecMarker;
import com.facebook.presto.spi.page.PagesSerdeUtil;
import com.facebook.presto.spi.page.SerializedPage;
import com.facebook.presto.sql.planner.PlanFragment;
import com.facebook.presto.testing.TestingSession;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.net.MediaType;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.slice.DynamicSliceOutput;
import io.airlift.slice.Slice;
import io.airlift.slice.SliceOutput;
import io.airlift.slice.Slices;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.joda.time.DateTime;
import org.testng.Assert;
import org.testng.annotations.Test;

public class TestPrestoSparkHttpClient {
    private static final String TASK_ROOT_PATH = "/v1/task";
    private static final URI BASE_URI = HttpUriBuilder.uriBuilder().scheme("http").host("localhost").port(8080).build();
    private static final Duration NO_DURATION = new Duration(0.0, TimeUnit.MILLISECONDS);
    private static final JsonCodec<TaskInfo> TASK_INFO_JSON_CODEC = JsonCodec.jsonCodec(TaskInfo.class);
    private static final JsonCodec<PlanFragment> PLAN_FRAGMENT_JSON_CODEC = JsonCodec.jsonCodec(PlanFragment.class);
    private static final JsonCodec<BatchTaskUpdateRequest> TASK_UPDATE_REQUEST_JSON_CODEC = JsonCodec.jsonCodec(BatchTaskUpdateRequest.class);
    private static final JsonCodec<ServerInfo> SERVER_INFO_JSON_CODEC = JsonCodec.jsonCodec(ServerInfo.class);
    private static final ScheduledExecutorService errorScheduler = Executors.newScheduledThreadPool(4);
    private static final ScheduledExecutorService updateScheduledExecutor = Executors.newScheduledThreadPool(4);

    @Test
    public void testResultGet() {
        TaskId taskId = new TaskId("testid", 0, 0, 0, 0);
        PrestoSparkHttpTaskClient workerClient = new PrestoSparkHttpTaskClient((HttpClient)new TestingHttpClient(new TestingResponseManager(taskId.toString())), taskId, BASE_URI, TASK_INFO_JSON_CODEC, PLAN_FRAGMENT_JSON_CODEC, TASK_UPDATE_REQUEST_JSON_CODEC, new Duration(1.0, TimeUnit.SECONDS));
        ListenableFuture future = workerClient.getResults(0L, new DataSize(32.0, DataSize.Unit.MEGABYTE));
        try {
            PageBufferClient.PagesResponse page = (PageBufferClient.PagesResponse)future.get();
            Assert.assertEquals((long)0L, (long)page.getToken());
            Assert.assertEquals((boolean)true, (boolean)page.isClientComplete());
            Assert.assertEquals((String)taskId.toString(), (String)page.getTaskInstanceId());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail();
        }
    }

    @Test
    public void testResultAcknowledge() {
        TaskId taskId = new TaskId("testid", 0, 0, 0, 0);
        PrestoSparkHttpTaskClient workerClient = new PrestoSparkHttpTaskClient((HttpClient)new TestingHttpClient(new TestingResponseManager(taskId.toString())), taskId, BASE_URI, TASK_INFO_JSON_CODEC, PLAN_FRAGMENT_JSON_CODEC, TASK_UPDATE_REQUEST_JSON_CODEC, new Duration(1.0, TimeUnit.SECONDS));
        workerClient.acknowledgeResultsAsync(1L);
    }

    @Test
    public void testResultAbort() {
        TaskId taskId = new TaskId("testid", 0, 0, 0, 0);
        PrestoSparkHttpTaskClient workerClient = new PrestoSparkHttpTaskClient((HttpClient)new TestingHttpClient(new TestingResponseManager(taskId.toString())), taskId, BASE_URI, TASK_INFO_JSON_CODEC, PLAN_FRAGMENT_JSON_CODEC, TASK_UPDATE_REQUEST_JSON_CODEC, new Duration(1.0, TimeUnit.SECONDS));
        ListenableFuture future = workerClient.abortResults();
        try {
            future.get();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail();
        }
    }

    @Test
    public void testGetTaskInfo() {
        TaskId taskId = new TaskId("testid", 0, 0, 0, 0);
        PrestoSparkHttpTaskClient workerClient = new PrestoSparkHttpTaskClient((HttpClient)new TestingHttpClient(new TestingResponseManager(taskId.toString())), taskId, BASE_URI, TASK_INFO_JSON_CODEC, PLAN_FRAGMENT_JSON_CODEC, TASK_UPDATE_REQUEST_JSON_CODEC, new Duration(1.0, TimeUnit.SECONDS));
        ListenableFuture future = workerClient.getTaskInfo();
        try {
            TaskInfo taskInfo = (TaskInfo)((BaseResponse)future.get()).getValue();
            Assert.assertEquals((String)taskInfo.getTaskId().toString(), (String)taskId.toString());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail();
        }
    }

    @Test
    public void testUpdateTask() {
        TaskId taskId = new TaskId("testid", 0, 0, 0, 0);
        PrestoSparkHttpTaskClient workerClient = new PrestoSparkHttpTaskClient((HttpClient)new TestingHttpClient(new TestingResponseManager(taskId.toString())), taskId, BASE_URI, TASK_INFO_JSON_CODEC, PLAN_FRAGMENT_JSON_CODEC, TASK_UPDATE_REQUEST_JSON_CODEC, new Duration(1.0, TimeUnit.SECONDS));
        HashSet<ScheduledSplit> splits = new HashSet<ScheduledSplit>();
        splits.add(TaskTestUtils.SPLIT);
        ArrayList sources = new ArrayList();
        ListenableFuture future = workerClient.updateTask(sources, TaskTestUtils.createPlanFragment(), new TableWriteInfo(Optional.empty(), Optional.empty(), Optional.empty()), Optional.empty(), TestingSession.testSessionBuilder().build(), OutputBuffers.createInitialEmptyOutputBuffers((OutputBuffers.BufferType)OutputBuffers.BufferType.PARTITIONED));
        try {
            TaskInfo taskInfo = (TaskInfo)((BaseResponse)future.get()).getValue();
            Assert.assertEquals((String)taskInfo.getTaskId().toString(), (String)taskId.toString());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail();
        }
    }

    @Test
    public void testGetServerInfo() {
        TaskId taskId = new TaskId("testid", 0, 0, 0, 0);
        ServerInfo expected = new ServerInfo(NodeVersion.UNKNOWN, "test", true, false, Optional.of(Duration.valueOf((String)"2m")));
        PrestoSparkHttpServerClient workerClient = new PrestoSparkHttpServerClient((HttpClient)new TestingHttpClient(new TestingResponseManager(taskId.toString())), BASE_URI, SERVER_INFO_JSON_CODEC);
        ListenableFuture future = workerClient.getServerInfo();
        try {
            ServerInfo serverInfo = (ServerInfo)((BaseResponse)future.get()).getValue();
            Assert.assertEquals((Object)serverInfo, (Object)expected);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail();
        }
    }

    @Test
    public void testGetServerInfoWithRetry() {
        TaskId taskId = new TaskId("testid", 0, 0, 0, 0);
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        ServerInfo expected = new ServerInfo(NodeVersion.UNKNOWN, "test", true, false, Optional.of(Duration.valueOf((String)"2m")));
        Duration maxTimeout = new Duration(1.0, TimeUnit.MINUTES);
        NativeExecutionProcess process = this.createNativeExecutionProcess(taskId, scheduler, maxTimeout, new TestingResponseManager(taskId.toString(), new FailureRetryResponseManager(5)), new TaskManagerConfig());
        SettableFuture future = process.getServerInfoWithRetry();
        try {
            ServerInfo serverInfo = (ServerInfo)future.get();
            Assert.assertEquals((Object)serverInfo, (Object)expected);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail();
        }
    }

    @Test
    public void testGetServerInfoWithRetryTimeout() {
        TaskId taskId = new TaskId("testid", 0, 0, 0, 0);
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        Duration maxTimeout = new Duration(0.0, TimeUnit.MILLISECONDS);
        NativeExecutionProcess process = this.createNativeExecutionProcess(taskId, scheduler, maxTimeout, new TestingResponseManager(taskId.toString(), new FailureRetryResponseManager(5)), new TaskManagerConfig());
        SettableFuture future = process.getServerInfoWithRetry();
        Exception exception = (Exception)Assert.expectThrows(ExecutionException.class, () -> future.get());
        Assert.assertTrue((boolean)exception.getMessage().contains("Native process launch failed with multiple retries"));
    }

    @Test
    public void testResultFetcher() {
        TaskId taskId = new TaskId("testid", 0, 0, 0, 0);
        PrestoSparkHttpTaskClient workerClient = new PrestoSparkHttpTaskClient((HttpClient)new TestingHttpClient(new TestingResponseManager(taskId.toString())), taskId, BASE_URI, TASK_INFO_JSON_CODEC, PLAN_FRAGMENT_JSON_CODEC, TASK_UPDATE_REQUEST_JSON_CODEC, new Duration(1.0, TimeUnit.SECONDS));
        HttpNativeExecutionTaskResultFetcher taskResultFetcher = new HttpNativeExecutionTaskResultFetcher(Executors.newScheduledThreadPool(1), workerClient);
        CompletableFuture future = taskResultFetcher.start();
        try {
            future.get();
            ArrayList pages = new ArrayList();
            Optional page = taskResultFetcher.pollPage();
            while (page.isPresent()) {
                pages.add(page.get());
                page = taskResultFetcher.pollPage();
            }
            Assert.assertEquals((int)1, (int)pages.size());
            Assert.assertEquals((int)0, (int)((SerializedPage)pages.get(0)).getSizeInBytes());
        }
        catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
            Assert.fail();
        }
    }

    @Test
    public void testResultFetcherMultipleNonEmptyResults() {
        TaskId taskId = new TaskId("testid", 0, 0, 0, 0);
        final int serializedPageSize = (int)new DataSize(1.0, DataSize.Unit.MEGABYTE).toBytes();
        final int numPages = 10;
        PrestoSparkHttpTaskClient workerClient = new PrestoSparkHttpTaskClient((HttpClient)new TestingHttpClient(new TestingResponseManager(taskId.toString(), new TestingResponseManager.TestingResultResponseManager(){
            private int requestCount;

            @Override
            public Response createResultResponse(String taskId) throws PageTransportErrorException {
                ++this.requestCount;
                if (this.requestCount < numPages) {
                    return this.createResultResponseHelper(HttpStatus.OK, taskId, this.requestCount - 1, this.requestCount, false, serializedPageSize);
                }
                if (this.requestCount == numPages) {
                    return this.createResultResponseHelper(HttpStatus.OK, taskId, this.requestCount - 1, this.requestCount, true, serializedPageSize);
                }
                Assert.fail((String)"Retrieving results after buffer completion");
                return null;
            }
        })), taskId, BASE_URI, TASK_INFO_JSON_CODEC, PLAN_FRAGMENT_JSON_CODEC, TASK_UPDATE_REQUEST_JSON_CODEC, new Duration(1.0, TimeUnit.SECONDS));
        HttpNativeExecutionTaskResultFetcher taskResultFetcher = new HttpNativeExecutionTaskResultFetcher(Executors.newScheduledThreadPool(1), workerClient);
        CompletableFuture future = taskResultFetcher.start();
        try {
            future.get();
            ArrayList pages = new ArrayList();
            Optional page = taskResultFetcher.pollPage();
            while (page.isPresent()) {
                pages.add(page.get());
                page = taskResultFetcher.pollPage();
            }
            Assert.assertEquals((int)numPages, (int)pages.size());
            for (int i = 0; i < numPages; ++i) {
                Assert.assertEquals((int)((SerializedPage)pages.get(i)).getSizeInBytes(), (int)serializedPageSize);
            }
        }
        catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
            Assert.fail();
        }
    }

    @Test
    public void testResultFetcherExceedingBufferLimit() {
        int numPages = 10;
        int serializedPageSize = (int)new DataSize(32.0, DataSize.Unit.MEGABYTE).toBytes();
        TaskId taskId = new TaskId("testid", 0, 0, 0, 0);
        BreakingLimitResponseManager breakingLimitResponseManager = new BreakingLimitResponseManager(serializedPageSize, numPages);
        PrestoSparkHttpTaskClient workerClient = new PrestoSparkHttpTaskClient((HttpClient)new TestingHttpClient(new TestingResponseManager(taskId.toString(), breakingLimitResponseManager)), taskId, BASE_URI, TASK_INFO_JSON_CODEC, PLAN_FRAGMENT_JSON_CODEC, TASK_UPDATE_REQUEST_JSON_CODEC, new Duration(1.0, TimeUnit.SECONDS));
        HttpNativeExecutionTaskResultFetcher taskResultFetcher = new HttpNativeExecutionTaskResultFetcher(Executors.newScheduledThreadPool(10), workerClient);
        CompletableFuture future = taskResultFetcher.start();
        try {
            Optional page = Optional.empty();
            while (!page.isPresent()) {
                page = taskResultFetcher.pollPage();
            }
            Thread.sleep(5000L);
            Assert.assertEquals((int)breakingLimitResponseManager.getRemainingPageCount(), (int)5);
            ArrayList pages = new ArrayList();
            pages.add(page.get());
            while (pages.size() < numPages) {
                page = taskResultFetcher.pollPage();
                page.ifPresent(pages::add);
            }
            future.get();
            Assert.assertEquals((int)numPages, (int)pages.size());
            for (int i = 0; i < numPages; ++i) {
                Assert.assertEquals((int)((SerializedPage)pages.get(i)).getSizeInBytes(), (int)serializedPageSize);
            }
        }
        catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
            Assert.fail();
        }
    }

    @Test
    public void testResultFetcherTransportErrorRecovery() {
        int numPages = 10;
        int serializedPageSize = 0;
        int numTransportErrors = 3;
        TaskId taskId = new TaskId("testid", 0, 0, 0, 0);
        TimeoutResponseManager timeoutResponseManager = new TimeoutResponseManager(serializedPageSize, numPages, numTransportErrors);
        PrestoSparkHttpTaskClient workerClient = new PrestoSparkHttpTaskClient((HttpClient)new TestingHttpClient(new TestingResponseManager(taskId.toString(), timeoutResponseManager)), taskId, BASE_URI, TASK_INFO_JSON_CODEC, PLAN_FRAGMENT_JSON_CODEC, TASK_UPDATE_REQUEST_JSON_CODEC, new Duration(1.0, TimeUnit.SECONDS));
        HttpNativeExecutionTaskResultFetcher taskResultFetcher = new HttpNativeExecutionTaskResultFetcher(Executors.newScheduledThreadPool(10), workerClient);
        CompletableFuture future = taskResultFetcher.start();
        try {
            future.get();
            ArrayList pages = new ArrayList();
            Optional page = taskResultFetcher.pollPage();
            while (page.isPresent()) {
                pages.add(page.get());
                page = taskResultFetcher.pollPage();
            }
            Assert.assertEquals((int)pages.size(), (int)numPages);
            for (int i = 0; i < numPages; ++i) {
                Assert.assertEquals((int)((SerializedPage)pages.get(i)).getSizeInBytes(), (int)serializedPageSize);
            }
        }
        catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
            Assert.fail();
        }
    }

    @Test
    public void testResultFetcherTransportErrorFail() {
        TaskId taskId = new TaskId("testid", 0, 0, 0, 0);
        PrestoSparkHttpTaskClient workerClient = new PrestoSparkHttpTaskClient((HttpClient)new TestingHttpClient(new TestingResponseManager(taskId.toString(), new TimeoutResponseManager(0, 10, 10))), taskId, BASE_URI, TASK_INFO_JSON_CODEC, PLAN_FRAGMENT_JSON_CODEC, TASK_UPDATE_REQUEST_JSON_CODEC, new Duration(1.0, TimeUnit.SECONDS));
        HttpNativeExecutionTaskResultFetcher taskResultFetcher = new HttpNativeExecutionTaskResultFetcher(Executors.newScheduledThreadPool(1), workerClient);
        CompletableFuture future = taskResultFetcher.start();
        Assert.assertThrows(ExecutionException.class, future::get);
    }

    @Test
    public void testInfoFetcher() {
        TaskId taskId = new TaskId("testid", 0, 0, 0, 0);
        Duration fetchInterval = new Duration(1.0, TimeUnit.SECONDS);
        HttpNativeExecutionTaskInfoFetcher taskInfoFetcher = this.createTaskInfoFetcher(taskId, new TestingResponseManager(taskId.toString()));
        Assert.assertFalse((boolean)taskInfoFetcher.getTaskInfo().isPresent());
        taskInfoFetcher.start();
        try {
            Thread.sleep(3L * fetchInterval.toMillis());
        }
        catch (InterruptedException e) {
            e.printStackTrace();
            Assert.fail();
        }
        Assert.assertTrue((boolean)taskInfoFetcher.getTaskInfo().isPresent());
    }

    @Test
    public void testInfoFetcherWithRetry() {
        TaskId taskId = new TaskId("testid", 0, 0, 0, 0);
        Duration fetchInterval = new Duration(1.0, TimeUnit.SECONDS);
        HttpNativeExecutionTaskInfoFetcher taskInfoFetcher = this.createTaskInfoFetcher(taskId, new TestingResponseManager(taskId.toString(), new FailureTaskInfoRetryResponseManager(1)), new Duration(5.0, TimeUnit.SECONDS));
        Assert.assertFalse((boolean)taskInfoFetcher.getTaskInfo().isPresent());
        taskInfoFetcher.start();
        try {
            Thread.sleep(3L * fetchInterval.toMillis());
        }
        catch (InterruptedException e) {
            e.printStackTrace();
            Assert.fail();
        }
        Assert.assertTrue((boolean)taskInfoFetcher.getTaskInfo().isPresent());
        try {
            Thread.sleep(10L * fetchInterval.toMillis());
        }
        catch (InterruptedException e) {
            e.printStackTrace();
            Assert.fail();
        }
        Exception exception = (Exception)Assert.expectThrows(RuntimeException.class, () -> taskInfoFetcher.getTaskInfo());
        Assert.assertTrue((boolean)exception.getMessage().contains("TaskInfoFetcher encountered too many errors talking to native process."));
    }

    @Test
    public void testNativeExecutionTask() {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(4);
        TaskId taskId = new TaskId("testid", 0, 0, 0, 0);
        TaskManagerConfig taskConfig = new TaskManagerConfig();
        QueryManagerConfig queryConfig = new QueryManagerConfig();
        taskConfig.setInfoRefreshMaxWait(new Duration(5.0, TimeUnit.SECONDS));
        taskConfig.setInfoUpdateInterval(new Duration(200.0, TimeUnit.MILLISECONDS));
        queryConfig.setRemoteTaskMaxErrorDuration(new Duration(1.0, TimeUnit.MINUTES));
        ArrayList sources = new ArrayList();
        try {
            NativeExecutionTaskFactory taskFactory = new NativeExecutionTaskFactory((HttpClient)new TestingHttpClient(new TestingResponseManager(taskId.toString(), new TimeoutResponseManager(0, 10, 0))), Executors.newSingleThreadExecutor(), scheduler, scheduler, TASK_INFO_JSON_CODEC, PLAN_FRAGMENT_JSON_CODEC, TASK_UPDATE_REQUEST_JSON_CODEC, taskConfig, queryConfig);
            NativeExecutionTask task = taskFactory.createNativeExecutionTask(TestingSession.testSessionBuilder().build(), BASE_URI, taskId, TaskTestUtils.createPlanFragment(), sources, new TableWriteInfo(Optional.empty(), Optional.empty(), Optional.empty()), Optional.empty());
            Assert.assertNotNull((Object)task);
            Assert.assertFalse((boolean)task.getTaskInfo().isPresent());
            Assert.assertFalse((boolean)task.pollResult().isPresent());
            TaskInfo taskInfo = task.start();
            Assert.assertFalse((boolean)taskInfo.getTaskStatus().getState().isDone());
            ArrayList resultPages = new ArrayList();
            for (int i = 0; i < 100 && resultPages.size() < 10; ++i) {
                Optional page = task.pollResult();
                page.ifPresent(resultPages::add);
            }
            Assert.assertFalse((boolean)task.pollResult().isPresent());
            Assert.assertEquals((int)10, (int)resultPages.size());
            Assert.assertTrue((boolean)task.getTaskInfo().isPresent());
            task.stop();
        }
        catch (InterruptedException e) {
            e.printStackTrace();
            Assert.fail();
        }
    }

    private NativeExecutionProcess createNativeExecutionProcess(TaskId taskId, ScheduledExecutorService scheduler, Duration maxErrorDuration, TestingResponseManager responseManager, TaskManagerConfig config) {
        ScheduledExecutorService errorScheduler = Executors.newScheduledThreadPool(4);
        PrestoSparkWorkerProperty workerProperty = new PrestoSparkWorkerProperty(new NativeExecutionConnectorConfig(), new NativeExecutionNodeConfig(), new NativeExecutionSystemConfig(), new NativeExecutionVeloxConfig());
        NativeExecutionProcessFactory factory = new NativeExecutionProcessFactory((HttpClient)new TestingHttpClient(responseManager), Executors.newSingleThreadExecutor(), errorScheduler, SERVER_INFO_JSON_CODEC, config, (WorkerProperty)workerProperty);
        ArrayList sources = new ArrayList();
        return factory.createNativeExecutionProcess(TestingSession.testSessionBuilder().build(), BASE_URI, maxErrorDuration);
    }

    private HttpNativeExecutionTaskInfoFetcher createTaskInfoFetcher(TaskId taskId, TestingResponseManager testingResponseManager) {
        return this.createTaskInfoFetcher(taskId, testingResponseManager, new Duration(1.0, TimeUnit.MINUTES));
    }

    private HttpNativeExecutionTaskInfoFetcher createTaskInfoFetcher(TaskId taskId, TestingResponseManager testingResponseManager, Duration maxErrorDuration) {
        PrestoSparkHttpTaskClient workerClient = new PrestoSparkHttpTaskClient((HttpClient)new TestingHttpClient(testingResponseManager), taskId, BASE_URI, TASK_INFO_JSON_CODEC, PLAN_FRAGMENT_JSON_CODEC, TASK_UPDATE_REQUEST_JSON_CODEC, new Duration(1.0, TimeUnit.SECONDS));
        return new HttpNativeExecutionTaskInfoFetcher(Executors.newScheduledThreadPool(1), Executors.newScheduledThreadPool(1), workerClient, (Executor)Executors.newSingleThreadExecutor(), new Duration(1.0, TimeUnit.SECONDS), maxErrorDuration);
    }

    private static SerializedPage createSerializedPage(int numBytes) {
        byte[] bytes = new byte[numBytes];
        Arrays.fill(bytes, (byte)8);
        Slice slice = Slices.wrappedBuffer((byte[])bytes);
        return new SerializedPage(slice, PageCodecMarker.none(), 0, numBytes, 0L);
    }

    private static class FailureTaskInfoRetryResponseManager
    extends TestingResponseManager.TestingTaskInfoResponseManager {
        private final int failureCount;
        private int retryCount;

        public FailureTaskInfoRetryResponseManager(int failureCount) {
            this.failureCount = failureCount;
        }

        @Override
        public Response createTaskInfoResponse(HttpStatus httpStatus, String taskId) throws PrestoException {
            if (this.retryCount++ > this.failureCount) {
                return super.createTaskInfoResponse(HttpStatus.INTERNAL_SERVER_ERROR, taskId);
            }
            return super.createTaskInfoResponse(httpStatus, taskId);
        }
    }

    public static class FailureRetryResponseManager
    extends TestingResponseManager.TestingServerResponseManager {
        private final int maxRetryCount;
        private int retryCount;

        public FailureRetryResponseManager(int maxRetryCount) {
            this.maxRetryCount = maxRetryCount;
        }

        @Override
        public Response createServerInfoResponse() throws PrestoException {
            if (this.retryCount++ < this.maxRetryCount) {
                throw new RuntimeException("Get ServerInfo request failure.");
            }
            return super.createServerInfoResponse();
        }
    }

    public static class TestingResponse
    implements Response {
        private final int statusCode;
        private final String statusMessage;
        private final ListMultimap<HeaderName, String> headers;
        private InputStream inputStream;

        private TestingResponse() {
            this.statusCode = HttpStatus.OK.code();
            this.statusMessage = HttpStatus.OK.toString();
            this.headers = ArrayListMultimap.create();
        }

        private TestingResponse(int statusCode, String statusMessage, ListMultimap<HeaderName, String> headers, InputStream inputStream) {
            this.statusCode = statusCode;
            this.statusMessage = statusMessage;
            this.headers = headers;
            this.inputStream = inputStream;
        }

        public int getStatusCode() {
            return this.statusCode;
        }

        public String getStatusMessage() {
            return this.statusMessage;
        }

        public ListMultimap<HeaderName, String> getHeaders() {
            return this.headers;
        }

        public long getBytesRead() {
            return 0L;
        }

        public InputStream getInputStream() {
            return this.inputStream;
        }
    }

    public static class TestingResponseManager {
        private static final JsonCodec<TaskInfo> taskInfoCodec = JsonCodec.jsonCodec(TaskInfo.class);
        private static final JsonCodec<ServerInfo> serverInfoCodec = JsonCodec.jsonCodec(ServerInfo.class);
        private final TestingResultResponseManager resultResponseManager;
        private final TestingServerResponseManager serverResponseManager;
        private final TestingTaskInfoResponseManager taskInfoResponseManager;
        private final String taskId;

        public TestingResponseManager(String taskId) {
            this.taskId = Objects.requireNonNull(taskId, "taskId is null");
            this.resultResponseManager = new TestingResultResponseManager();
            this.serverResponseManager = new TestingServerResponseManager();
            this.taskInfoResponseManager = new TestingTaskInfoResponseManager();
        }

        public TestingResponseManager(String taskId, TestingResultResponseManager resultResponseManager) {
            this.taskId = Objects.requireNonNull(taskId, "taskId is null");
            this.resultResponseManager = Objects.requireNonNull(resultResponseManager, "resultResponseManager is null.");
            this.serverResponseManager = new TestingServerResponseManager();
            this.taskInfoResponseManager = new TestingTaskInfoResponseManager();
        }

        public TestingResponseManager(String taskId, TestingServerResponseManager serverResponseManager) {
            this.taskId = Objects.requireNonNull(taskId, "taskId is null");
            this.resultResponseManager = new TestingResultResponseManager();
            this.taskInfoResponseManager = new TestingTaskInfoResponseManager();
            this.serverResponseManager = Objects.requireNonNull(serverResponseManager, "serverResponseManager is null");
        }

        public TestingResponseManager(String taskId, TestingTaskInfoResponseManager taskInfoResponseManager) {
            this.taskId = Objects.requireNonNull(taskId, "taskId is null");
            this.resultResponseManager = new TestingResultResponseManager();
            this.serverResponseManager = new TestingServerResponseManager();
            this.taskInfoResponseManager = Objects.requireNonNull(taskInfoResponseManager, "taskInfoResponseManager is null");
        }

        public Response createDummyResultResponse() {
            return new TestingResponse();
        }

        public Response createResultResponse() throws PageTransportErrorException {
            return this.resultResponseManager.createResultResponse(this.taskId);
        }

        public Response createServerInfoResponse() throws PrestoException {
            return this.serverResponseManager.createServerInfoResponse();
        }

        public Response createTaskInfoResponse(HttpStatus httpStatus) throws PrestoException {
            return this.taskInfoResponseManager.createTaskInfoResponse(httpStatus, this.taskId);
        }

        public static class TestingTaskInfoResponseManager {
            public Response createTaskInfoResponse(HttpStatus httpStatus, String taskId) throws PrestoException {
                ArrayListMultimap headers = ArrayListMultimap.create();
                headers.put((Object)HeaderName.of((String)"Content-Type"), (Object)String.valueOf(MediaType.create((String)"application", (String)"json")));
                TaskInfo taskInfo = TaskInfo.createInitialTask((TaskId)TaskId.valueOf((String)taskId), (URI)HttpUriBuilder.uriBuilderFrom((URI)BASE_URI).appendPath(TestPrestoSparkHttpClient.TASK_ROOT_PATH).build(), new ArrayList(), (TaskStats)new TaskStats(DateTime.now(), null), (String)"dummy-node");
                return new TestingResponse(httpStatus.code(), httpStatus.toString(), (ListMultimap)headers, new ByteArrayInputStream(taskInfoCodec.toBytes((Object)taskInfo)));
            }
        }

        public static class TestingResultResponseManager {
            public Response createResultResponse(String taskId) throws PageTransportErrorException {
                return this.createResultResponseHelper(HttpStatus.OK, taskId, 0L, 1L, true, 0);
            }

            protected Response createResultResponseHelper(HttpStatus httpStatus, String taskId, long token, long nextToken, boolean bufferComplete, int serializedPageSizeBytes) {
                DynamicSliceOutput slicedOutput = new DynamicSliceOutput(1024);
                PagesSerdeUtil.writeSerializedPage((SliceOutput)slicedOutput, (SerializedPage)TestPrestoSparkHttpClient.createSerializedPage(serializedPageSizeBytes));
                ArrayListMultimap headers = ArrayListMultimap.create();
                headers.put((Object)HeaderName.of((String)"X-Presto-Page-Sequence-Id"), (Object)String.valueOf(token));
                headers.put((Object)HeaderName.of((String)"X-Presto-Page-End-Sequence-Id"), (Object)String.valueOf(nextToken));
                headers.put((Object)HeaderName.of((String)"X-Presto-Buffer-Complete"), (Object)String.valueOf(bufferComplete));
                headers.put((Object)HeaderName.of((String)"X-Presto-Task-Instance-Id"), (Object)taskId);
                headers.put((Object)HeaderName.of((String)"Content-Type"), (Object)PrestoMediaTypes.PRESTO_PAGES_TYPE.toString());
                return new TestingResponse(httpStatus.code(), httpStatus.toString(), (ListMultimap)headers, (InputStream)slicedOutput.slice().getInput());
            }
        }

        public static class TestingServerResponseManager {
            public Response createServerInfoResponse() throws PrestoException {
                ServerInfo serverInfo = new ServerInfo(NodeVersion.UNKNOWN, "test", true, false, Optional.of(Duration.valueOf((String)"2m")));
                HttpStatus httpStatus = HttpStatus.OK;
                ArrayListMultimap headers = ArrayListMultimap.create();
                headers.put((Object)HeaderName.of((String)"Content-Type"), (Object)String.valueOf(MediaType.create((String)"application", (String)"json")));
                return new TestingResponse(httpStatus.code(), httpStatus.toString(), (ListMultimap)headers, new ByteArrayInputStream(serverInfoCodec.toBytes((Object)serverInfo)));
            }
        }
    }

    public static class TestingHttpClient
    implements HttpClient {
        private static final String TASK_ID_REGEX = "\\/v1\\/task\\/[a-zA-Z0-9]+.[0-9]+.[0-9]+.[0-9]+.[0-9]+";
        private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
        private final TestingResponseManager responseManager;

        public TestingHttpClient(TestingResponseManager responseManager) {
            this.responseManager = responseManager;
        }

        public <T, E extends Exception> T execute(Request request, ResponseHandler<T, E> responseHandler) throws E {
            try {
                return (T)this.executeAsync(request, responseHandler).get();
            }
            catch (Exception e) {
                e.printStackTrace();
                return null;
            }
        }

        public <T, E extends Exception> HttpClient.HttpResponseFuture<T> executeAsync(Request request, ResponseHandler<T, E> responseHandler) {
            TestingHttpResponseFuture future = new TestingHttpResponseFuture();
            this.executor.schedule(() -> {
                URI uri = request.getUri();
                String method = request.getMethod();
                ListMultimap headers = request.getHeaders();
                String path = uri.getPath();
                if (method.equalsIgnoreCase("GET")) {
                    if (Pattern.compile("\\/v1\\/task\\/[a-zA-Z0-9]+.[0-9]+.[0-9]+.[0-9]+.[0-9]+\\z").matcher(path).find()) {
                        try {
                            future.complete(responseHandler.handle(request, this.responseManager.createTaskInfoResponse(HttpStatus.OK)));
                        }
                        catch (Exception e) {
                            e.printStackTrace();
                            future.completeExceptionally(e);
                        }
                    } else if (Pattern.compile(".*\\/results\\/[0-9]+\\/[0-9]+\\/acknowledge\\z").matcher(path).find()) {
                        try {
                            future.complete(responseHandler.handle(request, this.responseManager.createDummyResultResponse()));
                        }
                        catch (Exception e) {
                            e.printStackTrace();
                            future.completeExceptionally(e);
                        }
                    } else if (Pattern.compile(".*\\/results\\/[0-9]+\\/[0-9]+\\z").matcher(path).find()) {
                        try {
                            future.complete(responseHandler.handle(request, this.responseManager.createResultResponse()));
                        }
                        catch (Exception e) {
                            e.printStackTrace();
                            future.completeExceptionally(e);
                        }
                    } else if (Pattern.compile("\\/v1\\/info").matcher(path).find()) {
                        try {
                            future.complete(responseHandler.handle(request, this.responseManager.createServerInfoResponse()));
                        }
                        catch (Exception e) {
                            e.printStackTrace();
                            future.completeExceptionally(e);
                        }
                    }
                } else if (method.equalsIgnoreCase("POST")) {
                    if (Pattern.compile(String.format("%s\\/batch\\z", TASK_ID_REGEX)).matcher(path).find()) {
                        try {
                            future.complete(responseHandler.handle(request, this.responseManager.createTaskInfoResponse(HttpStatus.OK)));
                        }
                        catch (Exception e) {
                            e.printStackTrace();
                            future.completeExceptionally(e);
                        }
                    }
                } else if (method.equalsIgnoreCase("DELETE")) {
                    if (Pattern.compile(String.format("%s\\/results\\/[0-9]+\\z", TASK_ID_REGEX)).matcher(path).find()) {
                        try {
                            future.complete(responseHandler.handle(request, this.responseManager.createDummyResultResponse()));
                        }
                        catch (Exception e) {
                            e.printStackTrace();
                            future.completeExceptionally(e);
                        }
                    }
                } else if (method.equalsIgnoreCase("DELETE") && Pattern.compile("\\/v1\\/task\\/[a-zA-Z0-9]+.[0-9]+.[0-9]+.[0-9]+.[0-9]+\\z").matcher(path).find()) {
                    try {
                        future.complete(responseHandler.handle(request, this.responseManager.createDummyResultResponse()));
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                        future.completeExceptionally(e);
                    }
                }
                if (!future.isDone()) {
                    future.completeExceptionally(new Exception("Unknown path " + path));
                }
            }, (long)NO_DURATION.getValue(), NO_DURATION.getUnit());
            return future;
        }

        public RequestStats getStats() {
            return null;
        }

        public long getMaxContentLength() {
            return 0L;
        }

        public void close() {
        }

        public boolean isClosed() {
            return false;
        }

        private String getTaskId(URI uri) {
            String fromTaskId = uri.getPath().substring(TestPrestoSparkHttpClient.TASK_ROOT_PATH.length() + 1);
            int endPosition = fromTaskId.indexOf("/");
            if (endPosition < 0) {
                return fromTaskId;
            }
            return fromTaskId.substring(0, endPosition);
        }
    }

    private static class TestingHttpResponseFuture<T>
    extends AbstractFuture<T>
    implements HttpClient.HttpResponseFuture<T> {
        private TestingHttpResponseFuture() {
        }

        public String getState() {
            return null;
        }

        public void complete(T value) {
            super.set(value);
        }

        public void completeExceptionally(Throwable t) {
            super.setException(t);
        }
    }

    private static class TimeoutResponseManager
    extends TestingResponseManager.TestingResultResponseManager {
        private final int serializedPageSize;
        private final int numPages;
        private final int numInitialTimeouts;
        private int requestCount;
        private int timeoutCount;

        public TimeoutResponseManager(int serializedPageSize, int numPages, int numInitialTimeouts) {
            this.serializedPageSize = serializedPageSize;
            this.numPages = numPages;
            this.numInitialTimeouts = numInitialTimeouts;
        }

        @Override
        public Response createResultResponse(String taskId) throws PageTransportErrorException {
            if (++this.timeoutCount <= this.numInitialTimeouts) {
                throw new PageTransportErrorException(new HostAddress("localhost", 8080), "Mock HttpClient Timeout");
            }
            ++this.requestCount;
            if (this.requestCount < this.numPages) {
                return this.createResultResponseHelper(HttpStatus.OK, taskId, this.requestCount - 1, this.requestCount, false, this.serializedPageSize);
            }
            if (this.requestCount == this.numPages) {
                return this.createResultResponseHelper(HttpStatus.OK, taskId, this.requestCount - 1, this.requestCount, true, this.serializedPageSize);
            }
            Assert.fail((String)"Retrieving results after buffer completion");
            return null;
        }
    }

    private static class BreakingLimitResponseManager
    extends TestingResponseManager.TestingResultResponseManager {
        private final int serializedPageSize;
        private final int numPages;
        private int requestCount;

        public BreakingLimitResponseManager(int serializedPageSize, int numPages) {
            this.serializedPageSize = serializedPageSize;
            this.numPages = numPages;
        }

        @Override
        public Response createResultResponse(String taskId) throws PageTransportErrorException {
            ++this.requestCount;
            if (this.requestCount < this.numPages) {
                return this.createResultResponseHelper(HttpStatus.OK, taskId, this.requestCount - 1, this.requestCount, false, this.serializedPageSize);
            }
            if (this.requestCount == this.numPages) {
                return this.createResultResponseHelper(HttpStatus.OK, taskId, this.requestCount - 1, this.requestCount, true, this.serializedPageSize);
            }
            Assert.fail((String)"Retrieving results after buffer completion");
            return null;
        }

        public int getRemainingPageCount() {
            return this.numPages - this.requestCount;
        }
    }
}

