#include <stdio.h>
#include <stdlib.h>
#include <libpq-fe.h>

#define LARGEDATATHREADS 32
#define SMALLDATATHREADS 50

void do_exit(PGconn *conn, PGresult *res) 
{
    fprintf(stderr, "%s\n", PQerrorMessage(conn));    
    PQclear(res);
    PQfinish(conn);    
    exit(1);
}

void initsubscriber()
{
    PGconn *conn = PQconnectdb("user=user1 dbname=testdb port=5433");
    if (PQstatus(conn) == CONNECTION_BAD) 
    {
        fprintf(stderr, "Connection to database failed: %s\n",
            PQerrorMessage(conn));
            
        PQfinish(conn);
        exit(1);
    }

    PGresult *res = PQexec(conn, "DROP TABLE IF EXISTS perftest_smalldata");
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
    {
        do_exit(conn, res);
    }

    PQclear(res);
    res = PQexec(conn, "CREATE TABLE perftest_smalldata(c1 varchar(10), c2 varchar(10), c3 varchar(10), c4 varchar(10), c5 varchar(10))");
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
    {
        do_exit(conn, res);
    }

    PQclear(res);

    res = PQexec(conn, "DROP TABLE IF EXISTS perftest");
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
    {
        do_exit(conn, res);
    }

    PQclear(res);
    res = PQexec(conn, "CREATE TABLE perftest(c1 varchar(100), c2 varchar(100), c3 varchar(100), c4 varchar(100), c5 varchar(100))");
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
    {
        do_exit(conn, res);
    }

    PQclear(res);

    res = PQexec(conn, "create subscription sub1 connection 'host=127.0.0.1 port=5432 dbname=testdb user=user1 password=123' publication pub1");
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
    {
        do_exit(conn, res);
    }

    PQclear(res);
    PQfinish(conn);

}

void initpublisher()
{
    PGconn *conn = PQconnectdb("user=user1 dbname=testdb");
    if (PQstatus(conn) == CONNECTION_BAD) 
    {
        fprintf(stderr, "Connection to database failed: %s\n",
            PQerrorMessage(conn));
            
        PQfinish(conn);
        exit(1);
    }

    PGresult *res = PQexec(conn, "DROP TABLE IF EXISTS perftest_smalldata");
    if (PQresultStatus(res) != PGRES_COMMAND_OK) 
    {
        do_exit(conn, res); 
    }
    
    PQclear(res);
    res = PQexec(conn, "CREATE TABLE perftest_smalldata(c1 varchar(10), c2 varchar(10), c3 varchar(10), c4 varchar(10), c5 varchar(10))");
    if (PQresultStatus(res) != PGRES_COMMAND_OK) 
    {
        do_exit(conn, res); 
    }
    
    PQclear(res);

    res = PQexec(conn, "DROP TABLE IF EXISTS perftest");
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
    {
        do_exit(conn, res);
    }

    PQclear(res);
    res = PQexec(conn, "CREATE TABLE perftest(c1 varchar(100), c2 varchar(100), c3 varchar(100), c4 varchar(100), c5 varchar(100))");
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
    {
        do_exit(conn, res);
    }

    PQclear(res);

    res = PQexec(conn, "CREATE PUBLICATION pub1 FOR TABLE perftest, perftest_smalldata WITH (publish='insert,update,delete')");
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
    {
        do_exit(conn, res);
    }

    PQclear(res);
    PQfinish(conn);
}

void* smalldatathreadfunc(void *recordcount) 
{
    unsigned int reccount = *((unsigned int *) recordcount);
    int count;
    PGresult *res;
    PGconn *conn = PQconnectdb("user=user1 dbname=testdb");

    if (PQstatus(conn) == CONNECTION_BAD) 
    {
        fprintf(stderr, "Connection to database failed: %s\n",
            PQerrorMessage(conn));
        PQfinish(conn);
        exit(1);
    }

    res = PQexec(conn, "BEGIN");
    if (PQresultStatus(res) != PGRES_COMMAND_OK) 
    {
        do_exit(conn, res); 
    }
    
    PQclear(res);

    /* Load data  */
    int subtxncount = reccount/64;
    int savepointcount = 0;

    for (count = 0; count < reccount; count++)
    {
        res = PQexec(conn, "insert into perftest_smalldata values('asafadfafa', 'asafadfafa', 'asafadfafa', 'asafadfafa', 'asafadfafa')");
        if (PQresultStatus(res) != PGRES_COMMAND_OK) 
            do_exit(conn, res);     
    
        PQclear(res);

	if (count % subtxncount == 0)
	{
		char savepointstr[128] = {0};
		savepointcount++;
		sprintf(savepointstr, "savepoint sp%d", savepointcount);
		res = PQexec(conn, savepointstr);
     	        if (PQresultStatus(res) != PGRES_COMMAND_OK)
            		do_exit(conn, res);

        	PQclear(res);
	}    
    }

    printf("%d subtransaction created\n", savepointcount);

    printf("Keeping txn open for 60 seconds\n");
    sleep(60);

    res = PQexec(conn, "SELECT * FROM perftest_smalldata");
    if (PQresultStatus(res) != PGRES_TUPLES_OK)
    {
        printf("No data retrieved\n");
        PQclear(res);
        PQfinish(conn);
        exit(1);
    }

    int rows = PQntuples(res);
    printf("Record count = %d\n", rows);

    PQclear(res);
    PQfinish(conn);
    return 0;
}


void* largedatathreadfunc(void *recordcount) 
{
    unsigned int reccount = *((unsigned int *) recordcount);
    int count;
    PGresult *res;
    PGconn *conn = PQconnectdb("user=user1 dbname=testdb");

    if (PQstatus(conn) == CONNECTION_BAD) 
    {
        fprintf(stderr, "Connection to database failed: %s\n",
            PQerrorMessage(conn));
        PQfinish(conn);
        exit(1);
    }

    res = PQexec(conn, "BEGIN");
    if (PQresultStatus(res) != PGRES_COMMAND_OK) 
    {
        do_exit(conn, res); 
    }
    
    PQclear(res);

    /* Load data  */
    for (count = 0; count < reccount; count++)
    {
        res = PQexec(conn, "insert into perftest values(\
'asafadfadfdasdasdafsafasfafafsasfasafsdafsfasfasdsdfasfdsfdsasdfsasaddfsadsdasdfssafdsfsfssasdfss',\
'asafadfadfdasdasdafsafasfafafsasfasafsdafsfasfasdsdfasfdsfdsasdfsasaddfsadsdasdfssafdsfsfssasdfss',\
'asafadfadfdasdasdafsafasfafafsasfasafsdafsfasfasdsdfasfdsfdsasdfsasaddfsadsdasdfssafdsfsfssasdfss',\
'asafadfadfdasdasdafsafasfafafsasfasafsdafsfasfasdsdfasfdsfdsasdfsasaddfsadsdasdfssafdsfsfssasdfss',\
'asafadfadfdasdasdafsafasfafafsasfasafsdafsfasfasdsdfasfdsfdsasdfsasaddfsadsdasdfssafdsfsfssasdfss')");
        if (PQresultStatus(res) != PGRES_COMMAND_OK) 
            do_exit(conn, res);     
    
        PQclear(res);    
    }

    printf("Keeping txn open for 60 seconds\n");
    sleep(60);

    res = PQexec(conn, "SELECT * FROM perftest");
    if (PQresultStatus(res) != PGRES_TUPLES_OK)
    {
        printf("No data retrieved\n");
        PQclear(res);
        PQfinish(conn);
        exit(1);
    }

    int rows = PQntuples(res);
    printf("Record count = %d\n", rows);

    PQclear(res);
    PQfinish(conn);
    return 0;
}

int main()
{
   pthread_t largedata_thread_id[LARGEDATATHREADS];
   pthread_t smalldata_thread_id[SMALLDATATHREADS];
   int i;
   int recordcount;

   initpublisher();
   initsubscriber();

   for(i=0; i < LARGEDATATHREADS; i++)
   {
      recordcount = 2500;
      pthread_create( &largedata_thread_id[i], NULL, largedatathreadfunc, &recordcount);
   }

   for(i=0; i < SMALLDATATHREADS; i++)
   {
      recordcount = 6000;
      pthread_create( &smalldata_thread_id[i], NULL, smalldatathreadfunc, &recordcount);
   }

   for(i=0; i < LARGEDATATHREADS; i++)
   {
      pthread_join( largedata_thread_id[i], NULL);
   }

   for(i=0; i < SMALLDATATHREADS; i++)
   {
      pthread_join( smalldata_thread_id[i], NULL);
   }
}

