Blog

JFR の event stream を Fluency で fluentd に送る

ここまでできれば、あとは fluentd で storage に格納して、flamegraph 等を描画すれば良いだけである。

import jdk.jfr.consumer.RecordedEvent
import jdk.jfr.consumer.RecordingStream
import org.komamitsu.fluency.Fluency
import org.slf4j.LoggerFactory
import java.time.Duration
import java.time.ZoneId

class JfmonClient(
    private val fluency: Fluency,
    private val tag: String,
    private val siteId: String,
    private val instanceId: String
) : AutoCloseable {
    private var rs: RecordingStream = RecordingStream()
    private val logger = LoggerFactory.getLogger(JfmonClient::class.java)

    fun start() {
        this.rs.start()
    }

    fun startAsync() {
        this.rs.startAsync()
    }

    override fun close() {
        logger.info("Closing jfmon-client")
        this.rs.close()

        this.fluency.flush()
        this.fluency.close()
    }

    fun flush() {
        logger.info("Flushing jfmon-client")
        this.fluency.flush()
    }

    /**
     * jdk.SocketRead: [jdk.jfr.events.SocketReadEvent]
     */
    fun enable(name: String, period: Duration?, threshold: Duration? = null, stackTrace: Boolean? = null) {
        logger.info("Enabling {}(period={}, threshold={}, stackTrace={})", name, period, threshold, stackTrace)

        val settings = rs.enable(name)
        if (period != null) {
            settings.withPeriod(period)
        }
        if (threshold != null) {
            settings.withThreshold(threshold)
        }
        if (stackTrace != null) {
            if (stackTrace) {
                settings.withStackTrace()
            } else {
                settings.withoutStackTrace()
            }
        }
        rs.onEvent(name, this::emitEvent)
    }

    private fun emitEvent(event: RecordedEvent) {
        val data = buildMap(event)
        if (logger.isDebugEnabled) {
            logger.debug(
                "{}: {} {}",
                event.eventType.name,
                event.startTime.atZone(ZoneId.of("Asia/Tokyo")),
                data
            )
        }
        fluency.emit(tag, data)
    }

    private fun buildMap(event: RecordedEvent): Map<String, Any?> {
        return mapOf(
            "siteId" to siteId,
            "instanceId" to instanceId,
            "type" to event.eventType.name,
            "data" to buildData(event)
        )
    }

    private fun buildData(event: RecordedEvent): Map<String, Any?> {
        return event.fields.associate { dv ->
            val value: Any? = when (dv.typeName) {
                "boolean", "long", "int", "java.lang.String" -> event.getValue<Any>(dv.name)
                "java.lang.Thread" -> event.getThread(dv.name).javaName
                "jdk.types.StackTrace" -> {
                    val stackTrace = event.stackTrace
                    stackTrace?.frames?.map { frame ->
                        // 型が違うものを一つの配列に入れると、Elasticsearch が怒る。
                        listOf(
                            frame.method?.type?.name,
                            frame.method?.name?.toString(),
                            frame.lineNumber.toString(),
                        )
                    }
                }
                else -> null
            }
            dv.name to value
        }
    }
}