/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.huaweicloud.obs;

import com.obs.services.ObsClient;
import com.obs.services.exception.ObsException;
import com.obs.services.model.BucketMetadataInfoRequest;
import com.obs.services.model.BucketMetadataInfoResult;
import com.obs.services.model.CopyObjectResult;
import com.obs.services.model.ListObjectsRequest;
import com.obs.services.model.ObjectListing;
import com.obs.services.model.ObsObject;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.Processor;
import org.apache.camel.component.huaweicloud.obs.OBSEndpoint;
import org.apache.camel.component.huaweicloud.obs.OBSUtils;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.ScheduledBatchPollingConsumer;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OBSConsumer
extends ScheduledBatchPollingConsumer {
    private static final Logger LOG = LoggerFactory.getLogger((String)OBSConsumer.class.getName());
    private final OBSEndpoint endpoint;
    private ObsClient obsClient;
    private String marker;
    private boolean destinationBucketCreated;

    public OBSConsumer(OBSEndpoint endpoint, Processor processor) {
        super((Endpoint)endpoint, processor);
        this.endpoint = endpoint;
        this.destinationBucketCreated = false;
    }

    protected void doStart() throws Exception {
        super.doStart();
        this.obsClient = this.endpoint.initClient();
        if (ObjectHelper.isEmpty((Object)this.endpoint.getBucketName())) {
            throw new IllegalArgumentException("Bucket name is mandatory to download objects");
        }
        if (this.endpoint.isMoveAfterRead()) {
            if (ObjectHelper.isEmpty((Object)this.endpoint.getDestinationBucket())) {
                throw new IllegalArgumentException("Destination bucket is mandatory when moveAfterRead is true");
            }
            BucketMetadataInfoRequest request = new BucketMetadataInfoRequest(this.endpoint.getBucketName());
            BucketMetadataInfoResult metadata = this.obsClient.getBucketMetadata(request);
            String bucketLocation = metadata.getLocation();
            try {
                BucketMetadataInfoRequest destinationRequest = new BucketMetadataInfoRequest(this.endpoint.getDestinationBucket());
                BucketMetadataInfoResult destinationMetadata = this.obsClient.getBucketMetadata(destinationRequest);
                String destinationLocation = destinationMetadata.getLocation();
                if (!bucketLocation.equals(destinationLocation)) {
                    throw new IllegalArgumentException("Destination bucket location must have the same location as the source bucket");
                }
                this.destinationBucketCreated = true;
                return;
            }
            catch (ObsException e) {
                if (e.getResponseCode() != 404) {
                    throw e;
                }
                this.obsClient.createBucket(this.endpoint.getDestinationBucket(), bucketLocation);
                this.destinationBucketCreated = true;
            }
        }
    }

    protected int poll() throws Exception {
        Queue<Object> exchanges;
        this.shutdownRunningTask = null;
        this.pendingExchanges = 0;
        String fileName = this.endpoint.getFileName();
        String bucketName = this.endpoint.getBucketName();
        if (this.endpoint.isMoveAfterRead() && !this.destinationBucketCreated) {
            exchanges = new LinkedList();
        } else if (ObjectHelper.isNotEmpty((Object)fileName)) {
            ObsObject object = this.obsClient.getObject(bucketName, fileName);
            ArrayList<ObsObject> list = new ArrayList<ObsObject>();
            list.add(object);
            exchanges = this.createExchanges(list);
        } else {
            ObjectListing objectListing;
            ListObjectsRequest request = new ListObjectsRequest(bucketName);
            request.setPrefix(this.endpoint.getPrefix());
            request.setDelimiter(this.endpoint.getDelimiter());
            if (this.maxMessagesPerPoll > 0) {
                request.setMaxKeys(this.maxMessagesPerPoll);
            }
            if (this.marker != null) {
                LOG.trace("Resuming from marker: " + this.marker);
                request.setMarker(this.marker);
            }
            this.marker = (objectListing = this.obsClient.listObjects(request)).isTruncated() ? objectListing.getNextMarker() : null;
            exchanges = this.createExchanges(objectListing.getObjects());
        }
        return this.processBatch(CastUtils.cast(exchanges));
    }

    public int processBatch(Queue<Object> exchanges) throws Exception {
        int total = exchanges.size();
        for (int index = 0; index < total && this.isBatchAllowed(); ++index) {
            Exchange exchange = (Exchange)ObjectHelper.cast(Exchange.class, (Object)exchanges.poll());
            exchange.setProperty(ExchangePropertyKey.BATCH_SIZE, (Object)total);
            exchange.setProperty(ExchangePropertyKey.BATCH_INDEX, (Object)index);
            exchange.setProperty(ExchangePropertyKey.BATCH_COMPLETE, (Object)(index == total - 1 ? 1 : 0));
            this.pendingExchanges = total - index - 1;
            ((ExtendedExchange)exchange.adapt(ExtendedExchange.class)).addOnCompletion(new Synchronization(){

                public void onComplete(Exchange exchange) {
                    OBSConsumer.this.processComplete(exchange);
                }

                public void onFailure(Exchange exchange) {
                    OBSConsumer.this.processFailure(exchange);
                }
            });
            AsyncCallback callback = this.defaultConsumerCallback(exchange, true);
            this.getAsyncProcessor().process(exchange, callback);
        }
        return total;
    }

    private Queue<Exchange> createExchanges(List<ObsObject> obsObjects) {
        LinkedList<Exchange> answer = new LinkedList<Exchange>();
        for (ObsObject objectSummary : obsObjects) {
            ObsObject obsObject = objectSummary.getMetadata().getContentType() == null ? this.obsClient.getObject(this.endpoint.getBucketName(), objectSummary.getObjectKey()) : objectSummary;
            if (!this.includeObsObject(obsObject)) continue;
            Exchange exchange = this.createExchange(obsObject);
            answer.add(exchange);
        }
        return answer;
    }

    private boolean includeObsObject(ObsObject obsObject) {
        return this.endpoint.isIncludeFolders() || !obsObject.getObjectKey().endsWith("/");
    }

    public Exchange createExchange(ObsObject obsObject) {
        Exchange exchange = this.createExchange(true);
        exchange.setPattern(this.endpoint.getExchangePattern());
        OBSUtils.mapObsObject(exchange, obsObject);
        return exchange;
    }

    private void processComplete(Exchange exchange) {
        CopyObjectResult copyObjectResult;
        String bucketName = (String)exchange.getIn().getHeader("CamelHwCloudObsBucketName", String.class);
        String objectKey = (String)exchange.getIn().getHeader("CamelHwCloudObsObjectKey", String.class);
        if (this.endpoint.isMoveAfterRead()) {
            copyObjectResult = this.obsClient.copyObject(bucketName, objectKey, this.endpoint.getDestinationBucket(), objectKey);
        }
        if (this.endpoint.isDeleteAfterRead()) {
            copyObjectResult = this.obsClient.deleteObject(bucketName, objectKey);
        }
    }

    private void processFailure(Exchange exchange) {
        Exception exception = exchange.getException();
        if (exception != null) {
            LOG.warn("Exchange failed, so rolling back message status: {}", (Object)exchange, (Object)exception);
        } else {
            LOG.warn("Exchange failed, so rolling back message status: {}", (Object)exchange);
        }
    }
}

