/*
 * Decompiled with CFR 0.152.
 */
package io.trino.server.protocol.spooling;

import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.airlift.slice.Slices;
import io.trino.metadata.InternalNode;
import io.trino.metadata.InternalNodeManager;
import io.trino.server.ExternalUriInfo;
import io.trino.server.protocol.spooling.SpoolingConfig;
import io.trino.server.security.ResourceSecurity;
import io.trino.spi.HostAddress;
import io.trino.spi.spool.SpooledLocation;
import io.trino.spi.spool.SpooledSegmentHandle;
import io.trino.spi.spool.SpoolingManager;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.ServiceUnavailableException;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.UriBuilder;
import jakarta.ws.rs.core.UriInfo;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;

@Path(value="/v1/spooled")
@ResourceSecurity(value=ResourceSecurity.AccessType.PUBLIC)
public class CoordinatorSegmentResource {
    private final SpoolingManager spoolingManager;
    private final SpoolingConfig.SegmentRetrievalMode retrievalMode;
    private final InternalNodeManager nodeManager;

    @Inject
    public CoordinatorSegmentResource(SpoolingManager spoolingManager, SpoolingConfig config, InternalNodeManager nodeManager) {
        this.spoolingManager = Objects.requireNonNull(spoolingManager, "spoolingManager is null");
        this.retrievalMode = Objects.requireNonNull(config, "config is null").getRetrievalMode();
        this.nodeManager = Objects.requireNonNull(nodeManager, "nodeManager is null");
    }

    @GET
    @Path(value="/download/{identifier}")
    @Produces(value={"application/octet-stream"})
    @ResourceSecurity(value=ResourceSecurity.AccessType.PUBLIC)
    public Response download(@Context UriInfo uriInfo, @PathParam(value="identifier") String identifier, @Context HttpHeaders headers) throws IOException {
        SpooledSegmentHandle handle = this.handle(identifier, headers);
        return switch (this.retrievalMode) {
            default -> throw new MatchException(null, null);
            case SpoolingConfig.SegmentRetrievalMode.STORAGE -> throw new ServiceUnavailableException("Retrieval mode is STORAGE but segment resource was called");
            case SpoolingConfig.SegmentRetrievalMode.COORDINATOR_PROXY -> Response.ok((Object)this.spoolingManager.openInputStream(handle)).build();
            case SpoolingConfig.SegmentRetrievalMode.WORKER_PROXY -> {
                HostAddress hostAddress = this.randomActiveWorkerNode();
                yield Response.seeOther((URI)uriInfo.getRequestUriBuilder().host(hostAddress.getHostText()).port(hostAddress.getPort()).build(new Object[0])).build();
            }
            case SpoolingConfig.SegmentRetrievalMode.COORDINATOR_STORAGE_REDIRECT -> Response.seeOther((URI)((SpooledLocation.DirectLocation)this.spoolingManager.directLocation(handle).orElseThrow(() -> new ServiceUnavailableException("Could not generate pre-signed URI"))).directUri()).build();
        };
    }

    @GET
    @Path(value="/ack/{identifier}")
    @ResourceSecurity(value=ResourceSecurity.AccessType.PUBLIC)
    public Response acknowledge(@PathParam(value="identifier") String identifier, @Context HttpHeaders headers) throws IOException {
        try {
            this.spoolingManager.acknowledge(this.handle(identifier, headers));
            return Response.ok().build();
        }
        catch (IOException e) {
            return Response.serverError().entity((Object)e.toString()).build();
        }
    }

    public static UriBuilder spooledSegmentUriBuilder(ExternalUriInfo info) {
        return UriBuilder.fromUri((URI)info.baseUriBuilder().build()).path(CoordinatorSegmentResource.class);
    }

    public HostAddress randomActiveWorkerNode() {
        List internalNodes = (List)this.nodeManager.getActiveNodesSnapshot().getAllNodes().stream().filter(node -> !node.isCoordinator()).collect(ImmutableList.toImmutableList());
        Verify.verify((!internalNodes.isEmpty() ? 1 : 0) != 0, (String)"No active worker nodes available", (Object[])new Object[0]);
        return ((InternalNode)internalNodes.get(ThreadLocalRandom.current().nextInt(internalNodes.size()))).getHostAndPort();
    }

    private SpooledSegmentHandle handle(String identifier, HttpHeaders headers) {
        return this.spoolingManager.handle(Slices.wrappedBuffer((byte[])identifier.getBytes(StandardCharsets.UTF_8)), (Map)headers.getRequestHeaders());
    }
}

