Kafka broker上对于produce生产者生产消息的处理
Kafka Server处理生成者请求
入口在KafkaApis.scala, 通过request.header.apikey判断消息类型
def handle(request: RequestChannel.Request) { try { trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s". format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal)) ApiKeys.forId(request.header.apiKey) match { case ApiKeys.PRODUCE => handleProduceRequest(request)
生产消息则调用replicaManager.appendRecords
// call the replica manager to append messages to the replicas replicaManager.appendRecords( timeout = produceRequest.timeout.toLong, requiredAcks = produceRequest.acks, internalTopicsAllowed = internalTopicsAllowed, isFromClient = true, entriesPerPartition = authorizedRequestInfo, responseCallback = sendResponseCallback) // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected; // hence we clear its data here inorder to let GC re-claim its memory since it is already appended to log produceRequest.clearPartitionRecords()
ReplicaManager.scala
appendRecords 先写消息到partition的leader上,如果requireAcks==-1说明需要所有isr都写入成功才返回response,而isr同样作为leader的消费者来拉取的
/** * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas; * the callback function will be triggered either when timeout or the required acks are satisfied; * if the callback function itself is already synchronized on some object then pass this object to avoid deadlock. */ def appendRecords(timeout: Long, requiredAcks: Short, internalTopicsAllowed: Boolean, isFromClient: Boolean, entriesPerPartition: Map[TopicPartition, MemoryRecords], responseCallback: Map[TopicPartition, PartitionResponse] => Unit, delayedProduceLock: Option[Object] = None) { if (isValidRequiredAcks(requiredAcks)) { val sTime = time.milliseconds val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed, isFromClient = isFromClient, entriesPerPartition, requiredAcks) debug("Produce to local log in %d ms".format(time.milliseconds - sTime)) val produceStatus = localProduceResults.map { case (topicPartition, result) => topicPartition -> ProducePartitionStatus( result.info.lastOffset + 1, // required offset new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime)) // response status } // 1. required acks = -1 // 2. there is data to append // 3. at least one partition append was successful (fewer errors than partitions) if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) { // create delayed produce operation val produceMetadata = ProduceMetadata(requiredAcks, produceStatus) val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock) // create a list of (topic, partition) pairs to use as keys for this delayed produce operation val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq // try to complete the request immediately, otherwise put it into the purgatory // this is because while the delayed produce operation is being created, new // requests may arrive and hence make this operation completable. delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys) } else { // we can respond immediately val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus) responseCallback(produceResponseStatus) } } else { // If required.acks is outside accepted range, something is wrong with the client // Just return an error and don't handle the request at all val responseStatus = entriesPerPartition.map { case (topicPartition, _) => topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS, LogAppendInfo.UnknownLogAppendInfo.firstOffset, RecordBatch.NO_TIMESTAMP) } responseCallback(responseStatus) } }
追加消息到本地log中
/** * Append the messages to the local replica logs */ private def appendToLocalLog(internalTopicsAllowed: Boolean, isFromClient: Boolean, entriesPerPartition: Map[TopicPartition, MemoryRecords], requiredAcks: Short): Map[TopicPartition, LogAppendResult] = { trace("Append [%s] to local log ".format(entriesPerPartition)) entriesPerPartition.map { case (topicPartition, records) => brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark() brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark() // reject appending to internal topics if it is not allowed if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) { (topicPartition, LogAppendResult( LogAppendInfo.UnknownLogAppendInfo, Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}")))) } else { try { val partitionOpt = getPartition(topicPartition) val info = partitionOpt match { case Some(partition) => if (partition eq ReplicaManager.OfflinePartition) throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory on broker $localBrokerId") partition.appendRecordsToLeader(records, isFromClient, requiredAcks) case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d" .format(topicPartition, localBrokerId)) } val numAppendedMessages = if (info.firstOffset == -1L || info.lastOffset == -1L) 0 else info.lastOffset - info.firstOffset + 1 // update stats for successfully appended bytes and messages as bytesInRate and messageInRate brokerTopicStats.topicStats(topicPartition.topic).bytesInRate.mark(records.sizeInBytes) brokerTopicStats.allTopicsStats.bytesInRate.mark(records.sizeInBytes) brokerTopicStats.topicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages) brokerTopicStats.allTopicsStats.messagesInRate.mark(numAppendedMessages) trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d" .format(records.sizeInBytes, topicPartition.topic, topicPartition.partition, info.firstOffset, info.lastOffset)) (topicPartition, LogAppendResult(info)) } catch { // NOTE: Failed produce requests metric is not incremented for known exceptions // it is supposed to indicate un-expected failures of a broker in handling a produce request case e@ (_: UnknownTopicOrPartitionException | _: NotLeaderForPartitionException | _: RecordTooLargeException | _: RecordBatchTooLargeException | _: CorruptRecordException | _: KafkaStorageException | _: InvalidTimestampException) => (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e))) case t: Throwable => brokerTopicStats.topicStats(topicPartition.topic).failedProduceRequestRate.mark() brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark() error("Error processing append operation on partition %s".format(topicPartition), t) (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(t))) } } } }
追加records到leader上
def appendRecordsToLeader(records: MemoryRecords, isFromClient: Boolean, requiredAcks: Int = 0) = { val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) { leaderReplicaIfLocal match { case Some(leaderReplica) => val log = leaderReplica.log.get val minIsr = log.config.minInSyncReplicas val inSyncSize = inSyncReplicas.size // Avoid writing to leader if there are not enough insync replicas to make it safe if (inSyncSize < minIsr && requiredAcks == -1) { throw new NotEnoughReplicasException("Number of insync replicas for partition %s is [%d], below required minimum [%d]" .format(topicPartition, inSyncSize, minIsr)) } val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient) // probably unblock some follower fetch requests since log end offset has been updated replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId)) // we may need to increment high watermark since ISR could be down to 1 (info, maybeIncrementLeaderHW(leaderReplica)) case None => throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d" .format(topicPartition, localBrokerId)) } } // some delayed operations may be unblocked after HW changed if (leaderHWIncremented) tryCompleteDelayedRequests() info }
Log.scala
private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = { maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") { val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient) // return if we have no valid messages or if this is a duplicate of the last appended entry if (appendInfo.shallowCount == 0) return appendInfo // trim any invalid bytes or partial messages before appending it to the on-disk log var validRecords = trimInvalidBytes(records, appendInfo) // they are valid, insert them in the log lock synchronized { if (assignOffsets) { // assign offsets to the message set val offset = new LongRef(nextOffsetMetadata.messageOffset) appendInfo.firstOffset = offset.value val now = time.milliseconds val validateAndOffsetAssignResult = try { LogValidator.validateMessagesAndAssignOffsets(validRecords, offset, now, appendInfo.sourceCodec, appendInfo.targetCodec, config.compact, config.messageFormatVersion.messageFormatVersion, config.messageTimestampType, config.messageTimestampDifferenceMaxMs, leaderEpoch, isFromClient) } catch { case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e) } validRecords = validateAndOffsetAssignResult.validatedRecords appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp appendInfo.lastOffset = offset.value - 1 if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME) appendInfo.logAppendTime = now // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message // format conversion) if (validateAndOffsetAssignResult.messageSizeMaybeChanged) { for (batch <- validRecords.batches.asScala) { if (batch.sizeInBytes > config.maxMessageSize) { // we record the original message set size instead of the trimmed size // to be consistent with pre-compression bytesRejectedRate recording brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes) brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes) throw new RecordTooLargeException("Message batch size is %d bytes which exceeds the maximum configured size of %d." .format(batch.sizeInBytes, config.maxMessageSize)) } } } } else { // we are taking the offsets we are given if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset) throw new IllegalArgumentException("Out of order offsets found in " + records.records.asScala.map(_.offset)) } // update the epoch cache with the epoch stamped onto the message by the leader validRecords.batches.asScala.foreach { batch => if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) leaderEpochCache.assign(batch.partitionLeaderEpoch, batch.baseOffset) } // check messages set size may be exceed config.segmentSize if (validRecords.sizeInBytes > config.segmentSize) { throw new RecordBatchTooLargeException("Message batch size is %d bytes which exceeds the maximum configured segment size of %d." .format(validRecords.sizeInBytes, config.segmentSize)) } // now that we have valid records, offsets assigned, and timestamps updated, we need to // validate the idempotent/transactional state of the producers and collect some metadata val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(validRecords, isFromClient) maybeDuplicate.foreach { duplicate => appendInfo.firstOffset = duplicate.firstOffset appendInfo.lastOffset = duplicate.lastOffset appendInfo.logAppendTime = duplicate.timestamp return appendInfo } // 如果segment满了则换一个新的segment // maybe roll the log if this segment is full val segment = maybeRoll(messagesSize = validRecords.sizeInBytes, maxTimestampInMessages = appendInfo.maxTimestamp, maxOffsetInMessages = appendInfo.lastOffset) val logOffsetMetadata = LogOffsetMetadata( messageOffset = appendInfo.firstOffset, segmentBaseOffset = segment.baseOffset, relativePositionInSegment = segment.size) // 由segment写入 segment.append(firstOffset = appendInfo.firstOffset, largestOffset = appendInfo.lastOffset, largestTimestamp = appendInfo.maxTimestamp, shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp, records = validRecords) // update the producer state for ((producerId, producerAppendInfo) <- updatedProducers) { producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata) producerStateManager.update(producerAppendInfo) } // update the transaction index with the true last stable offset. The last offset visible // to consumers using READ_COMMITTED will be limited by this value and the high watermark. for (completedTxn <- completedTxns) { val lastStableOffset = producerStateManager.completeTxn(completedTxn) segment.updateTxnIndex(completedTxn, lastStableOffset) } // always update the last producer id map offset so that the snapshot reflects the current offset // even if there isn't any idempotent data being written producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1) // increment the log end offset updateLogEndOffset(appendInfo.lastOffset + 1) // update the first unstable offset (which is used to compute LSO) updateFirstUnstableOffset() trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s" .format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validRecords)) // 如果超过了刷新间隔,则调用一次fsync if (unflushedMessages >= config.flushInterval) flush() appendInfo } } }
LogSegment.scala
追加record,如果追加的字节数超过一定大小则记录index、timeIndex
@nonthreadsafe def append(firstOffset: Long, largestOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = { if (records.sizeInBytes > 0) { trace("Inserting %d bytes at offset %d at position %d with largest timestamp %d at shallow offset %d" .format(records.sizeInBytes, firstOffset, log.sizeInBytes(), largestTimestamp, shallowOffsetOfMaxTimestamp)) val physicalPosition = log.sizeInBytes() if (physicalPosition == 0) rollingBasedTimestamp = Some(largestTimestamp) // append the messages require(canConvertToRelativeOffset(largestOffset), "largest offset in message set can not be safely converted to relative offset.") val appendedBytes = log.append(records) trace(s"Appended $appendedBytes to ${log.file()} at offset $firstOffset") // Update the in memory max timestamp and corresponding offset. if (largestTimestamp > maxTimestampSoFar) { maxTimestampSoFar = largestTimestamp offsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp } // append an entry to the index (if needed) if(bytesSinceLastIndexEntry > indexIntervalBytes) { index.append(firstOffset, physicalPosition) timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp) bytesSinceLastIndexEntry = 0 } bytesSinceLastIndexEntry += records.sizeInBytes }
FileRecords.scala
public int append(MemoryRecords records) throws IOException { int written = records.writeFullyTo(channel); size.getAndAdd(written); return written; }
通过FileChannel write到磁盘
MemoryRecords.scala
/** * Write all records to the given channel (including partial records). * @param channel The channel to write to * @return The number of bytes written * @throws IOException For any IO errors writing to the channel */ public int writeFullyTo(GatheringByteChannel channel) throws IOException { buffer.mark(); int written = 0; while (written < sizeInBytes()) written += channel.write(buffer); buffer.reset(); return written; }