/*
 * Decompiled with CFR 0.152.
 */
package io.trino.server.remotetask;

import com.google.common.base.MoreObjects;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import io.airlift.bootstrap.Bootstrap;
import io.airlift.http.client.HttpClient;
import io.airlift.http.client.testing.TestingHttpClient;
import io.airlift.jaxrs.JsonMapper;
import io.airlift.jaxrs.testing.JaxrsTestingHttpProcessor;
import io.airlift.json.JsonBinder;
import io.airlift.json.JsonCodec;
import io.airlift.json.JsonCodecBinder;
import io.airlift.json.JsonModule;
import io.airlift.testing.Assertions;
import io.airlift.units.Duration;
import io.trino.Session;
import io.trino.SessionTestUtils;
import io.trino.block.BlockJsonSerde;
import io.trino.client.NodeVersion;
import io.trino.execution.DynamicFilterConfig;
import io.trino.execution.DynamicFiltersCollector;
import io.trino.execution.ExecutionFailureInfo;
import io.trino.execution.LocationFactory;
import io.trino.execution.NodeTaskMap;
import io.trino.execution.QueryManagerConfig;
import io.trino.execution.RemoteTask;
import io.trino.execution.ScheduledSplit;
import io.trino.execution.SplitAssignment;
import io.trino.execution.StageId;
import io.trino.execution.TaskId;
import io.trino.execution.TaskInfo;
import io.trino.execution.TaskManagerConfig;
import io.trino.execution.TaskState;
import io.trino.execution.TaskStatus;
import io.trino.execution.TaskTestUtils;
import io.trino.execution.TestSqlTaskManager;
import io.trino.execution.buffer.OutputBuffers;
import io.trino.execution.buffer.PipelinedOutputBuffers;
import io.trino.metadata.BlockEncodingManager;
import io.trino.metadata.HandleJsonModule;
import io.trino.metadata.InternalBlockEncodingSerde;
import io.trino.metadata.InternalNode;
import io.trino.metadata.Metadata;
import io.trino.metadata.MetadataManager;
import io.trino.metadata.Split;
import io.trino.server.DynamicFilterService;
import io.trino.server.FailTaskRequest;
import io.trino.server.HttpRemoteTaskFactory;
import io.trino.server.TaskUpdateRequest;
import io.trino.server.remotetask.HttpRemoteTask;
import io.trino.server.remotetask.RemoteTaskStats;
import io.trino.spi.ErrorCode;
import io.trino.spi.QueryId;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockEncodingSerde;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.TestingColumnHandle;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.TypeOperators;
import io.trino.sql.DynamicFilters;
import io.trino.sql.planner.Symbol;
import io.trino.sql.planner.SymbolAllocator;
import io.trino.sql.planner.TestingPlannerContext;
import io.trino.sql.planner.plan.DynamicFilterId;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.tree.Expression;
import io.trino.sql.tree.SymbolReference;
import io.trino.testing.TestingHandles;
import io.trino.testing.TestingSession;
import io.trino.testing.TestingSplit;
import io.trino.type.InternalTypeManager;
import io.trino.type.TypeDeserializer;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.UriInfo;
import org.testng.Assert;
import org.testng.annotations.Test;

public class TestHttpRemoteTask {
    private static final Duration POLL_TIMEOUT = new Duration(100.0, TimeUnit.MILLISECONDS);
    private static final Duration IDLE_TIMEOUT = new Duration(3.0, TimeUnit.SECONDS);
    private static final Duration FAIL_TIMEOUT = new Duration(20.0, TimeUnit.SECONDS);
    private static final TaskManagerConfig TASK_MANAGER_CONFIG = new TaskManagerConfig().setStatusRefreshMaxWait(new Duration((double)(IDLE_TIMEOUT.roundTo(TimeUnit.MILLISECONDS) / 100L), TimeUnit.MILLISECONDS)).setInfoUpdateInterval(new Duration((double)(IDLE_TIMEOUT.roundTo(TimeUnit.MILLISECONDS) / 10L), TimeUnit.MILLISECONDS));
    private static final boolean TRACE_HTTP = false;

    @Test(timeOut=30000L)
    public void testRemoteTaskMismatch() throws Exception {
        this.runTest(FailureScenario.TASK_MISMATCH);
    }

    @Test(timeOut=30000L)
    public void testRejectedExecutionWhenVersionIsHigh() throws Exception {
        this.runTest(FailureScenario.TASK_MISMATCH_WHEN_VERSION_IS_HIGH);
    }

    @Test(timeOut=30000L)
    public void testRejectedExecution() throws Exception {
        this.runTest(FailureScenario.REJECTED_EXECUTION);
    }

    @Test(timeOut=30000L)
    public void testRegular() throws Exception {
        AtomicLong lastActivityNanos = new AtomicLong(System.nanoTime());
        TestingTaskResource testingTaskResource = new TestingTaskResource(lastActivityNanos, FailureScenario.NO_FAILURE);
        HttpRemoteTaskFactory httpRemoteTaskFactory = TestHttpRemoteTask.createHttpRemoteTaskFactory(testingTaskResource);
        RemoteTask remoteTask = this.createRemoteTask(httpRemoteTaskFactory, (Set<DynamicFilterId>)ImmutableSet.of());
        testingTaskResource.setInitialTaskInfo(remoteTask.getTaskInfo());
        remoteTask.start();
        remoteTask.addSplits((Multimap)ImmutableMultimap.of((Object)TaskTestUtils.TABLE_SCAN_NODE_ID, (Object)new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)TestingSplit.createLocalSplit())));
        TestHttpRemoteTask.poll(() -> testingTaskResource.getTaskSplitAssignment(TaskTestUtils.TABLE_SCAN_NODE_ID) != null);
        TestHttpRemoteTask.poll(() -> testingTaskResource.getTaskSplitAssignment(TaskTestUtils.TABLE_SCAN_NODE_ID).getSplits().size() == 1);
        remoteTask.noMoreSplits(TaskTestUtils.TABLE_SCAN_NODE_ID);
        TestHttpRemoteTask.poll(() -> testingTaskResource.getTaskSplitAssignment(TaskTestUtils.TABLE_SCAN_NODE_ID).isNoMoreSplits());
        remoteTask.cancel();
        TestHttpRemoteTask.poll(() -> remoteTask.getTaskStatus().getState().isDone());
        TestHttpRemoteTask.poll(() -> remoteTask.getTaskInfo().getTaskStatus().getState().isDone());
        httpRemoteTaskFactory.stop();
    }

    @Test(timeOut=30000L)
    public void testDynamicFilters() throws Exception {
        DynamicFilterId filterId1 = new DynamicFilterId("df1");
        DynamicFilterId filterId2 = new DynamicFilterId("df2");
        SymbolAllocator symbolAllocator = new SymbolAllocator();
        Symbol symbol1 = symbolAllocator.newSymbol("DF_SYMBOL1", (Type)BigintType.BIGINT);
        Symbol symbol2 = symbolAllocator.newSymbol("DF_SYMBOL2", (Type)BigintType.BIGINT);
        SymbolReference df1 = symbol1.toSymbolReference();
        SymbolReference df2 = symbol2.toSymbolReference();
        TestingColumnHandle handle1 = new TestingColumnHandle("column1");
        TestingColumnHandle handle2 = new TestingColumnHandle("column2");
        QueryId queryId = new QueryId("test");
        TestingTaskResource testingTaskResource = new TestingTaskResource(new AtomicLong(System.nanoTime()), FailureScenario.NO_FAILURE);
        DynamicFilterService dynamicFilterService = new DynamicFilterService(TestingPlannerContext.PLANNER_CONTEXT.getMetadata(), TestingPlannerContext.PLANNER_CONTEXT.getFunctionManager(), new TypeOperators(), new DynamicFilterConfig());
        HttpRemoteTaskFactory httpRemoteTaskFactory = TestHttpRemoteTask.createHttpRemoteTaskFactory(testingTaskResource, dynamicFilterService);
        RemoteTask remoteTask = this.createRemoteTask(httpRemoteTaskFactory, (Set<DynamicFilterId>)ImmutableSet.of());
        ImmutableMap initialDomain = ImmutableMap.of((Object)filterId1, (Object)Domain.singleValue((Type)BigintType.BIGINT, (Object)1L));
        testingTaskResource.setInitialTaskInfo(remoteTask.getTaskInfo());
        testingTaskResource.setDynamicFilterDomains(new DynamicFiltersCollector.VersionedDynamicFilterDomains(1L, (Map)initialDomain));
        dynamicFilterService.registerQuery(queryId, SessionTestUtils.TEST_SESSION, (Set)ImmutableSet.of((Object)filterId1, (Object)filterId2), (Set)ImmutableSet.of((Object)filterId1, (Object)filterId2), (Set)ImmutableSet.of());
        dynamicFilterService.stageCannotScheduleMoreTasks(new StageId(queryId, 1), 0, 1);
        DynamicFilter dynamicFilter = dynamicFilterService.createDynamicFilter(queryId, (List)ImmutableList.of((Object)new DynamicFilters.Descriptor(filterId1, (Expression)df1), (Object)new DynamicFilters.Descriptor(filterId2, (Expression)df2)), (Map)ImmutableMap.of((Object)symbol1, (Object)handle1, (Object)symbol2, (Object)handle2), symbolAllocator.getTypes());
        CompletableFuture future = dynamicFilter.isBlocked();
        remoteTask.start();
        future.get();
        Assert.assertEquals((Object)dynamicFilter.getCurrentPredicate(), (Object)TupleDomain.withColumnDomains((Map)ImmutableMap.of((Object)handle1, (Object)Domain.singleValue((Type)BigintType.BIGINT, (Object)1L))));
        Assert.assertEquals((long)testingTaskResource.getDynamicFiltersFetchCounter(), (long)1L);
        io.trino.testing.assertions.Assert.assertEventually((Duration)new Duration(15.0, TimeUnit.SECONDS), () -> Assertions.assertGreaterThanOrEqual((Comparable)Long.valueOf(testingTaskResource.getStatusFetchCounter()), (Comparable)Long.valueOf(3L)));
        Assert.assertEquals((long)testingTaskResource.getDynamicFiltersFetchCounter(), (long)1L, (String)testingTaskResource.getDynamicFiltersFetchRequests().toString());
        future = dynamicFilter.isBlocked();
        testingTaskResource.setDynamicFilterDomains(new DynamicFiltersCollector.VersionedDynamicFilterDomains(2L, (Map)ImmutableMap.of((Object)filterId2, (Object)Domain.singleValue((Type)BigintType.BIGINT, (Object)2L))));
        future.get();
        Assert.assertEquals((Object)dynamicFilter.getCurrentPredicate(), (Object)TupleDomain.withColumnDomains((Map)ImmutableMap.of((Object)handle1, (Object)Domain.singleValue((Type)BigintType.BIGINT, (Object)1L), (Object)handle2, (Object)Domain.singleValue((Type)BigintType.BIGINT, (Object)2L))));
        Assert.assertEquals((long)testingTaskResource.getDynamicFiltersFetchCounter(), (long)2L, (String)testingTaskResource.getDynamicFiltersFetchRequests().toString());
        Assertions.assertGreaterThanOrEqual((Comparable)Long.valueOf(testingTaskResource.getStatusFetchCounter()), (Comparable)Long.valueOf(4L));
        httpRemoteTaskFactory.stop();
    }

    @Test(timeOut=30000L)
    public void testOutboundDynamicFilters() throws Exception {
        DynamicFilterId filterId1 = new DynamicFilterId("df1");
        DynamicFilterId filterId2 = new DynamicFilterId("df2");
        SymbolAllocator symbolAllocator = new SymbolAllocator();
        Symbol symbol1 = symbolAllocator.newSymbol("DF_SYMBOL1", (Type)BigintType.BIGINT);
        Symbol symbol2 = symbolAllocator.newSymbol("DF_SYMBOL2", (Type)BigintType.BIGINT);
        SymbolReference df1 = symbol1.toSymbolReference();
        SymbolReference df2 = symbol2.toSymbolReference();
        TestingColumnHandle handle1 = new TestingColumnHandle("column1");
        TestingColumnHandle handle2 = new TestingColumnHandle("column2");
        QueryId queryId = new QueryId("test");
        TestingTaskResource testingTaskResource = new TestingTaskResource(new AtomicLong(System.nanoTime()), FailureScenario.NO_FAILURE);
        DynamicFilterService dynamicFilterService = new DynamicFilterService(TestingPlannerContext.PLANNER_CONTEXT.getMetadata(), TestingPlannerContext.PLANNER_CONTEXT.getFunctionManager(), new TypeOperators(), new DynamicFilterConfig());
        dynamicFilterService.registerQuery(queryId, SessionTestUtils.TEST_SESSION, (Set)ImmutableSet.of((Object)filterId1, (Object)filterId2), (Set)ImmutableSet.of((Object)filterId1, (Object)filterId2), (Set)ImmutableSet.of());
        dynamicFilterService.stageCannotScheduleMoreTasks(new StageId(queryId, 1), 0, 1);
        DynamicFilter dynamicFilter = dynamicFilterService.createDynamicFilter(queryId, (List)ImmutableList.of((Object)new DynamicFilters.Descriptor(filterId1, (Expression)df1), (Object)new DynamicFilters.Descriptor(filterId2, (Expression)df2)), (Map)ImmutableMap.of((Object)symbol1, (Object)handle1, (Object)symbol2, (Object)handle2), symbolAllocator.getTypes());
        CompletableFuture future = dynamicFilter.isBlocked();
        dynamicFilterService.addTaskDynamicFilters(new TaskId(new StageId(queryId.getId(), 1), 1, 0), (Map)ImmutableMap.of((Object)filterId1, (Object)Domain.singleValue((Type)BigintType.BIGINT, (Object)1L)));
        future.get();
        Assert.assertEquals((Object)dynamicFilter.getCurrentPredicate(), (Object)TupleDomain.withColumnDomains((Map)ImmutableMap.of((Object)handle1, (Object)Domain.singleValue((Type)BigintType.BIGINT, (Object)1L))));
        HttpRemoteTaskFactory httpRemoteTaskFactory = TestHttpRemoteTask.createHttpRemoteTaskFactory(testingTaskResource, dynamicFilterService);
        RemoteTask remoteTask = this.createRemoteTask(httpRemoteTaskFactory, (Set<DynamicFilterId>)ImmutableSet.of((Object)filterId1, (Object)filterId2));
        testingTaskResource.setInitialTaskInfo(remoteTask.getTaskInfo());
        remoteTask.start();
        io.trino.testing.assertions.Assert.assertEventually((Duration)new Duration(10.0, TimeUnit.SECONDS), () -> Assert.assertEquals((long)testingTaskResource.getDynamicFiltersSentCounter(), (long)1L));
        Assert.assertEquals((long)testingTaskResource.getCreateOrUpdateCounter(), (long)1L);
        this.addSplit(remoteTask, testingTaskResource, 1);
        this.addSplit(remoteTask, testingTaskResource, 2);
        Assert.assertEquals((long)testingTaskResource.getDynamicFiltersSentCounter(), (long)1L);
        Assert.assertEquals((long)testingTaskResource.getCreateOrUpdateCounter(), (long)3L);
        Assert.assertEquals(testingTaskResource.getLatestDynamicFilterFromCoordinator(), (Map)ImmutableMap.of((Object)filterId1, (Object)Domain.singleValue((Type)BigintType.BIGINT, (Object)1L)));
        future = dynamicFilter.isBlocked();
        dynamicFilterService.addTaskDynamicFilters(new TaskId(new StageId(queryId.getId(), 1), 1, 0), (Map)ImmutableMap.of((Object)filterId2, (Object)Domain.singleValue((Type)BigintType.BIGINT, (Object)2L)));
        future.get();
        Assert.assertEquals((Object)dynamicFilter.getCurrentPredicate(), (Object)TupleDomain.withColumnDomains((Map)ImmutableMap.of((Object)handle1, (Object)Domain.singleValue((Type)BigintType.BIGINT, (Object)1L), (Object)handle2, (Object)Domain.singleValue((Type)BigintType.BIGINT, (Object)2L))));
        io.trino.testing.assertions.Assert.assertEventually((Duration)new Duration(10.0, TimeUnit.SECONDS), () -> Assert.assertEquals((long)testingTaskResource.getDynamicFiltersSentCounter(), (long)2L));
        Assert.assertEquals((long)testingTaskResource.getCreateOrUpdateCounter(), (long)4L);
        Assert.assertEquals(testingTaskResource.getLatestDynamicFilterFromCoordinator(), (Map)ImmutableMap.of((Object)filterId2, (Object)Domain.singleValue((Type)BigintType.BIGINT, (Object)2L)));
        httpRemoteTaskFactory.stop();
    }

    @Test(timeOut=300000L)
    public void testAdaptiveRemoteTaskRequestSize() throws Exception {
        AtomicLong lastActivityNanos = new AtomicLong(System.nanoTime());
        TestingTaskResource testingTaskResource = new TestingTaskResource(lastActivityNanos, FailureScenario.NO_FAILURE);
        Session session = TestingSession.testSessionBuilder().setCatalog("tpch").setSchema("tiny").setSystemProperty("remote_task_adaptive_update_request_size_enabled", "true").setSystemProperty("remote_task_max_request_size", "10kB").setSystemProperty("remote_task_request_size_headroom", "1kB").setSystemProperty("remote_task_guaranteed_splits_per_request", "1").build();
        HttpRemoteTaskFactory httpRemoteTaskFactory = TestHttpRemoteTask.createHttpRemoteTaskFactory(testingTaskResource);
        RemoteTask remoteTask = this.createRemoteTask(httpRemoteTaskFactory, (Set<DynamicFilterId>)ImmutableSet.of(), session);
        testingTaskResource.setInitialTaskInfo(remoteTask.getTaskInfo());
        remoteTask.start();
        HashMultimap splits = HashMultimap.create();
        for (int i = 0; i < 100; ++i) {
            splits.put((Object)TaskTestUtils.TABLE_SCAN_NODE_ID, (Object)new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)TestingSplit.createLocalSplit()));
        }
        remoteTask.addSplits((Multimap)splits);
        TestHttpRemoteTask.poll(() -> testingTaskResource.getTaskSplitAssignment(TaskTestUtils.TABLE_SCAN_NODE_ID) != null);
        TestHttpRemoteTask.poll(() -> testingTaskResource.getTaskSplitAssignment(TaskTestUtils.TABLE_SCAN_NODE_ID).getSplits().size() == 100);
        Assert.assertTrue((testingTaskResource.getCreateOrUpdateCounter() > 1L ? 1 : 0) != 0);
        remoteTask.noMoreSplits(TaskTestUtils.TABLE_SCAN_NODE_ID);
        TestHttpRemoteTask.poll(() -> testingTaskResource.getTaskSplitAssignment(TaskTestUtils.TABLE_SCAN_NODE_ID).isNoMoreSplits());
        remoteTask.cancel();
        TestHttpRemoteTask.poll(() -> remoteTask.getTaskStatus().getState().isDone());
        TestHttpRemoteTask.poll(() -> remoteTask.getTaskInfo().getTaskStatus().getState().isDone());
        httpRemoteTaskFactory.stop();
    }

    @Test
    public void testAdjustSplitBatchSize() {
        AtomicLong lastActivityNanos = new AtomicLong(System.nanoTime());
        TestingTaskResource testingTaskResource = new TestingTaskResource(lastActivityNanos, FailureScenario.NO_FAILURE);
        Session session = TestingSession.testSessionBuilder().setCatalog("tpch").setSchema("tiny").setSystemProperty("remote_task_adaptive_update_request_size_enabled", "true").setSystemProperty("remote_task_max_request_size", "100kB").setSystemProperty("remote_task_request_size_headroom", "10kB").setSystemProperty("remote_task_guaranteed_splits_per_request", "1").build();
        HttpRemoteTaskFactory httpRemoteTaskFactory = TestHttpRemoteTask.createHttpRemoteTaskFactory(testingTaskResource);
        RemoteTask remoteTask = this.createRemoteTask(httpRemoteTaskFactory, (Set<DynamicFilterId>)ImmutableSet.of(), session);
        testingTaskResource.setInitialTaskInfo(remoteTask.getTaskInfo());
        HashSet<ScheduledSplit> splits = new HashSet<ScheduledSplit>();
        for (int i = 0; i < 1000; ++i) {
            splits.add(new ScheduledSplit((long)i, TaskTestUtils.TABLE_SCAN_NODE_ID, new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)TestingSplit.createLocalSplit())));
        }
        Assert.assertTrue((boolean)((HttpRemoteTask)remoteTask).adjustSplitBatchSize((List)ImmutableList.of((Object)new SplitAssignment(TaskTestUtils.TABLE_SCAN_NODE_ID, splits, true)), 1000000L, 500));
        Assertions.assertLessThan((Comparable)Integer.valueOf(((HttpRemoteTask)remoteTask).splitBatchSize.get()), (Comparable)Integer.valueOf(250));
        Assert.assertFalse((boolean)((HttpRemoteTask)remoteTask).adjustSplitBatchSize((List)ImmutableList.of((Object)new SplitAssignment(TaskTestUtils.TABLE_SCAN_NODE_ID, splits, true)), 1000L, 100));
        Assertions.assertGreaterThan((Comparable)Integer.valueOf(((HttpRemoteTask)remoteTask).splitBatchSize.get()), (Comparable)Integer.valueOf(250));
    }

    private void runTest(FailureScenario failureScenario) throws Exception {
        AtomicLong lastActivityNanos = new AtomicLong(System.nanoTime());
        TestingTaskResource testingTaskResource = new TestingTaskResource(lastActivityNanos, failureScenario);
        HttpRemoteTaskFactory httpRemoteTaskFactory = TestHttpRemoteTask.createHttpRemoteTaskFactory(testingTaskResource);
        RemoteTask remoteTask = this.createRemoteTask(httpRemoteTaskFactory, (Set<DynamicFilterId>)ImmutableSet.of());
        testingTaskResource.setInitialTaskInfo(remoteTask.getTaskInfo());
        remoteTask.start();
        TestHttpRemoteTask.waitUntilIdle(lastActivityNanos);
        httpRemoteTaskFactory.stop();
        Assert.assertTrue((boolean)remoteTask.getTaskStatus().getState().isDone(), (String)String.format("TaskStatus is not in a done state: %s", remoteTask.getTaskStatus()));
        ErrorCode actualErrorCode = ((ExecutionFailureInfo)Iterables.getOnlyElement((Iterable)remoteTask.getTaskStatus().getFailures())).getErrorCode();
        switch (failureScenario) {
            case TASK_MISMATCH: 
            case TASK_MISMATCH_WHEN_VERSION_IS_HIGH: {
                Assert.assertTrue((boolean)remoteTask.getTaskInfo().getTaskStatus().getState().isDone(), (String)String.format("TaskInfo is not in a done state: %s", remoteTask.getTaskInfo()));
                Assert.assertEquals((Object)actualErrorCode, (Object)StandardErrorCode.REMOTE_TASK_MISMATCH.toErrorCode());
                break;
            }
            case REJECTED_EXECUTION: {
                Assert.assertEquals((Object)actualErrorCode, (Object)StandardErrorCode.REMOTE_TASK_ERROR.toErrorCode());
                break;
            }
            default: {
                throw new UnsupportedOperationException();
            }
        }
    }

    private void addSplit(RemoteTask remoteTask, TestingTaskResource testingTaskResource, int expectedSplitsCount) throws InterruptedException {
        remoteTask.addSplits((Multimap)ImmutableMultimap.of((Object)TaskTestUtils.TABLE_SCAN_NODE_ID, (Object)new Split(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplit)TestingSplit.createLocalSplit())));
        TestHttpRemoteTask.poll(() -> testingTaskResource.getTaskSplitAssignment(TaskTestUtils.TABLE_SCAN_NODE_ID) != null);
        TestHttpRemoteTask.poll(() -> testingTaskResource.getTaskSplitAssignment(TaskTestUtils.TABLE_SCAN_NODE_ID).getSplits().size() == expectedSplitsCount);
    }

    private RemoteTask createRemoteTask(HttpRemoteTaskFactory httpRemoteTaskFactory, Set<DynamicFilterId> outboundDynamicFilterIds) {
        return this.createRemoteTask(httpRemoteTaskFactory, outboundDynamicFilterIds, SessionTestUtils.TEST_SESSION);
    }

    private RemoteTask createRemoteTask(HttpRemoteTaskFactory httpRemoteTaskFactory, Set<DynamicFilterId> outboundDynamicFilterIds, Session session) {
        return httpRemoteTaskFactory.createRemoteTask(session, new TaskId(new StageId("test", 1), 2, 0), new InternalNode("node-id", URI.create("http://fake.invalid/"), new NodeVersion("version"), false), TaskTestUtils.PLAN_FRAGMENT, (Multimap)ImmutableMultimap.of(), (OutputBuffers)PipelinedOutputBuffers.createInitial((PipelinedOutputBuffers.BufferType)PipelinedOutputBuffers.BufferType.BROADCAST), new NodeTaskMap.PartitionedSplitCountTracker(i -> {}), outboundDynamicFilterIds, Optional.empty(), true);
    }

    private static HttpRemoteTaskFactory createHttpRemoteTaskFactory(TestingTaskResource testingTaskResource) {
        return TestHttpRemoteTask.createHttpRemoteTaskFactory(testingTaskResource, new DynamicFilterService(TestingPlannerContext.PLANNER_CONTEXT.getMetadata(), TestingPlannerContext.PLANNER_CONTEXT.getFunctionManager(), new TypeOperators(), new DynamicFilterConfig()));
    }

    private static HttpRemoteTaskFactory createHttpRemoteTaskFactory(final TestingTaskResource testingTaskResource, final DynamicFilterService dynamicFilterService) {
        Bootstrap app = new Bootstrap(new Module[]{new JsonModule(), new HandleJsonModule(), new Module(){

            public void configure(Binder binder) {
                binder.bind(JsonMapper.class).in(Scopes.SINGLETON);
                binder.bind(Metadata.class).toInstance((Object)MetadataManager.createTestMetadataManager());
                JsonBinder.jsonBinder((Binder)binder).addDeserializerBinding(Type.class).to(TypeDeserializer.class);
                JsonCodecBinder.jsonCodecBinder((Binder)binder).bindJsonCodec(TaskStatus.class);
                JsonCodecBinder.jsonCodecBinder((Binder)binder).bindJsonCodec(DynamicFiltersCollector.VersionedDynamicFilterDomains.class);
                JsonBinder.jsonBinder((Binder)binder).addSerializerBinding(Block.class).to(BlockJsonSerde.Serializer.class);
                JsonBinder.jsonBinder((Binder)binder).addDeserializerBinding(Block.class).to(BlockJsonSerde.Deserializer.class);
                JsonCodecBinder.jsonCodecBinder((Binder)binder).bindJsonCodec(TaskInfo.class);
                JsonCodecBinder.jsonCodecBinder((Binder)binder).bindJsonCodec(TaskUpdateRequest.class);
                JsonCodecBinder.jsonCodecBinder((Binder)binder).bindJsonCodec(FailTaskRequest.class);
                binder.bind(TypeManager.class).toInstance((Object)InternalTypeManager.TESTING_TYPE_MANAGER);
                binder.bind(BlockEncodingManager.class).in(Scopes.SINGLETON);
                binder.bind(BlockEncodingSerde.class).to(InternalBlockEncodingSerde.class).in(Scopes.SINGLETON);
            }

            @Provides
            private HttpRemoteTaskFactory createHttpRemoteTaskFactory(JsonMapper jsonMapper, JsonCodec<TaskStatus> taskStatusCodec, JsonCodec<DynamicFiltersCollector.VersionedDynamicFilterDomains> dynamicFilterDomainsCodec, JsonCodec<TaskInfo> taskInfoCodec, JsonCodec<TaskUpdateRequest> taskUpdateRequestCodec, JsonCodec<FailTaskRequest> failTaskRequestCodec) {
                JaxrsTestingHttpProcessor jaxrsTestingHttpProcessor = new JaxrsTestingHttpProcessor(URI.create("http://fake.invalid/"), new Object[]{testingTaskResource, jsonMapper});
                TestingHttpClient testingHttpClient = new TestingHttpClient((TestingHttpClient.Processor)jaxrsTestingHttpProcessor.setTrace(false));
                testingTaskResource.setHttpClient(testingHttpClient);
                return new HttpRemoteTaskFactory(new QueryManagerConfig(), TASK_MANAGER_CONFIG, (HttpClient)testingHttpClient, (LocationFactory)new TestSqlTaskManager.MockLocationFactory(), taskStatusCodec, dynamicFilterDomainsCodec, taskInfoCodec, taskUpdateRequestCodec, failTaskRequestCodec, new RemoteTaskStats(), dynamicFilterService);
            }
        }});
        Injector injector = app.doNotInitializeLogging().quiet().initialize();
        return (HttpRemoteTaskFactory)injector.getInstance(HttpRemoteTaskFactory.class);
    }

    private static void poll(BooleanSupplier success) throws InterruptedException {
        long failAt = System.nanoTime() + FAIL_TIMEOUT.roundTo(TimeUnit.NANOSECONDS);
        while (!success.getAsBoolean()) {
            long millisUntilFail = (failAt - System.nanoTime()) / 1000000L;
            if (millisUntilFail <= 0L) {
                throw new AssertionError((Object)String.format("Timeout of %s reached", FAIL_TIMEOUT));
            }
            Thread.sleep(Math.min(POLL_TIMEOUT.toMillis(), millisUntilFail));
        }
    }

    private static void waitUntilIdle(AtomicLong lastActivityNanos) throws InterruptedException {
        long startTimeNanos = System.nanoTime();
        while (true) {
            long millisSinceLastActivity = (System.nanoTime() - lastActivityNanos.get()) / 1000000L;
            long millisSinceStart = (System.nanoTime() - startTimeNanos) / 1000000L;
            long millisToIdleTarget = IDLE_TIMEOUT.toMillis() - millisSinceLastActivity;
            long millisToFailTarget = FAIL_TIMEOUT.toMillis() - millisSinceStart;
            if (millisToFailTarget < millisToIdleTarget) {
                throw new AssertionError((Object)String.format("Activity doesn't stop after %s", FAIL_TIMEOUT));
            }
            if (millisToIdleTarget < 0L) {
                return;
            }
            Thread.sleep(millisToIdleTarget);
        }
    }

    private static enum FailureScenario {
        NO_FAILURE,
        TASK_MISMATCH,
        TASK_MISMATCH_WHEN_VERSION_IS_HIGH,
        REJECTED_EXECUTION;

    }

    @Path(value="/task/{nodeId}")
    public static class TestingTaskResource {
        private static final String INITIAL_TASK_INSTANCE_ID = "task-instance-id";
        private static final String NEW_TASK_INSTANCE_ID = "task-instance-id-x";
        private final AtomicLong lastActivityNanos;
        private final FailureScenario failureScenario;
        private final AtomicReference<TestingHttpClient> httpClient = new AtomicReference();
        private TaskInfo initialTaskInfo;
        private TaskStatus initialTaskStatus;
        private Optional<DynamicFiltersCollector.VersionedDynamicFilterDomains> dynamicFilterDomains = Optional.empty();
        private long version;
        private TaskState taskState;
        private String taskInstanceId = "task-instance-id";
        private Map<DynamicFilterId, Domain> latestDynamicFilterFromCoordinator = ImmutableMap.of();
        private long statusFetchCounter;
        private long createOrUpdateCounter;
        private long dynamicFiltersFetchCounter;
        private long dynamicFiltersSentCounter;
        private final List<DynamicFiltersFetchRequest> dynamicFiltersFetchRequests = new ArrayList<DynamicFiltersFetchRequest>();
        Map<PlanNodeId, SplitAssignment> taskSplitAssignmentMap = new HashMap<PlanNodeId, SplitAssignment>();

        public TestingTaskResource(AtomicLong lastActivityNanos, FailureScenario failureScenario) {
            this.lastActivityNanos = Objects.requireNonNull(lastActivityNanos, "lastActivityNanos is null");
            this.failureScenario = Objects.requireNonNull(failureScenario, "failureScenario is null");
        }

        public void setHttpClient(TestingHttpClient newValue) {
            this.httpClient.set(newValue);
        }

        @GET
        @Path(value="{taskId}")
        @Produces(value={"application/json"})
        public synchronized TaskInfo getTaskInfo(@PathParam(value="taskId") TaskId taskId, @HeaderParam(value="X-Trino-Current-Version") Long currentVersion, @HeaderParam(value="X-Trino-Max-Wait") Duration maxWait, @Context UriInfo uriInfo) {
            this.lastActivityNanos.set(System.nanoTime());
            return this.buildTaskInfo();
        }

        @POST
        @Path(value="{taskId}")
        @Consumes(value={"application/json"})
        @Produces(value={"application/json"})
        public synchronized TaskInfo createOrUpdateTask(@PathParam(value="taskId") TaskId taskId, TaskUpdateRequest taskUpdateRequest, @Context UriInfo uriInfo) {
            for (SplitAssignment splitAssignment : taskUpdateRequest.getSplitAssignments()) {
                this.taskSplitAssignmentMap.compute(splitAssignment.getPlanNodeId(), (planNodeId, taskSplitAssignment) -> taskSplitAssignment == null ? splitAssignment : taskSplitAssignment.update(splitAssignment));
            }
            if (!taskUpdateRequest.getDynamicFilterDomains().isEmpty()) {
                ++this.dynamicFiltersSentCounter;
                this.latestDynamicFilterFromCoordinator = taskUpdateRequest.getDynamicFilterDomains();
            }
            ++this.createOrUpdateCounter;
            this.lastActivityNanos.set(System.nanoTime());
            return this.buildTaskInfo();
        }

        public synchronized SplitAssignment getTaskSplitAssignment(PlanNodeId planNodeId) {
            SplitAssignment assignment = this.taskSplitAssignmentMap.get(planNodeId);
            if (assignment == null) {
                return null;
            }
            return new SplitAssignment(assignment.getPlanNodeId(), assignment.getSplits(), assignment.isNoMoreSplits());
        }

        @GET
        @Path(value="{taskId}/status")
        @Produces(value={"application/json"})
        public synchronized TaskStatus getTaskStatus(@PathParam(value="taskId") TaskId taskId, @HeaderParam(value="X-Trino-Current-Version") Long currentVersion, @HeaderParam(value="X-Trino-Max-Wait") Duration maxWait, @Context UriInfo uriInfo) throws InterruptedException {
            this.lastActivityNanos.set(System.nanoTime());
            this.wait(maxWait.roundTo(TimeUnit.MILLISECONDS));
            return this.buildTaskStatus();
        }

        @GET
        @Path(value="{taskId}/dynamicfilters")
        @Produces(value={"application/json"})
        public synchronized DynamicFiltersCollector.VersionedDynamicFilterDomains acknowledgeAndGetNewDynamicFilterDomains(@PathParam(value="taskId") TaskId taskId, @HeaderParam(value="X-Trino-Current-Version") Long currentDynamicFiltersVersion, @Context UriInfo uriInfo) {
            ++this.dynamicFiltersFetchCounter;
            this.dynamicFiltersFetchRequests.add(new DynamicFiltersFetchRequest(uriInfo.getRequestUri().toString(), taskId, currentDynamicFiltersVersion, this.dynamicFilterDomains.map(DynamicFiltersCollector.VersionedDynamicFilterDomains::getVersion).orElse(-1L)));
            return this.dynamicFilterDomains.orElse(null);
        }

        @DELETE
        @Path(value="{taskId}")
        @Produces(value={"application/json"})
        public synchronized TaskInfo deleteTask(@PathParam(value="taskId") TaskId taskId, @QueryParam(value="abort") @DefaultValue(value="true") boolean abort, @Context UriInfo uriInfo) {
            this.lastActivityNanos.set(System.nanoTime());
            this.taskState = abort ? TaskState.ABORTED : TaskState.CANCELED;
            return this.buildTaskInfo();
        }

        public void setInitialTaskInfo(TaskInfo initialTaskInfo) {
            this.initialTaskInfo = initialTaskInfo;
            this.initialTaskStatus = initialTaskInfo.getTaskStatus();
            this.taskState = this.initialTaskStatus.getState();
            this.version = this.initialTaskStatus.getVersion();
            switch (this.failureScenario) {
                case TASK_MISMATCH_WHEN_VERSION_IS_HIGH: {
                    this.version = 1000000L;
                    break;
                }
                case TASK_MISMATCH: 
                case REJECTED_EXECUTION: 
                case NO_FAILURE: {
                    break;
                }
                default: {
                    throw new UnsupportedOperationException();
                }
            }
        }

        public synchronized void setDynamicFilterDomains(DynamicFiltersCollector.VersionedDynamicFilterDomains dynamicFilterDomains) {
            this.dynamicFilterDomains = Optional.of(dynamicFilterDomains);
        }

        public Map<DynamicFilterId, Domain> getLatestDynamicFilterFromCoordinator() {
            return this.latestDynamicFilterFromCoordinator;
        }

        public synchronized long getStatusFetchCounter() {
            return this.statusFetchCounter;
        }

        public synchronized long getCreateOrUpdateCounter() {
            return this.createOrUpdateCounter;
        }

        public synchronized long getDynamicFiltersFetchCounter() {
            return this.dynamicFiltersFetchCounter;
        }

        public synchronized long getDynamicFiltersSentCounter() {
            return this.dynamicFiltersSentCounter;
        }

        public synchronized List<DynamicFiltersFetchRequest> getDynamicFiltersFetchRequests() {
            return ImmutableList.copyOf(this.dynamicFiltersFetchRequests);
        }

        private TaskInfo buildTaskInfo() {
            return new TaskInfo(this.buildTaskStatus(), this.initialTaskInfo.getLastHeartbeat(), this.initialTaskInfo.getOutputBuffers(), this.initialTaskInfo.getNoMoreSplits(), this.initialTaskInfo.getStats(), this.initialTaskInfo.getEstimatedMemory(), this.initialTaskInfo.isNeedsPlan());
        }

        private TaskStatus buildTaskStatus() {
            ++this.statusFetchCounter;
            switch (this.failureScenario) {
                case TASK_MISMATCH: 
                case TASK_MISMATCH_WHEN_VERSION_IS_HIGH: {
                    if (this.statusFetchCounter != 10L) break;
                    this.taskInstanceId = NEW_TASK_INSTANCE_ID;
                    this.version = 0L;
                    break;
                }
                case REJECTED_EXECUTION: {
                    if (this.statusFetchCounter < 10L) break;
                    this.httpClient.get().close();
                    throw new RejectedExecutionException();
                }
                case NO_FAILURE: {
                    break;
                }
                default: {
                    throw new UnsupportedOperationException();
                }
            }
            return new TaskStatus(this.initialTaskStatus.getTaskId(), this.taskInstanceId, ++this.version, this.taskState, this.initialTaskStatus.getSelf(), "fake", this.initialTaskStatus.getFailures(), this.initialTaskStatus.getQueuedPartitionedDrivers(), this.initialTaskStatus.getRunningPartitionedDrivers(), this.initialTaskStatus.getOutputBufferStatus(), this.initialTaskStatus.getOutputDataSize(), this.initialTaskStatus.getPhysicalWrittenDataSize(), this.initialTaskStatus.getMaxWriterCount(), this.initialTaskStatus.getMemoryReservation(), this.initialTaskStatus.getPeakMemoryReservation(), this.initialTaskStatus.getRevocableMemoryReservation(), this.initialTaskStatus.getFullGcCount(), this.initialTaskStatus.getFullGcTime(), this.dynamicFilterDomains.map(DynamicFiltersCollector.VersionedDynamicFilterDomains::getVersion).orElse(0L).longValue(), this.initialTaskStatus.getQueuedPartitionedSplitsWeight(), this.initialTaskStatus.getRunningPartitionedSplitsWeight());
        }

        private static class DynamicFiltersFetchRequest {
            private final String uriInfo;
            private final TaskId taskId;
            private final Long currentDynamicFiltersVersion;
            private final long storedDynamicFiltersVersion;

            private DynamicFiltersFetchRequest(String uriInfo, TaskId taskId, Long currentDynamicFiltersVersion, long storedDynamicFiltersVersion) {
                this.uriInfo = Objects.requireNonNull(uriInfo, "uriInfo is null");
                this.taskId = Objects.requireNonNull(taskId, "taskId is null");
                this.currentDynamicFiltersVersion = Objects.requireNonNull(currentDynamicFiltersVersion, "currentDynamicFiltersVersion is null");
                this.storedDynamicFiltersVersion = storedDynamicFiltersVersion;
            }

            public String toString() {
                return MoreObjects.toStringHelper((Object)this).add("uriInfo", (Object)this.uriInfo).add("taskId", (Object)this.taskId).add("currentDynamicFiltersVersion", (Object)this.currentDynamicFiltersVersion).add("storedDynamicFiltersVersion", this.storedDynamicFiltersVersion).toString();
            }
        }
    }
}

