/*
 * Decompiled with CFR 0.152.
 */
package io.trino.client;

import io.trino.client.spooling.SegmentLoader;
import io.trino.client.spooling.SpooledSegment;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.logging.Level;
import java.util.logging.Logger;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.Headers;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;

public class OkHttpSegmentLoader
implements SegmentLoader {
    private static final Logger logger = Logger.getLogger(SegmentLoader.class.getPackage().getName());
    private final Call.Factory callFactory;

    public OkHttpSegmentLoader() {
        this((Call.Factory)new OkHttpClient());
    }

    public OkHttpSegmentLoader(Call.Factory callFactory) {
        this.callFactory = Objects.requireNonNull(callFactory, "callFactory is null");
    }

    @Override
    public InputStream load(SpooledSegment segment) throws IOException {
        Request request = new Request.Builder().url(segment.getDataUri().toString()).headers(OkHttpSegmentLoader.toHeaders(segment.getHeaders())).build();
        Response response = this.callFactory.newCall(request).execute();
        if (response.body() == null) {
            throw new IOException("Could not open segment for streaming, got empty body");
        }
        if (response.isSuccessful()) {
            return this.delegatingInputStream(response, response.body().byteStream(), segment);
        }
        throw new IOException(String.format("Could not open segment for streaming, got error '%s' with code %d", response.message(), response.code()));
    }

    @Override
    public void acknowledge(SpooledSegment segment) {
        Request ackRequest = new Request.Builder().get().url(segment.getAckUri().toString()).headers(OkHttpSegmentLoader.toHeaders(segment.getHeaders())).build();
        this.callFactory.newCall(ackRequest).enqueue(new Callback(){

            public void onFailure(Call call, IOException cause) {
                logger.log(Level.WARNING, "Could not acknowledge spooled segment", cause);
            }

            public void onResponse(Call call, Response response) {
                response.close();
            }
        });
    }

    private InputStream delegatingInputStream(final Response response, final InputStream delegate, final SpooledSegment segment) {
        return new FilterInputStream(this, delegate){
            final /* synthetic */ OkHttpSegmentLoader this$0;
            {
                this.this$0 = this$0;
                super(arg0);
            }

            @Override
            public void close() throws IOException {
                try (Response ignored = response;
                     InputStream ignored2 = delegate;){
                    this.this$0.acknowledge(segment);
                }
            }
        };
    }

    private static Headers toHeaders(Map<String, List<String>> headers) {
        Headers.Builder builder = new Headers.Builder();
        headers.forEach((key, values) -> values.forEach(value -> builder.add(key, value)));
        return builder.build();
    }

    @Override
    public void close() {
        if (this.callFactory instanceof OkHttpClient) {
            ((OkHttpClient)this.callFactory).dispatcher().executorService().shutdown();
        }
    }
}

