/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.sdk.iot.service.transport.amqps;

import com.microsoft.azure.sdk.iot.service.FeedbackBatch;
import com.microsoft.azure.sdk.iot.service.FeedbackBatchMessage;
import com.microsoft.azure.sdk.iot.service.IotHubServiceClientProtocol;
import com.microsoft.azure.sdk.iot.service.transport.amqps.AmqpFeedbackReceivedEvent;
import com.microsoft.azure.sdk.iot.service.transport.amqps.AmqpFeedbackReceivedHandler;
import java.io.IOException;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.reactor.Reactor;

public class AmqpReceive
extends BaseHandler
implements AmqpFeedbackReceivedEvent {
    private final String hostName;
    private final String userName;
    private final String sasToken;
    private AmqpFeedbackReceivedHandler amqpReceiveHandler;
    private IotHubServiceClientProtocol iotHubServiceClientProtocol;
    private Reactor reactor = null;
    private FeedbackBatch feedbackBatch;
    private static final int REACTOR_TIMEOUT = 3141;

    public AmqpReceive(String hostName, String userName, String sasToken, IotHubServiceClientProtocol iotHubServiceClientProtocol) {
        this.hostName = hostName;
        this.userName = userName;
        this.sasToken = sasToken;
        this.iotHubServiceClientProtocol = iotHubServiceClientProtocol;
    }

    public void onReactorInit(Event event) {
        event.getReactor().connection((Handler)this.amqpReceiveHandler);
    }

    public void open() {
        this.amqpReceiveHandler = new AmqpFeedbackReceivedHandler(this.hostName, this.userName, this.sasToken, this.iotHubServiceClientProtocol, this);
    }

    public void close() {
        this.amqpReceiveHandler = null;
    }

    public synchronized FeedbackBatch receive(long timeoutMs) throws IOException, InterruptedException {
        this.feedbackBatch = null;
        if (this.amqpReceiveHandler != null) {
            this.reactor = Proton.reactor((Handler[])new Handler[]{this});
            this.reactor.setTimeout(3141L);
            this.reactor.start();
            long startTime = System.currentTimeMillis();
            long endTime = startTime + timeoutMs;
            while (this.reactor.process() && System.currentTimeMillis() <= endTime) {
            }
        } else {
            throw new IOException("receive handler is not initialized. call open before receive");
        }
        this.reactor.stop();
        this.reactor.process();
        this.reactor.free();
        this.amqpReceiveHandler.receiveComplete();
        return this.feedbackBatch;
    }

    @Override
    public void onFeedbackReceived(String feedbackJson) {
        this.feedbackBatch = FeedbackBatchMessage.parse(feedbackJson);
    }
}

