pmetrics 0.1
PostgreSQL metrics instrumentation extension
Loading...
Searching...
No Matches
pmetrics.c
Go to the documentation of this file.
1/*
2 * pmetrics - An instrumentation toolkit for PostgreSQL extensions
3 *
4 * This module provides simple metrics functionality with support
5 * for counters, gauges and histograms, with labels.
6 *
7 * Metrics are stored in dynamic shared memory and the hash table grows
8 * automatically as needed (no fixed limit).
9 *
10 * Each metric is uniquely identified by name, labels, type, and bucket.
11 *
12 * Accepts the following custom options:
13 * - pmetrics.enabled: Enable metrics collection. Defaults to true.
14 * - pmetrics.bucket_variability: Used to calculate the exponential buckets.
15 * Defaults to 0.1.
16 * - pmetrics.buckets_upper_bound: the limit for the maximum histogram bucket.
17 * Defaults to 30000. Values over this will be truncated and fitted into the
18 * last bucket. A notice is raised whenever this happens.
19 *
20 * Labels are stored as JSONB for structured key-value data. Names are limited
21 * to NAMEDATALEN.
22 */
23
24#include "postgres.h"
25#include "pmetrics.h"
26
27#include "common/hashfn.h"
28#include "fmgr.h"
29#include "funcapi.h"
30#include "lib/dshash.h"
31#include "miscadmin.h"
32#include "storage/ipc.h"
33#include "storage/lwlock.h"
34#include "storage/shmem.h"
35#include "utils/builtins.h"
36#include "utils/dsa.h"
37#include "utils/guc.h"
38#include "utils/hsearch.h"
39#include "utils/jsonb.h"
40
41#include "math.h"
42#include <stdio.h>
43
45
46/* LWLock tranche IDs (must not conflict with other extensions) */
47#define LWTRANCHE_PMETRICS_DSA 43001
48#define LWTRANCHE_PMETRICS 43002
49
50/* GUC defaults */
51#define DEFAULT_ENABLED true
52#define DEFAULT_BUCKET_VARIABILITY 0.1
53#define DEFAULT_BUCKETS_UPPER_BOUND 30000
54
55/* Metric types */
62
63/* Shared state stored in static shared memory */
64typedef struct PMetricsSharedState {
65 dsa_handle dsa;
66 dshash_table_handle metrics_handle;
67 LWLock *init_lock;
70
71typedef enum LabelsLocation {
72 LABELS_NONE = 0, /* No labels (empty JSONB or null) */
73 LABELS_LOCAL = 1, /* labels.local_ptr is valid (search key) */
74 LABELS_DSA = 2 /* labels.dsa_ptr is valid (stored key) */
76
77typedef struct {
78 char name[NAMEDATALEN];
80 union {
81 dsa_pointer dsa_ptr; /* When LABELS_DSA */
82 Jsonb *local_ptr; /* When LABELS_LOCAL */
83 } labels;
85 int bucket; /* Only used for histograms, 0 for counter/gauge */
86} MetricKey;
87
88typedef struct {
90 int64 value;
91} Metric;
92
94
95/* Backend-local state (not in shared memory) */
96static dsa_area *local_dsa = NULL;
97static dshash_table *local_metrics_table = NULL;
98
99/* Hooks */
100static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
101static shmem_request_hook_type prev_shmem_request_hook = NULL;
102
103/* Configs */
104
108
109static double gamma_val = 0;
110static double log_gamma = 0;
111
112/* Function declarations */
113void _PG_init(void);
114static void metrics_shmem_request(void);
115static void metrics_shmem_startup(void);
116static dshash_table *get_metrics_table(void);
117static void cleanup_metrics_backend(int code, Datum arg);
118static void validate_inputs(const char *name);
119static void init_metric_key(MetricKey *key, const char *name,
120 Jsonb *labels_jsonb, MetricType type, int bucket);
121static int bucket_for(double value);
122static int64 increment_by(const char *name_str, Jsonb *labels_jsonb,
123 MetricType type, int bucket, int64 amount);
124static int64 delete_metrics_by_name_labels(const char *name_str,
125 Jsonb *labels_jsonb);
126static void extract_metric_args(FunctionCallInfo fcinfo, int name_arg,
127 int labels_arg, char **name_out,
128 Jsonb **labels_out);
129static Jsonb *get_labels_jsonb(const MetricKey *key, dsa_area *dsa);
130static uint32 metric_hash_dshash(const void *key, size_t key_size, void *arg);
131static int metric_compare_dshash(const void *a, const void *b, size_t key_size,
132 void *arg);
133static void metric_key_copy(void *dst, const void *src, size_t key_size,
134 void *arg);
135
136/* dshash parameters (references function pointers declared above) */
137static const dshash_parameters metrics_params = {
138 .key_size = sizeof(MetricKey),
139 .entry_size = sizeof(Metric),
140 .compare_function = metric_compare_dshash,
141 .hash_function = metric_hash_dshash,
142 .copy_function = metric_key_copy,
143 .tranche_id = LWTRANCHE_PMETRICS};
144
145static void metrics_shmem_request(void)
146{
149
150 RequestAddinShmemSpace(MAXALIGN(sizeof(PMetricsSharedState)));
151 RequestNamedLWLockTranche("pmetrics_init", 1);
152}
153
154static void metrics_shmem_startup(void)
155{
156 bool found;
157
160
161 shared_state = ShmemInitStruct("pmetrics_shared_state",
162 sizeof(PMetricsSharedState), &found);
163
164 if (!found) {
165 dsa_area *dsa;
166 dshash_table *metrics_table;
167
168 dsa = dsa_create(LWTRANCHE_PMETRICS_DSA);
169 shared_state->dsa = dsa_get_handle(dsa);
170
171 /*
172 * Pin the DSA to keep it alive even after we detach.
173 * This prevents it from being destroyed when postmaster detaches.
174 */
175 dsa_pin(dsa);
176
177 metrics_table = dshash_create(dsa, &metrics_params, NULL);
179 dshash_get_hash_table_handle(metrics_table);
180
182 &(GetNamedLWLockTranche("pmetrics_init")[0].lock);
184
185 /*
186 * Detach from postmaster so backends don't inherit the attachment
187 * state. The DSA is pinned so it won't be destroyed.
188 */
189 dshash_detach(metrics_table);
190 dsa_detach(dsa);
191
192 elog(DEBUG1, "pmetrics: initialized with DSA handle %lu",
193 (unsigned long)shared_state->dsa);
194 }
195}
196
197void _PG_init(void)
198{
199 int max_bucket_exp;
200
201 /*
202 * Must be loaded via shared_preload_libraries since we allocate shared
203 * memory and register hooks. Fail if loaded any other way.
204 */
205 if (!process_shared_preload_libraries_in_progress)
206 ereport(
207 ERROR,
208 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
209 errmsg("pmetrics must be loaded via shared_preload_libraries")));
210
211 DefineCustomBoolVariable(
212 "pmetrics.enabled", "Enable metrics collection",
213 "When disabled, all metric recording functions return NULL immediately",
214 &pmetrics_enabled, DEFAULT_ENABLED, PGC_SIGHUP, 0, NULL, NULL, NULL);
215
216 DefineCustomRealVariable("pmetrics.bucket_variability",
217 "Bucket variability for histograms",
218 "Controls histogram bucket spacing. Higher values "
219 "create fewer, wider buckets. "
220 "Used to calculate gamma = (1 + variability) / (1 "
221 "- variability). Requires restart.",
223 0.01, 1.0, PGC_POSTMASTER, 0, NULL, NULL, NULL);
224
225 DefineCustomIntVariable(
226 "pmetrics.buckets_upper_bound", "Maximum value for histogram buckets",
227 "Values larger than this will be placed in the highest bucket. "
228 "The actual upper bound will be rounded up to the nearest bucket "
229 "boundary. Requires restart.",
231 PGC_POSTMASTER, 0, NULL, NULL, NULL);
232
234 log_gamma = log(gamma_val);
235
236 max_bucket_exp = ceil(log(buckets_upper_bound) / log_gamma);
237 buckets_upper_bound = (int)pow(gamma_val, max_bucket_exp);
238
239 MarkGUCPrefixReserved("pmetrics");
240
241 LWLockRegisterTranche(LWTRANCHE_PMETRICS_DSA, "pmetrics_dsa");
242 LWLockRegisterTranche(LWTRANCHE_PMETRICS, "pmetrics");
243
244 prev_shmem_startup_hook = shmem_startup_hook;
245 shmem_startup_hook = metrics_shmem_startup;
246 prev_shmem_request_hook = shmem_request_hook;
247 shmem_request_hook = metrics_shmem_request;
248}
249
250static void validate_inputs(const char *name)
251{
252 if (name == NULL)
253 elog(ERROR, "null input not allowed");
254
255 if (strlen(name) >= NAMEDATALEN)
256 elog(ERROR, "name too long");
257}
258
259/*
260 * Extract name and labels from PG_FUNCTION_ARGS with proper error handling.
261 * Validates inputs and allocates name_str which must be freed by caller.
262 */
263static void extract_metric_args(FunctionCallInfo fcinfo, int name_arg,
264 int labels_arg, char **name_out,
265 Jsonb **labels_out)
266{
267 text *name_text;
268
269 name_text = PG_GETARG_TEXT_PP(name_arg);
270 *labels_out = PG_GETARG_JSONB_P(labels_arg);
271 *name_out = text_to_cstring(name_text);
272 validate_inputs(*name_out);
273}
274
275/*
276 * Initialize a MetricKey structure with local JSONB pointer
277 */
278static void init_metric_key(MetricKey *key, const char *name,
279 Jsonb *labels_jsonb, MetricType type, int bucket)
280{
281 strlcpy(key->name, name, NAMEDATALEN);
282
283 if (labels_jsonb != NULL) {
284 key->labels.local_ptr = labels_jsonb;
286 } else {
287 key->labels.local_ptr = NULL;
289 }
290
291 key->type = type;
292 key->bucket = bucket;
293}
294
295/*
296 * Cleanup callback when backend exits.
297 * Detach from DSA and hash tables.
298 */
299static void cleanup_metrics_backend(int code, Datum arg)
300{
301 if (local_metrics_table != NULL) {
302 dshash_detach(local_metrics_table);
303 local_metrics_table = NULL;
304 }
305
306 if (local_dsa != NULL) {
307 dsa_detach(local_dsa);
308 local_dsa = NULL;
309 }
310
311 elog(DEBUG1, "pmetrics: backend %d cleaned up", MyProcPid);
312}
313
314/*
315 * Get metrics table for this backend.
316 * The DSA and hash table are created in postmaster during startup.
317 * Each backend must attach to get its own valid pointers.
318 */
319static dshash_table *get_metrics_table(void)
320{
321 MemoryContext oldcontext;
322
323 /* Already attached in this backend? */
324 if (local_metrics_table != NULL)
325 return local_metrics_table;
326
327 /* Ensure shared state exists and was initialized */
328 if (shared_state == NULL)
329 elog(ERROR, "pmetrics shared state not initialized");
330
332 elog(ERROR, "pmetrics not properly initialized during startup");
333
334 /*
335 * Switch to TopMemoryContext to ensure the dshash_table structure
336 * persists for the backend's lifetime and doesn't get freed/reused
337 * by short-lived memory contexts.
338 */
339 oldcontext = MemoryContextSwitchTo(TopMemoryContext);
340
341 /*
342 * Each backend must attach to the DSA to get valid pointers.
343 * The postmaster keeps the DSA alive, but each backend needs its own
344 * attachment.
345 */
346 local_dsa = dsa_attach(shared_state->dsa);
347
348 /*
349 * Pin the DSA mapping to keep it attached for the backend's lifetime.
350 * Without this, the resource owner will detach it at statement end,
351 * causing dangling pointers and crashes on subsequent calls.
352 */
353 dsa_pin_mapping(local_dsa);
354
357
358 MemoryContextSwitchTo(oldcontext);
359
360 elog(DEBUG1, "pmetrics: backend %d attached to tables", MyProcPid);
361
362 /* Register cleanup callback for when backend exits */
363 on_shmem_exit(cleanup_metrics_backend, 0);
364
365 return local_metrics_table;
366}
367
368static int64 increment_by(const char *name_str, Jsonb *labels_jsonb,
369 MetricType type, int bucket, int64 amount)
370{
371 Metric *entry;
372 MetricKey metric_key;
373 bool found;
374 dshash_table *table;
375 int64 result;
376
377 table = get_metrics_table();
378 if (table == NULL)
379 elog(ERROR, "pmetrics not initialized");
380
381 init_metric_key(&metric_key, name_str, labels_jsonb, type, bucket);
382
383 entry = (Metric *)dshash_find_or_insert(table, &metric_key, &found);
384
385 if (!found)
386 entry->value = 0;
387
388 entry->value += amount;
389 result = entry->value;
390
391 dshash_release_lock(table, entry);
392
393 return result;
394}
395
397Datum increment_counter(PG_FUNCTION_ARGS)
398{
399 Datum new_value;
400 Jsonb *labels_jsonb;
401 char *name_str = NULL;
402
403 if (!pmetrics_enabled)
404 PG_RETURN_NULL();
405
406 PG_TRY();
407 {
408 extract_metric_args(fcinfo, 0, 1, &name_str, &labels_jsonb);
409 new_value =
410 increment_by(name_str, labels_jsonb, METRIC_TYPE_COUNTER, 0, 1);
411 }
412 PG_CATCH();
413 {
414 if (name_str)
415 pfree(name_str);
416 PG_RE_THROW();
417 }
418 PG_END_TRY();
419
420 pfree(name_str);
421 return new_value;
422}
423
425Datum increment_counter_by(PG_FUNCTION_ARGS)
426{
427 Datum new_value;
428 Jsonb *labels_jsonb;
429 char *name_str = NULL;
430 int increment;
431
432 if (!pmetrics_enabled)
433 PG_RETURN_NULL();
434
435 PG_TRY();
436 {
437 extract_metric_args(fcinfo, 0, 1, &name_str, &labels_jsonb);
438 increment = PG_GETARG_INT32(2);
439
440 if (increment <= 0)
441 elog(ERROR, "increment must be greater than 0");
442
443 new_value = increment_by(name_str, labels_jsonb, METRIC_TYPE_COUNTER, 0,
444 increment);
445 }
446 PG_CATCH();
447 {
448 if (name_str)
449 pfree(name_str);
450 PG_RE_THROW();
451 }
452 PG_END_TRY();
453
454 pfree(name_str);
455 return new_value;
456}
457
459Datum set_gauge(PG_FUNCTION_ARGS)
460{
461 Jsonb *labels_jsonb;
462 char *name_str = NULL;
463 int64 new_value;
464 int64 result;
465
466 if (!pmetrics_enabled)
467 PG_RETURN_NULL();
468
469 PG_TRY();
470 {
471 extract_metric_args(fcinfo, 0, 1, &name_str, &labels_jsonb);
472 new_value = PG_GETARG_INT64(2);
473
474 result = pmetrics_set_gauge(name_str, labels_jsonb, new_value);
475 }
476 PG_CATCH();
477 {
478 if (name_str)
479 pfree(name_str);
480
481 PG_RE_THROW();
482 }
483 PG_END_TRY();
484
485 pfree(name_str);
486 PG_RETURN_INT64(result);
487}
488
490Datum add_to_gauge(PG_FUNCTION_ARGS)
491{
492 Datum new_value;
493 Jsonb *labels_jsonb;
494 char *name_str = NULL;
495 int increment;
496
497 if (!pmetrics_enabled)
498 PG_RETURN_NULL();
499
500 PG_TRY();
501 {
502 extract_metric_args(fcinfo, 0, 1, &name_str, &labels_jsonb);
503 increment = PG_GETARG_INT32(2);
504
505 if (increment == 0)
506 elog(ERROR, "value can't be 0");
507
508 new_value = increment_by(name_str, labels_jsonb, METRIC_TYPE_GAUGE, 0,
509 increment);
510 }
511 PG_CATCH();
512 {
513 if (name_str)
514 pfree(name_str);
515
516 PG_RE_THROW();
517 }
518 PG_END_TRY();
519
520 pfree(name_str);
521 return new_value;
522}
523
525Datum list_metrics(PG_FUNCTION_ARGS)
526{
527 FuncCallContext *funcctx;
528 Metric **metrics;
529 int current_idx;
530
531 if (SRF_IS_FIRSTCALL()) {
532 MemoryContext oldcontext;
533 TupleDesc tupdesc;
534 dshash_table *table;
535 dshash_seq_status status;
536 Metric *metric;
537 int capacity = 16;
538 int count = 0;
539
540 funcctx = SRF_FIRSTCALL_INIT();
541 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
542
543 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
544 ereport(ERROR,
545 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
546 errmsg("function returning record called in context "
547 "that cannot accept type record")));
548
549 funcctx->tuple_desc = BlessTupleDesc(tupdesc);
550
551 table = get_metrics_table();
552
553 /*
554 * Materialize all metrics in the first call.
555 * We can't use dshash_seq_next() across SRF calls because it holds
556 * partition locks that must be released between iterations.
557 */
558 metrics = (Metric **)palloc(capacity * sizeof(Metric *));
559
560 /* Scan the entire hash table and copy all entries */
561 dshash_seq_init(&status, table, false); /* false = shared lock */
562 while ((metric = (Metric *)dshash_seq_next(&status)) != NULL) {
563 Jsonb *labels_copy = NULL;
564
565 /* Expand array if needed */
566 if (count >= capacity) {
567 capacity *= 2;
568 metrics =
569 (Metric **)repalloc(metrics, capacity * sizeof(Metric *));
570 }
571
572 /* Copy the metric to backend-local memory */
573 metrics[count] = (Metric *)palloc(sizeof(Metric));
574 memcpy(metrics[count], metric, sizeof(Metric));
575
576 /* Copy JSONB labels to backend-local memory if they exist in DSA */
577 if (metric->key.labels_location == LABELS_DSA &&
578 metric->key.labels.dsa_ptr != InvalidDsaPointer) {
579 Jsonb *dsa_labels = (Jsonb *)dsa_get_address(
580 local_dsa, metric->key.labels.dsa_ptr);
581 size_t jsonb_size = VARSIZE(dsa_labels);
582
583 labels_copy = (Jsonb *)palloc(jsonb_size);
584 memcpy(labels_copy, dsa_labels, jsonb_size);
585
586 /* Update the copied metric to point to local copy */
587 metrics[count]->key.labels.local_ptr = labels_copy;
588 metrics[count]->key.labels_location = LABELS_LOCAL;
589 }
590
591 count++;
592 }
593 dshash_seq_term(&status);
594
595 /* Store the materialized metrics and count */
596 funcctx->user_fctx = metrics;
597 funcctx->max_calls = count;
598
599 MemoryContextSwitchTo(oldcontext);
600 }
601
602 funcctx = SRF_PERCALL_SETUP();
603 metrics = (Metric **)funcctx->user_fctx;
604 current_idx = funcctx->call_cntr;
605
606 if (current_idx < funcctx->max_calls) {
607 Metric *metric = metrics[current_idx];
608 Datum values[5];
609 bool nulls[5] = {false, false, false, false, false};
610 HeapTuple tuple;
611 Datum result;
612 const char *type_str;
613
614 /* Convert metric type enum to string */
615 switch (metric->key.type) {
617 type_str = "counter";
618 break;
620 type_str = "gauge";
621 break;
623 type_str = "histogram";
624 break;
626 type_str = "histogram_sum";
627 break;
628 default:
629 type_str = "unknown";
630 break;
631 }
632
633 values[0] = CStringGetTextDatum(metric->key.name);
634
635 if (metric->key.labels_location == LABELS_LOCAL &&
636 metric->key.labels.local_ptr != NULL) {
637 values[1] = JsonbPGetDatum(metric->key.labels.local_ptr);
638 } else {
639 nulls[1] = true;
640 }
641
642 values[2] = CStringGetTextDatum(type_str);
643 values[3] = Int32GetDatum(metric->key.bucket);
644 values[4] = Int64GetDatum(metric->value);
645 tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
646 result = HeapTupleGetDatum(tuple);
647 SRF_RETURN_NEXT(funcctx, result);
648 } else {
649 /* All metrics returned */
650 SRF_RETURN_DONE(funcctx);
651 }
652}
653
654/*
655 * C API functions for other extensions to call
656 * These are marked with visibility("default") to be externally accessible
657 */
658
659__attribute__((visibility("default"))) int64
660pmetrics_increment_counter(const char *name_str, Jsonb *labels_jsonb)
661{
662 validate_inputs(name_str);
663 return increment_by(name_str, labels_jsonb, METRIC_TYPE_COUNTER, 0, 1);
664}
665
666__attribute__((visibility("default"))) int64 pmetrics_increment_counter_by(
667 const char *name_str, Jsonb *labels_jsonb, int64 amount)
668{
669 validate_inputs(name_str);
670
671 if (amount <= 0)
672 elog(ERROR, "increment must be greater than 0");
673
674 return increment_by(name_str, labels_jsonb, METRIC_TYPE_COUNTER, 0, amount);
675}
676
677__attribute__((visibility("default"))) int64
678pmetrics_set_gauge(const char *name_str, Jsonb *labels_jsonb, int64 value)
679{
680 Metric *entry;
681 MetricKey metric_key;
682 bool found;
683 dshash_table *table;
684 int64 result;
685
686 validate_inputs(name_str);
687
688 table = get_metrics_table();
689 if (table == NULL)
690 elog(ERROR, "pmetrics not initialized");
691
692 init_metric_key(&metric_key, name_str, labels_jsonb, METRIC_TYPE_GAUGE, 0);
693
694 entry = (Metric *)dshash_find_or_insert(table, &metric_key, &found);
695
696 entry->value = value;
697 result = entry->value;
698
699 dshash_release_lock(table, entry);
700
701 return result;
702}
703
704__attribute__((visibility("default"))) int64
705pmetrics_add_to_gauge(const char *name_str, Jsonb *labels_jsonb, int64 amount)
706{
707 validate_inputs(name_str);
708
709 if (amount == 0)
710 elog(ERROR, "value can't be 0");
711
712 return increment_by(name_str, labels_jsonb, METRIC_TYPE_GAUGE, 0, amount);
713}
714
715__attribute__((visibility("default"))) int64 pmetrics_record_to_histogram(
716 const char *name_str, Jsonb *labels_jsonb, double value)
717{
718 Datum bucket_count;
719 int bucket;
720
721 validate_inputs(name_str);
722
723 bucket = bucket_for(value);
724
725 /* Increment the histogram bucket count */
726 bucket_count =
727 increment_by(name_str, labels_jsonb, METRIC_TYPE_HISTOGRAM, bucket, 1);
728
729 /* Add to histogram sum (bucket is always 0 for sum type) */
730 increment_by(name_str, labels_jsonb, METRIC_TYPE_HISTOGRAM_SUM, 0,
731 (int64)value);
732
733 return bucket_count;
734}
735
737Datum record_to_histogram(PG_FUNCTION_ARGS)
738{
739 int64 result;
740 Jsonb *labels_jsonb;
741 char *name_str = NULL;
742 double value;
743
744 if (!pmetrics_enabled)
745 PG_RETURN_NULL();
746
747 PG_TRY();
748 {
749 extract_metric_args(fcinfo, 0, 1, &name_str, &labels_jsonb);
750 value = PG_GETARG_FLOAT8(2);
751 result = pmetrics_record_to_histogram(name_str, labels_jsonb, value);
752 }
753 PG_CATCH();
754 {
755 if (name_str)
756 pfree(name_str);
757
758 PG_RE_THROW();
759 }
760 PG_END_TRY();
761
762 pfree(name_str);
763 PG_RETURN_INT64(result);
764}
765
767Datum list_histogram_buckets(PG_FUNCTION_ARGS)
768{
769 FuncCallContext *funcctx;
770 int *buckets;
771 int current_idx;
772
773 if (SRF_IS_FIRSTCALL()) {
774 MemoryContext oldcontext;
775 TupleDesc tupdesc;
776 int max_bucket_exp;
777 int num_buckets;
778 int i;
779 int count;
780
781 funcctx = SRF_FIRSTCALL_INIT();
782 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
783
784 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
785 ereport(ERROR,
786 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
787 errmsg("function returning record called in context "
788 "that cannot accept type record")));
789
790 funcctx->tuple_desc = BlessTupleDesc(tupdesc);
791
792 max_bucket_exp = ceil(log(buckets_upper_bound) / log_gamma);
793 num_buckets = max_bucket_exp + 1;
794
795 buckets = (int *)palloc(num_buckets * sizeof(int));
796
797 /* Generate unique bucket values */
798 count = 0;
799 buckets[count++] = 0;
800 for (i = 1; i <= max_bucket_exp; i++) {
801 int bucket_value = (int)pow(gamma_val, i);
802 if (bucket_value != buckets[count - 1])
803 buckets[count++] = bucket_value;
804 }
805
806 funcctx->user_fctx = buckets;
807 funcctx->max_calls = count;
808
809 MemoryContextSwitchTo(oldcontext);
810 }
811
812 funcctx = SRF_PERCALL_SETUP();
813 buckets = (int *)funcctx->user_fctx;
814 current_idx = funcctx->call_cntr;
815
816 if (current_idx < funcctx->max_calls) {
817 Datum values[1];
818 bool nulls[1] = {false};
819 HeapTuple tuple;
820 Datum result;
821
822 values[0] = Int32GetDatum(buckets[current_idx]);
823
824 tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
825 result = HeapTupleGetDatum(tuple);
826 SRF_RETURN_NEXT(funcctx, result);
827 } else {
828 SRF_RETURN_DONE(funcctx);
829 }
830}
831
832__attribute__((visibility("default"))) int64 pmetrics_clear_metrics(void)
833{
834 dshash_table *metrics_table;
835 dshash_seq_status status;
836 Metric *entry;
837 int64 deleted_count = 0;
838
839 metrics_table = get_metrics_table();
840
841 dshash_seq_init(&status, metrics_table, true);
842 while ((entry = dshash_seq_next(&status)) != NULL) {
843 if (entry->key.labels_location == LABELS_DSA) {
844 dsa_free(local_dsa, entry->key.labels.dsa_ptr);
845 }
846 dshash_delete_current(&status);
847 deleted_count++;
848 }
849 dshash_seq_term(&status);
850
851 return deleted_count;
852}
853
855Datum clear_metrics(PG_FUNCTION_ARGS)
856{
857 int64 deleted_count;
858
859 deleted_count = pmetrics_clear_metrics();
860
861 PG_RETURN_INT64(deleted_count);
862}
863
864static int64 delete_metrics_by_name_labels(const char *name_str,
865 Jsonb *labels_jsonb)
866{
867 dshash_table *metrics_table;
868 dshash_seq_status status;
869 Metric *entry;
870 int64 deleted_count = 0;
871 Jsonb *entry_labels;
872
873 metrics_table = get_metrics_table();
874 if (metrics_table == NULL)
875 elog(ERROR, "pmetrics not initialized");
876
877 dshash_seq_init(&status, metrics_table, true);
878 while ((entry = dshash_seq_next(&status)) != NULL) {
879 /* Check if name matches */
880 if (strcmp(entry->key.name, name_str) != 0)
881 continue;
882
883 /* Check if labels match */
884 entry_labels = get_labels_jsonb(&entry->key, local_dsa);
885
886 /* Compare labels - both NULL means match */
887 if (labels_jsonb == NULL && entry_labels == NULL) {
888 /* Both are NULL, they match */
889 } else if (labels_jsonb != NULL && entry_labels != NULL) {
890 /* Both exist, compare them */
891 if (compareJsonbContainers(&labels_jsonb->root,
892 &entry_labels->root) != 0)
893 continue;
894 } else {
895 /* One is NULL, the other isn't - no match */
896 continue;
897 }
898
899 /* Free DSA-allocated labels before deleting */
900 if (entry->key.labels_location == LABELS_DSA) {
901 dsa_free(local_dsa, entry->key.labels.dsa_ptr);
902 }
903 dshash_delete_current(&status);
904 deleted_count++;
905 }
906 dshash_seq_term(&status);
907
908 return deleted_count;
909}
910
911__attribute__((visibility("default"))) int64
912pmetrics_delete_metric(const char *name_str, Jsonb *labels_jsonb)
913{
914 validate_inputs(name_str);
915 return delete_metrics_by_name_labels(name_str, labels_jsonb);
916}
917
919Datum delete_metric(PG_FUNCTION_ARGS)
920{
921 int64 deleted_count;
922 Jsonb *labels_jsonb;
923 char *name_str = NULL;
924
925 if (!pmetrics_enabled)
926 PG_RETURN_NULL();
927
928 PG_TRY();
929 {
930 extract_metric_args(fcinfo, 0, 1, &name_str, &labels_jsonb);
931 deleted_count = delete_metrics_by_name_labels(name_str, labels_jsonb);
932 }
933 PG_CATCH();
934 {
935 if (name_str)
936 pfree(name_str);
937 PG_RE_THROW();
938 }
939 PG_END_TRY();
940
941 pfree(name_str);
942 PG_RETURN_INT64(deleted_count);
943}
944
945__attribute__((visibility("default"))) bool pmetrics_is_initialized(void)
946{
947 return shared_state != NULL && shared_state->initialized;
948}
949
950__attribute__((visibility("default"))) dsa_handle pmetrics_get_dsa_handle(void)
951{
952 if (shared_state == NULL || !shared_state->initialized)
953 elog(ERROR, "pmetrics not initialized");
954
955 return shared_state->dsa;
956}
957
958__attribute__((visibility("default"))) dsa_area *pmetrics_get_dsa(void)
959{
960 if (local_dsa == NULL)
962
963 return local_dsa;
964}
965
966__attribute__((visibility("default"))) bool pmetrics_is_enabled(void)
967{
968 return pmetrics_enabled;
969}
970
971static int bucket_for(double value)
972{
973 int bucket;
974 int this_bucket_upper_bound;
975
976 if (value < 1.0)
977 bucket = 0;
978 else
979 bucket = (int)fmax(ceil(log(value) / log_gamma), 0);
980
981 this_bucket_upper_bound = (int)pow(gamma_val, bucket);
982
983 if (this_bucket_upper_bound > buckets_upper_bound) {
984 elog(NOTICE, "Histogram data truncated: value %f to %d", value,
986 this_bucket_upper_bound = buckets_upper_bound;
987 }
988
989 return this_bucket_upper_bound;
990}
991
992/*
993 * Helper function to get JSONB from MetricKey, handling both local and DSA
994 * locations.
995 */
996static Jsonb *get_labels_jsonb(const MetricKey *key, dsa_area *dsa)
997{
998 switch (key->labels_location) {
999 case LABELS_LOCAL:
1000 return key->labels.local_ptr;
1001 case LABELS_DSA:
1002 if (key->labels.dsa_ptr != InvalidDsaPointer)
1003 return (Jsonb *)dsa_get_address(dsa, key->labels.dsa_ptr);
1004 return NULL;
1005 case LABELS_NONE:
1006 default:
1007 return NULL;
1008 }
1009}
1010
1011/*
1012 * Custom hash function for MetricKey (dshash signature).
1013 * Handles both local (search) keys and DSA (stored) keys.
1014 */
1015static uint32 metric_hash_dshash(const void *key, size_t key_size, void *arg)
1016{
1017 const MetricKey *k = (const MetricKey *)key;
1018 uint32 hash;
1019 Jsonb *labels;
1020
1021 hash = string_hash(k->name, NAMEDATALEN);
1022 hash ^= hash_bytes((const unsigned char *)&k->type, sizeof(MetricType));
1023 hash ^= hash_uint32((uint32)k->bucket);
1024
1025 /* Hash JSONB labels if present */
1026 labels = get_labels_jsonb(k, local_dsa);
1027 if (labels != NULL)
1028 hash ^= hash_bytes((unsigned char *)labels, VARSIZE(labels));
1029
1030 return hash;
1031}
1032
1033/*
1034 * Custom compare function for MetricKey (dshash signature).
1035 * Handles both local (search) keys and DSA (stored) keys.
1036 * Returns <0, 0, or >0 like strcmp.
1037 */
1038static int metric_compare_dshash(const void *a, const void *b, size_t key_size,
1039 void *arg)
1040{
1041 const MetricKey *k1 = (const MetricKey *)a;
1042 const MetricKey *k2 = (const MetricKey *)b;
1043 Jsonb *labels1, *labels2;
1044 int cmp;
1045
1046 /* Compare name */
1047 cmp = strcmp(k1->name, k2->name);
1048 if (cmp != 0)
1049 return cmp;
1050
1051 /* Compare type */
1052 if (k1->type != k2->type)
1053 return (k1->type < k2->type) ? -1 : 1;
1054
1055 /* Compare bucket */
1056 if (k1->bucket != k2->bucket)
1057 return (k1->bucket < k2->bucket) ? -1 : 1;
1058
1059 /* Compare JSONB labels */
1060 labels1 = get_labels_jsonb(k1, local_dsa);
1061 labels2 = get_labels_jsonb(k2, local_dsa);
1062
1063 if (labels1 == NULL && labels2 == NULL)
1064 return 0;
1065 if (labels1 == NULL)
1066 return -1;
1067 if (labels2 == NULL)
1068 return 1;
1069
1070 /*
1071 * Use memcmp instead of compareJsonbContainers to avoid collation lookup.
1072 *
1073 * compareJsonbContainers() calls varstr_cmp() which requires
1074 * pg_newlocale_from_collation(), triggering syscache lookups that fail
1075 * during early backend initialization when the system catalog cache is
1076 * not yet available.
1077 *
1078 * Binary comparison is safe here because:
1079 * - JSONB has a canonical binary format (sorted keys, no duplicates)
1080 * - Identical JSON produces identical binary representations
1081 * - We only need equality checking, not locale-aware sorting
1082 */
1083 {
1084 Size size1 = VARSIZE(labels1);
1085 Size size2 = VARSIZE(labels2);
1086
1087 if (size1 != size2)
1088 return (size1 < size2) ? -1 : 1;
1089
1090 return memcmp(labels1, labels2, size1);
1091 }
1092}
1093
1094/*
1095 * Custom copy function for MetricKey (dshash signature).
1096 * When inserting a new entry, allocates JSONB to DSA if source has local JSONB.
1097 */
1098static void metric_key_copy(void *dst, const void *src, size_t key_size,
1099 void *arg)
1100{
1101 MetricKey *dest_key = (MetricKey *)dst;
1102 const MetricKey *src_key = (const MetricKey *)src;
1103 Jsonb *src_labels;
1104 Jsonb *dest_labels;
1105 Size jsonb_size;
1106
1107 memcpy(dest_key, src_key, sizeof(MetricKey));
1108
1109 if (src_key->labels_location == LABELS_LOCAL &&
1110 src_key->labels.local_ptr != NULL) {
1111 src_labels = src_key->labels.local_ptr;
1112 jsonb_size = VARSIZE(src_labels);
1113
1114 dest_key->labels.dsa_ptr = dsa_allocate(local_dsa, jsonb_size);
1115 if (dest_key->labels.dsa_ptr == InvalidDsaPointer)
1116 elog(ERROR, "out of dynamic shared memory for metric labels");
1117
1118 dest_labels =
1119 (Jsonb *)dsa_get_address(local_dsa, dest_key->labels.dsa_ptr);
1120 memcpy(dest_labels, src_labels, jsonb_size);
1121
1122 dest_key->labels_location = LABELS_DSA;
1123 }
1124}
Datum add_to_gauge(PG_FUNCTION_ARGS)
Definition pmetrics.c:490
Datum list_metrics(PG_FUNCTION_ARGS)
Definition pmetrics.c:525
static int64 increment_by(const char *name_str, Jsonb *labels_jsonb, MetricType type, int bucket, int64 amount)
Definition pmetrics.c:368
Datum increment_counter_by(PG_FUNCTION_ARGS)
Definition pmetrics.c:425
static int metric_compare_dshash(const void *a, const void *b, size_t key_size, void *arg)
Definition pmetrics.c:1038
static dsa_area * local_dsa
Definition pmetrics.c:96
Datum list_histogram_buckets(PG_FUNCTION_ARGS)
Definition pmetrics.c:767
Datum set_gauge(PG_FUNCTION_ARGS)
Definition pmetrics.c:459
Datum delete_metric(PG_FUNCTION_ARGS)
Definition pmetrics.c:919
void _PG_init(void)
Definition pmetrics.c:197
static PMetricsSharedState * shared_state
Definition pmetrics.c:93
static void metric_key_copy(void *dst, const void *src, size_t key_size, void *arg)
Definition pmetrics.c:1098
static double log_gamma
Definition pmetrics.c:110
static void extract_metric_args(FunctionCallInfo fcinfo, int name_arg, int labels_arg, char **name_out, Jsonb **labels_out)
Definition pmetrics.c:263
static int buckets_upper_bound
Definition pmetrics.c:107
PG_MODULE_MAGIC
Definition pmetrics.c:44
MetricType
Definition pmetrics.c:56
@ METRIC_TYPE_GAUGE
Definition pmetrics.c:58
@ METRIC_TYPE_COUNTER
Definition pmetrics.c:57
@ METRIC_TYPE_HISTOGRAM_SUM
Definition pmetrics.c:60
@ METRIC_TYPE_HISTOGRAM
Definition pmetrics.c:59
static const dshash_parameters metrics_params
Definition pmetrics.c:137
PG_FUNCTION_INFO_V1(increment_counter)
Datum record_to_histogram(PG_FUNCTION_ARGS)
Definition pmetrics.c:737
static int64 delete_metrics_by_name_labels(const char *name_str, Jsonb *labels_jsonb)
Definition pmetrics.c:864
static double bucket_variability
Definition pmetrics.c:106
__attribute__((visibility("default")))
Definition pmetrics.c:659
static uint32 metric_hash_dshash(const void *key, size_t key_size, void *arg)
Definition pmetrics.c:1015
#define LWTRANCHE_PMETRICS
Definition pmetrics.c:48
static void cleanup_metrics_backend(int code, Datum arg)
Definition pmetrics.c:299
#define DEFAULT_BUCKET_VARIABILITY
Definition pmetrics.c:52
static shmem_startup_hook_type prev_shmem_startup_hook
Definition pmetrics.c:100
static shmem_request_hook_type prev_shmem_request_hook
Definition pmetrics.c:101
#define DEFAULT_BUCKETS_UPPER_BOUND
Definition pmetrics.c:53
static void validate_inputs(const char *name)
Definition pmetrics.c:250
Datum clear_metrics(PG_FUNCTION_ARGS)
Definition pmetrics.c:855
static bool pmetrics_enabled
Definition pmetrics.c:105
#define LWTRANCHE_PMETRICS_DSA
Definition pmetrics.c:47
static void metrics_shmem_request(void)
Definition pmetrics.c:145
static int bucket_for(double value)
Definition pmetrics.c:971
static double gamma_val
Definition pmetrics.c:109
static Jsonb * get_labels_jsonb(const MetricKey *key, dsa_area *dsa)
Definition pmetrics.c:996
Datum increment_counter(PG_FUNCTION_ARGS)
Definition pmetrics.c:397
#define DEFAULT_ENABLED
Definition pmetrics.c:51
LabelsLocation
Definition pmetrics.c:71
@ LABELS_LOCAL
Definition pmetrics.c:73
@ LABELS_DSA
Definition pmetrics.c:74
@ LABELS_NONE
Definition pmetrics.c:72
static dshash_table * local_metrics_table
Definition pmetrics.c:97
static void metrics_shmem_startup(void)
Definition pmetrics.c:154
static dshash_table * get_metrics_table(void)
Definition pmetrics.c:319
static void init_metric_key(MetricKey *key, const char *name, Jsonb *labels_jsonb, MetricType type, int bucket)
Definition pmetrics.c:278
Public C API for pmetrics extension.
int64 pmetrics_increment_counter_by(const char *name_str, Jsonb *labels_jsonb, int64 amount)
bool pmetrics_is_enabled(void)
int64 pmetrics_record_to_histogram(const char *name_str, Jsonb *labels_jsonb, double value)
int64 pmetrics_clear_metrics(void)
dsa_handle pmetrics_get_dsa_handle(void)
int64 pmetrics_delete_metric(const char *name_str, Jsonb *labels_jsonb)
int64 pmetrics_add_to_gauge(const char *name_str, Jsonb *labels_jsonb, int64 amount)
int64 pmetrics_set_gauge(const char *name_str, Jsonb *labels_jsonb, int64 value)
bool pmetrics_is_initialized(void)
int64 pmetrics_increment_counter(const char *name_str, Jsonb *labels_jsonb)
dsa_area * pmetrics_get_dsa(void)
char name[NAMEDATALEN]
Definition pmetrics.c:78
dsa_pointer dsa_ptr
Definition pmetrics.c:81
union MetricKey::@0 labels
MetricType type
Definition pmetrics.c:84
LabelsLocation labels_location
Definition pmetrics.c:79
Jsonb * local_ptr
Definition pmetrics.c:82
int bucket
Definition pmetrics.c:85
int64 value
Definition pmetrics.c:90
MetricKey key
Definition pmetrics.c:89
LWLock * init_lock
Definition pmetrics.c:67
dshash_table_handle metrics_handle
Definition pmetrics.c:66
dsa_handle dsa
Definition pmetrics.c:65