diff --git a/contrib/system_load/Makefile b/contrib/system_load/Makefile new file mode 100644 index 0000000..2ba70eb --- /dev/null +++ b/contrib/system_load/Makefile @@ -0,0 +1,16 @@ +# contrib/system_load/Makefile + +MODULE_big = system_load +OBJS = system_load.o cpu.o $(WIN32RES) +PGFILEDESC = "system_load - facilty to consider system load while generating parallel workers" + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = contrib/system_load +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/contrib/system_load/cpu.c b/contrib/system_load/cpu.c new file mode 100644 index 0000000..0652eca --- /dev/null +++ b/contrib/system_load/cpu.c @@ -0,0 +1,431 @@ +/*------------------------------------------------------------------------- + * + * cpu.c + * Get IDLE CPU information. + * + * Copyright (c) 2008-2016, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/system_load/cpu.c + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include +#include +#include + +#ifdef WIN32 +#include +#endif + +#include "utils/builtins.h" + +#include "cpu.h" + +/* cgroup subsystems */ +#define CPU_SUBSYSTEM "cpu" +#define CPUACCT_SUBSYSTEM "cpuacct" + +/* + * get_cpu_idle: + * Gather idle cpu time for a given cpu. + */ +#ifndef WIN32 +static int +get_cpu_idle(void) +{ + int skip_line = 0; + char *cmd = "dstat -c -C total 1 1"; + char cmd_output[MAX_CMD_OUTPUT_SIZ]; + FILE *ptr; + int cpu_idle = 100; + + /* Running dstat command and store its output in cmd_output */ + if ((ptr = popen(cmd, "r")) != NULL) + { + while (fgets(cmd_output, MAX_CMD_SIZ, ptr) != NULL) + { + skip_line++; + if (skip_line > 2) + { + const char s[] = " "; + char *token; + int cpu_usr; + int cpu_sys; + + /* get the first token */ + token = strtok(cmd_output, s); + cpu_usr = atoi(token); + + /* get the second token */ + token = strtok(NULL, s); + cpu_sys = atoi(token); + cpu_idle = 100 - (cpu_sys + cpu_usr); + } + } + + pclose(ptr); + } + + return cpu_idle; +} + +/* + * get_num_of_cpus: + * Get the number of CPUs in the system + */ +static int +get_number_of_cpus(void) +{ + char *cmd = "grep processor /proc/cpuinfo | wc -l"; + FILE *fpipe; + char buf[256]; + int num_of_cpus; + + if ((fpipe = popen(cmd, "r")) != NULL) + { + fgets(buf, sizeof(buf), fpipe); + pclose(fpipe); + num_of_cpus = pg_atoi(buf, sizeof(int32), 0); + return num_of_cpus; + } + else + { + return 1; + } +} + +static long +get_cgroup_cmd_output(char *cmd) +{ + long cmd_result = -1; + char cmd_output[MAX_CMD_OUTPUT_SIZ]; + FILE *fpipe; + + if ((fpipe = popen(cmd, "r")) != NULL) + { + if(fgets(cmd_output, MAX_CMD_SIZ, fpipe)) + { + cmd_result = atol(cmd_output); + } + pclose(fpipe); + } + + return cmd_result; +} + +/* + * Returns the mount point for specified cgroup subsystem. + * Returns NULL if the specified subsystem not already mounted. + */ +static char * +get_mount_point(char *subsystem) +{ + char cmd[MAX_CMD_SIZ]; + char cmd_output[MAX_CMD_OUTPUT_SIZ]; + char *mount_point = NULL; + FILE *fpipe; + + sprintf(cmd, "lssubsys -m %s", subsystem); + + if ((fpipe = popen(cmd, "r")) != NULL) + { + if(fgets(cmd_output, MAX_CMD_SIZ, fpipe)) + { + strtok(cmd_output," "); + mount_point = strtok(NULL, " "); + } + + pclose(fpipe); + } + + if (!mount_point) + return NULL; + + /* Remove trailing newline */ + if (strchr(mount_point, '\n') != NULL) + *strchr(mount_point, '\n') = '\0'; + + return pstrdup(mount_point); +} + +/* + * Returns the cgroup, which the backend process belongs to. + * Returns NULL if backend does not belong to any cgroup. + */ +static char * +get_cgroup_name(int pid, char *sub_system) +{ + char cmd[MAX_CMD_SIZ]; + char cmd_output[MAX_CMD_OUTPUT_SIZ]; + char *token; + char *lastTok = NULL; + + FILE *fpipe; + + sprintf(cmd, "cat /proc/%d/cgroup | grep -w %s", pid, sub_system); + + if ((fpipe = popen(cmd, "r")) != NULL) + { + if(fgets(cmd_output, MAX_CMD_SIZ, fpipe)) + { + token = strtok(cmd_output, ":"); + while(token != NULL) + { + lastTok = token; + token = strtok(NULL, ":"); + } + } + pclose(fpipe); + } + + /* No associated cgroup*/ + if (!lastTok) + return NULL; + + /* Remove leading "/" */ + if (strchr(lastTok, '/') != NULL) + lastTok = lastTok + 1; + + /* Remove trailing newline */ + if (strchr(lastTok, '\n') != NULL) + *strchr(lastTok, '\n') = '\0'; + + /* No associated cgroup*/ + if (strcmp(lastTok, "") == 0) + return NULL; + + return pstrdup(lastTok); +} + +int +get_cgroup_cpu_idle(void) +{ + char *cpu_mount_point; + char *cpuacct_mount_point; + char *cpu_cgroup; + char *cpuacct_cgroup; + + char cmd[MAX_CMD_SIZ]; + + long cpuacct_usage_old = 0; + long cpuacct_usage_new = 0; + long cpuacct_usage_delta_us = 0; + long cpu_cfs_quota_us; + long cpu_cfs_period_us; + int number_of_cpus = 0; + int idle_cpu = 0; + + cpuacct_cgroup = get_cgroup_name(MyProcPid, CPUACCT_SUBSYSTEM); + cpu_cgroup = get_cgroup_name(MyProcPid, CPU_SUBSYSTEM); + elog(DEBUG1,"cpuacct groupname : %s\n", cpuacct_cgroup); + elog(DEBUG1,"cpu cgroup: %s\n", cpu_cgroup); + + /* If control group configured, get the idle cpu using cgroup parameters */ + if (cpuacct_cgroup && cpu_cgroup) + { + cpu_mount_point = get_mount_point(CPU_SUBSYSTEM); + cpuacct_mount_point = get_mount_point(CPUACCT_SUBSYSTEM); + elog(DEBUG1, "cpuacct mpt: %s\n",cpuacct_mount_point); + elog(DEBUG1, "cpu mpt %s\n", cpu_mount_point); + + sprintf(cmd, "cat %s/%s/cpu.cfs_quota_us", cpu_mount_point, cpu_cgroup); + cpu_cfs_quota_us = get_cgroup_cmd_output(cmd); + + if (cpu_cfs_quota_us != -1) + { + sprintf(cmd, "cat %s/%s/cpu.cfs_period_us", cpu_mount_point, cpu_cgroup); + cpu_cfs_period_us = get_cgroup_cmd_output(cmd); + + sprintf(cmd, "cat %s/%s/cpuacct.usage", cpuacct_mount_point, cpuacct_cgroup); + cpuacct_usage_old = get_cgroup_cmd_output(cmd); + + pg_usleep(cpu_cfs_period_us); + + sprintf(cmd, "cat %s/%s/cpuacct.usage", cpuacct_mount_point, cpuacct_cgroup); + cpuacct_usage_new = get_cgroup_cmd_output(cmd); + + cpuacct_usage_delta_us = (cpuacct_usage_new - cpuacct_usage_old)/1000; + idle_cpu = (cpu_cfs_quota_us - cpuacct_usage_delta_us) * 100 / cpu_cfs_period_us; + + pfree(cpu_cgroup); + pfree(cpuacct_cgroup); + pfree(cpuacct_mount_point); + pfree(cpu_mount_point); + + elog(DEBUG1, "BE: get_cgroup_cpu_idle(): Total Idle CPU in cgroup: %d ", idle_cpu); + return idle_cpu; + } + } + + /* + * TODO: Calculate the system load which will be platform dependent for + * linux/solaris and Windows. + */ + number_of_cpus = get_number_of_cpus(); + idle_cpu = get_cpu_idle() * number_of_cpus; + + elog(DEBUG1, "BE: get_cgroup_cpu_idle(): Number of CPU's: %d, Total Idle CPU: %d ", + number_of_cpus, idle_cpu); + return idle_cpu; +} +#endif /*end #ifndef WIN32*/ + +#ifdef WIN32 +typedef BOOL(WINAPI *LPFN_GLPI)( + PSYSTEM_LOGICAL_PROCESSOR_INFORMATION, + PDWORD); +/* Helper function to count set bits in the processor mask. */ +unsigned long CountSetBits(ULONG_PTR bitMask) +{ + DWORD LSHIFT = sizeof(ULONG_PTR) * 8 - 1; + DWORD bitSetCount = 0; + ULONG_PTR bitTest = (ULONG_PTR)1 << LSHIFT; + DWORD i; + + for (i = 0; i <= LSHIFT; ++i) + { + bitSetCount += ((bitMask & bitTest) ? 1 : 0); + bitTest /= 2; + } + + return bitSetCount; +} + +static float CalculateCPULoad(unsigned long long idleTicks, unsigned long long totalTicks) +{ + static unsigned long long _previousTotalTicks = 0; + static unsigned long long _previousIdleTicks = 0; + + unsigned long long totalTicksSinceLastTime = totalTicks - _previousTotalTicks; + unsigned long long idleTicksSinceLastTime = idleTicks - _previousIdleTicks; + + float ret = 1.0f - ((totalTicksSinceLastTime > 0) ? ((float)idleTicksSinceLastTime) / totalTicksSinceLastTime : 0); + + _previousTotalTicks = totalTicks; + _previousIdleTicks = idleTicks; + return ret; +} + +static unsigned long long FileTimeToInt64(const FILETIME *ft) +{ + return (((unsigned long long)(ft->dwHighDateTime)) << 32) | ((unsigned long long)ft->dwLowDateTime); +} + +/*** + * Returns 1.0f for "CPU fully pinned", 0.0f for "CPU idle", or somewhere in between + * You'll need to call this at regular intervals, since it measures the load between + * the previous call and the current one. Returns 0 on error. + */ +float GetCPULoad() +{ + FILETIME idleTime, kernelTime, userTime; + float ret = 0.0f; + + if (GetSystemTimes(&idleTime, &kernelTime, &userTime)) + ret = CalculateCPULoad(FileTimeToInt64(&idleTime), FileTimeToInt64(&kernelTime) + FileTimeToInt64(&userTime)); + + return ret; +} + +/* + * Get total number of cpus. + * Return 1 if failed to get it. + */ +int GetNumberOfCPUs() +{ + LPFN_GLPI glpi; + BOOL done = FALSE; + PSYSTEM_LOGICAL_PROCESSOR_INFORMATION buffer = NULL; + PSYSTEM_LOGICAL_PROCESSOR_INFORMATION ptr = NULL; + DWORD returnLength = 0; + DWORD logicalProcessorCount = 0; + DWORD byteOffset = 0; + + glpi = (LPFN_GLPI)GetProcAddress( + GetModuleHandle(TEXT("kernel32")), + "GetLogicalProcessorInformation"); + if (NULL == glpi) + { + elog(DEBUG1,"GetLogicalProcessorInformation is not supported.\n"); + return (1); + } + + while (!done) + { + DWORD rc = glpi(buffer, &returnLength); + + if (FALSE == rc) + { + if (GetLastError() == ERROR_INSUFFICIENT_BUFFER) + { + if (buffer) + free(buffer); + + buffer = (PSYSTEM_LOGICAL_PROCESSOR_INFORMATION)malloc( + returnLength); + + if (NULL == buffer) + { + elog(DEBUG1,"Error: Allocation failure\n"); + return (1); + } + } + else + { + elog(DEBUG1,"Error %d\n", GetLastError()); + return (1); + } + } + else + { + done = TRUE; + } + } + + ptr = buffer; + + while (byteOffset + sizeof(SYSTEM_LOGICAL_PROCESSOR_INFORMATION) <= returnLength) + { + switch (ptr->Relationship) + { + case RelationProcessorCore: + // A hyperthreaded core supplies more than one logical processor. + logicalProcessorCount += CountSetBits(ptr->ProcessorMask); + break; + + default: + break; + } + byteOffset += sizeof(SYSTEM_LOGICAL_PROCESSOR_INFORMATION); + ptr++; + } + + + free(buffer); + + return logicalProcessorCount; +} + +int +get_cgroup_cpu_idle(void) +{ + int num = 0; + float cpuload = 0; + int totalIdleCPU = 0; + + GetCPULoad(); + /* 200 millisecond sleep delay */ + Sleep(200); + cpuload = GetCPULoad() * 100; + + num = GetNumberOfCPUs(); + + totalIdleCPU = (100 - (int)cpuload) * num; + + return totalIdleCPU; +} + +#endif diff --git a/contrib/system_load/cpu.h b/contrib/system_load/cpu.h new file mode 100644 index 0000000..b89bed8 --- /dev/null +++ b/contrib/system_load/cpu.h @@ -0,0 +1,20 @@ +/*------------------------------------------------------------------------- + * + * cpu.h + * Declare for getting IDLE CPU information functions + * + * contrib/system_load/cpu.h + * + *------------------------------------------------------------------------- + */ +#ifndef CPU_H +#define CPU_H + +#include "miscadmin.h" + +#define MAX_CMD_SIZ 100 +#define MAX_CMD_OUTPUT_SIZ 256 + +extern int get_cgroup_cpu_idle(void); + +#endif /* CPU_H */ diff --git a/contrib/system_load/system_load.c b/contrib/system_load/system_load.c new file mode 100644 index 0000000..78683e5 --- /dev/null +++ b/contrib/system_load/system_load.c @@ -0,0 +1,100 @@ +/*------------------------------------------------------------------------- + * + * system_load.c + * + * Modules to find out the current system load. This can be used to + * identify number of parallel workers that can be started. + * + * Copyright (c) 2008-2016, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/system_load/system_load.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/parallel.h" +#include "utils/guc.h" + +#include "cpu.h" + + +PG_MODULE_MAGIC; + +/* GUC variable */ +static bool enable_system_load = false; + +/* Saved hook values in case of unload */ +static number_of_parallel_workers_hook_type prev_number_of_parallel_workers_hook = NULL; + +void _PG_init(void); +void _PG_fini(void); + +static int system_load_parallel_workers(int nworkers); + +/* + * Module load callback + */ +void +_PG_init(void) +{ + /* + * we have to be loaded via shared_preload_libraries. If not, fall out + * without hooking into any of the main system. + */ + if (!process_shared_preload_libraries_in_progress) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("system load extension must be loaded via shared_preload_libraries"))); + + /* Define custom GUC variables. */ + DefineCustomBoolVariable("system_load.enable", + "Use system load into consideration while generating parallel workers.", + NULL, + &enable_system_load, + false, + PGC_SUSET, + 0, + NULL, + NULL, + NULL); + + /* Install hooks. */ + prev_number_of_parallel_workers_hook = number_of_parallel_workers_hook; + number_of_parallel_workers_hook = system_load_parallel_workers; +} + +/* + * Module unload callback + */ +void +_PG_fini(void) +{ + /* Uninstall hooks. */ + number_of_parallel_workers_hook = prev_number_of_parallel_workers_hook; +} + +/* + * Function to return the number of parallel workers that are possible + * under current system load. + */ +static int system_load_parallel_workers(int nworkers) +{ + if (enable_system_load) + { + int idle_cpu; + + idle_cpu = get_cgroup_cpu_idle(); + return (idle_cpu / 100); + } + else + { + /* + * In case system load calculation is disabled, + * return the planned number of workers. + */ + return nworkers; + } +}