/*
 * Decompiled with CFR 0.152.
 */
package com.influxdb.client.internal;

import com.influxdb.Arguments;
import com.influxdb.client.InfluxDBClientOptions;
import com.influxdb.client.WriteOptions;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.internal.MeasurementMapper;
import com.influxdb.client.service.WriteService;
import com.influxdb.client.write.Point;
import com.influxdb.client.write.events.AbstractWriteEvent;
import com.influxdb.client.write.events.BackpressureEvent;
import com.influxdb.client.write.events.WriteErrorEvent;
import com.influxdb.client.write.events.WriteRetriableErrorEvent;
import com.influxdb.client.write.events.WriteSuccessEvent;
import com.influxdb.exceptions.InfluxException;
import com.influxdb.internal.AbstractRestClient;
import io.reactivex.Flowable;
import io.reactivex.FlowableSubscriber;
import io.reactivex.FlowableTransformer;
import io.reactivex.Maybe;
import io.reactivex.Notification;
import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.functions.Function;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.subjects.PublishSubject;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import retrofit2.Call;
import retrofit2.HttpException;
import retrofit2.Response;

public abstract class AbstractWriteClient
extends AbstractRestClient
implements AutoCloseable {
    private static final Logger LOG = Logger.getLogger(AbstractWriteClient.class.getName());
    private static final List<Integer> ABLE_TO_RETRY_ERRORS = Arrays.asList(429, 503);
    private static final String CLOSED_EXCEPTION = "WriteApi is closed. Data should be written before calling InfluxDBClient.close or WriteApi.close.";
    private static final int DEFAULT_WAIT = 30000;
    private static final int DEFAULT_SLEEP = 25;
    private final WriteOptions writeOptions;
    protected final InfluxDBClientOptions options;
    private final PublishProcessor<BatchWriteItem> processor;
    private final PublishProcessor<Flowable<BatchWriteItem>> flushPublisher;
    private final PublishSubject<AbstractWriteEvent> eventPublisher;
    protected final MeasurementMapper measurementMapper = new MeasurementMapper();
    private final WriteService service;
    private final Collection<AutoCloseable> autoCloseables;
    private final PublishProcessor<Object> tempBoundary;
    private AtomicBoolean finished = new AtomicBoolean(false);

    public AbstractWriteClient(@Nonnull WriteOptions writeOptions, @Nonnull InfluxDBClientOptions options, @Nonnull Scheduler processorScheduler, @Nonnull WriteService service, @Nonnull Collection<AutoCloseable> autoCloseables) {
        Arguments.checkNotNull((Object)options, (String)"options");
        this.writeOptions = writeOptions;
        this.options = options;
        this.service = service;
        this.autoCloseables = autoCloseables;
        this.flushPublisher = PublishProcessor.create();
        this.eventPublisher = PublishSubject.create();
        this.tempBoundary = PublishProcessor.create();
        this.processor = PublishProcessor.create();
        PublishProcessor tempBoundary = PublishProcessor.create();
        this.processor.onBackpressureBuffer((long)writeOptions.getBufferLimit(), () -> this.publish(new BackpressureEvent()), writeOptions.getBackpressureStrategy()).publish(connectedSource -> connectedSource.window(() -> tempBoundary).mergeWith((Publisher)Flowable.defer(() -> {
            connectedSource.window((long)writeOptions.getFlushInterval(), TimeUnit.MILLISECONDS, processorScheduler, (long)writeOptions.getBatchSize(), true).mergeWith(this.flushPublisher).subscribe((FlowableSubscriber)tempBoundary);
            return Flowable.empty();
        }))).concatMap(it -> it.groupBy(batchWrite -> ((BatchWriteItem)batchWrite).batchWriteOptions)).concatMapSingle(grouped -> grouped.map(it -> {
            try {
                String lineProtocol = ((BatchWriteItem)it).data.toLineProtocol();
                if (lineProtocol == null) {
                    return "";
                }
                return lineProtocol;
            }
            catch (Exception e) {
                this.publish(new WriteErrorEvent(e));
                return "";
            }
        }).filter(it -> it != null && !it.isEmpty()).collect(StringBuilder::new, (sb, x) -> {
            if (sb.length() > 0) {
                sb.append("\n");
            }
            sb.append((String)x);
        }).map(StringBuilder::toString).map(it -> new BatchWriteItem((BatchWriteOptions)grouped.getKey(), new BatchWriteDataRecord((String)it)))).compose(this.jitter(processorScheduler)).concatMapMaybe((Function)new ToWritePointsMaybe(processorScheduler)).doFinally(() -> this.finished.set(true)).subscribe(responseNotification -> {
            if (responseNotification.isOnError()) {
                this.publish(new WriteErrorEvent(this.toInfluxException(responseNotification.getError())));
            }
        }, throwable -> new WriteErrorEvent(this.toInfluxException((Throwable)throwable)));
        autoCloseables.add(this);
    }

    @Nonnull
    protected <T extends AbstractWriteEvent> Observable<T> addEventListener(@Nonnull Class<T> eventType) {
        Objects.requireNonNull(eventType, "EventType is required");
        return this.eventPublisher.ofType(eventType);
    }

    public void flush() {
        this.flushPublisher.offer((Object)Flowable.empty());
    }

    @Override
    public void close() {
        LOG.log(Level.FINE, "Flushing any cached BatchWrites before shutdown.");
        this.autoCloseables.remove(this);
        this.processor.onComplete();
        this.flushPublisher.onComplete();
        this.tempBoundary.onComplete();
        this.eventPublisher.onComplete();
        AbstractWriteClient.waitToCondition(() -> this.finished.get(), 30000);
    }

    public void write(@Nonnull String bucket, @Nonnull String organization, @Nonnull Flowable<BatchWriteDataPoint> stream) {
        if (this.processor.hasComplete()) {
            throw new InfluxException(CLOSED_EXCEPTION);
        }
        stream.subscribe(dataPoint -> this.write(bucket, organization, ((BatchWriteDataPoint)dataPoint).point.getPrecision(), (Publisher<BatchWriteData>)Flowable.just((Object)dataPoint)), throwable -> this.publish(new WriteErrorEvent((Throwable)throwable)));
    }

    public void write(@Nonnull String bucket, @Nonnull String organization, @Nonnull WritePrecision precision, @Nonnull Publisher<BatchWriteData> stream) {
        Arguments.checkNonEmpty((String)bucket, (String)"bucket");
        Arguments.checkNonEmpty((String)organization, (String)"organization");
        Arguments.checkNotNull(stream, (String)"data to write");
        BatchWriteOptions batchWriteOptions = new BatchWriteOptions(bucket, organization, precision);
        if (this.processor.hasComplete()) {
            throw new InfluxException(CLOSED_EXCEPTION);
        }
        Flowable.fromPublisher(stream).map(it -> new BatchWriteItem(batchWriteOptions, (BatchWriteData)it)).subscribe(arg_0 -> this.processor.onNext(arg_0), throwable -> this.publish(new WriteErrorEvent((Throwable)throwable)));
    }

    @Nonnull
    private FlowableTransformer<BatchWriteItem, BatchWriteItem> jitter(@Nonnull Scheduler scheduler) {
        Arguments.checkNotNull((Object)scheduler, (String)"Jitter scheduler is required");
        return source -> {
            if (this.writeOptions.getJitterInterval() <= 0) {
                return source;
            }
            return source.delay(pointFlowable -> {
                int delay = this.jitterDelay();
                LOG.log(Level.FINEST, "Generated Jitter dynamic delay: {0}", delay);
                return Flowable.timer((long)delay, (TimeUnit)TimeUnit.MILLISECONDS, (Scheduler)scheduler);
            });
        };
    }

    private int jitterDelay() {
        return (int)(Math.random() * (double)this.writeOptions.getJitterInterval());
    }

    private <T extends AbstractWriteEvent> void publish(@Nonnull T event) {
        Arguments.checkNotNull(event, (String)"event");
        event.logEvent();
        this.eventPublisher.onNext(event);
    }

    private Function<Flowable<Throwable>, Publisher<?>> retryHandler(@Nonnull Scheduler retryScheduler, @Nonnull WriteOptions writeOptions) {
        Objects.requireNonNull(writeOptions, "WriteOptions are required");
        Objects.requireNonNull(retryScheduler, "RetryScheduler is required");
        return errors -> errors.flatMap(throwable -> {
            if (throwable instanceof HttpException) {
                long retryInterval;
                HttpException ie = (HttpException)throwable;
                if (!ABLE_TO_RETRY_ERRORS.contains(ie.code())) {
                    return Flowable.error((Throwable)throwable);
                }
                String retryAfter = ((HttpException)throwable).response().headers().get("Retry-After");
                if (retryAfter != null) {
                    retryInterval = TimeUnit.MILLISECONDS.convert(Integer.parseInt(retryAfter), TimeUnit.SECONDS);
                } else {
                    retryInterval = writeOptions.getRetryInterval();
                    String msg = "The InfluxDB does not specify \"Retry-After\". Use the default retryInterval: {0}";
                    LOG.log(Level.FINEST, msg, retryInterval);
                }
                this.publish(new WriteRetriableErrorEvent((Throwable)throwable, retryInterval += (long)this.jitterDelay()));
                return Flowable.just((Object)"notify").delay(retryInterval, TimeUnit.MILLISECONDS, retryScheduler);
            }
            return Flowable.error((Throwable)throwable);
        });
    }

    @Nonnull
    private InfluxException toInfluxException(@Nonnull Throwable throwable) {
        if (throwable instanceof InfluxException) {
            return (InfluxException)throwable;
        }
        if (throwable instanceof HttpException) {
            return this.responseToError(((HttpException)throwable).response());
        }
        return new InfluxException(throwable);
    }

    static void waitToCondition(Supplier<Boolean> condition, int millis) {
        long start = System.currentTimeMillis();
        while (!condition.get().booleanValue()) {
            try {
                Thread.sleep(25L);
            }
            catch (InterruptedException e) {
                LOG.log(Level.SEVERE, "Interrupted during wait to dispose.", e);
            }
            if (System.currentTimeMillis() - start <= (long)millis) continue;
            LOG.severe("The WriteApi can't be gracefully dispose! - " + millis + "ms elapsed.");
            break;
        }
    }

    private final class ToWritePointsMaybe
    implements Function<BatchWriteItem, Maybe<Notification<Response>>> {
        private final Scheduler retryScheduler;

        private ToWritePointsMaybe(Scheduler retryScheduler) {
            this.retryScheduler = retryScheduler;
        }

        public Maybe<Notification<Response>> apply(BatchWriteItem batchWrite) {
            String content = batchWrite.data.toLineProtocol();
            if (content == null || content.isEmpty()) {
                return Maybe.empty();
            }
            String organization = batchWrite.batchWriteOptions.organization;
            String bucket = batchWrite.batchWriteOptions.bucket;
            WritePrecision precision = batchWrite.batchWriteOptions.precision;
            Maybe requestSource = Maybe.fromCallable(() -> AbstractWriteClient.this.service.postWrite(organization, bucket, content, null, "identity", "text/plain; charset=utf-8", null, "application/json", null, precision)).map(Call::execute);
            return requestSource.map(response -> {
                if (!response.isSuccessful()) {
                    throw new HttpException(response);
                }
                return response;
            }).retryWhen(AbstractWriteClient.this.retryHandler(this.retryScheduler, AbstractWriteClient.this.writeOptions)).map(response -> {
                if (response.isSuccessful()) {
                    return Notification.createOnNext((Object)response);
                }
                return Notification.createOnError((Throwable)new HttpException(response));
            }).doOnSuccess(responseNotification -> {
                if (!responseNotification.isOnError()) {
                    AbstractWriteClient.this.publish(this.toSuccessEvent(batchWrite, content));
                }
            }).onErrorResumeNext(throwable -> Maybe.just((Object)Notification.createOnError((Throwable)throwable)));
        }

        @Nonnull
        private WriteSuccessEvent toSuccessEvent(@Nonnull BatchWriteItem batchWrite, String lineProtocol) {
            return new WriteSuccessEvent(batchWrite.batchWriteOptions.organization, batchWrite.batchWriteOptions.bucket, batchWrite.batchWriteOptions.precision, lineProtocol);
        }
    }

    private final class BatchWriteOptions {
        private String bucket;
        private String organization;
        private WritePrecision precision;

        private BatchWriteOptions(@Nonnull String bucket, @Nonnull String organization, WritePrecision precision) {
            Arguments.checkNonEmpty((String)bucket, (String)"bucket");
            Arguments.checkNonEmpty((String)organization, (String)"organization");
            Arguments.checkNotNull((Object)((Object)precision), (String)"TimeUnit.precision is required");
            this.bucket = bucket;
            this.organization = organization;
            this.precision = precision;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (!(o instanceof BatchWriteOptions)) {
                return false;
            }
            BatchWriteOptions that = (BatchWriteOptions)o;
            return Objects.equals(this.bucket, that.bucket) && Objects.equals(this.organization, that.organization) && this.precision == that.precision;
        }

        public int hashCode() {
            return Objects.hash(new Object[]{this.bucket, this.organization, this.precision});
        }
    }

    final class BatchWriteItem {
        private BatchWriteOptions batchWriteOptions;
        private BatchWriteData data;

        private BatchWriteItem(@Nonnull BatchWriteOptions batchWriteOptions, BatchWriteData data) {
            Arguments.checkNotNull((Object)batchWriteOptions, (String)"data");
            Arguments.checkNotNull((Object)data, (String)"write options");
            this.batchWriteOptions = batchWriteOptions;
            this.data = data;
        }
    }

    public static final class BatchWriteDataMeasurement
    implements BatchWriteData {
        private final Object measurement;
        private final WritePrecision precision;
        private final InfluxDBClientOptions options;
        private final MeasurementMapper measurementMapper;

        public BatchWriteDataMeasurement(@Nullable Object measurement, @Nonnull WritePrecision precision, @Nonnull InfluxDBClientOptions options, @Nonnull MeasurementMapper measurementMapper) {
            this.measurement = measurement;
            this.precision = precision;
            this.options = options;
            this.measurementMapper = measurementMapper;
        }

        @Override
        @Nullable
        public String toLineProtocol() {
            if (this.measurement == null) {
                return null;
            }
            Point point = this.measurementMapper.toPoint(this.measurement, this.precision);
            if (!point.hasFields()) {
                LOG.warning("The measurement: " + this.measurement + "doesn't contains any fields, skipping");
                return null;
            }
            return point.toLineProtocol(this.options.getPointSettings());
        }
    }

    public static final class BatchWriteDataPoint
    implements BatchWriteData {
        private static final Logger LOG = Logger.getLogger(BatchWriteDataPoint.class.getName());
        private final Point point;
        private final InfluxDBClientOptions options;

        public BatchWriteDataPoint(@Nonnull Point point, @Nonnull InfluxDBClientOptions options) {
            this.point = point;
            this.options = options;
        }

        @Override
        @Nullable
        public String toLineProtocol() {
            if (!this.point.hasFields()) {
                LOG.warning("The point: " + this.point + "doesn't contains any fields, skipping");
                return null;
            }
            return this.point.toLineProtocol(this.options.getPointSettings());
        }
    }

    public static final class BatchWriteDataRecord
    implements BatchWriteData {
        private final String record;

        public BatchWriteDataRecord(@Nullable String record) {
            this.record = record;
        }

        @Override
        @Nullable
        public String toLineProtocol() {
            return this.record;
        }
    }

    public static interface BatchWriteData {
        @Nullable
        public String toLineProtocol();
    }
}

