/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.azure.storage.datalake;

import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.models.DataLakeStorageException;
import com.azure.storage.file.datalake.models.PathItem;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.Processor;
import org.apache.camel.component.azure.storage.datalake.DataLakeEndpoint;
import org.apache.camel.component.azure.storage.datalake.client.DataLakeFileClientWrapper;
import org.apache.camel.component.azure.storage.datalake.client.DataLakeFileSystemClientWrapper;
import org.apache.camel.component.azure.storage.datalake.operations.DataLakeFileOperations;
import org.apache.camel.component.azure.storage.datalake.operations.DataLakeFileSystemOperations;
import org.apache.camel.component.azure.storage.datalake.operations.DataLakeOperationResponse;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.ScheduledBatchPollingConsumer;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class DataLakeConsumer
extends ScheduledBatchPollingConsumer {
    public static final int NOT_FOUND = 404;
    private static final Logger LOG = LoggerFactory.getLogger(DataLakeConsumer.class);

    public DataLakeConsumer(Endpoint endpoint, Processor processor) {
        super(endpoint, processor);
    }

    protected int poll() throws Exception {
        int result;
        String fileSystemName = this.getEndpoint().getConfiguration().getFileSystemName();
        String fileName = this.getEndpoint().getConfiguration().getFileName();
        DataLakeFileSystemClient dataLakeFileSystemClient = this.getEndpoint().getDataLakeServiceClient().getFileSystemClient(fileSystemName);
        try {
            Queue<Exchange> exchanges;
            if (ObjectHelper.isNotEmpty((Object)fileName)) {
                Exchange exchange = this.createExchangeFromFile(fileName, dataLakeFileSystemClient);
                exchanges = new LinkedList<Exchange>();
                exchanges.add(exchange);
            } else {
                exchanges = this.createBatchExchangesFromPath(dataLakeFileSystemClient);
            }
            result = this.processBatch(CastUtils.cast(exchanges));
        }
        catch (DataLakeStorageException e) {
            if (404 == e.getStatusCode()) {
                result = 0;
            }
            throw e;
        }
        return result;
    }

    private Queue<Exchange> createBatchExchangesFromPath(DataLakeFileSystemClient dataLakeFileSystemClient) throws IOException {
        DataLakeFileSystemClientWrapper fileSystemClientWrapper = new DataLakeFileSystemClientWrapper(dataLakeFileSystemClient);
        DataLakeFileSystemOperations fileSystemOperations = new DataLakeFileSystemOperations(this.getEndpoint().getConfiguration(), fileSystemClientWrapper);
        List items = (List)fileSystemOperations.listPaths(null).getBody();
        LinkedList<Exchange> exchanges = new LinkedList<Exchange>();
        for (PathItem pathItem : items) {
            if (pathItem.isDirectory()) continue;
            exchanges.add(this.createExchangeFromFile(pathItem.getName(), dataLakeFileSystemClient));
        }
        return exchanges;
    }

    private Exchange createExchangeFromFile(String fileName, DataLakeFileSystemClient dataLakeFileSystemClient) throws IOException {
        DataLakeFileClientWrapper clientWrapper = new DataLakeFileClientWrapper(dataLakeFileSystemClient.getFileClient(fileName));
        DataLakeFileOperations operations = new DataLakeFileOperations(this.getEndpoint().getConfiguration(), clientWrapper);
        Exchange exchange = this.getEndpoint().createExchange();
        DataLakeOperationResponse response = ObjectHelper.isNotEmpty((Object)this.getEndpoint().getConfiguration().getFileDir()) ? operations.downloadToFile(exchange) : operations.getFile(exchange);
        this.getEndpoint().setResponseOnExchange(response, exchange);
        exchange.getIn().setHeader("CamelAzureStorageDataLakeFileName", (Object)fileName);
        return exchange;
    }

    public DataLakeEndpoint getEndpoint() {
        return (DataLakeEndpoint)super.getEndpoint();
    }

    public int processBatch(Queue<Object> exchanges) {
        int total = exchanges.size();
        for (int i = 0; i < total && this.isBatchAllowed(); ++i) {
            Exchange exchange = (Exchange)ObjectHelper.cast(Exchange.class, (Object)exchanges.poll());
            exchange.setProperty("CamelBatchIndex", (Object)i);
            exchange.setProperty("CamelBatchSize", (Object)total);
            exchange.setProperty("CamelBatchComplete", (Object)(i == total - 1 ? 1 : 0));
            this.pendingExchanges = total - i - 1;
            ((ExtendedExchange)exchange.adapt(ExtendedExchange.class)).addOnCompletion(new Synchronization(){

                public void onComplete(Exchange exchange) {
                    LOG.trace("Processing all exchanges completed");
                }

                public void onFailure(Exchange exchange) {
                    DataLakeConsumer.this.processRollback(exchange);
                }
            });
            this.getAsyncProcessor().process(exchange, doneSync -> LOG.trace("Processing exchange [{}] done.", (Object)exchange));
        }
        return total;
    }

    protected void processRollback(Exchange exchange) {
        Exception exception = exchange.getException();
        if (exception != null) {
            LOG.warn("Exchange failed, rollback processed. Status: {}", (Object)exchange, (Object)exception);
        } else {
            LOG.warn("Exchange failed, rollback processed. Status: {}", (Object)exchange);
        }
    }
}

