/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.direct_java.runners.fnexecution.artifact;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.Serializable;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.FnService;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.MessageOrBuilder;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.util.JsonFormat;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.StatusRuntimeException;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hasher;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BeamFileSystemArtifactStagingService
extends ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase
implements FnService {
    private static final Logger LOG = LoggerFactory.getLogger(BeamFileSystemArtifactStagingService.class);
    private static final ObjectMapper MAPPER = new ObjectMapper();
    private static final Charset CHARSET = StandardCharsets.UTF_8;
    public static final String MANIFEST = "MANIFEST";
    public static final String ARTIFACTS = "artifacts";

    public StreamObserver<ArtifactApi.PutArtifactRequest> putArtifact(StreamObserver<ArtifactApi.PutArtifactResponse> responseObserver) {
        return new PutArtifactStreamObserver(responseObserver);
    }

    public void commitManifest(ArtifactApi.CommitManifestRequest request, StreamObserver<ArtifactApi.CommitManifestResponse> responseObserver) {
        try {
            StagingSessionToken stagingSessionToken = StagingSessionToken.decode(request.getStagingSessionToken());
            ResourceId manifestResourceId = this.getManifestFileResourceId(stagingSessionToken);
            ResourceId artifactDirResourceId = this.getArtifactDirResourceId(stagingSessionToken);
            ArtifactApi.ProxyManifest.Builder proxyManifestBuilder = ArtifactApi.ProxyManifest.newBuilder().setManifest(request.getManifest());
            for (ArtifactApi.ArtifactMetadata artifactMetadata : request.getManifest().getArtifactList()) {
                proxyManifestBuilder.addLocation(ArtifactApi.ProxyManifest.Location.newBuilder().setName(artifactMetadata.getName()).setUri(artifactDirResourceId.resolve(this.encodedFileName(artifactMetadata), (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE).toString()).build());
            }
            try (WritableByteChannel manifestWritableByteChannel = FileSystems.create((ResourceId)manifestResourceId, (String)"text/plain");){
                manifestWritableByteChannel.write(CHARSET.encode(JsonFormat.printer().print((MessageOrBuilder)proxyManifestBuilder.build())));
            }
            responseObserver.onNext((Object)ArtifactApi.CommitManifestResponse.newBuilder().setRetrievalToken(manifestResourceId.toString()).build());
            responseObserver.onCompleted();
        }
        catch (Exception e) {
            LOG.error("Unable to commit manifest.", (Throwable)e);
            responseObserver.onError((Throwable)e);
        }
    }

    @Override
    public void close() throws Exception {
    }

    public static String generateStagingSessionToken(String sessionId, String basePath) {
        StagingSessionToken stagingSessionToken = new StagingSessionToken();
        stagingSessionToken.setSessionId(sessionId);
        stagingSessionToken.setBasePath(basePath);
        return stagingSessionToken.encode();
    }

    private String encodedFileName(ArtifactApi.ArtifactMetadata artifactMetadata) {
        return "artifact_" + Hashing.sha256().hashString((CharSequence)artifactMetadata.getName(), CHARSET).toString();
    }

    public void removeArtifacts(String stagingSessionToken) throws Exception {
        StagingSessionToken parsedToken = StagingSessionToken.decode(stagingSessionToken);
        ResourceId dir = this.getJobDirResourceId(parsedToken);
        ResourceId manifestResourceId = dir.resolve(MANIFEST, (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
        LOG.debug("Removing dir {}", (Object)dir);
        ArtifactApi.ProxyManifest proxyManifest = BeamFileSystemArtifactRetrievalService.loadManifest(manifestResourceId);
        for (ArtifactApi.ProxyManifest.Location location : proxyManifest.getLocationList()) {
            String uri = location.getUri();
            LOG.debug("Removing artifact: {}", (Object)uri);
            FileSystems.delete(Collections.singletonList(FileSystems.matchNewResource((String)uri, (boolean)false)), (MoveOptions[])new MoveOptions[0]);
        }
        ResourceId artifactsResourceId = dir.resolve(ARTIFACTS, (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY);
        if (!proxyManifest.getLocationList().isEmpty()) {
            LOG.debug("Removing artifacts dir: {}", (Object)artifactsResourceId);
            FileSystems.delete(Collections.singletonList(artifactsResourceId), (MoveOptions[])new MoveOptions[0]);
        }
        LOG.debug("Removing manifest: {}", (Object)manifestResourceId);
        FileSystems.delete(Collections.singletonList(manifestResourceId), (MoveOptions[])new MoveOptions[0]);
        LOG.debug("Removing empty dir: {}", (Object)dir);
        FileSystems.delete(Collections.singletonList(dir), (MoveOptions[])new MoveOptions[0]);
        LOG.info("Removed dir {}", (Object)dir);
    }

    private ResourceId getJobDirResourceId(StagingSessionToken stagingSessionToken) {
        ResourceId baseResourceId = FileSystems.matchNewResource((String)stagingSessionToken.getBasePath(), (boolean)true);
        return baseResourceId.resolve(stagingSessionToken.getSessionId(), (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY);
    }

    private ResourceId getManifestFileResourceId(StagingSessionToken stagingSessionToken) {
        return this.getJobDirResourceId(stagingSessionToken).resolve(MANIFEST, (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
    }

    private ResourceId getArtifactDirResourceId(StagingSessionToken stagingSessionToken) {
        return this.getJobDirResourceId(stagingSessionToken).resolve(ARTIFACTS, (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY);
    }

    private static class StagingSessionToken
    implements Serializable {
        private String sessionId;
        private String basePath;

        private StagingSessionToken() {
        }

        public String getSessionId() {
            return this.sessionId;
        }

        private void setSessionId(String sessionId) {
            this.sessionId = sessionId;
        }

        public String getBasePath() {
            return this.basePath;
        }

        private void setBasePath(String basePath) {
            this.basePath = basePath;
        }

        public String encode() {
            try {
                return MAPPER.writeValueAsString((Object)this);
            }
            catch (JsonProcessingException e) {
                String message = String.format("Error %s occurred while serializing %s", e.getMessage(), this);
                throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription(message));
            }
        }

        public static StagingSessionToken decode(String stagingSessionToken) throws Exception {
            try {
                return (StagingSessionToken)MAPPER.readValue(stagingSessionToken, StagingSessionToken.class);
            }
            catch (JsonProcessingException e) {
                String message = String.format("Unable to deserialize staging token %s. Expected format: %s. Error: %s", stagingSessionToken, "{\"sessionId\": \"sessionId\", \"basePath\": \"basePath\"}", e.getMessage());
                throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription(message));
            }
        }

        public String toString() {
            return "StagingSessionToken{sessionId='" + this.sessionId + "', basePath='" + this.basePath + "'}";
        }
    }

    private class PutArtifactStreamObserver
    implements StreamObserver<ArtifactApi.PutArtifactRequest> {
        private final StreamObserver<ArtifactApi.PutArtifactResponse> outboundObserver;
        private ArtifactApi.PutArtifactMetadata metadata;
        private ResourceId artifactId;
        private WritableByteChannel artifactWritableByteChannel;
        private Hasher hasher;

        PutArtifactStreamObserver(StreamObserver<ArtifactApi.PutArtifactResponse> outboundObserver) {
            this.outboundObserver = outboundObserver;
        }

        public void onNext(ArtifactApi.PutArtifactRequest putArtifactRequest) {
            if (this.metadata == null) {
                Preconditions.checkNotNull((Object)putArtifactRequest);
                Preconditions.checkNotNull((Object)putArtifactRequest.getMetadata());
                this.metadata = putArtifactRequest.getMetadata();
                LOG.debug("stored metadata: {}", (Object)this.metadata);
                try {
                    ResourceId artifactsDirId = BeamFileSystemArtifactStagingService.this.getArtifactDirResourceId(StagingSessionToken.decode(putArtifactRequest.getMetadata().getStagingSessionToken()));
                    this.artifactId = artifactsDirId.resolve(BeamFileSystemArtifactStagingService.this.encodedFileName(this.metadata.getMetadata()), (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
                    LOG.debug("Going to stage artifact {} to {}.", (Object)this.metadata.getMetadata().getName(), (Object)this.artifactId);
                    this.artifactWritableByteChannel = FileSystems.create((ResourceId)this.artifactId, (String)"application/octet-stream");
                    this.hasher = Hashing.sha256().newHasher();
                }
                catch (Exception e) {
                    String message = String.format("Failed to begin staging artifact %s", this.metadata.getMetadata().getName());
                    LOG.error(message, (Throwable)e);
                    this.outboundObserver.onError((Throwable)new StatusRuntimeException(Status.DATA_LOSS.withDescription(message).withCause((Throwable)e)));
                }
            } else {
                try {
                    ByteString data = putArtifactRequest.getData().getData();
                    this.artifactWritableByteChannel.write(data.asReadOnlyByteBuffer());
                    this.hasher.putBytes(data.toByteArray());
                }
                catch (IOException e) {
                    String message = String.format("Failed to write chunk of artifact %s to %s", this.metadata.getMetadata().getName(), this.artifactId);
                    LOG.error(message, (Throwable)e);
                    this.outboundObserver.onError((Throwable)new StatusRuntimeException(Status.DATA_LOSS.withDescription(message).withCause((Throwable)e)));
                }
            }
        }

        public void onError(Throwable throwable) {
            LOG.error("Staging artifact failed for " + this.artifactId, throwable);
            try {
                if (this.artifactWritableByteChannel != null) {
                    this.artifactWritableByteChannel.close();
                }
                if (this.artifactId != null) {
                    FileSystems.delete(Collections.singletonList(this.artifactId), (MoveOptions[])new MoveOptions[]{MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES});
                }
            }
            catch (IOException e) {
                this.outboundObserver.onError((Throwable)new StatusRuntimeException(Status.DATA_LOSS.withDescription(String.format("Failed to clean up artifact file %s", this.artifactId))));
                return;
            }
            this.outboundObserver.onError((Throwable)new StatusRuntimeException(Status.DATA_LOSS.withDescription(String.format("Failed to stage artifact %s", this.artifactId)).withCause(throwable)));
        }

        public void onCompleted() {
            String actualSha256;
            String expectedSha256;
            LOG.debug("Staging artifact completed for " + this.artifactId);
            if (this.artifactWritableByteChannel != null) {
                try {
                    this.artifactWritableByteChannel.close();
                }
                catch (IOException e) {
                    this.onError(e);
                    return;
                }
            }
            if ((expectedSha256 = this.metadata.getMetadata().getSha256()) != null && !expectedSha256.isEmpty() && !(actualSha256 = this.hasher.hash().toString()).equals(expectedSha256)) {
                this.outboundObserver.onError((Throwable)new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription(String.format("Artifact %s is corrupt: expected sah256 %s, but has sha256 %s", this.metadata.getMetadata().getName(), expectedSha256, actualSha256))));
                return;
            }
            this.outboundObserver.onNext((Object)ArtifactApi.PutArtifactResponse.newBuilder().build());
            this.outboundObserver.onCompleted();
        }
    }
}

