#include #include #include #include #include #include #include #include #include #include #include //#include "GRID.h" #include "libpq-fe.h" #define RCVBUFSIZE 32 /* size of receive buffer */ #define SENDBUFSIZE 32 /* size of receive buffer */ #define GRID_DataBase "GRID_DB_Test" #define GRID_Server "192.168.0.105" /* Postgre SQL Server IP*/ #define Path "/root/curr_work/" #define GRID_User "root" #define GRID_Port "5432" #define QUERYSIZE 1024 #define VALSIZE 100 #define MSGSIZE 1024 #define MAXSIZE 128 #define SVIP "192.168.0.106" /* socket Åë½ÅÇÒ Server IP*/ #define SVPORT 990 /* Server Port */Ç Ò #define SENDSIZE 128 #define RCVSIZE 8192 void errorMsg(char *errorMessage); PGconn *POSTGRES_connect(char *msg) ; /* DB Connect function */ void GRID_make_attr(char *comm_msg, int *comm_pos, int comm_indx, char *comm_val, int comm_len); /* ¸Þ½ÃÁö Ãâ·ÂÀ» À§ÇÑ ÇÔ¼ö */ int insertMsg(int sockFd, PGconn *DBconn, char table, int db_column); /* insert data to DB */ int main(int argc, char *argv[]) { /* ÇÊ¿äÇÑ º¯¼ö ¼±¾ð */ int sockFd, addrLen, result; struct sockaddr_in sockAddr; char *svrIP; unsigned int svrPort=SVPORT ; char sendMsg[SENDSIZE]; int i, cnt; char rcvBuffer[10]; PGconn *DBconn ; /* DB */ PGresult *res ; DBconn = (PGconn *)POSTGRES_connect(GRID_DataBase); // svrIP = arv[1]; svrIP = SVIP; sockFd = socket(AF_INET, SOCK_STREAM, 0); if(sockFd < 0) { fprintf(stderr, "Socket() »ý¼º ½ÇÆÐ! \n"); // fprintf(stderr, "mynode is %d \n", mynode); exit(1); } sockAddr.sin_family = AF_INET; sockAddr.sin_addr.s_addr = inet_addr(svrIP); sockAddr.sin_port = htons(svrPort); printf("server IP : %s\n", svrIP); printf("server PORT : %d\n", svrPort); /* ¼­¹öÀÇ ¼ÒÄϰú ¿¬°á ½Ãµµ */ addrLen = sizeof(sockAddr); result = connect(sockFd, (struct sockaddr *) &sockAddr, addrLen); if(result == -1) { fprintf(stderr, "·¯´× ¼­¹ö¿ÍÀÇ Á¢¼Ó¿¡ ½ÇÆÐÇß½À´Ï´Ù.\n"); // fprintf(stderr, "mynode is %d \n", mynode); exit(1); } /* ÀÔ·Â ¸Þ½ÃÁö¿¡ ´ëÇÑ parsing */ while(1) { bzero(sendMsg, SENDSIZE); printf("\ninput application year date : "); fgets( sendMsg, SENDSIZE, stdin ); i=0; cnt=0; while(1) { if(sendMsg[i] == ' ') cnt++; if(sendMsg[i] == '\n') break; i++; } if(cnt != 2) printf("wrong!! input again : application_name year month\n"); else if ( cnt == 2) break; } /* ¿øÇÏ´Â ¸Þ½ÃÁö Àü¼Û */ if( send(sockFd, sendMsg, SENDSIZE, 0) < 0) errorMsg("Send sendMsg failed"); insertMsg(sockFd, DBconn, '1', 9);/* table¿¡ µû¸¥ columnÀÇ ¼ö°¡ ´Ù¸£´Ù */ insertMsg(sockFd, DBconn, '2', 5); PQfinish(DBconn); close(sockFd); } int insertMsg(int sockFd, PGconn *DBconn, char table, int db_column) { char *id, *msg, val[VALSIZE], query[QUERYSIZE], *rcvMsg; int *pos, len, j, cnt; int row=0, i, indx, index; int end_row, msgSize, size, remain, start=0; char *dbmsg[db_column]; FILE *fp; PGresult *res ; /* ¸Þ½ÃÁö ÃàÃâÀ» À§ÇÑ µ¥ÀÌÅÍ ¼ö½Å */ /* index Á¤º¸ : ¸Þ½ÃÁöÀÇ °¹¼ö */ if( recv(sockFd, &index, sizeof(int), 0) < 0) errorMsg("index recv() failed"); printf("%c : index receive ok!! %d \n", table, index); /* position Á¤º¸ : ¸Þ½ÃÁö 1°³ÀÇ ±æÀÌ¿¡ ´ëÇÑ Á¤º¸ */ /* ÇÔ¼ö µÎ¹øÂ° ¼öÇàÁß segmentation fault¸¦ ¹ß»ý ½ÃÅ´ */ pos = (int *)malloc((index+1)*sizeof(int)); if( recv(sockFd, pos, (index+1)*sizeof(int), 0) < 0) errorMsg("position recv() failed"); printf("%c : position receive ok!! pos[0] : %d \n", table, pos[0]); /* table¿¡ µû¸¥ columnÀÇ °¹¼ö¿Í ±æÀÌ ¼³Á¤ */ switch( table) { case '1' : end_row = index / db_column; size = 801 * end_row; break; case '2' : end_row = index / db_column; size = 203 * end_row; break; } msg = (char *)malloc(size*sizeof(char)); bzero(msg, size*sizeof(char)); /* msg¸¦ Å©±â¿¡ µû¶ó Àß¶ó Àü¼Û */ remain = pos[0]; cnt = 0; while(1) { if( (remain-cnt) > SENDSIZE) { if( recv(sockFd, (msg+cnt), RCVSIZE, 0) < 0) errorMsg("msg recv() failed"); } else { if( recv(sockFd, (msg+cnt), (remain-cnt), 0) < 0) errorMsg("msg recv() failed"); break; } cnt += RCVSIZE; } free(rcvMsg); printf("%c : Message receive ok!! %d \n", table, strlen(msg)); printf(" end_row :%d , db_column :%d , size : %d\n", end_row, db_column, size); /* message get out to receive msg */ len=0; indx=0; for (i=0; i < end_row; ++i){ for (j=0; j < db_column ; ++j){ bzero(val, VALSIZE); indx++; /* ¸Þ½ÃÁö ÃàÃâÀ» À§ÇÑ ÇÔ¼ö */ GRID_make_attr(msg, pos, indx, val, len); len += pos[indx]; dbmsg[j]=(char *)malloc(pos[indx]*sizeof(char)); bzero(dbmsg[j], pos[indx]*sizeof(char)); /* ÃàÃâµÈ ¸Þ½ÃÁö¸¦ ÀúÀåÇÑ´Ù */ strncat(dbmsg[j], val, strlen(val)); printf(" %d dbmsg[%d] : %s \n", i, j, dbmsg[j]); } bzero(query, QUERYSIZE); switch (table) { case '1': /* query ¹®À» Àü¼ÛÇÏ¿© DBÀÇ Table¿¡ ÀÔ·ÂÇÑ´Ù */ sprintf(query, "INSERT INTO %s (id, dataname, owner, replication, version, basic_pattern,\ data_type, storage_order, access_pattern) VALUES ( %d, '%s', '%s', '%s', '%s', '%s', '%s',\ '%s', '%s')","GEDAS_data_registry_table", atoi(dbmsg[0]), dbmsg[1], dbmsg[2], dbmsg[3], dbmsg[4], dbmsg[5], dbmsg[6], dbmsg[7], dbmsg[8]); res = PQexec(DBconn, query); if (PQresultStatus(res) != PGRES_COMMAND_OK) { fprintf(stderr, "INSERT into the GEDAS_data_registry_table failed\n"); PQclear(res); } break; case '2': sprintf(query, "INSERT INTO %s (id, dataname, timesteps, file_offset, file_name) VALUES \ ( %d, '%s', %d, %d, '%s')","GEDAS_file_registry_table", atoi(dbmsg[0]), dbmsg[1], atoi(dbmsg[2]),atoi(dbmsg[3]), dbmsg[4]); res = PQexec(DBconn, query); if (PQresultStatus(res) != PGRES_COMMAND_OK) { fprintf(stderr, "INSERT into the GEDAS_file_registry_table failed\n"); PQclear(res); } break; } PQclear(res); for(j=0; j < db_column ; j++) free(dbmsg[j]); } /* ÇÒ´çµÈ ¸Þ¸ð¸®¸¦ Ç®¾î ÁØ´Ù */ fflush(NULL); free(pos); pos=NULL; free(msg); msg = NULL; return 0; } void errorMsg(char *errorMessage) { perror(errorMessage); exit(0); } void GRID_make_attr(char *comm_msg, int *comm_pos, int comm_indx, char *comm_val, int comm_len) { memcpy(comm_val, (comm_msg+comm_len), comm_pos[comm_indx]); } PGconn *POSTGRES_connect(char *msg) { char *pghost; char *pgport; char *pgoptions; char *pgtty; char *error; char query[QUERYSIZE]; PGresult *res; static PGconn *conn; pghost = GRID_Server; pgport = GRID_Port; pgoptions = NULL; pgtty = NULL; conn = PQsetdbLogin(pghost, pgport, pgoptions, pgtty, GRID_DataBase, GRID_User,"char *"); if (PQstatus(conn) == CONNECTION_BAD) { fprintf(stderr, "Connection to database '%s' failed.\n", GRID_DataBase) ; fprintf(stderr, "%s", PQerrorMessage(conn)); PQfinish(conn); return ((PGconn *)0); } bzero(query, sizeof(query)); sprintf(query, "SELECT * from %s", "GEDAS_application_registry_table" ); res = PQexec(conn, query); if (PQresultStatus(res) != PGRES_TUPLES_OK) { bzero(query, sizeof(query)); PQclear(res); /* create application_registry_table */ sprintf( query, "CREATE TABLE %s (id int NOT NULL, application varchar(100) NOT NULL, \ location varchar(100) NOT NULL, year int NOT NULL, month int NOT NULL, \ day int NOT NULL, hour int NOT NULL, min int NOT NULL, \ dimension int NOT NULL, problem_size varchar(100) NOT NULL, timesteps int NOT NULL, \ primary key (id))", "GEDAS_application_registry_table" ); res = PQexec(conn, query); if (PQresultStatus(res) != PGRES_COMMAND_OK) { error = PQresultErrorMessage( res ); printf( "Error = %s\n", error ); printf( "Query 234 = %c %c\n", query[233], query[234] ); fprintf(stderr, "Create application_registry_table failed\n"); PQclear(res); return((PGconn *)0); } /* create data_registry_table */ bzero(query, sizeof(query)); PQclear(res); sprintf( query, "CREATE TABLE %s (id int NOT NULL, dataname varchar(100) NOT NULL, \ owner varchar(100) NOT NULL, replication varchar(100) NOT NULL, \ version varchar(100) NOT NULL, basic_pattern varchar(100) NOT NULL, \ data_type varchar(100) NOT NULL, storage_order varchar(100) NOT NULL, \ access_pattern varchar(100) NOT NULL, \ primary key(id, dataname))", "GEDAS_data_registry_table" ); res = PQexec(conn, query); if (PQresultStatus(res) != PGRES_COMMAND_OK) { error = PQresultErrorMessage( res ); fprintf( stderr, "%s\n", error ); fprintf(stderr, "Create data_registry_table failed\n"); PQclear(res); return((PGconn *)0); } /* create file_registry_table */ bzero(query, sizeof(query)); PQclear(res); sprintf(query, "CREATE TABLE %s (id int NOT NULL, \ dataname varchar(100) NOT NULL, timesteps int NOT NULL, \ file_offset int NOT NULL, file_name varchar(100) NOT NULL, \ primary key (id, dataname, timesteps))", "GEDAS_file_registry_table"); res = PQexec(conn, query); if (PQresultStatus(res) != PGRES_COMMAND_OK) { error = PQresultErrorMessage( res ); fprintf( stderr, "%s\n", error ); fprintf(stderr, "Create file_registry_table failed\n"); PQclear(res); return((PGconn *)0); } /* create system_registry_table */ bzero( query, sizeof(query) ); PQclear( res ); sprintf( query, "CREATE TABLE %s (location int NOT NULL, \ file_system varchar(100) NOT NULL, striping_size int NOT NULL, \ layout varchar(100) NOT NULL, primary key(location))", "GEDAS_system_registry_table" ); res = PQexec( conn, query ); if( PQresultStatus(res) != PGRES_COMMAND_OK ) { fprintf( stderr, "Create system_registry_table failed\n" ); PQclear( res ); return ( (PGconn *)0 ); } /* create performance_registry_table */ bzero( query, sizeof(query) ); PQclear( res ); sprintf( query, "CREATE TABLE %s (file_system varchar(100) NOT NULL, \ data_size int NOT NULL, layout varchar(100) NOT NULL, \ buffered_io varchar(100) NOT NULL, direct_id varchar(100) NOT NULL, \ primary key(file_system))", "GEDAS_performance_registry_table" ); res = PQexec( conn, query ); if( PQresultStatus(res) != PGRES_COMMAND_OK ) { fprintf( stderr, "Create performance_registry_table failed\n" ); PQclear( res ); return ( (PGconn *)0 ); } } return (conn); }