27#include "common/hashfn.h"
30#include "lib/dshash.h"
32#include "storage/ipc.h"
33#include "storage/lwlock.h"
34#include "storage/shmem.h"
35#include "utils/builtins.h"
38#include "utils/hsearch.h"
39#include "utils/jsonb.h"
47#define LWTRANCHE_PMETRICS_DSA 43001
48#define LWTRANCHE_PMETRICS 43002
51#define DEFAULT_ENABLED true
52#define DEFAULT_BUCKET_VARIABILITY 0.1
53#define DEFAULT_BUCKETS_UPPER_BOUND 30000
78 char name[NAMEDATALEN];
120 Jsonb *labels_jsonb,
MetricType type,
int bucket);
122static int64
increment_by(
const char *name_str, Jsonb *labels_jsonb,
125 Jsonb *labels_jsonb);
127 int labels_arg,
char **name_out,
133static void metric_key_copy(
void *dst,
const void *src,
size_t key_size,
139 .entry_size =
sizeof(
Metric),
151 RequestNamedLWLockTranche(
"pmetrics_init", 1);
166 dshash_table *metrics_table;
179 dshash_get_hash_table_handle(metrics_table);
182 &(GetNamedLWLockTranche(
"pmetrics_init")[0].lock);
189 dshash_detach(metrics_table);
192 elog(DEBUG1,
"pmetrics: initialized with DSA handle %lu",
205 if (!process_shared_preload_libraries_in_progress)
208 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
209 errmsg(
"pmetrics must be loaded via shared_preload_libraries")));
211 DefineCustomBoolVariable(
212 "pmetrics.enabled",
"Enable metrics collection",
213 "When disabled, all metric recording functions return NULL immediately",
216 DefineCustomRealVariable(
"pmetrics.bucket_variability",
217 "Bucket variability for histograms",
218 "Controls histogram bucket spacing. Higher values "
219 "create fewer, wider buckets. "
220 "Used to calculate gamma = (1 + variability) / (1 "
221 "- variability). Requires restart.",
223 0.01, 1.0, PGC_POSTMASTER, 0, NULL, NULL, NULL);
225 DefineCustomIntVariable(
226 "pmetrics.buckets_upper_bound",
"Maximum value for histogram buckets",
227 "Values larger than this will be placed in the highest bucket. "
228 "The actual upper bound will be rounded up to the nearest bucket "
229 "boundary. Requires restart.",
231 PGC_POSTMASTER, 0, NULL, NULL, NULL);
239 MarkGUCPrefixReserved(
"pmetrics");
253 elog(ERROR,
"null input not allowed");
255 if (strlen(name) >= NAMEDATALEN)
256 elog(ERROR,
"name too long");
264 int labels_arg,
char **name_out,
269 name_text = PG_GETARG_TEXT_PP(name_arg);
270 *labels_out = PG_GETARG_JSONB_P(labels_arg);
271 *name_out = text_to_cstring(name_text);
279 Jsonb *labels_jsonb,
MetricType type,
int bucket)
281 strlcpy(key->
name, name, NAMEDATALEN);
283 if (labels_jsonb != NULL) {
311 elog(DEBUG1,
"pmetrics: backend %d cleaned up", MyProcPid);
321 MemoryContext oldcontext;
329 elog(ERROR,
"pmetrics shared state not initialized");
332 elog(ERROR,
"pmetrics not properly initialized during startup");
339 oldcontext = MemoryContextSwitchTo(TopMemoryContext);
358 MemoryContextSwitchTo(oldcontext);
360 elog(DEBUG1,
"pmetrics: backend %d attached to tables", MyProcPid);
379 elog(ERROR,
"pmetrics not initialized");
383 entry = (
Metric *)dshash_find_or_insert(table, &metric_key, &found);
388 entry->
value += amount;
389 result = entry->
value;
391 dshash_release_lock(table, entry);
401 char *name_str = NULL;
429 char *name_str = NULL;
438 increment = PG_GETARG_INT32(2);
441 elog(ERROR,
"increment must be greater than 0");
462 char *name_str = NULL;
472 new_value = PG_GETARG_INT64(2);
486 PG_RETURN_INT64(result);
494 char *name_str = NULL;
503 increment = PG_GETARG_INT32(2);
506 elog(ERROR,
"value can't be 0");
527 FuncCallContext *funcctx;
531 if (SRF_IS_FIRSTCALL()) {
532 MemoryContext oldcontext;
535 dshash_seq_status status;
540 funcctx = SRF_FIRSTCALL_INIT();
541 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
543 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
545 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
546 errmsg(
"function returning record called in context "
547 "that cannot accept type record")));
549 funcctx->tuple_desc = BlessTupleDesc(tupdesc);
558 metrics = (
Metric **)palloc(capacity *
sizeof(
Metric *));
561 dshash_seq_init(&status, table,
false);
562 while ((metric = (
Metric *)dshash_seq_next(&status)) != NULL) {
563 Jsonb *labels_copy = NULL;
566 if (count >= capacity) {
569 (
Metric **)repalloc(metrics, capacity *
sizeof(
Metric *));
574 memcpy(metrics[count], metric,
sizeof(
Metric));
579 Jsonb *dsa_labels = (Jsonb *)dsa_get_address(
581 size_t jsonb_size = VARSIZE(dsa_labels);
583 labels_copy = (Jsonb *)palloc(jsonb_size);
584 memcpy(labels_copy, dsa_labels, jsonb_size);
593 dshash_seq_term(&status);
596 funcctx->user_fctx = metrics;
597 funcctx->max_calls = count;
599 MemoryContextSwitchTo(oldcontext);
602 funcctx = SRF_PERCALL_SETUP();
603 metrics = (
Metric **)funcctx->user_fctx;
604 current_idx = funcctx->call_cntr;
606 if (current_idx < funcctx->max_calls) {
607 Metric *metric = metrics[current_idx];
609 bool nulls[5] = {
false,
false,
false,
false,
false};
612 const char *type_str;
617 type_str =
"counter";
623 type_str =
"histogram";
626 type_str =
"histogram_sum";
629 type_str =
"unknown";
633 values[0] = CStringGetTextDatum(metric->
key.
name);
642 values[2] = CStringGetTextDatum(type_str);
643 values[3] = Int32GetDatum(metric->
key.
bucket);
644 values[4] = Int64GetDatum(metric->
value);
645 tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
646 result = HeapTupleGetDatum(tuple);
647 SRF_RETURN_NEXT(funcctx, result);
650 SRF_RETURN_DONE(funcctx);
667 const
char *name_str, Jsonb *labels_jsonb, int64 amount)
672 elog(ERROR,
"increment must be greater than 0");
690 elog(ERROR,
"pmetrics not initialized");
694 entry = (
Metric *)dshash_find_or_insert(table, &metric_key, &found);
696 entry->
value = value;
697 result = entry->
value;
699 dshash_release_lock(table, entry);
710 elog(ERROR,
"value can't be 0");
716 const
char *name_str, Jsonb *labels_jsonb,
double value)
741 char *name_str = NULL;
750 value = PG_GETARG_FLOAT8(2);
763 PG_RETURN_INT64(result);
769 FuncCallContext *funcctx;
773 if (SRF_IS_FIRSTCALL()) {
774 MemoryContext oldcontext;
781 funcctx = SRF_FIRSTCALL_INIT();
782 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
784 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
786 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
787 errmsg(
"function returning record called in context "
788 "that cannot accept type record")));
790 funcctx->tuple_desc = BlessTupleDesc(tupdesc);
793 num_buckets = max_bucket_exp + 1;
795 buckets = (
int *)palloc(num_buckets *
sizeof(
int));
799 buckets[count++] = 0;
800 for (i = 1; i <= max_bucket_exp; i++) {
801 int bucket_value = (int)pow(
gamma_val, i);
802 if (bucket_value != buckets[count - 1])
803 buckets[count++] = bucket_value;
806 funcctx->user_fctx = buckets;
807 funcctx->max_calls = count;
809 MemoryContextSwitchTo(oldcontext);
812 funcctx = SRF_PERCALL_SETUP();
813 buckets = (
int *)funcctx->user_fctx;
814 current_idx = funcctx->call_cntr;
816 if (current_idx < funcctx->max_calls) {
818 bool nulls[1] = {
false};
822 values[0] = Int32GetDatum(buckets[current_idx]);
824 tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
825 result = HeapTupleGetDatum(tuple);
826 SRF_RETURN_NEXT(funcctx, result);
828 SRF_RETURN_DONE(funcctx);
834 dshash_table *metrics_table;
835 dshash_seq_status status;
837 int64 deleted_count = 0;
841 dshash_seq_init(&status, metrics_table,
true);
842 while ((entry = dshash_seq_next(&status)) != NULL) {
846 dshash_delete_current(&status);
849 dshash_seq_term(&status);
851 return deleted_count;
861 PG_RETURN_INT64(deleted_count);
867 dshash_table *metrics_table;
868 dshash_seq_status status;
870 int64 deleted_count = 0;
874 if (metrics_table == NULL)
875 elog(ERROR,
"pmetrics not initialized");
877 dshash_seq_init(&status, metrics_table,
true);
878 while ((entry = dshash_seq_next(&status)) != NULL) {
880 if (strcmp(entry->
key.
name, name_str) != 0)
887 if (labels_jsonb == NULL && entry_labels == NULL) {
889 }
else if (labels_jsonb != NULL && entry_labels != NULL) {
891 if (compareJsonbContainers(&labels_jsonb->root,
892 &entry_labels->root) != 0)
903 dshash_delete_current(&status);
906 dshash_seq_term(&status);
908 return deleted_count;
923 char *name_str = NULL;
942 PG_RETURN_INT64(deleted_count);
953 elog(ERROR,
"pmetrics not initialized");
974 int this_bucket_upper_bound;
979 bucket = (int)fmax(ceil(log(value) /
log_gamma), 0);
981 this_bucket_upper_bound = (int)pow(
gamma_val, bucket);
984 elog(NOTICE,
"Histogram data truncated: value %f to %d", value,
989 return this_bucket_upper_bound;
1021 hash = string_hash(k->
name, NAMEDATALEN);
1022 hash ^= hash_bytes((
const unsigned char *)&k->
type,
sizeof(
MetricType));
1023 hash ^= hash_uint32((uint32)k->
bucket);
1028 hash ^= hash_bytes((
unsigned char *)labels, VARSIZE(labels));
1043 Jsonb *labels1, *labels2;
1053 return (k1->
type < k2->
type) ? -1 : 1;
1063 if (labels1 == NULL && labels2 == NULL)
1065 if (labels1 == NULL)
1067 if (labels2 == NULL)
1084 Size size1 = VARSIZE(labels1);
1085 Size size2 = VARSIZE(labels2);
1088 return (size1 < size2) ? -1 : 1;
1090 return memcmp(labels1, labels2, size1);
1107 memcpy(dest_key, src_key,
sizeof(
MetricKey));
1112 jsonb_size = VARSIZE(src_labels);
1116 elog(ERROR,
"out of dynamic shared memory for metric labels");
1120 memcpy(dest_labels, src_labels, jsonb_size);
Datum add_to_gauge(PG_FUNCTION_ARGS)
Datum list_metrics(PG_FUNCTION_ARGS)
static int64 increment_by(const char *name_str, Jsonb *labels_jsonb, MetricType type, int bucket, int64 amount)
Datum increment_counter_by(PG_FUNCTION_ARGS)
static int metric_compare_dshash(const void *a, const void *b, size_t key_size, void *arg)
static dsa_area * local_dsa
Datum list_histogram_buckets(PG_FUNCTION_ARGS)
Datum set_gauge(PG_FUNCTION_ARGS)
Datum delete_metric(PG_FUNCTION_ARGS)
static PMetricsSharedState * shared_state
static void metric_key_copy(void *dst, const void *src, size_t key_size, void *arg)
static void extract_metric_args(FunctionCallInfo fcinfo, int name_arg, int labels_arg, char **name_out, Jsonb **labels_out)
static int buckets_upper_bound
@ METRIC_TYPE_HISTOGRAM_SUM
static const dshash_parameters metrics_params
PG_FUNCTION_INFO_V1(increment_counter)
Datum record_to_histogram(PG_FUNCTION_ARGS)
static int64 delete_metrics_by_name_labels(const char *name_str, Jsonb *labels_jsonb)
static double bucket_variability
__attribute__((visibility("default")))
static uint32 metric_hash_dshash(const void *key, size_t key_size, void *arg)
#define LWTRANCHE_PMETRICS
static void cleanup_metrics_backend(int code, Datum arg)
#define DEFAULT_BUCKET_VARIABILITY
static shmem_startup_hook_type prev_shmem_startup_hook
static shmem_request_hook_type prev_shmem_request_hook
#define DEFAULT_BUCKETS_UPPER_BOUND
static void validate_inputs(const char *name)
Datum clear_metrics(PG_FUNCTION_ARGS)
static bool pmetrics_enabled
#define LWTRANCHE_PMETRICS_DSA
static void metrics_shmem_request(void)
static int bucket_for(double value)
static Jsonb * get_labels_jsonb(const MetricKey *key, dsa_area *dsa)
Datum increment_counter(PG_FUNCTION_ARGS)
static dshash_table * local_metrics_table
static void metrics_shmem_startup(void)
static dshash_table * get_metrics_table(void)
static void init_metric_key(MetricKey *key, const char *name, Jsonb *labels_jsonb, MetricType type, int bucket)
Public C API for pmetrics extension.
int64 pmetrics_increment_counter_by(const char *name_str, Jsonb *labels_jsonb, int64 amount)
bool pmetrics_is_enabled(void)
int64 pmetrics_record_to_histogram(const char *name_str, Jsonb *labels_jsonb, double value)
int64 pmetrics_clear_metrics(void)
dsa_handle pmetrics_get_dsa_handle(void)
int64 pmetrics_delete_metric(const char *name_str, Jsonb *labels_jsonb)
int64 pmetrics_add_to_gauge(const char *name_str, Jsonb *labels_jsonb, int64 amount)
int64 pmetrics_set_gauge(const char *name_str, Jsonb *labels_jsonb, int64 value)
bool pmetrics_is_initialized(void)
int64 pmetrics_increment_counter(const char *name_str, Jsonb *labels_jsonb)
dsa_area * pmetrics_get_dsa(void)
union MetricKey::@0 labels
LabelsLocation labels_location
dshash_table_handle metrics_handle