Managing Worker Pool State with Prometheus and Grafana in Go - not detail

Huynh Thanh Phuc - Sep 12 '23 - - Dev Community

In this article, we'll explore how to manage the state of jobs in a Worker Pool in Go while leveraging the power of Prometheus and Grafana for monitoring and visualization.

Introduction

Worker Pools are a common concurrency pattern used in software development to efficiently process tasks or jobs concurrently. Monitoring the state of these worker pools is crucial for ensuring the health and performance of your application. Prometheus and Grafana are excellent tools for this purpose.

Prerequisites

Before we dive into the code, make sure you have Prometheus and Grafana set up and running. If you haven't already, follow the official documentation for installation and configuration.

Setting Up Prometheus Metrics

To begin, let's instrument our Go application to expose Prometheus metrics. We'll create a simple worker pool and monitor the number of jobs in progress.



// Import necessary packages
import (
"net/http"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

// Define Prometheus metrics
var (
jobQueue = promauto.NewGauge(prometheus.GaugeOpts{Name: "workerpool_jobs_queued", Help: "Number of jobs currently in the queue"})
activeWorkers = promauto.NewGauge(prometheus.GaugeOpts{Name: "workerpool_active_workers", Help: "Number of currently active workers"})
jobState = promauto.NewGaugeVec(prometheus.GaugeOpts{Name: "job_state", Help: "The state of the job"}, []string{"jobID", "state"})
jobProcessingDuration = promauto.NewHistogram(prometheus.HistogramOpts{Name: "job_processing_duration_seconds", Help: "The duration it takes to process a job", Buckets: prometheus.LinearBuckets(1, 1, 10)})
)
// Job represents a unit of work
type Job struct {
ID string
Data string
State string // states can be "Queued", "Processing", "Completed"
}

func main() {
// Register Prometheus metrics
prometheus.MustRegister(jobsInProgress)

<span class="c">// Start a web server to expose metrics</span>
<span class="n">http</span><span class="o">.</span><span class="n">Handle</span><span class="p">(</span><span class="s">"/metrics"</span><span class="p">,</span> <span class="n">promhttp</span><span class="o">.</span><span class="n">Handler</span><span class="p">())</span>
<span class="k">go</span> <span class="k">func</span><span class="p">()</span> <span class="p">{</span>
    <span class="n">http</span><span class="o">.</span><span class="n">ListenAndServe</span><span class="p">(</span><span class="s">":8080"</span><span class="p">,</span> <span class="no">nil</span><span class="p">)</span>
<span class="p">}()</span>

<span class="c">// Your worker pool logic goes here</span>
<span class="c">// Don't forget to update jobsInProgress as jobs are processed</span>

<span class="c">// Example:</span>
<span class="c">// Assign jobs using goroutines</span>
<span class="k">for</span> <span class="n">i</span> <span class="o">:=</span> <span class="m">1</span><span class="p">;</span> <span class="n">i</span> <span class="o">&lt;=</span> <span class="n">numJobs</span><span class="p">;</span> <span class="n">i</span><span class="o">++</span> <span class="p">{</span>
    <span class="k">go</span> <span class="n">assignJob</span><span class="p">(</span><span class="n">i</span><span class="p">,</span> <span class="n">jobs</span><span class="p">,</span> <span class="n">done</span><span class="p">,</span> <span class="o">&amp;</span><span class="n">wg</span><span class="p">)</span>
<span class="p">}</span>
Enter fullscreen mode Exit fullscreen mode

}

// assignJob creates a new job, sends it for processing, and waits for it to complete
func assignJob(i int, jobs chan<- Job, done <-chan Job, wg *sync.WaitGroup) {
defer wg.Done()

<span class="n">jobID</span> <span class="o">:=</span> <span class="n">fmt</span><span class="o">.</span><span class="n">Sprintf</span><span class="p">(</span><span class="s">"%s-%d"</span><span class="p">,</span> <span class="n">uuid</span><span class="o">.</span><span class="n">New</span><span class="p">()</span><span class="o">.</span><span class="n">String</span><span class="p">(),</span> <span class="n">i</span><span class="p">)</span>
<span class="n">job</span> <span class="o">:=</span> <span class="o">&amp;</span><span class="n">Job</span><span class="p">{</span><span class="n">ID</span><span class="o">:</span> <span class="n">jobID</span><span class="p">,</span> <span class="n">Data</span><span class="o">:</span> <span class="n">fmt</span><span class="o">.</span><span class="n">Sprintf</span><span class="p">(</span><span class="s">"Data_%d"</span><span class="p">,</span> <span class="n">i</span><span class="p">),</span> <span class="n">State</span><span class="o">:</span> <span class="s">"Queued"</span><span class="p">}</span>
<span class="n">updateJobState</span><span class="p">(</span><span class="n">job</span><span class="p">,</span> <span class="s">"Queued"</span><span class="p">)</span>

<span class="n">jobQueue</span><span class="o">.</span><span class="n">Inc</span><span class="p">()</span>
<span class="n">jobs</span> <span class="o">&lt;-</span> <span class="n">job</span>
<span class="o">&lt;-</span><span class="n">done</span>
<span class="n">jobQueue</span><span class="o">.</span><span class="n">Dec</span><span class="p">()</span>
Enter fullscreen mode Exit fullscreen mode

}

Enter fullscreen mode Exit fullscreen mode




Configuring Grafana Dashboards

With Prometheus metrics exposed, we can now create Grafana dashboards to visualize the state of our worker pool. Configure Grafana to use Prometheus as a data source and create panels that display the job_state metric.



{
"annotations": {
"list": [
{
"builtIn": 1,
"datasource": {
"type": "grafana",
"uid": "-- Grafana --"
},
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"type": "dashboard"
}
]
},
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 0,
"id": 2,
"links": [],
"liveNow": false,
"panels": [
{
"datasource": {},
"fieldConfig": {
"defaults": {
"color": {
"fixedColor": "transparent",
"mode": "fixed"
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
},
"unit": "none"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 6,
"x": 0,
"y": 0
},
"id": 5,
"options": {
"colorMode": "background",
"graphMode": "area",
"justifyMode": "auto",
"orientation": "auto",
"reduceOptions": {
"calcs": [
"lastNotNull"
],
"fields": "",
"values": false
},
"textMode": "auto"
},
"pluginVersion": "10.1.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "e294f615-a82c-43ef-a960-86ba0e5b5b11"
},
"editorMode": "code",
"expr": "count(job_state{state=\"Queued\"})",
"instant": false,
"legendFormat": "auto",
"range": true,
"refId": "A"
}
],
"title": "Total Jobs",
"type": "stat"
},
{
"datasource": {},
"fieldConfig": {
"defaults": {
"color": {
"mode": "thresholds"
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
},
"unit": "s"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 18,
"x": 6,
"y": 0
},
"id": 3,
"options": {
"colorMode": "value",
"graphMode": "area",
"justifyMode": "auto",
"orientation": "auto",
"reduceOptions": {
"calcs": [
"lastNotNull"
],
"fields": "",
"values": false
},
"textMode": "auto"
},
"pluginVersion": "10.1.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "eba8c540-ab46-4987-9347-6112f4e0a081"
},
"disableTextWrap": false,
"editorMode": "code",
"expr": "avg(rate(job_processing_duration_seconds_sum[5m]) / rate(job_processing_duration_seconds_count[5m]))",
"fullMetaSearch": false,
"includeNullMetadata": true,
"instant": false,
"legendFormat": "auto",
"range": true,
"refId": "A",
"useBackend": false
}
],
"title": "Time average",
"type": "stat"
},
{
"datasource": {
"type": "prometheus",
"uid": "e6f319b5-9c5c-48df-96f7-40fc9b1c189c"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "thresholds"
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": [
{
"matcher": {
"id": "byName",
"options": "count(job_state{state=\"Completed\"})"
},
"properties": [
{
"id": "displayName",
"value": "Complete"
}
]
},
{
"matcher": {
"id": "byName",
"options": "count(job_state{state=\"Processing\"} unless on(jobID) (job_state{state=\"Completed\"})) or\nvector(0)"
},
"properties": [
{
"id": "displayName",
"value": "Processing"
},
{
"id": "color",
"value": {
"fixedColor": "blue",
"mode": "fixed"
}
}
]
},
{
"matcher": {
"id": "byName",
"options": "count(job_state{state=\"Queued\"} unless on(jobID) (job_state{state=\"Processing\"} or job_state{state=\"Completed\"})) or\nvector(0)"
},
"properties": [
{
"id": "displayName",
"value": "Queue"
},
{
"id": "color",
"value": {
"fixedColor": "yellow",
"mode": "fixed"
}
}
]
}
]
},
"gridPos": {
"h": 8,
"w": 24,
"x": 0,
"y": 8
},
"id": 4,
"options": {
"colorMode": "value",
"graphMode": "area",
"justifyMode": "auto",
"orientation": "auto",
"reduceOptions": {
"calcs": [
"lastNotNull"
],
"fields": "",
"values": false
},
"textMode": "auto"
},
"pluginVersion": "10.1.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "a93b3831-a11d-4ef3-b5e7-54bc557e33c0"
},
"editorMode": "code",
"expr": "count(job_state{state=\"Queued\"} unless on(jobID) (job_state{state=\"Processing\"} or job_state{state=\"Completed\"})) or\nvector(0)",
"hide": false,
"instant": false,
"legendFormat": "auto",
"range": true,
"refId": "Queue"
},
{
"datasource": {
"type": "prometheus",
"uid": "eba8c540-ab46-4987-9347-6112f4e0a081"
},
"editorMode": "code",
"exemplar": false,
"expr": "count(job_state{state=\"Processing\"} unless on(jobID) (job_state{state=\"Completed\"})) or\nvector(0)",
"hide": false,
"instant": false,
"legendFormat": "auto",
"range": true,
"refId": "C"
},
{
"datasource": {
"type": "prometheus",
"uid": "eba8c540-ab46-4987-9347-6112f4e0a081"
},
"editorMode": "code",
"expr": "count(job_state{state=\"Completed\"})",
"instant": false,
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "State Jobs",
"type": "stat"
}
],
"refresh": "5s",
"schemaVersion": 38,
"style": "dark",
"tags": [],
"templating": {
"list": []
},
"time": {
"from": "now-5m",
"to": "now"
},
"timepicker": {},
"timezone": "",
"title": "New dashboard Copy",
"uid": "ee9cb0d1-2e66-4f16-8dae-166000f285cd",
"version": 1,
"weekStart": ""
}
Enter fullscreen mode Exit fullscreen mode




Logging Worker Pool State

While Prometheus and Grafana are excellent for monitoring, it's often helpful to log the state of your worker pool for debugging and record-keeping purposes. We can use Go's standard logging package for this.



// processJob simulates processing a job by a worker
func worker(id int, jobs <-chan Job, done chan<- Job) {
for job := range jobs {
startTime := time.Now()

    <span class="n">activeWorkers</span><span class="o">.</span><span class="n">Inc</span><span class="p">()</span>
    <span class="n">updateJobState</span><span class="p">(</span><span class="n">job</span><span class="p">,</span> <span class="s">"Processing"</span><span class="p">)</span>

    <span class="c">// Simulating job processing</span>
    <span class="n">time</span><span class="o">.</span><span class="n">Sleep</span><span class="p">(</span><span class="n">time</span><span class="o">.</span><span class="n">Duration</span><span class="p">(</span><span class="n">rand</span><span class="o">.</span><span class="n">Intn</span><span class="p">(</span><span class="m">10000</span><span class="p">)</span><span class="o">+</span><span class="m">100</span><span class="p">)</span> <span class="o">*</span> <span class="n">time</span><span class="o">.</span><span class="n">Millisecond</span><span class="p">)</span>

    <span class="n">activeWorkers</span><span class="o">.</span><span class="n">Dec</span><span class="p">()</span>
    <span class="n">jobProcessingDuration</span><span class="o">.</span><span class="n">Observe</span><span class="p">(</span><span class="n">time</span><span class="o">.</span><span class="n">Since</span><span class="p">(</span><span class="n">startTime</span><span class="p">)</span><span class="o">.</span><span class="n">Seconds</span><span class="p">())</span>
    <span class="n">updateJobState</span><span class="p">(</span><span class="n">job</span><span class="p">,</span> <span class="s">"Completed"</span><span class="p">)</span>

    <span class="n">done</span> <span class="o">&lt;-</span> <span class="n">job</span>
<span class="p">}</span>
Enter fullscreen mode Exit fullscreen mode

}

Enter fullscreen mode Exit fullscreen mode




Result

Image description

Conclusion

In this article, we've learned how to manage the state of jobs in a Worker Pool in Go while integrating Prometheus and Grafana for monitoring. We've also seen how to log the state of the worker pool for debugging purposes. Monitoring and logging are crucial for maintaining the reliability and performance of your applications, and these tools make the process much more accessible.

Now, go ahead and apply these techniques to your Go applications to ensure they run smoothly and efficiently, even under heavy workloads.

Happy coding!

repo: https://github.com/ThanhPhucHuynh/go-grafana-prometheus-worker-pool

Buy Me a Coffee:
Buy Me A Coffee

. . . . . . . .
Terabox Video Player