/*
 * $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.48 2005/11/23 12:19:12 ishii Exp $
 *
 * pgbench: a simple benchmark program for PostgreSQL
 * written by Tatsuo Ishii
 *
 * Copyright (c) 2000-2005	Tatsuo Ishii
 *
 * Permission to use, copy, modify, and distribute this software and
 * its documentation for any purpose and without fee is hereby
 * granted, provided that the above copyright notice appear in all
 * copies and that both that copyright notice and this permission
 * notice appear in supporting documentation, and that the name of the
 * author not be used in advertising or publicity pertaining to
 * distribution of the software without specific, written prior
 * permission. The author makes no representations about the
 * suitability of this software for any purpose.  It is provided "as
 * is" without express or implied warranty.
 */
#include "postgres_fe.h"

#include "libpq-fe.h"

#include <ctype.h>

#ifdef WIN32
#include "win32.h"
#else
#include <sys/time.h>
#include <unistd.h>
#include <sys/wait.h>

#ifdef HAVE_GETOPT_H
#include <getopt.h>
#endif

#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif

/* for getrlimit */
#include <sys/resource.h>
#endif   /* ! WIN32 */

extern char *optarg;
extern int	optind;

#ifdef WIN32
#undef select
#endif


/********************************************************************
 * some configurable parameters */

int			nclients = 1;		/* default number of clients */
int			nxacts = 10;		/* default number of transactions per client */

/*
 * scaling factor. for example, tps = 10 will make 1000000 tuples of
 * accounts table.
 */
int			tps = 1;

/*
 * end of configurable parameters
 *********************************************************************/

#define nbranches	1
#define ntellers	10
#define naccounts	100000

FILE	   *LOGFILE = NULL;

bool		use_log;			/* log transaction latencies to a file */

int			is_connect;			/* establish connection for each transaction */

char	   *pghost = "";
char	   *pgport = NULL;
char	   *pgoptions = NULL;
char	   *pgtty = NULL;
char	   *login = NULL;
char	   *pwd = NULL;
char	   *dbName;

/* variable definitions */
typedef struct
{
	char	   *name;			/* variable name */
	char	   *value;			/* its value */
}	Variable;

typedef struct
{
	Variable   *variables;		/* array of variable definitions */
	int			nvariables;
}	VariableSet;

/*
 * Queries read from files.  A query script is an array of pointers to Command
 * structs, terminated by a null pointer.  Note that MAX_ARGS only constrains
 * the number of words in meta-commands, because we don't tokenize SQL
 * commands: argv[0] is the whole SQL command.
 */
#define SQL_COMMAND		1
#define META_COMMAND	2
#define MAX_ARGS		10

typedef struct
{
	int			type;			/* type (SQL_COMMAND or META_COMMAND) */
	int			argc;			/* number of words in meta-command */
	char	   *argv[MAX_ARGS]; /* word list */
}	Command;

#define MAX_FILES		128		/* max number of SQL script files allowed */

Command   **sql_files[MAX_FILES];		/* SQL scripts */
int			num_files;			/* number of 'em */

/* default scenario */
static char *tpc_b = {
	"\\setrandom aid 1 100000\n"
	"\\setrandom bid 1 1\n"
	"\\setrandom tid 1 10\n"
	"\\setrandom delta 1 10000\n"
	"BEGIN;\n"
	"UPDATE accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
	"SELECT abalance FROM accounts WHERE aid = :aid;\n"
	"UPDATE tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
	"UPDATE branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
	"INSERT INTO history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
	"END;\n"
};

/* -N case */
static char *simple_update = {
	"\\setrandom aid 1 100000\n"
	"\\setrandom bid 1 1\n"
	"\\setrandom tid 1 10\n"
	"\\setrandom delta 1 10000\n"
	"BEGIN;\n"
	"UPDATE accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
	"SELECT abalance FROM accounts WHERE aid = :aid;\n"
	"INSERT INTO history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
	"END;\n"
};

/* -S case */
static char *select_only = {
	"\\setrandom aid 1 100000\n"
	"SELECT abalance FROM accounts WHERE aid = :aid;\n"
};

static void
usage(void)
{
	fprintf(stderr, "usage: pgbench [-h hostname][-p port][-c nclients][-t ntransactions][-s scaling_factor][-n][-C][-v][-S][-N][-f filename][-l][-U login][-P password][-d][dbname]\n");
	fprintf(stderr, "(initialize mode): pgbench -i [-h hostname][-p port][-s scaling_factor][-U login][-P password][-d][dbname]\n");
}

/* random number generator */
static int
getrand(int min, int max)
{
	return min + (int) (((max - min) * (double) random()) / MAX_RANDOM_VALUE + 0.5);
}

/* set up a connection to the backend */
static PGconn *
doConnect(void)
{
	PGconn	   *con;
	PGresult   *res;

	con = PQsetdbLogin(pghost, pgport, pgoptions, pgtty, dbName,
					   login, pwd);
	if (con == NULL)
	{
		fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
		fprintf(stderr, "Memory allocation problem?\n");
		return (NULL);
	}

	if (PQstatus(con) == CONNECTION_BAD)
	{
		fprintf(stderr, "Connection to database '%s' failed.\n", dbName);

		if (PQerrorMessage(con))
			fprintf(stderr, "%s", PQerrorMessage(con));
		else
			fprintf(stderr, "No explanation from the backend\n");
		PQfinish(con);
		return (NULL);
	}

	res = PQexec(con, "SET search_path = public");
	if (PQresultStatus(res) != PGRES_COMMAND_OK)
	{
		fprintf(stderr, "%s", PQerrorMessage(con));
		exit(1);
	}
	PQclear(res);

	return (con);
}

/* check to see if the SQL result was good */
static int
check(PGconn *conn, PGresult *res, int id, const char *sql, int good)
{
	if (PQresultStatus(res) != good)
	{
		fprintf(stderr, "Client %d failed: %sSQL command was: %s\n",
				id, PQerrorMessage(conn), sql);
		PQfinish(conn);
		return (-1);
	}
	return (0);					/* OK */
}

static int
compareVariables(const void *v1, const void *v2)
{
	return strcmp(((const Variable *) v1)->name,
				  ((const Variable *) v2)->name);
}

static char *
getVariable(VariableSet * st, char *name)
{
	Variable	key,
			   *var;

	/* On some versions of Solaris, bsearch of zero items dumps core */
	if (st->nvariables <= 0)
		return NULL;

	key.name = name;
	var = (Variable *) bsearch((void *) &key,
							   (void *) st->variables,
							   st->nvariables,
							   sizeof(Variable),
							   compareVariables);
	if (var != NULL)
		return var->value;
	else
		return NULL;
}

static int
putVariable(VariableSet * st, char *name, char *value)
{
	Variable	key,
			   *var;

	key.name = name;
	/* On some versions of Solaris, bsearch of zero items dumps core */
	if (st->nvariables > 0)
		var = (Variable *) bsearch((void *) &key,
								   (void *) st->variables,
								   st->nvariables,
								   sizeof(Variable),
								   compareVariables);
	else
		var = NULL;

	if (var == NULL)
	{
		Variable   *newvars;

		if (st->variables)
			newvars = (Variable *) realloc(st->variables,
									(st->nvariables + 1) * sizeof(Variable));
		else
			newvars = (Variable *) malloc(sizeof(Variable));

		if (newvars == NULL)
			return false;

		st->variables = newvars;

		var = &newvars[st->nvariables];

		var->name = NULL;
		var->value = NULL;

		if ((var->name = strdup(name)) == NULL
			|| (var->value = strdup(value)) == NULL)
		{
			free(var->name);
			free(var->value);
			return false;
		}

		st->nvariables++;

		qsort((void *) st->variables, st->nvariables, sizeof(Variable),
			  compareVariables);
	}
	else
	{
		if ((value = strdup(value)) == NULL)
			return false;
		free(var->value);
		var->value = value;
	}

	return true;
}

static char *
assignVariables(VariableSet * st, char *sql)
{
	int			i,
				j;
	char	   *p,
			   *name,
			   *val;
	void	   *tmp;

	i = 0;
	while ((p = strchr(&sql[i], ':')) != NULL)
	{
		i = j = p - sql;
		do
		{
			i++;
		} while (isalnum((unsigned char) sql[i]) || sql[i] == '_');
		if (i == j + 1)
			continue;

		name = malloc(i - j);
		if (name == NULL)
			return NULL;
		memcpy(name, &sql[j + 1], i - (j + 1));
		name[i - (j + 1)] = '\0';
		val = getVariable(st, name);
		free(name);
		if (val == NULL)
			continue;

		if (strlen(val) > i - j)
		{
			tmp = realloc(sql, strlen(sql) - (i - j) + strlen(val) + 1);
			if (tmp == NULL)
			{
				free(sql);
				return NULL;
			}
			sql = tmp;
		}

		if (strlen(val) != i - j)
			memmove(&sql[j + strlen(val)], &sql[i], strlen(&sql[i]) + 1);

		strncpy(&sql[j], val, strlen(val));

		if (strlen(val) < i - j)
		{
			tmp = realloc(sql, strlen(sql) + 1);
			if (tmp == NULL)
			{
				free(sql);
				return NULL;
			}
			sql = tmp;
		}

		i = j + strlen(val);
	}

	return sql;
}

/*
 * Run a single client process.  Result is 0 if OK, 1 if error
 */
static int
doClient(int id, int debug)
{
	PGconn	   *con = NULL;		/* connection handle to DB */
	VariableSet	variables;
	int			cnt = 0;		/* xacts count */
	int			state;			/* state No. */
	int			use_file;		/* index in sql_files for this client */
	Command   **commands;
	struct timeval txn_begin;	/* used for measuring latencies */
	PGresult   *res;

	variables.variables = NULL;
	variables.nvariables = 0;

	/* Establish per-client connection, unless in per-xact mode */
	if (is_connect == 0)
	{
		if ((con = doConnect()) == NULL)
		{
			fprintf(stderr, "Client %d aborted in establishing connection.\n",
					id);
			return 1;
		}
	}

	/* Initialize first transaction */
	use_file = getrand(0, num_files - 1);
	commands = sql_files[use_file];
	state = 0;

	for (;;)
	{
		/* Establish per-transaction connection if needed */
		if (con == NULL)
		{
			if ((con = doConnect()) == NULL)
			{
				fprintf(stderr, "Client %d aborted in establishing connection.\n",
						id);
				return 1;
			}
		}

		/* Start per-xact timer */
		if (use_log && state == 0)
			gettimeofday(&txn_begin, NULL);

		/* Process current script command */
		if (commands[state]->type == SQL_COMMAND)
		{
			char	   *sql;

			if ((sql = strdup(commands[state]->argv[0])) == NULL
				|| (sql = assignVariables(&variables, sql)) == NULL)
			{
				fprintf(stderr, "Client %d out of memory\n", id);
				return 1;
			}

			if (debug)
				fprintf(stderr, "client %d sending %s\n", id, sql);

			res = PQexec(con, sql);
			if (strncasecmp(sql, "select", 6) != 0)
			{
				if (check(con, res, id, sql, PGRES_COMMAND_OK))
					return 1;
			}
			else
			{
				if (check(con, res, id, sql, PGRES_TUPLES_OK))
					return 1;
			}
			PQclear(res);
			free(sql);
		}
		else if (commands[state]->type == META_COMMAND)
		{
			int			argc = commands[state]->argc,
				i;
			char	  **argv = commands[state]->argv;

			if (debug)
			{
				fprintf(stderr, "client %d executing \\%s", id, argv[0]);
				for (i = 1; i < argc; i++)
					fprintf(stderr, " %s", argv[i]);
				fprintf(stderr, "\n");
			}

			if (strcasecmp(argv[0], "setrandom") == 0)
			{
				char	val[64];

				snprintf(val, sizeof(val), "%d",
						 getrand(atoi(argv[2]), atoi(argv[3])));

				if (putVariable(&variables, argv[1], val) == false)
				{
					fprintf(stderr, "Client %d out of memory\n", id);
					return 1;
				}
			}
		}

		/* Increment state counter */
		state++;

		/* End of transaction? */
		if (commands[state] == NULL)
		{
			/*
			 * transaction finished: record the time it took in the log
			 */
			if (use_log)
			{
				double		diff;
				struct timeval now;

				gettimeofday(&now, NULL);
				diff = (int) (now.tv_sec - txn_begin.tv_sec) * 1000000.0 +
					(int) (now.tv_usec - txn_begin.tv_usec);

				fprintf(LOGFILE, "%d %d %.0f\n", id, cnt, diff);
				fflush(LOGFILE);
			}

			/* drop connection if per-xact connections */
			if (is_connect && con)
			{
				PQfinish(con);
				con = NULL;
			}

			if (++cnt >= nxacts)
				break;

			/* initialize for a new xact */
			use_file = getrand(0, num_files - 1);
			commands = sql_files[use_file];
			state = 0;
		}
	}

	/* Done with this client */
	if (con)
		PQfinish(con);
	return 0;
}

/* create tables and setup data */
static void
init(void)
{
	PGconn	   *con;
	PGresult   *res;
	static char *DDLs[] = {
		"drop table branches",
		"create table branches(bid int not null,bbalance int,filler char(88))",
		"drop table tellers",
		"create table tellers(tid int not null,bid int,tbalance int,filler char(84))",
		"drop table accounts",
		"create table accounts(aid int not null,bid int,abalance int,filler char(84))",
		"drop table history",
	"create table history(tid int,bid int,aid int,delta int,mtime timestamp,filler char(22))"};
	static char *DDLAFTERs[] = {
		"alter table branches add primary key (bid)",
		"alter table tellers add primary key (tid)",
	"alter table accounts add primary key (aid)"};


	char		sql[256];

	int			i;

	if ((con = doConnect()) == NULL)
		exit(1);

	for (i = 0; i < (sizeof(DDLs) / sizeof(char *)); i++)
	{
		res = PQexec(con, DDLs[i]);
		if (strncmp(DDLs[i], "drop", 4) && PQresultStatus(res) != PGRES_COMMAND_OK)
		{
			fprintf(stderr, "%s", PQerrorMessage(con));
			exit(1);
		}
		PQclear(res);
	}

	res = PQexec(con, "begin");
	if (PQresultStatus(res) != PGRES_COMMAND_OK)
	{
		fprintf(stderr, "%s", PQerrorMessage(con));
		exit(1);
	}
	PQclear(res);

	for (i = 0; i < nbranches * tps; i++)
	{
		snprintf(sql, 256, "insert into branches(bid,bbalance) values(%d,0)", i + 1);
		res = PQexec(con, sql);
		if (PQresultStatus(res) != PGRES_COMMAND_OK)
		{
			fprintf(stderr, "%s", PQerrorMessage(con));
			exit(1);
		}
		PQclear(res);
	}

	for (i = 0; i < ntellers * tps; i++)
	{
		snprintf(sql, 256, "insert into tellers(tid,bid,tbalance) values (%d,%d,0)"
				 ,i + 1, i / ntellers + 1);
		res = PQexec(con, sql);
		if (PQresultStatus(res) != PGRES_COMMAND_OK)
		{
			fprintf(stderr, "%s", PQerrorMessage(con));
			exit(1);
		}
		PQclear(res);
	}

	res = PQexec(con, "end");
	if (PQresultStatus(res) != PGRES_COMMAND_OK)
	{
		fprintf(stderr, "%s", PQerrorMessage(con));
		exit(1);
	}
	PQclear(res);

	/*
	 * occupy accounts table with some data
	 */
	fprintf(stderr, "creating tables...\n");
	for (i = 0; i < naccounts * tps; i++)
	{
		int			j = i + 1;

		if (j % 10000 == 1)
		{
			res = PQexec(con, "copy accounts from stdin");
			if (PQresultStatus(res) != PGRES_COPY_IN)
			{
				fprintf(stderr, "%s", PQerrorMessage(con));
				exit(1);
			}
			PQclear(res);
		}

		snprintf(sql, 256, "%d\t%d\t%d\t\n", j, i / naccounts + 1, 0);
		if (PQputline(con, sql))
		{
			fprintf(stderr, "PQputline failed\n");
			exit(1);
		}

		if (j % 10000 == 0)
		{
			/*
			 * every 10000 tuples, we commit the copy command. this should
			 * avoid generating too much WAL logs
			 */
			fprintf(stderr, "%d tuples done.\n", j);
			if (PQputline(con, "\\.\n"))
			{
				fprintf(stderr, "very last PQputline failed\n");
				exit(1);
			}

			if (PQendcopy(con))
			{
				fprintf(stderr, "PQendcopy failed\n");
				exit(1);
			}

#ifdef NOT_USED

			/*
			 * do a checkpoint to purge the old WAL logs
			 */
			res = PQexec(con, "checkpoint");
			if (PQresultStatus(res) != PGRES_COMMAND_OK)
			{
				fprintf(stderr, "%s", PQerrorMessage(con));
				exit(1);
			}
			PQclear(res);
#endif   /* NOT_USED */
		}
	}
	fprintf(stderr, "set primary key...\n");
	for (i = 0; i < (sizeof(DDLAFTERs) / sizeof(char *)); i++)
	{
		res = PQexec(con, DDLAFTERs[i]);
		if (PQresultStatus(res) != PGRES_COMMAND_OK)
		{
			fprintf(stderr, "%s", PQerrorMessage(con));
			exit(1);
		}
		PQclear(res);
	}

	/* vacuum */
	fprintf(stderr, "vacuum...");
	res = PQexec(con, "vacuum analyze");
	if (PQresultStatus(res) != PGRES_COMMAND_OK)
	{
		fprintf(stderr, "%s", PQerrorMessage(con));
		exit(1);
	}
	PQclear(res);
	fprintf(stderr, "done.\n");

	PQfinish(con);
}

static Command *
process_commands(char *buf)
{
	const char	delim[] = " \f\n\r\t\v";

	Command    *my_commands;
	int			j;
	char	   *p,
			   *tok;

	if ((p = strchr(buf, '\n')) != NULL)
		*p = '\0';

	p = buf;
	while (isspace((unsigned char) *p))
		p++;

	if (*p == '\0' || strncmp(p, "--", 2) == 0)
	{
		return NULL;
	}

	my_commands = (Command *) malloc(sizeof(Command));
	if (my_commands == NULL)
	{
		return NULL;
	}

	my_commands->argc = 0;

	if (*p == '\\')
	{
		my_commands->type = META_COMMAND;

		j = 0;
		tok = strtok(++p, delim);

		while (tok != NULL)
		{
			if ((my_commands->argv[j] = strdup(tok)) == NULL)
				return NULL;

			my_commands->argc++;

			j++;
			tok = strtok(NULL, delim);
		}

		if (strcasecmp(my_commands->argv[0], "setrandom") == 0)
		{
			int			min,
						max;

			if (my_commands->argc < 4)
			{
				fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
				return NULL;
			}

			for (j = 4; j < my_commands->argc; j++)
				fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
						my_commands->argv[0], my_commands->argv[j]);

			if ((min = atoi(my_commands->argv[2])) < 0)
			{
				fprintf(stderr, "%s: invalid minimum number %s\n",
						my_commands->argv[0], my_commands->argv[2]);
				return NULL;
			}

			if ((max = atoi(my_commands->argv[3])) < min || max > MAX_RANDOM_VALUE)
			{
				fprintf(stderr, "%s: invalid maximum number %s\n",
						my_commands->argv[0], my_commands->argv[3]);
				return NULL;
			}
		}
		else
		{
			fprintf(stderr, "invalid command %s\n", my_commands->argv[0]);
			return NULL;
		}
	}
	else
	{
		my_commands->type = SQL_COMMAND;

		if ((my_commands->argv[0] = strdup(p)) == NULL)
			return NULL;

		my_commands->argc++;
	}

	return my_commands;
}

static int
process_file(char *filename)
{
#define COMMANDS_ALLOC_NUM 128

	Command   **my_commands;
	FILE	   *fd;
	int			lineno;
	char		buf[BUFSIZ];
	int			alloc_num;

	if (num_files >= MAX_FILES)
	{
		fprintf(stderr, "Up to only %d SQL files are allowed\n", MAX_FILES);
		exit(1);
	}

	alloc_num = COMMANDS_ALLOC_NUM;
	my_commands = (Command **) malloc(sizeof(Command *) * alloc_num);
	if (my_commands == NULL)
		return false;

	if (strcmp(filename, "-") == 0)
		fd = stdin;
	else if ((fd = fopen(filename, "r")) == NULL)
	{
		fprintf(stderr, "%s: %s\n", filename, strerror(errno));
		return false;
	}

	lineno = 0;

	while (fgets(buf, sizeof(buf), fd) != NULL)
	{
		Command    *commands;


		if (strncmp(buf, "\n", 1) != 0) {
			commands = process_commands(buf);
			if (commands == NULL)
			{
				fclose(fd);
				return false;
			}
		} else {
			lineno++;
			continue;
		}

		my_commands[lineno] = commands;
		lineno++;

		if (lineno >= alloc_num)
		{
			alloc_num += COMMANDS_ALLOC_NUM;
			my_commands = realloc(my_commands, sizeof(Command *) * alloc_num);
			if (my_commands == NULL)
			{
				fclose(fd);
				return false;
			}
		}
	}
	fclose(fd);

	my_commands[lineno] = NULL;

	sql_files[num_files++] = my_commands;

	return true;
}

static Command **
process_builtin(char *tb)
{
#define COMMANDS_ALLOC_NUM 128

	Command   **my_commands;
	int			lineno;
	char		buf[BUFSIZ];
	int			alloc_num;

	if (*tb == '\0')
		return NULL;

	alloc_num = COMMANDS_ALLOC_NUM;
	my_commands = (Command **) malloc(sizeof(Command *) * alloc_num);
	if (my_commands == NULL)
		return NULL;

	lineno = 0;

	for (;;)
	{
		char	   *p;
		Command    *commands;

		p = buf;
		while (*tb && *tb != '\n')
			*p++ = *tb++;

		if (*tb == '\0')
			break;

		if (*tb == '\n')
			tb++;

		*p = '\0';

		commands = process_commands(buf);
		if (commands == NULL)
		{
			return NULL;
		}

		my_commands[lineno] = commands;
		lineno++;

		if (lineno >= alloc_num)
		{
			alloc_num += COMMANDS_ALLOC_NUM;
			my_commands = realloc(my_commands, sizeof(Command *) * alloc_num);
			if (my_commands == NULL)
			{
				return NULL;
			}
		}
	}

	my_commands[lineno] = NULL;

	return my_commands;
}

/* print out results */
static void
printResults(int ttype, int nfailed,
			 struct timeval * tv1,
			 struct timeval * tv2)
{
	double		t1;
	int			normal_xacts = nxacts * nclients;
	char	   *s;

	t1 = (tv2->tv_sec - tv1->tv_sec) * 1000000.0 + (tv2->tv_usec - tv1->tv_usec);
	t1 = normal_xacts * 1000000.0 / t1;

	if (ttype == 0)
		s = "TPC-B (sort of)";
	else if (ttype == 2)
		s = "Update only accounts";
	else if (ttype == 1)
		s = "SELECT only";
	else
		s = "Custom query";

	printf("transaction type: %s\n", s);
	printf("scaling factor: %d\n", tps);
	printf("number of clients: %d\n", nclients);
	printf("number of transactions per client: %d\n", nxacts);
	if (nfailed == 0)
		printf("number of transactions actually processed: %d\n",
			   normal_xacts);
	else
		printf("number of failed clients: %d\n",
			   nfailed);
	printf("tps = %f (including connections establishing)\n", t1);
}


int
main(int argc, char **argv)
{
	int			c;
	int			is_init_mode = 0;		/* initialize mode? */
	int			is_no_vacuum = 0;		/* no vacuum at all before testing? */
	int			is_full_vacuum = 0;		/* do full vacuum before testing? */
	int			debug = 0;		/* debug flag */
	int			ttype = 0;		/* transaction type. 0: TPC-B, 1: SELECT only,
								 * 2: skip update of branches and tellers,
								 * 3: custom */
	char	   *filename = NULL;

	int			remains;		/* number of remaining clients */
	int			nfailed;		/* number of failed clients */

	struct timeval tv1;			/* start up time */
	struct timeval tv2;			/* end time */

#if !(defined(__CYGWIN__) || defined(__MINGW32__))
	struct rlimit rlim;
#endif

	PGconn	   *con;
	PGresult   *res;
	char	   *env;

	if ((env = getenv("PGHOST")) != NULL && *env != '\0')
		pghost = env;
	if ((env = getenv("PGPORT")) != NULL && *env != '\0')
		pgport = env;
	else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
		login = env;

	while ((c = getopt(argc, argv, "ih:nvp:dc:t:s:U:P:CNSlf:")) != -1)
	{
		switch (c)
		{
			case 'i':
				is_init_mode++;
				break;
			case 'h':
				pghost = optarg;
				break;
			case 'n':
				is_no_vacuum++;
				break;
			case 'v':
				is_full_vacuum++;
				break;
			case 'p':
				pgport = optarg;
				break;
			case 'd':
				debug++;
				break;
			case 'S':
				ttype = 1;
				break;
			case 'N':
				ttype = 2;
				break;
			case 'c':
				nclients = atoi(optarg);
				if (nclients <= 0)
				{
					fprintf(stderr, "invalid number of clients: %d\n",
							nclients);
					exit(1);
				}
#if !(defined(__CYGWIN__) || defined(__MINGW32__))
#ifdef RLIMIT_NOFILE			/* most platform uses RLIMIT_NOFILE */
				if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
#else							/* but BSD doesn't ... */
				if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
#endif   /* RLIMIT_NOFILE */
				{
					fprintf(stderr, "getrlimit failed: %s\n", strerror(errno));
					exit(1);
				}
				if (rlim.rlim_cur <= (nclients + 2))
				{
					fprintf(stderr, "You need at least %d open files resource but you are only allowed to use %ld.\n", nclients + 2, (long) rlim.rlim_cur);
					fprintf(stderr, "Use limit/ulimt to increase the limit before using pgbench.\n");
					exit(1);
				}
#endif
				break;
			case 'C':
				is_connect = 1;
				break;
			case 's':
				tps = atoi(optarg);
				if (tps <= 0)
				{
					fprintf(stderr, "invalid scaling factor: %d\n", tps);
					exit(1);
				}
				break;
			case 't':
				nxacts = atoi(optarg);
				if (nxacts <= 0)
				{
					fprintf(stderr, "invalid number of transactions: %d\n", nxacts);
					exit(1);
				}
				break;
			case 'U':
				login = optarg;
				break;
			case 'P':
				pwd = optarg;
				break;
			case 'l':
				use_log = true;
				break;
			case 'f':
				ttype = 3;
				filename = optarg;
				if (process_file(filename) == false)
					exit(1);
				break;
			default:
				usage();
				exit(1);
				break;
		}
	}

	if (argc > optind)
		dbName = argv[optind];
	else
	{
		if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
			dbName = env;
		else if (login != NULL && *login != '\0')
			dbName = login;
		else
			dbName = "";
	}

	if (is_init_mode)
	{
		init();
		exit(0);
	}

	/* set random seed */
	gettimeofday(&tv1, NULL);
	srand((unsigned int) tv1.tv_usec);

	if (use_log)
	{
		char		logpath[64];

		snprintf(logpath, 64, "pgbench_log.%d", getpid());
		LOGFILE = fopen(logpath, "w");

		if (LOGFILE == NULL)
		{
			fprintf(stderr, "Couldn't open logfile \"%s\": %s",
					logpath, strerror(errno));
			exit(1);
		}
	}

	if (debug)
	{
		printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
			   pghost, pgport, nclients, nxacts, dbName);
	}

	/* opening connection... */
	con = doConnect();
	if (con == NULL)
		exit(1);

	if (ttype != 3)
	{
		/*
		 * get the scaling factor that should be same as count(*) from
		 * branches if this is not a custom query
		 */
		res = PQexec(con, "select count(*) from branches");
		if (PQresultStatus(res) != PGRES_TUPLES_OK)
		{
			fprintf(stderr, "%s", PQerrorMessage(con));
			exit(1);
		}
		tps = atoi(PQgetvalue(res, 0, 0));
		if (tps <= 0)
		{
			fprintf(stderr, "count(*) from branches invalid (%d)\n", tps);
			exit(1);
		}
		PQclear(res);
	}

	if (!is_no_vacuum)
	{
		fprintf(stderr, "starting vacuum...");
		res = PQexec(con, "vacuum branches");
		if (PQresultStatus(res) != PGRES_COMMAND_OK)
		{
			fprintf(stderr, "%s", PQerrorMessage(con));
			exit(1);
		}
		PQclear(res);

		res = PQexec(con, "vacuum tellers");
		if (PQresultStatus(res) != PGRES_COMMAND_OK)
		{
			fprintf(stderr, "%s", PQerrorMessage(con));
			exit(1);
		}
		PQclear(res);

		res = PQexec(con, "delete from history");
		if (PQresultStatus(res) != PGRES_COMMAND_OK)
		{
			fprintf(stderr, "%s", PQerrorMessage(con));
			exit(1);
		}
		PQclear(res);
		res = PQexec(con, "vacuum history");
		if (PQresultStatus(res) != PGRES_COMMAND_OK)
		{
			fprintf(stderr, "%s", PQerrorMessage(con));
			exit(1);
		}
		PQclear(res);

		fprintf(stderr, "end.\n");

		if (is_full_vacuum)
		{
			fprintf(stderr, "starting full vacuum...");
			res = PQexec(con, "vacuum analyze accounts");
			if (PQresultStatus(res) != PGRES_COMMAND_OK)
			{
				fprintf(stderr, "%s", PQerrorMessage(con));
				exit(1);
			}
			PQclear(res);
			fprintf(stderr, "end.\n");
		}
	}

	/* done with initial connection */
	PQfinish(con);

	/*
	 * set up builtin SQL scripts, if not custom mode.  Note that this
	 * can't be done until after we have obtained the correct value of tps.
	 */
	switch (ttype)
	{
		char		buf[128];

		case 0:
			sql_files[0] = process_builtin(tpc_b);
			snprintf(buf, sizeof(buf), "%d", 100000 * tps);
			sql_files[0][0]->argv[3] = strdup(buf);
			snprintf(buf, sizeof(buf), "%d", 1 * tps);
			sql_files[0][1]->argv[3] = strdup(buf);
			snprintf(buf, sizeof(buf), "%d", 10 * tps);
			sql_files[0][2]->argv[3] = strdup(buf);
			snprintf(buf, sizeof(buf), "%d", 10000 * tps);
			sql_files[0][3]->argv[3] = strdup(buf);
			num_files = 1;
			break;
		case 1:
			sql_files[0] = process_builtin(select_only);
			snprintf(buf, sizeof(buf), "%d", 100000 * tps);
			sql_files[0][0]->argv[3] = strdup(buf);
			num_files = 1;
			break;
		case 2:
			sql_files[0] = process_builtin(simple_update);
			snprintf(buf, sizeof(buf), "%d", 100000 * tps);
			sql_files[0][0]->argv[3] = strdup(buf);
			snprintf(buf, sizeof(buf), "%d", 1 * tps);
			sql_files[0][1]->argv[3] = strdup(buf);
			snprintf(buf, sizeof(buf), "%d", 10 * tps);
			sql_files[0][2]->argv[3] = strdup(buf);
			snprintf(buf, sizeof(buf), "%d", 10000 * tps);
			sql_files[0][3]->argv[3] = strdup(buf);
			num_files = 1;
			break;
		default:
			break;
	}

	/* get start up time */
	gettimeofday(&tv1, NULL);

	/* Start up all the client subprocesses */
	remains = 0;
	while (remains < nclients)
	{
		pid_t		result;

		result = fork();
		if (result == (pid_t) -1)
		{
			/* fork failed */
			fprintf(stderr, "fork failed: %s\n", strerror(errno));
			exit(1);
		}
		else if (result == 0)
		{
			/* fork succeeded, in child */
			exit(doClient(remains, debug));
		}
		else
		{
			/* fork succeeded, in parent */
			remains++;
		}
	}

	/* Wait for all the clients to finish */
	nfailed = 0;
	while (remains > 0)
	{
		int		status;

		if (wait(&status) != (pid_t) -1)
		{
			if (status != 0)
				nfailed++;
			remains--;
		}
	}

	/* Clients all gone, so clean up */

	/* get end time */
	gettimeofday(&tv2, NULL);
	printResults(ttype, nfailed, &tv1, &tv2);
	if (LOGFILE)
		fclose(LOGFILE);

	return 0;
}
