/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.producer;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.UniformStickyPartitioner;
import org.apache.kafka.clients.producer.internals.BufferPool;
import org.apache.kafka.clients.producer.internals.BuiltInPartitioner;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.clients.producer.internals.KafkaProducerMetrics;
import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
import org.apache.kafka.clients.producer.internals.ProducerMetrics;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.clients.producer.internals.TransactionManager;
import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;

public class KafkaProducer<K, V>
implements Producer<K, V> {
    private final Logger log;
    private static final String JMX_PREFIX = "kafka.producer";
    public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread";
    public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics";
    private final String clientId;
    final Metrics metrics;
    private final KafkaProducerMetrics producerMetrics;
    private final Partitioner partitioner;
    private final int maxRequestSize;
    private final long totalMemorySize;
    private final ProducerMetadata metadata;
    private final RecordAccumulator accumulator;
    private final Sender sender;
    private final Thread ioThread;
    private final CompressionType compressionType;
    private final Sensor errors;
    private final Time time;
    private final Serializer<K> keySerializer;
    private final Serializer<V> valueSerializer;
    private final ProducerConfig producerConfig;
    private final long maxBlockTimeMs;
    private final boolean partitionerIgnoreKeys;
    private final ProducerInterceptors<K, V> interceptors;
    private final ApiVersions apiVersions;
    private final TransactionManager transactionManager;

    public KafkaProducer(Map<String, Object> configs) {
        this(configs, null, null);
    }

    public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        this(new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, keySerializer, valueSerializer)), keySerializer, valueSerializer, null, null, null, Time.SYSTEM);
    }

    public KafkaProducer(Properties properties) {
        this(properties, (Serializer<K>)null, (Serializer<V>)null);
    }

    public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        this(Utils.propsToMap(properties), keySerializer, valueSerializer);
    }

    private void warnIfPartitionerDeprecated() {
        if (this.partitioner instanceof DefaultPartitioner) {
            this.log.warn("DefaultPartitioner is deprecated.  Please clear partitioner.class configuration setting to get the default partitioning behavior");
        }
        if (this.partitioner instanceof UniformStickyPartitioner) {
            this.log.warn("UniformStickyPartitioner is deprecated.  Please clear partitioner.class configuration setting and set partitioner.ignore.keys to 'true' to get the uniform sticky partitioning behavior");
        }
    }

    KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer, ProducerMetadata metadata, KafkaClient kafkaClient, ProducerInterceptors<K, V> interceptors, Time time) {
        try {
            this.producerConfig = config;
            this.time = time;
            String transactionalId = config.getString("transactional.id");
            this.clientId = config.getString("client.id");
            LogContext logContext = transactionalId == null ? new LogContext(String.format("[Producer clientId=%s] ", this.clientId)) : new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", this.clientId, transactionalId));
            this.log = logContext.logger(KafkaProducer.class);
            this.log.trace("Starting the Kafka producer");
            Map<String, String> metricTags = Collections.singletonMap("client-id", this.clientId);
            MetricConfig metricConfig = new MetricConfig().samples(config.getInt("metrics.num.samples")).timeWindow(config.getLong("metrics.sample.window.ms"), TimeUnit.MILLISECONDS).recordLevel(Sensor.RecordingLevel.forName(config.getString("metrics.recording.level"))).tags(metricTags);
            List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(this.clientId, (AbstractConfig)config);
            KafkaMetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX, config.originalsWithPrefix("metrics.context."));
            this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
            this.producerMetrics = new KafkaProducerMetrics(this.metrics);
            this.partitioner = config.getConfiguredInstance("partitioner.class", Partitioner.class, Collections.singletonMap("client.id", this.clientId));
            this.warnIfPartitionerDeprecated();
            this.partitionerIgnoreKeys = config.getBoolean("partitioner.ignore.keys");
            long retryBackoffMs = config.getLong("retry.backoff.ms");
            if (keySerializer == null) {
                this.keySerializer = config.getConfiguredInstance("key.serializer", Serializer.class);
                this.keySerializer.configure(config.originals(Collections.singletonMap("client.id", this.clientId)), true);
            } else {
                config.ignore("key.serializer");
                this.keySerializer = keySerializer;
            }
            if (valueSerializer == null) {
                this.valueSerializer = config.getConfiguredInstance("value.serializer", Serializer.class);
                this.valueSerializer.configure(config.originals(Collections.singletonMap("client.id", this.clientId)), false);
            } else {
                config.ignore("value.serializer");
                this.valueSerializer = valueSerializer;
            }
            List interceptorList = ClientUtils.createConfiguredInterceptors(config, "interceptor.classes", ProducerInterceptor.class);
            this.interceptors = interceptors != null ? interceptors : new ProducerInterceptors(interceptorList);
            ClusterResourceListeners clusterResourceListeners = this.configureClusterResourceListeners(this.keySerializer, this.valueSerializer, interceptorList, reporters);
            this.maxRequestSize = config.getInt("max.request.size");
            this.totalMemorySize = config.getLong("buffer.memory");
            this.compressionType = CompressionType.forName(config.getString("compression.type"));
            this.maxBlockTimeMs = config.getLong("max.block.ms");
            int deliveryTimeoutMs = KafkaProducer.configureDeliveryTimeout(config, this.log);
            this.apiVersions = new ApiVersions();
            this.transactionManager = this.configureTransactionState(config, logContext);
            boolean enableAdaptivePartitioning = this.partitioner == null && config.getBoolean("partitioner.adaptive.partitioning.enable") != false;
            RecordAccumulator.PartitionerConfig partitionerConfig = new RecordAccumulator.PartitionerConfig(enableAdaptivePartitioning, config.getLong("partitioner.availability.timeout.ms"));
            int batchSize = Math.max(1, config.getInt("batch.size"));
            this.accumulator = new RecordAccumulator(logContext, batchSize, this.compressionType, KafkaProducer.lingerMs(config), retryBackoffMs, deliveryTimeoutMs, partitionerConfig, this.metrics, PRODUCER_METRIC_GROUP_NAME, time, this.apiVersions, this.transactionManager, new BufferPool(this.totalMemorySize, batchSize, this.metrics, time, PRODUCER_METRIC_GROUP_NAME));
            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
            if (metadata != null) {
                this.metadata = metadata;
            } else {
                this.metadata = new ProducerMetadata(retryBackoffMs, config.getLong("metadata.max.age.ms"), config.getLong("metadata.max.idle.ms"), logContext, clusterResourceListeners, Time.SYSTEM);
                this.metadata.bootstrap(addresses);
            }
            this.errors = this.metrics.sensor("errors");
            this.sender = this.newSender(logContext, kafkaClient, this.metadata);
            String ioThreadName = "kafka-producer-network-thread | " + this.clientId;
            this.ioThread = new KafkaThread(ioThreadName, (Runnable)this.sender, true);
            this.ioThread.start();
            config.logUnused();
            AppInfoParser.registerAppInfo(JMX_PREFIX, this.clientId, this.metrics, time.milliseconds());
            this.log.debug("Kafka producer started");
        }
        catch (Throwable t) {
            this.close(Duration.ofMillis(0L), true);
            throw new KafkaException("Failed to construct kafka producer", t);
        }
    }

    KafkaProducer(ProducerConfig config, LogContext logContext, Metrics metrics, Serializer<K> keySerializer, Serializer<V> valueSerializer, ProducerMetadata metadata, RecordAccumulator accumulator, TransactionManager transactionManager, Sender sender, ProducerInterceptors<K, V> interceptors, Partitioner partitioner, Time time, KafkaThread ioThread) {
        this.producerConfig = config;
        this.time = time;
        this.clientId = config.getString("client.id");
        this.log = logContext.logger(KafkaProducer.class);
        this.metrics = metrics;
        this.producerMetrics = new KafkaProducerMetrics(metrics);
        this.partitioner = partitioner;
        this.keySerializer = keySerializer;
        this.valueSerializer = valueSerializer;
        this.interceptors = interceptors;
        this.maxRequestSize = config.getInt("max.request.size");
        this.totalMemorySize = config.getLong("buffer.memory");
        this.compressionType = CompressionType.forName(config.getString("compression.type"));
        this.maxBlockTimeMs = config.getLong("max.block.ms");
        this.partitionerIgnoreKeys = config.getBoolean("partitioner.ignore.keys");
        this.apiVersions = new ApiVersions();
        this.transactionManager = transactionManager;
        this.accumulator = accumulator;
        this.errors = this.metrics.sensor("errors");
        this.metadata = metadata;
        this.sender = sender;
        this.ioThread = ioThread;
    }

    Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
        int maxInflightRequests = this.producerConfig.getInt("max.in.flight.requests.per.connection");
        int requestTimeoutMs = this.producerConfig.getInt("request.timeout.ms");
        ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
        Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
        KafkaClient client = kafkaClient != null ? kafkaClient : ClientUtils.createNetworkClient(this.producerConfig, this.metrics, "producer", logContext, this.apiVersions, this.time, maxInflightRequests, metadata, throttleTimeSensor);
        short acks = Short.parseShort(this.producerConfig.getString("acks"));
        return new Sender(logContext, client, metadata, this.accumulator, maxInflightRequests == 1, this.producerConfig.getInt("max.request.size"), acks, this.producerConfig.getInt("retries"), metricsRegistry.senderMetrics, this.time, requestTimeoutMs, this.producerConfig.getLong("retry.backoff.ms"), this.transactionManager, this.apiVersions);
    }

    private static int lingerMs(ProducerConfig config) {
        return (int)Math.min(config.getLong("linger.ms"), Integer.MAX_VALUE);
    }

    private static int configureDeliveryTimeout(ProducerConfig config, Logger log) {
        int requestTimeoutMs;
        int lingerMs;
        int lingerAndRequestTimeoutMs;
        int deliveryTimeoutMs = config.getInt("delivery.timeout.ms");
        if (deliveryTimeoutMs < (lingerAndRequestTimeoutMs = (int)Math.min((long)(lingerMs = KafkaProducer.lingerMs(config)) + (long)(requestTimeoutMs = config.getInt("request.timeout.ms").intValue()), Integer.MAX_VALUE))) {
            if (config.originals().containsKey("delivery.timeout.ms")) {
                throw new ConfigException("delivery.timeout.ms should be equal to or larger than linger.ms + request.timeout.ms");
            }
            deliveryTimeoutMs = lingerAndRequestTimeoutMs;
            log.warn("{} should be equal to or larger than {} + {}. Setting it to {}.", new Object[]{"delivery.timeout.ms", "linger.ms", "request.timeout.ms", deliveryTimeoutMs});
        }
        return deliveryTimeoutMs;
    }

    private TransactionManager configureTransactionState(ProducerConfig config, LogContext logContext) {
        TransactionManager transactionManager = null;
        if (config.getBoolean("enable.idempotence").booleanValue()) {
            long retryBackoffMs;
            int transactionTimeoutMs;
            String transactionalId = config.getString("transactional.id");
            transactionManager = new TransactionManager(logContext, transactionalId, transactionTimeoutMs = config.getInt("transaction.timeout.ms").intValue(), retryBackoffMs = config.getLong("retry.backoff.ms").longValue(), this.apiVersions);
            if (transactionManager.isTransactional()) {
                this.log.info("Instantiated a transactional producer.");
            } else {
                this.log.info("Instantiated an idempotent producer.");
            }
        } else {
            config.ignore("transaction.timeout.ms");
        }
        return transactionManager;
    }

    @Override
    public void initTransactions() {
        this.throwIfNoTransactionManager();
        this.throwIfProducerClosed();
        long now = this.time.nanoseconds();
        TransactionalRequestResult result = this.transactionManager.initializeTransactions();
        this.sender.wakeup();
        result.await(this.maxBlockTimeMs, TimeUnit.MILLISECONDS);
        this.producerMetrics.recordInit(this.time.nanoseconds() - now);
    }

    @Override
    public void beginTransaction() throws ProducerFencedException {
        this.throwIfNoTransactionManager();
        this.throwIfProducerClosed();
        long now = this.time.nanoseconds();
        this.transactionManager.beginTransaction();
        this.producerMetrics.recordBeginTxn(this.time.nanoseconds() - now);
    }

    @Override
    @Deprecated
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
        this.sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata(consumerGroupId));
    }

    @Override
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, ConsumerGroupMetadata groupMetadata) throws ProducerFencedException {
        this.throwIfInvalidGroupMetadata(groupMetadata);
        this.throwIfNoTransactionManager();
        this.throwIfProducerClosed();
        if (!offsets.isEmpty()) {
            long start = this.time.nanoseconds();
            TransactionalRequestResult result = this.transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
            this.sender.wakeup();
            result.await(this.maxBlockTimeMs, TimeUnit.MILLISECONDS);
            this.producerMetrics.recordSendOffsets(this.time.nanoseconds() - start);
        }
    }

    @Override
    public void commitTransaction() throws ProducerFencedException {
        this.throwIfNoTransactionManager();
        this.throwIfProducerClosed();
        long commitStart = this.time.nanoseconds();
        TransactionalRequestResult result = this.transactionManager.beginCommit();
        this.sender.wakeup();
        result.await(this.maxBlockTimeMs, TimeUnit.MILLISECONDS);
        this.producerMetrics.recordCommitTxn(this.time.nanoseconds() - commitStart);
    }

    @Override
    public void abortTransaction() throws ProducerFencedException {
        this.throwIfNoTransactionManager();
        this.throwIfProducerClosed();
        this.log.info("Aborting incomplete transaction");
        long abortStart = this.time.nanoseconds();
        TransactionalRequestResult result = this.transactionManager.beginAbort();
        this.sender.wakeup();
        result.await(this.maxBlockTimeMs, TimeUnit.MILLISECONDS);
        this.producerMetrics.recordAbortTxn(this.time.nanoseconds() - abortStart);
    }

    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        return this.send(record, null);
    }

    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
        return this.doSend(interceptedRecord, callback);
    }

    private void throwIfProducerClosed() {
        if (this.sender == null || !this.sender.isRunning()) {
            throw new IllegalStateException("Cannot perform operation after producer has been closed");
        }
    }

    private void onNewBatch(String topic, Cluster cluster, int prevPartition) {
        assert (this.partitioner != null);
        this.partitioner.onNewBatch(topic, cluster, prevPartition);
    }

    private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        AppendCallbacks appendCallbacks = new AppendCallbacks(callback, this.interceptors, record);
        try {
            byte[] serializedValue;
            byte[] serializedKey;
            ClusterAndWaitTime clusterAndWaitTime;
            this.throwIfProducerClosed();
            long nowMs = this.time.milliseconds();
            try {
                clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), nowMs, this.maxBlockTimeMs);
            }
            catch (KafkaException e) {
                if (this.metadata.isClosed()) {
                    throw new KafkaException("Producer closed while send in progress", e);
                }
                throw e;
            }
            nowMs += clusterAndWaitTime.waitedOnMetadataMs;
            long remainingWaitMs = Math.max(0L, this.maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
            Cluster cluster = clusterAndWaitTime.cluster;
            try {
                serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key());
            }
            catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + this.producerConfig.getClass("key.serializer").getName() + " specified in key.serializer", cce);
            }
            try {
                serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value());
            }
            catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + this.producerConfig.getClass("value.serializer").getName() + " specified in value.serializer", cce);
            }
            int partition = this.partition(record, serializedKey, serializedValue, cluster);
            this.setReadOnly(record.headers());
            Header[] headers = record.headers().toArray();
            int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(this.apiVersions.maxUsableProduceMagic(), this.compressionType, serializedKey, serializedValue, headers);
            this.ensureValidRecordSize(serializedSize);
            long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
            boolean abortOnNewBatch = this.partitioner != null;
            RecordAccumulator.RecordAppendResult result = this.accumulator.append(record.topic(), partition, timestamp, serializedKey, serializedValue, headers, appendCallbacks, remainingWaitMs, abortOnNewBatch, nowMs, cluster);
            assert (appendCallbacks.getPartition() != -1);
            if (result.abortForNewBatch) {
                int prevPartition = partition;
                this.onNewBatch(record.topic(), cluster, prevPartition);
                partition = this.partition(record, serializedKey, serializedValue, cluster);
                if (this.log.isTraceEnabled()) {
                    this.log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", new Object[]{record.topic(), partition, prevPartition});
                }
                result = this.accumulator.append(record.topic(), partition, timestamp, serializedKey, serializedValue, headers, appendCallbacks, remainingWaitMs, false, nowMs, cluster);
            }
            if (this.transactionManager != null) {
                this.transactionManager.maybeAddPartition(appendCallbacks.topicPartition());
            }
            if (result.batchIsFull || result.newBatchCreated) {
                this.log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", (Object)record.topic(), (Object)appendCallbacks.getPartition());
                this.sender.wakeup();
            }
            return result.future;
        }
        catch (ApiException e) {
            this.log.debug("Exception occurred during message send:", (Throwable)e);
            if (callback != null) {
                TopicPartition tp = appendCallbacks.topicPartition();
                RecordMetadata nullMetadata = new RecordMetadata(tp, -1L, -1, -1L, -1, -1);
                callback.onCompletion(nullMetadata, e);
            }
            this.errors.record();
            this.interceptors.onSendError(record, appendCallbacks.topicPartition(), e);
            if (this.transactionManager != null) {
                this.transactionManager.maybeTransitionToErrorState(e);
            }
            return new FutureFailure(e);
        }
        catch (InterruptedException e) {
            this.errors.record();
            this.interceptors.onSendError(record, appendCallbacks.topicPartition(), e);
            throw new InterruptException(e);
        }
        catch (KafkaException e) {
            this.errors.record();
            this.interceptors.onSendError(record, appendCallbacks.topicPartition(), e);
            throw e;
        }
        catch (Exception e) {
            this.interceptors.onSendError(record, appendCallbacks.topicPartition(), e);
            throw e;
        }
    }

    private void setReadOnly(Headers headers) {
        if (headers instanceof RecordHeaders) {
            ((RecordHeaders)headers).setReadOnly();
        }
    }

    private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long nowMs, long maxWaitMs) throws InterruptedException {
        Cluster cluster = this.metadata.fetch();
        if (cluster.invalidTopics().contains(topic)) {
            throw new InvalidTopicException(topic);
        }
        this.metadata.add(topic, nowMs);
        Integer partitionsCount = cluster.partitionCountForTopic(topic);
        if (partitionsCount != null && (partition == null || partition < partitionsCount)) {
            return new ClusterAndWaitTime(cluster, 0L);
        }
        long remainingWaitMs = maxWaitMs;
        long elapsed = 0L;
        long nowNanos = this.time.nanoseconds();
        do {
            if (partition != null) {
                this.log.trace("Requesting metadata update for partition {} of topic {}.", (Object)partition, (Object)topic);
            } else {
                this.log.trace("Requesting metadata update for topic {}.", (Object)topic);
            }
            this.metadata.add(topic, nowMs + elapsed);
            int version = this.metadata.requestUpdateForTopic(topic);
            this.sender.wakeup();
            try {
                this.metadata.awaitUpdate(version, remainingWaitMs);
            }
            catch (TimeoutException ex) {
                throw new TimeoutException(String.format("Topic %s not present in metadata after %d ms.", topic, maxWaitMs));
            }
            cluster = this.metadata.fetch();
            elapsed = this.time.milliseconds() - nowMs;
            if (elapsed >= maxWaitMs) {
                throw new TimeoutException(partitionsCount == null ? String.format("Topic %s not present in metadata after %d ms.", topic, maxWaitMs) : String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.", partition, topic, partitionsCount, maxWaitMs));
            }
            this.metadata.maybeThrowExceptionForTopic(topic);
            remainingWaitMs = maxWaitMs - elapsed;
        } while ((partitionsCount = cluster.partitionCountForTopic(topic)) == null || partition != null && partition >= partitionsCount);
        this.producerMetrics.recordMetadataWait(this.time.nanoseconds() - nowNanos);
        return new ClusterAndWaitTime(cluster, elapsed);
    }

    private void ensureValidRecordSize(int size) {
        if (size > this.maxRequestSize) {
            throw new RecordTooLargeException("The message is " + size + " bytes when serialized which is larger than " + this.maxRequestSize + ", which is the value of the " + "max.request.size" + " configuration.");
        }
        if ((long)size > this.totalMemorySize) {
            throw new RecordTooLargeException("The message is " + size + " bytes when serialized which is larger than the total memory buffer you have configured with the " + "buffer.memory" + " configuration.");
        }
    }

    @Override
    public void flush() {
        this.log.trace("Flushing accumulated records in producer.");
        long start = this.time.nanoseconds();
        this.accumulator.beginFlush();
        this.sender.wakeup();
        try {
            this.accumulator.awaitFlushCompletion();
        }
        catch (InterruptedException e) {
            throw new InterruptException("Flush interrupted.", e);
        }
        finally {
            this.producerMetrics.recordFlush(this.time.nanoseconds() - start);
        }
    }

    @Override
    public List<PartitionInfo> partitionsFor(String topic) {
        Objects.requireNonNull(topic, "topic cannot be null");
        try {
            return this.waitOnMetadata((String)topic, null, (long)this.time.milliseconds(), (long)this.maxBlockTimeMs).cluster.partitionsForTopic(topic);
        }
        catch (InterruptedException e) {
            throw new InterruptException(e);
        }
    }

    @Override
    public Map<MetricName, ? extends Metric> metrics() {
        return Collections.unmodifiableMap(this.metrics.metrics());
    }

    @Override
    public void close() {
        this.close(Duration.ofMillis(Long.MAX_VALUE));
    }

    @Override
    public void close(Duration timeout) {
        this.close(timeout, false);
    }

    private void close(Duration timeout, boolean swallowException) {
        boolean invokedFromCallback;
        long timeoutMs = timeout.toMillis();
        if (timeoutMs < 0L) {
            throw new IllegalArgumentException("The timeout cannot be negative.");
        }
        this.log.info("Closing the Kafka producer with timeoutMillis = {} ms.", (Object)timeoutMs);
        AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();
        boolean bl = invokedFromCallback = Thread.currentThread() == this.ioThread;
        if (timeoutMs > 0L) {
            if (invokedFromCallback) {
                this.log.warn("Overriding close timeout {} ms to 0 ms in order to prevent useless blocking due to self-join. This means you have incorrectly invoked close with a non-zero timeout from the producer call-back.", (Object)timeoutMs);
            } else {
                if (this.sender != null) {
                    this.sender.initiateClose();
                }
                if (this.ioThread != null) {
                    try {
                        this.ioThread.join(timeoutMs);
                    }
                    catch (InterruptedException t) {
                        firstException.compareAndSet(null, new InterruptException(t));
                        this.log.error("Interrupted while joining ioThread", (Throwable)t);
                    }
                }
            }
        }
        if (this.sender != null && this.ioThread != null && this.ioThread.isAlive()) {
            this.log.info("Proceeding to force close the producer since pending requests could not be completed within timeout {} ms.", (Object)timeoutMs);
            this.sender.forceClose();
            if (!invokedFromCallback) {
                try {
                    this.ioThread.join();
                }
                catch (InterruptedException e) {
                    firstException.compareAndSet(null, new InterruptException(e));
                }
            }
        }
        Utils.closeQuietly(this.interceptors, "producer interceptors", firstException);
        Utils.closeQuietly(this.producerMetrics, "producer metrics wrapper", firstException);
        Utils.closeQuietly(this.metrics, "producer metrics", firstException);
        Utils.closeQuietly(this.keySerializer, "producer keySerializer", firstException);
        Utils.closeQuietly(this.valueSerializer, "producer valueSerializer", firstException);
        Utils.closeQuietly(this.partitioner, "producer partitioner", firstException);
        AppInfoParser.unregisterAppInfo(JMX_PREFIX, this.clientId, this.metrics);
        Throwable exception = firstException.get();
        if (exception != null && !swallowException) {
            if (exception instanceof InterruptException) {
                throw (InterruptException)exception;
            }
            throw new KafkaException("Failed to close kafka producer", exception);
        }
        this.log.debug("Kafka producer has been closed");
    }

    private ClusterResourceListeners configureClusterResourceListeners(Serializer<K> keySerializer, Serializer<V> valueSerializer, List<?> ... candidateLists) {
        ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners();
        for (List<?> candidateList : candidateLists) {
            clusterResourceListeners.maybeAddAll(candidateList);
        }
        clusterResourceListeners.maybeAdd(keySerializer);
        clusterResourceListeners.maybeAdd(valueSerializer);
        return clusterResourceListeners;
    }

    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        if (record.partition() != null) {
            return record.partition();
        }
        if (this.partitioner != null) {
            int customPartition = this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
            if (customPartition < 0) {
                throw new IllegalArgumentException(String.format("The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition));
            }
            return customPartition;
        }
        if (serializedKey != null && !this.partitionerIgnoreKeys) {
            return BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());
        }
        return -1;
    }

    private void throwIfInvalidGroupMetadata(ConsumerGroupMetadata groupMetadata) {
        if (groupMetadata == null) {
            throw new IllegalArgumentException("Consumer group metadata could not be null");
        }
        if (groupMetadata.generationId() > 0 && "".equals(groupMetadata.memberId())) {
            throw new IllegalArgumentException("Passed in group metadata " + groupMetadata + " has generationId > 0 but member.id ");
        }
    }

    private void throwIfNoTransactionManager() {
        if (this.transactionManager == null) {
            throw new IllegalStateException("Cannot use transactional methods without enabling transactions by setting the transactional.id configuration property");
        }
    }

    String getClientId() {
        return this.clientId;
    }

    private class AppendCallbacks
    implements RecordAccumulator.AppendCallbacks {
        private final Callback userCallback;
        private final ProducerInterceptors<K, V> interceptors;
        private final String topic;
        private final Integer recordPartition;
        private final String recordLogString;
        private volatile int partition = -1;
        private volatile TopicPartition topicPartition;

        private AppendCallbacks(Callback userCallback, ProducerInterceptors<K, V> interceptors, ProducerRecord<K, V> record) {
            this.userCallback = userCallback;
            this.interceptors = interceptors;
            this.topic = record != null ? record.topic() : null;
            this.recordPartition = record != null ? record.partition() : null;
            this.recordLogString = KafkaProducer.this.log.isTraceEnabled() && record != null ? record.toString() : "";
        }

        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if (metadata == null) {
                metadata = new RecordMetadata(this.topicPartition(), -1L, -1, -1L, -1, -1);
            }
            this.interceptors.onAcknowledgement(metadata, exception);
            if (this.userCallback != null) {
                this.userCallback.onCompletion(metadata, exception);
            }
        }

        @Override
        public void setPartition(int partition) {
            assert (partition != -1);
            this.partition = partition;
            if (KafkaProducer.this.log.isTraceEnabled()) {
                KafkaProducer.this.log.trace("Attempting to append record {} with callback {} to topic {} partition {}", new Object[]{this.recordLogString, this.userCallback, this.topic, partition});
            }
        }

        public int getPartition() {
            return this.partition;
        }

        public TopicPartition topicPartition() {
            if (this.topicPartition == null && this.topic != null) {
                this.topicPartition = this.partition != -1 ? new TopicPartition(this.topic, this.partition) : (this.recordPartition != null ? new TopicPartition(this.topic, this.recordPartition) : new TopicPartition(this.topic, -1));
            }
            return this.topicPartition;
        }
    }

    private static class ClusterAndWaitTime {
        final Cluster cluster;
        final long waitedOnMetadataMs;

        ClusterAndWaitTime(Cluster cluster, long waitedOnMetadataMs) {
            this.cluster = cluster;
            this.waitedOnMetadataMs = waitedOnMetadataMs;
        }
    }

    private static class FutureFailure
    implements Future<RecordMetadata> {
        private final ExecutionException exception;

        public FutureFailure(Exception exception) {
            this.exception = new ExecutionException(exception);
        }

        @Override
        public boolean cancel(boolean interrupt) {
            return false;
        }

        @Override
        public RecordMetadata get() throws ExecutionException {
            throw this.exception;
        }

        @Override
        public RecordMetadata get(long timeout, TimeUnit unit) throws ExecutionException {
            throw this.exception;
        }

        @Override
        public boolean isCancelled() {
            return false;
        }

        @Override
        public boolean isDone() {
            return true;
        }
    }
}

