/*
 * Decompiled with CFR 0.152.
 */
package net.snowflake.ingest.streaming.internal;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Slf4jReporter;
import com.codahale.metrics.Timer;
import com.codahale.metrics.jmx.JmxReporter;
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.security.PrivateKey;
import java.security.spec.InvalidKeySpecException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import net.snowflake.client.core.SFSessionProperty;
import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient;
import net.snowflake.ingest.connection.IngestResponseException;
import net.snowflake.ingest.connection.OAuthCredential;
import net.snowflake.ingest.connection.RequestBuilder;
import net.snowflake.ingest.connection.ServiceResponseHandler;
import net.snowflake.ingest.connection.TelemetryService;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import net.snowflake.ingest.streaming.internal.BlobMetadata;
import net.snowflake.ingest.streaming.internal.ChannelCache;
import net.snowflake.ingest.streaming.internal.ChannelsStatusRequest;
import net.snowflake.ingest.streaming.internal.ChannelsStatusResponse;
import net.snowflake.ingest.streaming.internal.ChunkMetadata;
import net.snowflake.ingest.streaming.internal.ChunkRegisterStatus;
import net.snowflake.ingest.streaming.internal.FlushService;
import net.snowflake.ingest.streaming.internal.OpenChannelResponse;
import net.snowflake.ingest.streaming.internal.RegisterBlobResponse;
import net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestChannelFactory;
import net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestChannelInternal;
import net.snowflake.ingest.streaming.internal.StreamingIngestUtils;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.HttpUtil;
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.ParameterProvider;
import net.snowflake.ingest.utils.SFException;
import net.snowflake.ingest.utils.SnowflakeURL;
import net.snowflake.ingest.utils.Utils;

public class SnowflakeStreamingIngestClientInternal<T>
implements SnowflakeStreamingIngestClient {
    private static final Logging logger = new Logging(SnowflakeStreamingIngestClientInternal.class);
    private static final ObjectMapper objectMapper = new ObjectMapper();
    private final AtomicLong counter = new AtomicLong(0L);
    private final ParameterProvider parameterProvider;
    private final String name;
    private String role;
    private final CloseableHttpClient httpClient;
    private final ChannelCache<T> channelCache;
    private final FlushService<T> flushService;
    private volatile boolean isClosed;
    private final boolean isTestMode;
    MetricRegistry metrics;
    Histogram blobSizeHistogram;
    Histogram blobRowCountHistogram;
    Histogram cpuHistogram;
    Timer flushLatency;
    Timer buildLatency;
    Timer uploadLatency;
    Timer registerLatency;
    Meter uploadThroughput;
    Meter inputThroughput;
    MetricRegistry jvmMemoryAndThreadMetrics;
    private RequestBuilder requestBuilder;
    private ScheduledExecutorService telemetryWorker;

    SnowflakeStreamingIngestClientInternal(String name, SnowflakeURL accountURL, Properties prop, CloseableHttpClient httpClient, boolean isTestMode, RequestBuilder requestBuilder, Map<String, Object> parameterOverrides) {
        this.parameterProvider = new ParameterProvider(parameterOverrides, prop);
        this.name = name;
        String accountName = accountURL == null ? null : accountURL.getAccount();
        this.isTestMode = isTestMode;
        this.httpClient = httpClient == null ? HttpUtil.getHttpClient(accountName) : httpClient;
        this.channelCache = new ChannelCache();
        this.isClosed = false;
        this.requestBuilder = requestBuilder;
        if (!isTestMode) {
            this.role = prop.getProperty("role");
            Object credential = null;
            if (prop.getProperty("authorization_type").equals("JWT")) {
                try {
                    credential = Utils.createKeyPairFromPrivateKey((PrivateKey)prop.get(SFSessionProperty.PRIVATE_KEY.getPropertyKey()));
                }
                catch (NoSuchAlgorithmException | InvalidKeySpecException e) {
                    throw new SFException(e, ErrorCode.KEYPAIR_CREATION_FAILURE, new Object[0]);
                }
            } else {
                credential = new OAuthCredential(prop.getProperty("oauth_client_id"), prop.getProperty("oauth_client_secret"), prop.getProperty("oauth_refresh_token"));
            }
            this.requestBuilder = new RequestBuilder(accountURL, prop.get("user").toString(), credential, this.httpClient, String.format("%s_%s", this.name, System.currentTimeMillis()));
            logger.logInfo("Using {} for authorization", this.requestBuilder.getAuthType());
            this.setupMetricsForClient();
        }
        try {
            this.flushService = new FlushService<T>(this, this.channelCache, this.isTestMode);
        }
        catch (Exception e) {
            this.cleanUpResources();
            throw e;
        }
        logger.logInfo("Client created, name={}, account={}. isTestMode={}, parameters={}", name, accountURL == null ? "" : accountURL.getAccount(), isTestMode, this.parameterProvider);
    }

    public SnowflakeStreamingIngestClientInternal(String name, SnowflakeURL accountURL, Properties prop, Map<String, Object> parameterOverrides) {
        this(name, accountURL, prop, null, false, null, parameterOverrides);
    }

    SnowflakeStreamingIngestClientInternal(String name) {
        this(name, null, null, null, true, null, new HashMap<String, Object>());
    }

    @VisibleForTesting
    public void injectRequestBuilder(RequestBuilder requestBuilder) {
        this.requestBuilder = requestBuilder;
    }

    @Override
    public String getName() {
        return this.name;
    }

    String getRole() {
        return this.role;
    }

    @Override
    public boolean isClosed() {
        return this.isClosed;
    }

    @Override
    public SnowflakeStreamingIngestChannelInternal<?> openChannel(OpenChannelRequest request) {
        if (this.isClosed) {
            throw new SFException(ErrorCode.CLOSED_CLIENT, new Object[0]);
        }
        logger.logDebug("Open channel request start, channel={}, table={}, client={}", request.getChannelName(), request.getFullyQualifiedTableName(), this.getName());
        try {
            HashMap<Object, Object> payload = new HashMap<Object, Object>();
            payload.put("request_id", this.flushService.getClientPrefix() + "_" + this.counter.getAndIncrement());
            payload.put("channel", request.getChannelName());
            payload.put("table", request.getTableName());
            payload.put("database", request.getDBName());
            payload.put("schema", request.getSchemaName());
            payload.put("write_mode", Constants.WriteMode.CLOUD_STORAGE.name());
            payload.put("role", this.role);
            OpenChannelResponse response = StreamingIngestUtils.executeWithRetries(OpenChannelResponse.class, "/v1/streaming/channels/open/", payload, "open channel", ServiceResponseHandler.ApiName.STREAMING_OPEN_CHANNEL, this.httpClient, this.requestBuilder);
            if (response.getStatusCode() != 0L) {
                logger.logDebug("Open channel request failed, channel={}, table={}, client={}, message={}", request.getChannelName(), request.getFullyQualifiedTableName(), this.getName(), response.getMessage());
                throw new SFException(ErrorCode.OPEN_CHANNEL_FAILURE, response.getMessage());
            }
            logger.logInfo("Open channel request succeeded, channel={}, table={}, clientSequencer={}, rowSequencer={}, client={}", request.getChannelName(), request.getFullyQualifiedTableName(), response.getClientSequencer(), response.getRowSequencer(), this.getName());
            SnowflakeStreamingIngestChannelInternal channel = SnowflakeStreamingIngestChannelFactory.builder(response.getChannelName()).setDBName(response.getDBName()).setSchemaName(response.getSchemaName()).setTableName(response.getTableName()).setOffsetToken(response.getOffsetToken()).setRowSequencer(response.getRowSequencer()).setChannelSequencer(response.getClientSequencer()).setOwningClient(this).setEncryptionKey(response.getEncryptionKey()).setEncryptionKeyId(response.getEncryptionKeyId()).setOnErrorOption(request.getOnErrorOption()).setDefaultTimezone(request.getDefaultTimezone()).build();
            channel.setupSchema(response.getTableColumns());
            this.channelCache.addChannel(channel);
            return channel;
        }
        catch (IOException | IngestResponseException e) {
            throw new SFException(e, ErrorCode.OPEN_CHANNEL_FAILURE, e.getMessage());
        }
    }

    @Override
    public Map<String, String> getLatestCommittedOffsetTokens(List<SnowflakeStreamingIngestChannel> channels) {
        List<SnowflakeStreamingIngestChannelInternal<?>> internalChannels = channels.stream().map(c -> (SnowflakeStreamingIngestChannelInternal)c).collect(Collectors.toList());
        List<ChannelsStatusResponse.ChannelStatusResponseDTO> channelsStatus = this.getChannelsStatus(internalChannels).getChannels();
        HashMap<String, String> result = new HashMap<String, String>();
        for (int idx = 0; idx < channels.size(); ++idx) {
            result.put(channels.get(idx).getFullyQualifiedName(), channelsStatus.get(idx).getPersistedOffsetToken());
        }
        return result;
    }

    ChannelsStatusResponse getChannelsStatus(List<SnowflakeStreamingIngestChannelInternal<?>> channels) {
        try {
            ChannelsStatusRequest request = new ChannelsStatusRequest();
            List<ChannelsStatusRequest.ChannelStatusRequestDTO> requestDTOs = channels.stream().map(ChannelsStatusRequest.ChannelStatusRequestDTO::new).collect(Collectors.toList());
            request.setChannels(requestDTOs);
            request.setRole(this.role);
            request.setRequestId(this.flushService.getClientPrefix() + "_" + this.counter.getAndIncrement());
            String payload = objectMapper.writeValueAsString((Object)request);
            ChannelsStatusResponse response = StreamingIngestUtils.executeWithRetries(ChannelsStatusResponse.class, "/v1/streaming/channels/status/", payload, "channel status", ServiceResponseHandler.ApiName.STREAMING_CHANNEL_STATUS, this.httpClient, this.requestBuilder);
            if (response.getStatusCode() != 0L) {
                throw new SFException(ErrorCode.CHANNEL_STATUS_FAILURE, response.getMessage());
            }
            for (int idx = 0; idx < channels.size(); ++idx) {
                SnowflakeStreamingIngestChannelInternal<?> channel = channels.get(idx);
                ChannelsStatusResponse.ChannelStatusResponseDTO channelStatus = response.getChannels().get(idx);
                if (channelStatus.getStatusCode() == 0L) continue;
                String errorMessage = String.format("Channel has failure status_code, name=%s, channel_sequencer=%d, status_code=%d", channel.getFullyQualifiedName(), channel.getChannelSequencer(), channelStatus.getStatusCode());
                logger.logWarn(errorMessage);
                if (this.getTelemetryService() == null) continue;
                this.getTelemetryService().reportClientFailure(this.getClass().getSimpleName(), errorMessage);
            }
            return response;
        }
        catch (IOException | IngestResponseException e) {
            throw new SFException(e, ErrorCode.CHANNEL_STATUS_FAILURE, e.getMessage());
        }
    }

    void registerBlobs(List<BlobMetadata> blobs) {
        for (List<BlobMetadata> blobBatch : this.partitionBlobListForRegistrationRequest(blobs)) {
            this.registerBlobs(blobBatch, 0);
        }
    }

    List<List<BlobMetadata>> partitionBlobListForRegistrationRequest(List<BlobMetadata> blobs) {
        ArrayList<List<BlobMetadata>> result = new ArrayList<List<BlobMetadata>>();
        ArrayList<BlobMetadata> currentBatch = new ArrayList<BlobMetadata>();
        int chunksInCurrentBatch = 0;
        int maxChunksInBlobAndRegistrationRequest = this.parameterProvider.getMaxChunksInBlobAndRegistrationRequest();
        for (BlobMetadata blob : blobs) {
            if (blob.getChunks().size() > maxChunksInBlobAndRegistrationRequest) {
                throw new SFException(ErrorCode.INTERNAL_ERROR, String.format("Incorrectly generated blob detected - number of chunks in the blob is larger than the max allowed number of chunks. Please report this bug to Snowflake. bdec=%s chunkCount=%d maxAllowedChunkCount=%d", blob.getPath(), blob.getChunks().size(), maxChunksInBlobAndRegistrationRequest));
            }
            if (chunksInCurrentBatch + blob.getChunks().size() > maxChunksInBlobAndRegistrationRequest) {
                result.add(currentBatch);
                currentBatch = new ArrayList();
                currentBatch.add(blob);
                chunksInCurrentBatch = blob.getChunks().size();
                continue;
            }
            currentBatch.add(blob);
            chunksInCurrentBatch += blob.getChunks().size();
        }
        if (!currentBatch.isEmpty()) {
            result.add(currentBatch);
        }
        return result;
    }

    void registerBlobs(List<BlobMetadata> blobs, int executionCount) {
        logger.logInfo("Register blob request preparing for blob={}, client={}, executionCount={}", blobs.stream().map(BlobMetadata::getPath).collect(Collectors.toList()), this.name, executionCount);
        RegisterBlobResponse response = null;
        try {
            HashMap<Object, Object> payload = new HashMap<Object, Object>();
            payload.put("request_id", this.flushService.getClientPrefix() + "_" + this.counter.getAndIncrement());
            payload.put("blobs", blobs);
            payload.put("role", this.role);
            response = StreamingIngestUtils.executeWithRetries(RegisterBlobResponse.class, "/v1/streaming/channels/write/blobs/", payload, "register blob", ServiceResponseHandler.ApiName.STREAMING_REGISTER_BLOB, this.httpClient, this.requestBuilder);
            if (response.getStatusCode() != 0L) {
                logger.logDebug("Register blob request failed for blob={}, client={}, message={}, executionCount={}", blobs.stream().map(BlobMetadata::getPath).collect(Collectors.toList()), this.name, response.getMessage(), executionCount);
                throw new SFException(ErrorCode.REGISTER_BLOB_FAILURE, response.getMessage());
            }
        }
        catch (IOException | IngestResponseException e) {
            throw new SFException(e, ErrorCode.REGISTER_BLOB_FAILURE, e.getMessage());
        }
        logger.logInfo("Register blob request returned for blob={}, client={}, executionCount={}", blobs.stream().map(BlobMetadata::getPath).collect(Collectors.toList()), this.name, executionCount);
        HashSet<ChunkRegisterStatus> queueFullChunks = new HashSet<ChunkRegisterStatus>();
        response.getBlobsStatus().forEach(blobStatus -> blobStatus.getChunksStatus().forEach(chunkStatus -> chunkStatus.getChannelsStatus().forEach(channelStatus -> {
            if (channelStatus.getStatusCode() != 0L) {
                if ((channelStatus.getStatusCode() == 7L || channelStatus.getStatusCode() == 10L) && executionCount < 3) {
                    queueFullChunks.add((ChunkRegisterStatus)chunkStatus);
                } else {
                    String errorMessage = String.format("Channel has been invalidated because of failure response, name=%s, channel_sequencer=%d, status_code=%d,  message=%s, executionCount=%d", channelStatus.getChannelName(), channelStatus.getChannelSequencer(), channelStatus.getStatusCode(), channelStatus.getMessage(), executionCount);
                    logger.logWarn(errorMessage);
                    if (this.getTelemetryService() != null) {
                        this.getTelemetryService().reportClientFailure(this.getClass().getSimpleName(), errorMessage);
                    }
                    this.channelCache.invalidateChannelIfSequencersMatch(chunkStatus.getDBName(), chunkStatus.getSchemaName(), chunkStatus.getTableName(), channelStatus.getChannelName(), channelStatus.getChannelSequencer());
                }
            }
        })));
        if (!queueFullChunks.isEmpty()) {
            logger.logInfo("Retrying registerBlobs request, blobs={}, retried_chunks={}, executionCount={}", blobs, queueFullChunks, executionCount);
            List<BlobMetadata> retryBlobs = this.getRetryBlobs(queueFullChunks, blobs);
            if (retryBlobs.isEmpty()) {
                throw new SFException(ErrorCode.INTERNAL_ERROR, "Failed to retry queue full chunks");
            }
            StreamingIngestUtils.sleepForRetry(executionCount);
            this.registerBlobs(retryBlobs, executionCount + 1);
        }
    }

    List<BlobMetadata> getRetryBlobs(Set<ChunkRegisterStatus> queueFullChunks, List<BlobMetadata> blobs) {
        Set queueFullKeys = queueFullChunks.stream().flatMap(chunkRegisterStatus -> chunkRegisterStatus.getChannelsStatus().stream().map(channelStatus -> new Pair<String, Long>(channelStatus.getChannelName(), channelStatus.getChannelSequencer()))).collect(Collectors.toSet());
        ArrayList<BlobMetadata> retryBlobs = new ArrayList<BlobMetadata>();
        blobs.forEach(blobMetadata -> {
            List<ChunkMetadata> relevantChunks = blobMetadata.getChunks().stream().filter(chunkMetadata -> chunkMetadata.getChannels().stream().map(channelMetadata -> new Pair<String, Long>(channelMetadata.getChannelName(), channelMetadata.getClientSequencer())).anyMatch(queueFullKeys::contains)).collect(Collectors.toList());
            if (!relevantChunks.isEmpty()) {
                retryBlobs.add(BlobMetadata.createBlobMetadata(blobMetadata.getPath(), blobMetadata.getMD5(), blobMetadata.getVersion(), relevantChunks, blobMetadata.getBlobStats()));
            }
        });
        return retryBlobs;
    }

    @Override
    public void close() throws Exception {
        if (this.isClosed) {
            return;
        }
        this.isClosed = true;
        this.channelCache.closeAllChannels();
        try {
            this.flush(true).get();
            this.reportStreamingIngestTelemetryToSF();
            if (this.metrics != null) {
                Slf4jReporter.forRegistry((MetricRegistry)this.metrics).outputTo(logger.getLogger()).build().report();
                this.removeMetricsFromRegistry();
            }
            if (this.jvmMemoryAndThreadMetrics != null) {
                Slf4jReporter.forRegistry((MetricRegistry)this.jvmMemoryAndThreadMetrics).outputTo(logger.getLogger()).build().report();
            }
        }
        catch (InterruptedException | ExecutionException e) {
            throw new SFException(e, ErrorCode.RESOURCE_CLEANUP_FAILURE, "client close");
        }
        finally {
            this.flushService.shutdown();
            this.cleanUpResources();
        }
    }

    CompletableFuture<Void> flush(boolean closing) {
        if (this.isClosed && !closing) {
            throw new SFException(ErrorCode.CLOSED_CLIENT, new Object[0]);
        }
        return this.flushService.flush(true);
    }

    void setNeedFlush() {
        this.flushService.setNeedFlush();
    }

    void removeChannelIfSequencersMatch(SnowflakeStreamingIngestChannelInternal<T> channel) {
        this.channelCache.removeChannelIfSequencersMatch(channel);
    }

    boolean isTestMode() {
        return this.isTestMode;
    }

    CloseableHttpClient getHttpClient() {
        return this.httpClient;
    }

    RequestBuilder getRequestBuilder() {
        return this.requestBuilder;
    }

    ChannelCache<T> getChannelCache() {
        return this.channelCache;
    }

    FlushService<T> getFlushService() {
        return this.flushService;
    }

    List<SnowflakeStreamingIngestChannelInternal<?>> verifyChannelsAreFullyCommitted(List<SnowflakeStreamingIngestChannelInternal<?>> channels) {
        if (channels.isEmpty()) {
            return channels;
        }
        int retry = 0;
        boolean isTimeout = true;
        ArrayList oldChannelsStatus = new ArrayList();
        ArrayList channelsWithError = new ArrayList();
        do {
            boolean isMakingProgress;
            List<ChannelsStatusResponse.ChannelStatusResponseDTO> channelsStatus = this.getChannelsStatus(channels).getChannels();
            ArrayList tempChannels = new ArrayList();
            ArrayList<ChannelsStatusResponse.ChannelStatusResponseDTO> tempChannelsStatus = new ArrayList<ChannelsStatusResponse.ChannelStatusResponseDTO>();
            for (int idx = 0; idx < channelsStatus.size(); ++idx) {
                ChannelsStatusResponse.ChannelStatusResponseDTO channelStatus = channelsStatus.get(idx);
                SnowflakeStreamingIngestChannelInternal<?> channel = channels.get(idx);
                long rowSequencer = channel.getChannelState().getRowSequencer();
                logger.logInfo("Get channel status name={}, status={}, clientSequencer={}, rowSequencer={}, offsetToken={}, persistedRowSequencer={}, persistedOffsetToken={}", channel.getName(), channelStatus.getStatusCode(), channel.getChannelSequencer(), rowSequencer, channel.getChannelState().getOffsetToken(), channelStatus.getPersistedRowSequencer(), channelStatus.getPersistedOffsetToken());
                if (channelStatus.getStatusCode() != 0L) {
                    channelsWithError.add(channel);
                    continue;
                }
                if (channelStatus.getPersistedRowSequencer().equals(rowSequencer)) continue;
                tempChannels.add(channel);
                tempChannelsStatus.add(channelStatus);
            }
            boolean bl = isMakingProgress = tempChannels.size() != channels.size();
            if (!isMakingProgress) {
                for (int idx = 0; idx < channelsStatus.size(); ++idx) {
                    if (!oldChannelsStatus.isEmpty() && channelsStatus.get(idx).getPersistedRowSequencer().equals(((ChannelsStatusResponse.ChannelStatusResponseDTO)oldChannelsStatus.get(idx)).getPersistedRowSequencer())) continue;
                    isMakingProgress = true;
                    break;
                }
            }
            oldChannelsStatus = tempChannelsStatus;
            channels = tempChannels;
            if (channels.isEmpty()) {
                isTimeout = false;
                break;
            }
            if (!isMakingProgress) {
                ++retry;
            }
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                throw new SFException(ErrorCode.INTERNAL_ERROR, e.getMessage());
            }
        } while (retry < 60);
        if (isTimeout) {
            logger.logWarn("Commit service at server side is not making progress, stop retrying for client={}.", this.name);
        }
        channels.addAll(channelsWithError);
        return channels;
    }

    ParameterProvider getParameterProvider() {
        return this.parameterProvider;
    }

    @Override
    public void setRefreshToken(String refreshToken) {
        if (this.requestBuilder != null) {
            this.requestBuilder.setRefreshToken(refreshToken);
        }
    }

    private void setupMetricsForClient() {
        this.telemetryWorker = Executors.newSingleThreadScheduledExecutor();
        this.telemetryWorker.scheduleWithFixedDelay(this::reportStreamingIngestTelemetryToSF, 10L, 10L, TimeUnit.SECONDS);
        this.metrics = new MetricRegistry();
        this.cpuHistogram = this.metrics.histogram(MetricRegistry.name((String)"cpu", (String[])new String[]{"usage", "histogram"}));
        this.flushLatency = this.metrics.timer(MetricRegistry.name((String)"latency", (String[])new String[]{"flush"}));
        this.buildLatency = this.metrics.timer(MetricRegistry.name((String)"latency", (String[])new String[]{"build"}));
        this.uploadLatency = this.metrics.timer(MetricRegistry.name((String)"latency", (String[])new String[]{"upload"}));
        this.registerLatency = this.metrics.timer(MetricRegistry.name((String)"latency", (String[])new String[]{"register"}));
        this.uploadThroughput = this.metrics.meter(MetricRegistry.name((String)"throughput", (String[])new String[]{"upload"}));
        this.inputThroughput = this.metrics.meter(MetricRegistry.name((String)"throughput", (String[])new String[]{"input"}));
        this.blobSizeHistogram = this.metrics.histogram(MetricRegistry.name((String)"blob", (String[])new String[]{"size", "histogram"}));
        this.blobRowCountHistogram = this.metrics.histogram(MetricRegistry.name((String)"blob", (String[])new String[]{"row", "count", "histogram"}));
        if (this.parameterProvider.hasEnabledSnowpipeStreamingMetrics()) {
            JmxReporter jmxReporter = JmxReporter.forRegistry((MetricRegistry)this.metrics).inDomain("snowflake.ingest.sdk").convertDurationsTo(TimeUnit.SECONDS).createsObjectNamesWith((ignoreMeterType, jmxDomain, metricName) -> SnowflakeStreamingIngestClientInternal.getObjectName(this.getName(), jmxDomain, metricName)).build();
            jmxReporter.start();
            this.jvmMemoryAndThreadMetrics = new MetricRegistry();
            this.jvmMemoryAndThreadMetrics.register(MetricRegistry.name((String)"jvm", (String[])new String[]{"memory"}), (Metric)new MemoryUsageGaugeSet());
            this.jvmMemoryAndThreadMetrics.register(MetricRegistry.name((String)"jvm", (String[])new String[]{"threads"}), (Metric)new ThreadStatesGaugeSet());
            SharedMetricRegistries.add((String)"SnowpipeStreamingJvmMemoryAndThreadMetrics", (MetricRegistry)this.jvmMemoryAndThreadMetrics);
        }
        if (this.metrics.getMetrics().size() != 0) {
            SharedMetricRegistries.add((String)"SnowpipeStreamingMetrics", (MetricRegistry)this.metrics);
        }
    }

    private static ObjectName getObjectName(String clientName, String jmxDomain, String metricName) {
        try {
            String sb = jmxDomain + ":clientName=" + clientName + ",name=" + metricName;
            return new ObjectName(sb);
        }
        catch (MalformedObjectNameException e) {
            logger.logWarn("Could not create Object name for MetricName={}", metricName);
            throw new SFException(ErrorCode.INTERNAL_ERROR, "Invalid metric name");
        }
    }

    private void removeMetricsFromRegistry() {
        if (this.metrics.getMetrics().size() != 0) {
            logger.logDebug("Unregistering all metrics for client={}", this.getName());
            this.metrics.removeMatching(MetricFilter.startsWith((String)"snowflake.ingest.sdk"));
            SharedMetricRegistries.remove((String)"SnowpipeStreamingMetrics");
        }
    }

    TelemetryService getTelemetryService() {
        return this.requestBuilder == null ? null : this.requestBuilder.getTelemetryService();
    }

    private void reportStreamingIngestTelemetryToSF() {
        TelemetryService telemetryService = this.getTelemetryService();
        if (telemetryService != null) {
            telemetryService.reportLatencyInSec(this.buildLatency, this.uploadLatency, this.registerLatency, this.flushLatency);
            telemetryService.reportThroughputBytesPerSecond(this.inputThroughput, this.uploadThroughput);
            telemetryService.reportCpuMemoryUsage(this.cpuHistogram);
        }
    }

    private void cleanUpResources() {
        if (this.telemetryWorker != null) {
            this.telemetryWorker.shutdown();
        }
        if (this.requestBuilder != null) {
            this.requestBuilder.closeResources();
        }
        if (!this.isTestMode) {
            HttpUtil.shutdownHttpConnectionManagerDaemonThread();
        }
    }
}

