/*
 * Decompiled with CFR 0.152.
 */
package com.azure.storage.file.datalake;

import com.azure.core.annotation.ReturnType;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.annotation.ServiceMethod;
import com.azure.core.credential.AzureSasCredential;
import com.azure.core.http.HttpPipeline;
import com.azure.core.http.rest.Response;
import com.azure.core.http.rest.ResponseBase;
import com.azure.core.http.rest.SimpleResponse;
import com.azure.core.util.Context;
import com.azure.core.util.DateTimeRfc1123;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.blob.models.BlobProperties;
import com.azure.storage.blob.options.BlobDownloadToFileOptions;
import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
import com.azure.storage.common.ParallelTransferOptions;
import com.azure.storage.common.ProgressReceiver;
import com.azure.storage.common.ProgressReporter;
import com.azure.storage.common.Utility;
import com.azure.storage.common.implementation.BufferAggregator;
import com.azure.storage.common.implementation.BufferStagingArea;
import com.azure.storage.common.implementation.StorageImplUtils;
import com.azure.storage.common.implementation.UploadUtils;
import com.azure.storage.file.datalake.DataLakePathAsyncClient;
import com.azure.storage.file.datalake.DataLakePathClientBuilder;
import com.azure.storage.file.datalake.DataLakeServiceVersion;
import com.azure.storage.file.datalake.Transforms;
import com.azure.storage.file.datalake.implementation.models.LeaseAccessConditions;
import com.azure.storage.file.datalake.implementation.models.ModifiedAccessConditions;
import com.azure.storage.file.datalake.implementation.models.PathExpiryOptions;
import com.azure.storage.file.datalake.implementation.models.PathResourceType;
import com.azure.storage.file.datalake.implementation.models.PathsFlushDataHeaders;
import com.azure.storage.file.datalake.implementation.util.DataLakeImplUtils;
import com.azure.storage.file.datalake.implementation.util.ModelHelper;
import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
import com.azure.storage.file.datalake.models.DownloadRetryOptions;
import com.azure.storage.file.datalake.models.FileExpirationOffset;
import com.azure.storage.file.datalake.models.FileQueryAsyncResponse;
import com.azure.storage.file.datalake.models.FileRange;
import com.azure.storage.file.datalake.models.FileReadAsyncResponse;
import com.azure.storage.file.datalake.models.PathHttpHeaders;
import com.azure.storage.file.datalake.models.PathInfo;
import com.azure.storage.file.datalake.models.PathProperties;
import com.azure.storage.file.datalake.options.FileParallelUploadOptions;
import com.azure.storage.file.datalake.options.FileQueryOptions;
import com.azure.storage.file.datalake.options.FileScheduleDeletionOptions;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.OpenOption;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;

@ServiceClient(builder=DataLakePathClientBuilder.class, isAsync=true)
public class DataLakeFileAsyncClient
extends DataLakePathAsyncClient {
    static final long MAX_APPEND_FILE_BYTES = 0xFA000000L;
    private static final ClientLogger LOGGER = new ClientLogger(DataLakeFileAsyncClient.class);

    DataLakeFileAsyncClient(HttpPipeline pipeline, String url, DataLakeServiceVersion serviceVersion, String accountName, String fileSystemName, String fileName, BlockBlobAsyncClient blockBlobAsyncClient, AzureSasCredential sasToken) {
        super(pipeline, url, serviceVersion, accountName, fileSystemName, fileName, PathResourceType.FILE, blockBlobAsyncClient, sasToken);
    }

    DataLakeFileAsyncClient(DataLakePathAsyncClient pathAsyncClient) {
        super(pathAsyncClient.getHttpPipeline(), pathAsyncClient.getAccountUrl(), pathAsyncClient.getServiceVersion(), pathAsyncClient.getAccountName(), pathAsyncClient.getFileSystemName(), Utility.urlEncode((String)pathAsyncClient.pathName), PathResourceType.FILE, pathAsyncClient.getBlockBlobAsyncClient(), pathAsyncClient.getSasToken());
    }

    public String getFileUrl() {
        return this.getPathUrl();
    }

    public String getFilePath() {
        return this.getObjectPath();
    }

    public String getFileName() {
        return this.getObjectName();
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Void> delete() {
        return this.deleteWithResponse(null).flatMap(FluxUtil::toMono);
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Response<Void>> deleteWithResponse(DataLakeRequestConditions requestConditions) {
        try {
            return FluxUtil.withContext(context -> this.deleteWithResponse(null, requestConditions, (Context)context));
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)ex);
        }
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<PathInfo> upload(Flux<ByteBuffer> data, ParallelTransferOptions parallelTransferOptions) {
        return this.upload(data, parallelTransferOptions, false);
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<PathInfo> upload(Flux<ByteBuffer> data, ParallelTransferOptions parallelTransferOptions, boolean overwrite) {
        DataLakeRequestConditions requestConditions;
        Mono overwriteCheck;
        if (overwrite) {
            overwriteCheck = Mono.empty();
            requestConditions = null;
        } else {
            overwriteCheck = this.exists().flatMap(exists -> exists != false ? FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalArgumentException("Blob already exists. Specify overwrite to true to force update the blob.")) : Mono.empty());
            requestConditions = new DataLakeRequestConditions().setIfNoneMatch("*");
        }
        return overwriteCheck.then(this.uploadWithResponse(data, parallelTransferOptions, null, null, requestConditions)).flatMap(FluxUtil::toMono);
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Response<PathInfo>> uploadWithResponse(Flux<ByteBuffer> data, ParallelTransferOptions parallelTransferOptions, PathHttpHeaders headers, Map<String, String> metadata, DataLakeRequestConditions requestConditions) {
        try {
            return this.uploadWithResponse(new FileParallelUploadOptions(data).setParallelTransferOptions(parallelTransferOptions).setHeaders(headers).setMetadata(metadata).setRequestConditions(requestConditions));
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)ex);
        }
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Response<PathInfo>> uploadWithResponse(FileParallelUploadOptions options) {
        try {
            StorageImplUtils.assertNotNull((String)"options", (Object)options);
            DataLakeRequestConditions validatedRequestConditions = options.getRequestConditions() == null ? new DataLakeRequestConditions() : options.getRequestConditions();
            DataLakeRequestConditions validatedUploadRequestConditions = new DataLakeRequestConditions().setLeaseId(validatedRequestConditions.getLeaseId());
            ParallelTransferOptions validatedParallelTransferOptions = ModelHelper.populateAndApplyDefaults(options.getParallelTransferOptions());
            long fileOffset = 0L;
            Function<Flux, Mono> uploadInChunksFunction = stream -> this.uploadInChunks((Flux<ByteBuffer>)stream, fileOffset, validatedParallelTransferOptions, options.getHeaders(), validatedUploadRequestConditions);
            BiFunction<Flux, Long, Mono> uploadFullMethod = (stream, length) -> this.uploadWithResponse((Flux<ByteBuffer>)ProgressReporter.addProgressReporting((Flux)stream, (ProgressReceiver)validatedParallelTransferOptions.getProgressReceiver()), fileOffset, (long)length, options.getHeaders(), validatedUploadRequestConditions);
            Flux data = options.getDataFlux();
            if (data == null && options.getOptionalLength() == null) {
                int chunkSize = (int)Math.min(0x4000000L, validatedParallelTransferOptions.getBlockSizeLong());
                data = FluxUtil.toFluxByteBuffer((InputStream)options.getDataStream(), (int)chunkSize);
            } else if (data == null) {
                int chunkSize = (int)Math.min(0x4000000L, validatedParallelTransferOptions.getBlockSizeLong());
                data = Utility.convertStreamToByteBuffer((InputStream)options.getDataStream(), (long)options.getOptionalLength(), (int)chunkSize, (boolean)false);
            }
            return this.createWithResponse(options.getPermissions(), options.getUmask(), options.getHeaders(), options.getMetadata(), validatedRequestConditions).then(UploadUtils.uploadFullOrChunked((Flux)data, (ParallelTransferOptions)validatedParallelTransferOptions, uploadInChunksFunction, uploadFullMethod));
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)ex);
        }
    }

    private Mono<Response<PathInfo>> uploadInChunks(Flux<ByteBuffer> data, long fileOffset, ParallelTransferOptions parallelTransferOptions, PathHttpHeaders httpHeaders, DataLakeRequestConditions requestConditions) {
        AtomicLong totalProgress = new AtomicLong();
        ReentrantLock progressLock = new ReentrantLock();
        BufferStagingArea stagingArea = new BufferStagingArea(parallelTransferOptions.getBlockSizeLong().longValue(), 0xFA000000L);
        Flux chunkedSource = UploadUtils.chunkSource(data, (ParallelTransferOptions)parallelTransferOptions);
        return chunkedSource.flatMapSequential(arg_0 -> ((BufferStagingArea)stagingArea).write(arg_0), 1, 1).concatWith((Publisher)Flux.defer(() -> ((BufferStagingArea)stagingArea).flush())).map(bufferAggregator -> Tuples.of((Object)bufferAggregator, (Object)bufferAggregator.length(), (Object)0L)).scan((result, source) -> {
            BufferAggregator bufferAggregator = (BufferAggregator)source.getT1();
            long currentBufferLength = bufferAggregator.length();
            long lastBytesWritten = (Long)result.getT2();
            long lastOffset = (Long)result.getT3();
            return Tuples.of((Object)bufferAggregator, (Object)currentBufferLength, (Object)(lastBytesWritten + lastOffset));
        }).flatMapSequential(tuple3 -> {
            BufferAggregator bufferAggregator = (BufferAggregator)tuple3.getT1();
            long currentBufferLength = bufferAggregator.length();
            long currentOffset = (Long)tuple3.getT3() + fileOffset;
            Flux progressData = ProgressReporter.addParallelProgressReporting((Flux)bufferAggregator.asFlux(), (ProgressReceiver)parallelTransferOptions.getProgressReceiver(), (Lock)progressLock, (AtomicLong)totalProgress);
            long offset = currentBufferLength + currentOffset;
            return this.appendWithResponse((Flux<ByteBuffer>)progressData, currentOffset, currentBufferLength, null, requestConditions.getLeaseId()).map(resp -> offset).flux();
        }, parallelTransferOptions.getMaxConcurrency().intValue(), 1).last().flatMap(length -> this.flushWithResponse((long)length, false, false, httpHeaders, requestConditions));
    }

    private Mono<Response<PathInfo>> uploadWithResponse(Flux<ByteBuffer> data, long fileOffset, long length, PathHttpHeaders httpHeaders, DataLakeRequestConditions requestConditions) {
        return this.appendWithResponse(data, fileOffset, length, null, requestConditions.getLeaseId()).flatMap(resp -> this.flushWithResponse(fileOffset + length, false, false, httpHeaders, requestConditions));
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Void> uploadFromFile(String filePath) {
        return this.uploadFromFile(filePath, false);
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Void> uploadFromFile(String filePath, boolean overwrite) {
        Mono overwriteCheck = Mono.empty();
        DataLakeRequestConditions requestConditions = null;
        if (!overwrite) {
            if (UploadUtils.shouldUploadInChunks((String)filePath, (Long)0x6400000L, (ClientLogger)LOGGER)) {
                overwriteCheck = this.exists().flatMap(exists -> exists != false ? FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalArgumentException("File already exists. Specify overwrite to true to force update the file.")) : Mono.empty());
            }
            requestConditions = new DataLakeRequestConditions().setIfNoneMatch("*");
        }
        return overwriteCheck.then(this.uploadFromFile(filePath, null, null, null, requestConditions));
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Void> uploadFromFile(String filePath, ParallelTransferOptions parallelTransferOptions, PathHttpHeaders headers, Map<String, String> metadata, DataLakeRequestConditions requestConditions) {
        Long originalBlockSize = parallelTransferOptions == null ? null : parallelTransferOptions.getBlockSizeLong();
        DataLakeRequestConditions validatedRequestConditions = requestConditions == null ? new DataLakeRequestConditions() : requestConditions;
        DataLakeRequestConditions validatedUploadRequestConditions = new DataLakeRequestConditions().setLeaseId(validatedRequestConditions.getLeaseId());
        ParallelTransferOptions finalParallelTransferOptions = ModelHelper.populateAndApplyDefaults(parallelTransferOptions);
        long fileOffset = 0L;
        try {
            return Mono.using(() -> UploadUtils.uploadFileResourceSupplier((String)filePath, (ClientLogger)LOGGER), channel -> {
                try {
                    long fileSize = channel.size();
                    if (fileSize == 0L) {
                        throw LOGGER.logExceptionAsError((RuntimeException)new IllegalArgumentException("Size of the file must be greater than 0."));
                    }
                    if (UploadUtils.shouldUploadInChunks((String)filePath, (Long)finalParallelTransferOptions.getMaxSingleUploadSizeLong(), (ClientLogger)LOGGER)) {
                        return this.createWithResponse(null, null, headers, metadata, validatedRequestConditions).then(this.uploadFileChunks(fileOffset, fileSize, finalParallelTransferOptions, originalBlockSize, headers, validatedUploadRequestConditions, (AsynchronousFileChannel)channel));
                    }
                    return this.createWithResponse(null, null, headers, metadata, validatedRequestConditions).then(this.uploadWithResponse((Flux<ByteBuffer>)FluxUtil.readFile((AsynchronousFileChannel)channel), fileOffset, fileSize, headers, validatedUploadRequestConditions)).then();
                }
                catch (IOException ex) {
                    return Mono.error((Throwable)ex);
                }
            }, channel -> UploadUtils.uploadFileCleanup((AsynchronousFileChannel)channel, (ClientLogger)LOGGER));
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)ex);
        }
    }

    private Mono<Void> uploadFileChunks(long fileOffset, long fileSize, ParallelTransferOptions parallelTransferOptions, Long originalBlockSize, PathHttpHeaders headers, DataLakeRequestConditions requestConditions, AsynchronousFileChannel channel) {
        AtomicLong totalProgress = new AtomicLong();
        ReentrantLock progressLock = new ReentrantLock();
        return Flux.fromIterable(DataLakeFileAsyncClient.sliceFile(fileSize, originalBlockSize, parallelTransferOptions.getBlockSizeLong())).flatMap(chunk -> {
            Flux progressData = ProgressReporter.addParallelProgressReporting((Flux)FluxUtil.readFile((AsynchronousFileChannel)channel, (long)chunk.getOffset(), (long)chunk.getCount()), (ProgressReceiver)parallelTransferOptions.getProgressReceiver(), (Lock)progressLock, (AtomicLong)totalProgress);
            return this.appendWithResponse((Flux<ByteBuffer>)progressData, fileOffset + chunk.getOffset(), chunk.getCount(), null, requestConditions.getLeaseId());
        }, parallelTransferOptions.getMaxConcurrency().intValue()).then(Mono.defer(() -> this.flushWithResponse(fileSize, false, false, headers, requestConditions))).then();
    }

    private static List<FileRange> sliceFile(long fileSize, Long originalBlockSize, long blockSize) {
        ArrayList<FileRange> ranges = new ArrayList<FileRange>();
        if (fileSize > 0x6400000L && originalBlockSize == null) {
            blockSize = 0x800000L;
        }
        for (long pos = 0L; pos < fileSize; pos += blockSize) {
            long count = blockSize;
            if (pos + count > fileSize) {
                count = fileSize - pos;
            }
            ranges.add(new FileRange(pos, count));
        }
        return ranges;
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Void> append(Flux<ByteBuffer> data, long fileOffset, long length) {
        return this.appendWithResponse(data, fileOffset, length, null, null).flatMap(FluxUtil::toMono);
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Response<Void>> appendWithResponse(Flux<ByteBuffer> data, long fileOffset, long length, byte[] contentMd5, String leaseId) {
        try {
            return FluxUtil.withContext(context -> this.appendWithResponse(data, fileOffset, length, contentMd5, leaseId, (Context)context));
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)ex);
        }
    }

    Mono<Response<Void>> appendWithResponse(Flux<ByteBuffer> data, long fileOffset, long length, byte[] contentMd5, String leaseId, Context context) {
        LeaseAccessConditions leaseAccessConditions = new LeaseAccessConditions().setLeaseId(leaseId);
        PathHttpHeaders headers = new PathHttpHeaders().setTransactionalContentHash(contentMd5);
        return this.dataLakeStorage.getPaths().appendDataWithResponseAsync(data, fileOffset, null, length, null, null, headers, leaseAccessConditions, context).map(response -> new SimpleResponse((Response)response, null));
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<PathInfo> flush(long position) {
        return this.flush(position, false);
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<PathInfo> flush(long position, boolean overwrite) {
        DataLakeRequestConditions requestConditions = null;
        if (!overwrite) {
            requestConditions = new DataLakeRequestConditions().setIfNoneMatch("*");
        }
        return this.flushWithResponse(position, false, false, null, requestConditions).flatMap(FluxUtil::toMono);
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Response<PathInfo>> flushWithResponse(long position, boolean retainUncommittedData, boolean close, PathHttpHeaders httpHeaders, DataLakeRequestConditions requestConditions) {
        try {
            return FluxUtil.withContext(context -> this.flushWithResponse(position, retainUncommittedData, close, httpHeaders, requestConditions, (Context)context));
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)ex);
        }
    }

    Mono<Response<PathInfo>> flushWithResponse(long position, boolean retainUncommittedData, boolean close, PathHttpHeaders httpHeaders, DataLakeRequestConditions requestConditions, Context context) {
        httpHeaders = httpHeaders == null ? new PathHttpHeaders() : httpHeaders;
        requestConditions = requestConditions == null ? new DataLakeRequestConditions() : requestConditions;
        LeaseAccessConditions lac = new LeaseAccessConditions().setLeaseId(requestConditions.getLeaseId());
        ModifiedAccessConditions mac = new ModifiedAccessConditions().setIfMatch(requestConditions.getIfMatch()).setIfNoneMatch(requestConditions.getIfNoneMatch()).setIfModifiedSince(requestConditions.getIfModifiedSince()).setIfUnmodifiedSince(requestConditions.getIfUnmodifiedSince());
        context = context == null ? Context.NONE : context;
        return this.dataLakeStorage.getPaths().flushDataWithResponseAsync(null, position, retainUncommittedData, close, 0L, null, httpHeaders, lac, mac, context.addData((Object)"az.namespace", (Object)"Microsoft.Storage")).map(response -> new SimpleResponse((Response)response, (Object)new PathInfo(((PathsFlushDataHeaders)response.getDeserializedHeaders()).getETag(), ((PathsFlushDataHeaders)response.getDeserializedHeaders()).getLastModified())));
    }

    public Flux<ByteBuffer> read() {
        return this.readWithResponse(null, null, null, false).flatMapMany(ResponseBase::getValue);
    }

    public Mono<FileReadAsyncResponse> readWithResponse(FileRange range, DownloadRetryOptions options, DataLakeRequestConditions requestConditions, boolean getRangeContentMd5) {
        return this.blockBlobAsyncClient.downloadWithResponse(Transforms.toBlobRange(range), Transforms.toBlobDownloadRetryOptions(options), Transforms.toBlobRequestConditions(requestConditions), getRangeContentMd5).map(Transforms::toFileReadAsyncResponse).onErrorMap(DataLakeImplUtils::transformBlobStorageException);
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<PathProperties> readToFile(String filePath) {
        return this.readToFile(filePath, false);
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<PathProperties> readToFile(String filePath, boolean overwrite) {
        HashSet<OpenOption> openOptions = null;
        if (overwrite) {
            openOptions = new HashSet<OpenOption>();
            openOptions.add(StandardOpenOption.CREATE);
            openOptions.add(StandardOpenOption.TRUNCATE_EXISTING);
            openOptions.add(StandardOpenOption.READ);
            openOptions.add(StandardOpenOption.WRITE);
        }
        return this.readToFileWithResponse(filePath, null, null, null, null, false, openOptions).flatMap(FluxUtil::toMono);
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Response<PathProperties>> readToFileWithResponse(String filePath, FileRange range, ParallelTransferOptions parallelTransferOptions, DownloadRetryOptions options, DataLakeRequestConditions requestConditions, boolean rangeGetContentMd5, Set<OpenOption> openOptions) {
        return this.blockBlobAsyncClient.downloadToFileWithResponse(new BlobDownloadToFileOptions(filePath).setRange(Transforms.toBlobRange(range)).setParallelTransferOptions(parallelTransferOptions).setDownloadRetryOptions(Transforms.toBlobDownloadRetryOptions(options)).setRequestConditions(Transforms.toBlobRequestConditions(requestConditions)).setRetrieveContentRangeMd5(rangeGetContentMd5).setOpenOptions(openOptions)).onErrorMap(DataLakeImplUtils::transformBlobStorageException).map(response -> new SimpleResponse(response, (Object)Transforms.toPathProperties((BlobProperties)response.getValue())));
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<DataLakeFileAsyncClient> rename(String destinationFileSystem, String destinationPath) {
        return this.renameWithResponse(destinationFileSystem, destinationPath, null, null).flatMap(FluxUtil::toMono);
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Response<DataLakeFileAsyncClient>> renameWithResponse(String destinationFileSystem, String destinationPath, DataLakeRequestConditions sourceRequestConditions, DataLakeRequestConditions destinationRequestConditions) {
        try {
            return FluxUtil.withContext(context -> this.renameWithResponse(destinationFileSystem, destinationPath, sourceRequestConditions, destinationRequestConditions, (Context)context)).map(response -> new SimpleResponse(response, (Object)new DataLakeFileAsyncClient((DataLakePathAsyncClient)response.getValue())));
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)ex);
        }
    }

    public Flux<ByteBuffer> query(String expression) {
        try {
            return this.queryWithResponse(new FileQueryOptions(expression)).flatMapMany(ResponseBase::getValue);
        }
        catch (RuntimeException ex) {
            return FluxUtil.fluxError((ClientLogger)LOGGER, (RuntimeException)ex);
        }
    }

    public Mono<FileQueryAsyncResponse> queryWithResponse(FileQueryOptions queryOptions) {
        return this.blockBlobAsyncClient.queryWithResponse(Transforms.toBlobQueryOptions(queryOptions)).map(Transforms::toFileQueryAsyncResponse).onErrorMap(DataLakeImplUtils::transformBlobStorageException);
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Void> scheduleDeletion(FileScheduleDeletionOptions options) {
        return this.scheduleDeletionWithResponse(options).flatMap(FluxUtil::toMono);
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Response<Void>> scheduleDeletionWithResponse(FileScheduleDeletionOptions options) {
        try {
            return FluxUtil.withContext(context -> this.scheduleDeletionWithResponse(options, (Context)context));
        }
        catch (RuntimeException ex) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)ex);
        }
    }

    Mono<Response<Void>> scheduleDeletionWithResponse(FileScheduleDeletionOptions options, Context context) {
        PathExpiryOptions pathExpiryOptions;
        context = context == null ? Context.NONE : context;
        String expiresOn = null;
        if (options != null && options.getExpiresOn() != null) {
            pathExpiryOptions = PathExpiryOptions.ABSOLUTE;
            expiresOn = new DateTimeRfc1123(options.getExpiresOn()).toString();
        } else if (options != null && options.getTimeToExpire() != null) {
            pathExpiryOptions = options.getExpiryRelativeTo() == FileExpirationOffset.CREATION_TIME ? PathExpiryOptions.RELATIVE_TO_CREATION : PathExpiryOptions.RELATIVE_TO_NOW;
            expiresOn = Long.toString(options.getTimeToExpire().toMillis());
        } else {
            pathExpiryOptions = PathExpiryOptions.NEVER_EXPIRE;
        }
        return this.blobDataLakeStorage.getPaths().setExpiryWithResponseAsync(pathExpiryOptions, null, null, expiresOn, context.addData((Object)"az.namespace", (Object)"Microsoft.Storage")).map(rb -> new SimpleResponse((Response)rb, null));
    }
}

