Skip to content

Commit

Permalink
Added KafkaConsumerGroupOffsetHealthCheck
Browse files Browse the repository at this point in the history
  • Loading branch information
sksamuel committed Jul 22, 2023
1 parent 17ed295 commit 6ab98a7
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 17 deletions.
17 changes: 0 additions & 17 deletions cohort-core/src/main/kotlin/com/sksamuel/cohort/cpu/FibWarmup.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.sksamuel.cohort.cpu

import com.sksamuel.cohort.Warmup
import com.sksamuel.cohort.WarmupHealthCheck
import kotlin.random.Random

Expand All @@ -22,22 +21,6 @@ class FibWarmup(
}
}

class FibonacciWarmup(
private val depth: Int = 32,
) : Warmup {

override val name: String = "fibonacci_warmup"

private fun fib(n: Int): Int = when (n) {
0, 1 -> 1
else -> fib(n - 1) + fib(n - 2)
}

override suspend fun warm(iteration: Int) {
fib(Random.nextInt(0, depth))
}
}

suspend fun main() {
val w = FibonacciWarmup()
repeat(1000) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.sksamuel.cohort.cpu

import com.sksamuel.cohort.Warmup
import kotlin.random.Random

class FibonacciWarmup(
private val depth: Int = 32,
) : Warmup {

override val name: String = "fibonacci_warmup"

private fun fib(n: Int): Int = when (n) {
0, 1 -> 1
else -> fib(n - 1) + fib(n - 2)
}

override suspend fun warm(iteration: Int) {
fib(Random.nextInt(0, depth))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.sksamuel.cohort.kafka

import com.sksamuel.cohort.HealthCheck
import com.sksamuel.cohort.HealthCheckResult
import kotlinx.coroutines.future.await
import org.apache.kafka.clients.admin.Admin
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition

/**
* A Cohort [HealthCheck] that checks for stalled consumers by retrieving the committed offsets
* for a given consumer group. If the commited offset is the same, for all topics, between health
* check invocations, this check returns unhealthy.
*
* In other words, at least one partition must have an advancing offset for the given consumer group
* between health check invocations.
*
* This check can be useful to detect consumer groups which are making no progress.
*/
class KafkaConsumerGroupOffsetHealthCheck(
private val admin: Admin,
private val consumerGroupId: String,
) : HealthCheck {

private var offsets = mapOf<TopicPartition, OffsetAndMetadata>()

override suspend fun check(): HealthCheckResult {

val groups = admin.listConsumerGroupOffsets(consumerGroupId).all().toCompletionStage().await()

val newOffsets = groups[consumerGroupId] ?: return HealthCheckResult.unhealthy(
"Offset metadata not available for consumer group $consumerGroupId",
null
)

// we are healthy, if for any partition, this is the first time we see the partition,
// or we have advanced the offset
val healthy = newOffsets.any { (tp, offset) ->
val currentOffset = offsets[tp]
currentOffset == null || currentOffset.offset() < offset.offset()
}

// replace all current offsets so next check uses new values
offsets = newOffsets

return if (healthy) {
HealthCheckResult.healthy("Kakfa consumer group is making progress")
} else {
HealthCheckResult.unhealthy("Kakfa consumer group has stalled", null)
}
}
}

0 comments on commit 6ab98a7

Please sign in to comment.