00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090 #include <sys/types.h>
00091 #include <sys/socket.h>
00092 #include <sys/times.h>
00093 #include <time.h>
00094 #include <netinet/in.h>
00095 #include <unistd.h>
00096 #include <netdb.h>
00097 #include <errno.h>
00098 #include <string.h>
00099 #include <signal.h>
00100 #include "all.h"
00101 #include "param.h"
00102 #include "mpeg.h"
00103 #include "prototypes.h"
00104 #include "parallel.h"
00105 #include "readframe.h"
00106 #include "fsize.h"
00107 #include "combine.h"
00108 #include "frames.h"
00109
00110
00111 #define MAX_IO_SERVERS 10
00112 #ifndef SOMAXCONN
00113 #define SOMAXCONN 5
00114 #endif
00115
00116
00117
00118
00119
00120 #define TERMINATE_PID_SIGNAL SIGTERM
00121 #ifndef MAXARGS
00122 #define MAXARGS 1024
00123 #endif
00124
00125
00126
00127
00128
00129 static int32 diffTime;
00130 static char rsh[256];
00131 static struct hostent *hostEntry = NULL;
00132 static boolean *frameDone;
00133 static int outputServerSocket;
00134 static int decodeServerSocket;
00135 static boolean parallelPerfect = FALSE;
00136 static int current_max_forked_pid=0;
00137
00138
00139
00140
00141
00142
00143 extern int yuvHeight, yuvWidth;
00144 extern time_t timeStart, timeEnd;
00145 extern char statFileName[256];
00146 extern FILE *statFile;
00147 extern boolean debugMachines;
00148 extern boolean debugSockets;
00149 int parallelTestFrames = 10;
00150 int parallelTimeChunks = 60;
00151 char *IOhostName;
00152 int ioPortNumber;
00153 int combinePortNumber;
00154 int decodePortNumber;
00155 boolean niceProcesses = FALSE;
00156 boolean forceIalign = FALSE;
00157 int machineNumber = -1;
00158 boolean remoteIO = FALSE;
00159 boolean separateConversion;
00160 time_t IOtime = 0;
00161 extern char encoder_name[];
00162 int ClientPid[MAX_MACHINES+4];
00163
00164
00165
00166
00167
00168 static void TransmitPortNum _ANSI_ARGS_((char *hostName, int portNum,
00169 int ioPortNum));
00170 static void EndIOServer _ANSI_ARGS_((void));
00171 static void SafeRead _ANSI_ARGS_((int fd, char *buf, int nbyte));
00172 static void SafeWrite _ANSI_ARGS_((int fd, char *buf, int nbyte));
00173 static int CreateListeningSocket _ANSI_ARGS_((int *portNumber));
00174 static int ConnectToSocket _ANSI_ARGS_((char *machineName, int portNum,
00175 struct hostent **hostEnt));
00176 static int safe_fork _ANSI_ARGS_((char *command));
00177 void cleanup_fork _ANSI_ARGS_ ((int dummy));
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187
00188
00189
00190
00191
00192
00193
00194
00195
00196
00197
00198
00199
00200
00201 void
00202 SetIOConvert(separate)
00203 boolean separate;
00204 {
00205 separateConversion = separate;
00206 }
00207
00208
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221 void
00222 SetParallelPerfect(val)
00223 boolean val;
00224 {
00225 parallelPerfect = val;
00226 }
00227
00228
00229
00230
00231
00232
00233
00234
00235
00236
00237
00238
00239
00240
00241 void
00242 SetRemoteShell(shell)
00243 char *shell;
00244 {
00245 strcpy(rsh, shell);
00246 }
00247
00248
00249
00250
00251
00252
00253
00254
00255
00256
00257
00258
00259
00260
00261 void
00262 StartIOServer(numInputFiles, parallelHostName, portNum)
00263 int numInputFiles;
00264 char *parallelHostName;
00265 int portNum;
00266 {
00267 int ioPortNum;
00268 int serverSocket;
00269 int otherSock, otherSize;
00270 struct sockaddr otherSocket;
00271 int32 buffer[8];
00272 boolean done = FALSE;
00273 int frameNumber;
00274 MpegFrame *frame;
00275 register int y;
00276 int numBytes;
00277 unsigned char *bigBuffer;
00278 unsigned char smallBuffer[1000];
00279 int bigBufferSize;
00280 FILE *filePtr;
00281 uint32 data;
00282 char inputFileName[1024];
00283 char fileName[1024];
00284
00285 bigBufferSize = 0;
00286 bigBuffer = NULL;
00287
00288
00289
00290 serverSocket = CreateListeningSocket(&ioPortNum);
00291
00292 if ( debugSockets ) {
00293 fprintf(stdout, "====I/O USING PORT %d\n", ioPortNum);
00294 }
00295
00296 TransmitPortNum(parallelHostName, portNum, ioPortNum);
00297
00298 otherSize = sizeof(otherSocket);
00299
00300 if ( separateConversion ) {
00301 SetFileType(ioConversion);
00302 } else {
00303 SetFileType(inputConversion);
00304 }
00305
00306
00307 while ( ! done ) {
00308 otherSock = accept(serverSocket, &otherSocket, &otherSize);
00309 if ( otherSock == -1 ) {
00310 fprintf(stderr, "ERROR: I/O SERVER accept returned error %d\n", errno);
00311 exit(1);
00312 }
00313
00314 SafeRead(otherSock, (char *)buffer, 4);
00315 frameNumber = ntohl(buffer[0]);
00316
00317 if ( frameNumber == -1 ) {
00318 done = TRUE;
00319 } else if ( frameNumber == -2 ) {
00320
00321 SafeRead(otherSock, (char *)buffer, 4);
00322 frameNumber = ntohl(buffer[0]);
00323
00324 if ( debugSockets ) {
00325 fprintf(stdout, "INPUT SERVER: GETTING DECODED FRAME %d\n", frameNumber);
00326 fflush(stdout);
00327 }
00328
00329
00330 frame = Frame_New(frameNumber, 'i');
00331
00332 Frame_AllocDecoded(frame, TRUE);
00333
00334 for ( y = 0; y < Fsize_y; y++ ) {
00335 SafeRead(otherSock, (char *)frame->decoded_y[y], Fsize_x);
00336 }
00337
00338 for (y = 0; y < (Fsize_y >> 1); y++) {
00339 SafeRead(otherSock, (char *)frame->decoded_cb[y], (Fsize_x >> 1));
00340 }
00341
00342 for (y = 0; y < (Fsize_y >> 1); y++) {
00343 SafeRead(otherSock, (char *)frame->decoded_cr[y], (Fsize_x >> 1));
00344 }
00345
00346
00347 WriteDecodedFrame(frame);
00348
00349 Frame_Free(frame);
00350 } else if ( frameNumber == -3 ) {
00351
00352 SafeRead(otherSock, (char *)buffer, 4);
00353 frameNumber = ntohl(buffer[0]);
00354
00355 if ( debugSockets ) {
00356 fprintf(stdout, "INPUT SERVER: READING DECODED FRAME %d from DISK\n", frameNumber);
00357 fflush(stdout);
00358 }
00359
00360
00361 frame = Frame_New(frameNumber, 'i');
00362
00363 Frame_AllocDecoded(frame, TRUE);
00364
00365 ReadDecodedRefFrame(frame, frameNumber);
00366
00367
00368 for ( y = 0; y < Fsize_y; y++ ) {
00369 SafeWrite(otherSock, (char *)frame->decoded_y[y], Fsize_x);
00370 }
00371
00372 for (y = 0; y < (Fsize_y >> 1); y++) {
00373 SafeWrite(otherSock, (char *)frame->decoded_cb[y], (Fsize_x >> 1));
00374 }
00375
00376 for (y = 0; y < (Fsize_y >> 1); y++) {
00377 SafeWrite(otherSock, (char *)frame->decoded_cr[y], (Fsize_x >> 1));
00378 }
00379
00380 Frame_Free(frame);
00381 } else if ( frameNumber == -4 ) {
00382
00383 SafeRead(otherSock, (char *)buffer, 8);
00384 frameNumber = buffer[0];
00385 frameNumber = ntohl(frameNumber);
00386
00387
00388 numBytes = buffer[1];
00389 numBytes = ntohl(numBytes);
00390
00391
00392 if ( numBytes > bigBufferSize ) {
00393 bigBufferSize = numBytes;
00394 if ( bigBuffer != NULL ) {
00395 free(bigBuffer);
00396 }
00397
00398 bigBuffer = (unsigned char *) malloc(bigBufferSize*
00399 sizeof(unsigned char));
00400 }
00401
00402
00403 SafeRead(otherSock, (char *) bigBuffer, numBytes);
00404
00405
00406 sprintf(fileName, "%s.frame.%d", outputFileName, frameNumber);
00407 if ( (filePtr = fopen(fileName, "wb")) == NULL ) {
00408 fprintf(stderr, "ERROR: Could not open output file(3): %s\n",
00409 fileName);
00410 exit(1);
00411 }
00412
00413
00414 fwrite(bigBuffer, sizeof(char), numBytes, filePtr);
00415
00416 fclose(filePtr);
00417
00418 if ( debugSockets ) {
00419 fprintf(stdout, "====I/O SERVER: WROTE FRAME %d to disk\n",
00420 frameNumber);
00421 fflush(stdout);
00422 }
00423 } else {
00424 if ( debugSockets ) {
00425 fprintf(stdout, "I/O SERVER GETTING FRAME %d\n", frameNumber);
00426 fflush(stdout);
00427 }
00428
00429
00430 frame = Frame_New(frameNumber, 'i');
00431
00432 if ( separateConversion ) {
00433 GetNthInputFileName(inputFileName, frameNumber);
00434
00435
00436 filePtr = ReadIOConvert(inputFileName);
00437 do {
00438 numBytes = fread(smallBuffer, 1, 1000, filePtr);
00439
00440 if ( numBytes > 0 ) {
00441 data = numBytes;
00442 data = htonl(data);
00443 SafeWrite(otherSock, (char *)&data, 4);
00444 SafeWrite(otherSock, (char *)smallBuffer, numBytes);
00445 }
00446 }
00447 while ( numBytes == 1000 );
00448
00449 if ( strcmp(ioConversion, "*") == 0 ) {
00450 fclose(filePtr);
00451 } else {
00452 pclose(filePtr);
00453 }
00454 } else {
00455 GetNthInputFileName(inputFileName, frameNumber);
00456 ReadFrame(frame, inputFileName, inputConversion, TRUE);
00457
00458
00459 for (y = 0; y < yuvHeight; y++) {
00460 SafeWrite(otherSock, (char *)frame->orig_y[y], yuvWidth);
00461 }
00462
00463 for (y = 0; y < (yuvHeight >> 1); y++) {
00464 SafeWrite(otherSock, (char *)frame->orig_cb[y], yuvWidth >> 1);
00465 }
00466
00467 for (y = 0; y < (yuvHeight >> 1); y++) {
00468 SafeWrite(otherSock, (char *)frame->orig_cr[y], yuvWidth >> 1);
00469 }
00470
00471
00472
00473 SafeRead(otherSock, (char *)buffer, 4);
00474
00475 }
00476
00477 if ( debugSockets ) {
00478 fprintf(stdout, "====I/O SERVER: READ FRAME %d\n",
00479 frameNumber);
00480 }
00481
00482 Frame_Free(frame);
00483 }
00484
00485 close(otherSock);
00486 }
00487
00488 close(serverSocket);
00489
00490 if ( debugSockets ) {
00491 fprintf(stdout, "====I/O SERVER: Shutting Down\n");
00492 }
00493 }
00494
00495
00496
00497
00498
00499
00500
00501
00502
00503
00504
00505
00506
00507
00508 void
00509 SendRemoteFrame(frameNumber, bb)
00510 int frameNumber;
00511 BitBucket *bb;
00512 {
00513 int clientSocket;
00514 u_long data;
00515 int negativeFour = -4;
00516 time_t tempTimeStart, tempTimeEnd;
00517
00518 time(&tempTimeStart);
00519
00520 clientSocket = ConnectToSocket(IOhostName, ioPortNumber, &hostEntry);
00521
00522 data = htonl(negativeFour);
00523 SafeWrite(clientSocket, (char *)&data, 4);
00524
00525 data = htonl(frameNumber);
00526 SafeWrite(clientSocket, (char *)&data, 4);
00527
00528 if ( frameNumber != -1 ) {
00529
00530 data = (bb->totalbits+7)>>3;
00531 data = htonl(data);
00532 SafeWrite(clientSocket, (char *)&data, 4);
00533
00534
00535 Bitio_WriteToSocket(bb, clientSocket);
00536 }
00537
00538 close(clientSocket);
00539
00540 time(&tempTimeEnd);
00541 IOtime += (tempTimeEnd-tempTimeStart);
00542 }
00543
00544
00545
00546
00547
00548
00549
00550
00551
00552
00553
00554
00555
00556
00557
00558
00559 void
00560 NoteFrameDone(frameStart, frameEnd)
00561 int frameStart;
00562 int frameEnd;
00563 {
00564 int clientSocket;
00565 u_long data;
00566 int negativeTwo = -2;
00567 time_t tempTimeStart, tempTimeEnd;
00568
00569 time(&tempTimeStart);
00570
00571 clientSocket = ConnectToSocket(IOhostName, combinePortNumber, &hostEntry);
00572
00573 data = negativeTwo;
00574 data = htonl(negativeTwo);
00575 SafeWrite(clientSocket, (char *)&data, 4);
00576
00577 data = htonl(frameStart);
00578 SafeWrite(clientSocket, (char *)&data, 4);
00579
00580 data = htonl(frameEnd);
00581 SafeWrite(clientSocket, (char *)&data, 4);
00582
00583 close(clientSocket);
00584
00585 time(&tempTimeEnd);
00586 IOtime += (tempTimeEnd-tempTimeStart);
00587 }
00588
00589
00590
00591
00592
00593
00594
00595
00596
00597
00598
00599
00600
00601 void
00602 GetRemoteFrame(frame, frameNumber)
00603 MpegFrame *frame;
00604 int frameNumber;
00605 {
00606 FILE *filePtr;
00607 int clientSocket;
00608 unsigned char smallBuffer[1000];
00609 register int y;
00610 int numBytes;
00611 u_long data;
00612 char fileName[256];
00613
00614 Fsize_Note(frameNumber, yuvWidth, yuvHeight);
00615
00616 if ( debugSockets ) {
00617 fprintf(stdout, "MACHINE %s REQUESTING connection for FRAME %d\n",
00618 getenv("HOST"), frameNumber);
00619 fflush(stdout);
00620 }
00621
00622 clientSocket = ConnectToSocket(IOhostName, ioPortNumber, &hostEntry);
00623
00624 data = frameNumber;
00625 data = htonl(data);
00626 SafeWrite(clientSocket, (char *)&data, 4);
00627
00628 if ( frameNumber != -1 ) {
00629 if ( separateConversion ) {
00630 sprintf(fileName, "/tmp/foobar%d", machineNumber);
00631 filePtr = fopen(fileName, "wb");
00632
00633
00634 do {
00635 SafeRead(clientSocket, (char *)&numBytes, 4);
00636 numBytes = ntohl(numBytes);
00637
00638 SafeRead(clientSocket, (char *)smallBuffer, numBytes);
00639
00640 fwrite(smallBuffer, 1, numBytes, filePtr);
00641 } while ( numBytes == 1000 );
00642 fflush(filePtr);
00643 fclose(filePtr);
00644
00645
00646 ReadFrame(frame, fileName, slaveConversion, FALSE);
00647 } else {
00648 Frame_AllocYCC(frame);
00649
00650 if ( debugSockets ) {
00651 fprintf(stdout, "MACHINE %s allocated YCC FRAME %d\n",
00652 getenv("HOST"), frameNumber);
00653 fflush(stdout);
00654 }
00655
00656
00657 for (y = 0; y < yuvHeight; y++) {
00658 SafeRead(clientSocket, (char *)frame->orig_y[y], yuvWidth);
00659 }
00660
00661 for (y = 0; y < (yuvHeight >> 1); y++) {
00662 SafeRead(clientSocket, (char *)frame->orig_cb[y], yuvWidth>>1);
00663 }
00664
00665 for (y = 0; y < (yuvHeight >> 1); y++) {
00666 SafeRead(clientSocket, (char *)frame->orig_cr[y], yuvWidth>>1);
00667 }
00668 }
00669 }
00670
00671 data = 0;
00672 data = htonl(data);
00673 SafeWrite(clientSocket, (char *)&data, 4);
00674
00675 close(clientSocket);
00676
00677 if ( debugSockets ) {
00678 fprintf(stdout, "MACHINE %s READ COMPLETELY FRAME %d\n",
00679 getenv("HOST"), frameNumber);
00680 fflush(stdout);
00681 }
00682 }
00683
00684
00685
00686
00687
00688
00689
00690
00691
00692
00693
00694
00695
00696
00697
00698 void
00699 StartCombineServer(numInputFiles, outputFileName, parallelHostName, portNum)
00700 int numInputFiles;
00701 char *outputFileName;
00702 char *parallelHostName;
00703 int portNum;
00704 {
00705 int combinePortNum;
00706 FILE *ofp;
00707
00708
00709
00710 outputServerSocket = CreateListeningSocket(&combinePortNum);
00711
00712 if ( debugSockets ) {
00713 fprintf(stdout, "====OUTPUT USING PORT %d\n", combinePortNum);
00714 }
00715
00716 TransmitPortNum(parallelHostName, portNum, combinePortNum);
00717
00718 frameDone = (boolean *) malloc(numInputFiles*sizeof(boolean));
00719 memset((char *)frameDone, 0, numInputFiles*sizeof(boolean));
00720
00721 if ( (ofp = fopen(outputFileName, "wb")) == NULL ) {
00722 fprintf(stderr, "ERROR: Could not open output file!!\n");
00723 fflush(stderr);
00724 exit(1);
00725 }
00726 FramesToMPEG(numInputFiles, outputFileName, ofp, TRUE);
00727
00728 if ( debugSockets ) {
00729 fprintf(stdout, "====COMBINE SERVER: Shutting Down\n");
00730 fflush(stdout);
00731 }
00732
00733
00734 TransmitPortNum(parallelHostName, portNum, combinePortNum);
00735
00736 close(outputServerSocket);
00737 }
00738
00739
00740
00741
00742
00743
00744
00745
00746
00747
00748
00749
00750
00751
00752 void
00753 WaitForOutputFile(number)
00754 int number;
00755 {
00756 int otherSock;
00757 static int otherSize = sizeof(struct sockaddr);
00758 struct sockaddr otherSocket;
00759 int frameNumber;
00760 int32 buffer[8];
00761 int frameStart, frameEnd;
00762
00763 while ( ! frameDone[number] ) {
00764 otherSock = accept(outputServerSocket, &otherSocket, &otherSize);
00765 if ( otherSock == -1 ) {
00766 fprintf(stderr, "ERROR: Combine SERVER accept returned error %d\n", errno);
00767 exit(1);
00768 }
00769
00770 SafeRead(otherSock, (char *)buffer, 4);
00771 frameNumber = ntohl(buffer[0]);
00772
00773 if ( frameNumber == -2 ) {
00774
00775
00776 SafeRead(otherSock, (char *)buffer, 8);
00777 frameStart = buffer[0];
00778 frameStart = ntohl(frameStart);
00779 frameEnd = buffer[1];
00780 frameEnd = ntohl(frameEnd);
00781
00782 for ( frameNumber = frameStart; frameNumber <= frameEnd;
00783 frameNumber++ ) {
00784 frameDone[frameNumber] = TRUE;
00785 }
00786 }
00787
00788 close(otherSock);
00789 }
00790
00791 if ( debugSockets ) {
00792 fprintf(stdout, "WAIT FOR FRAME %d over\n", number);
00793 fflush(stdout);
00794 }
00795 }
00796
00797
00798
00799
00800
00801
00802
00803
00804
00805
00806
00807
00808
00809
00810
00811
00812
00813
00814 void
00815 StartMasterServer(numInputFiles, paramFile, outputFileName)
00816 int numInputFiles;
00817 char *paramFile;
00818 char *outputFileName;
00819 {
00820 FILE *filePtr;
00821 register int ind, ind2;
00822 int framesPerMachine;
00823 char command[1024];
00824 char *hostName;
00825 int portNum;
00826 int serverSocket;
00827 boolean finished[MAX_MACHINES];
00828 int numFinished;
00829 int otherSock, otherSize;
00830 struct sockaddr otherSocket;
00831 int seconds;
00832 int32 buffer[8];
00833 int ioPortNum[MAX_IO_SERVERS];
00834 int combinePortNum, decodePortNum;
00835 int nextFrame;
00836 int startFrames[MAX_MACHINES];
00837 int numFrames[MAX_MACHINES];
00838 int lastNumFrames[MAX_MACHINES];
00839 int numSeconds[MAX_MACHINES];
00840 float fps[MAX_MACHINES];
00841 int numMachinesToEstimate;
00842 float framesPerSecond;
00843 float totalFPS, localFPS;
00844 int framesDone;
00845 float avgFPS;
00846 char niceNess[256];
00847 int32 startFrame, endFrame;
00848 int numInputPorts = 0;
00849 int numRemote = SOMAXCONN;
00850 int totalRemote = 0;
00851 time_t startUpBegin, startUpEnd;
00852 time_t shutDownBegin, shutDownEnd;
00853 float timeChunk;
00854
00855 time(&startUpBegin);
00856
00857 if ( niceProcesses ) {
00858 sprintf(niceNess, "nice");
00859 } else {
00860 niceNess[0] = '\0';
00861 }
00862
00863 time(&timeStart);
00864
00865 PrintStartStats(-1, 0);
00866
00867
00868 hostName = getenv("HOST");
00869
00870 if ( hostName == NULL ) {
00871 fprintf(stderr, "ERROR: Set HOST environment variable\n");
00872 exit(1);
00873 }
00874
00875 hostEntry = gethostbyname(hostName);
00876 if ( hostEntry == NULL ) {
00877 fprintf(stderr, "ERROR: Could not find host %s in database\n",
00878 hostName);
00879 exit(1);
00880 }
00881
00882 hostName = hostEntry->h_name;
00883
00884 serverSocket = CreateListeningSocket(&portNum);
00885 if ( debugSockets ) {
00886 fprintf(stdout, "---USING PORT %d\n", portNum);
00887 }
00888
00889
00890 sprintf(command, "%s -max_machines %d -output_server %s %d %d %s",
00891 encoder_name, numMachines, hostName, portNum, numInputFiles, paramFile);
00892 safe_fork(command);
00893
00894
00895 otherSize = sizeof(otherSocket);
00896 otherSock = accept(serverSocket, &otherSocket, &otherSize);
00897 if ( otherSock == -1 ) {
00898 fprintf(stderr, "ERROR: MASTER SERVER accept returned error %d\n", errno);
00899 exit(1);
00900 }
00901
00902 SafeRead(otherSock, (char *)(&combinePortNum), 4);
00903 combinePortNum = ntohl(combinePortNum);
00904 combinePortNumber = combinePortNum;
00905 close(otherSock);
00906
00907 if ( debugSockets ) {
00908 fprintf(stdout, "---MASTER SERVER: Combine port number = %d\n",
00909 combinePortNum);
00910 }
00911
00912
00913 if ( referenceFrame == DECODED_FRAME ) {
00914 sprintf(command, "%s -max_machines %d -decode_server %s %d %d %s",
00915 encoder_name, numMachines, hostName, portNum, numInputFiles, paramFile);
00916 safe_fork(command);
00917
00918
00919 otherSize = sizeof(otherSocket);
00920 otherSock = accept(serverSocket, &otherSocket, &otherSize);
00921 if ( otherSock == -1 ) {
00922 fprintf(stderr, "ERROR: MASTER SERVER accept returned error %d\n", errno);
00923 exit(1);
00924 }
00925
00926 SafeRead(otherSock, (char *)(&decodePortNum), 4);
00927 decodePortNum = ntohl(decodePortNum);
00928 close(otherSock);
00929
00930 if ( debugSockets ) {
00931 fprintf(stdout, "---MASTER SERVER: Decode port number = %d\n",
00932 decodePortNum);
00933 }
00934 }
00935
00936
00937
00938 framesPerMachine = numInputFiles/numMachines;
00939
00940 numFinished = 0;
00941
00942
00943 for ( ind = 0; ind < numMachines; ind++ ) {
00944 fps[ind] = -1.0;
00945 if ( remote[ind] ) {
00946 totalRemote++;
00947 }
00948 }
00949
00950
00951 nextFrame = 0;
00952 for ( ind = 0; ind < numMachines; ind++ ) {
00953 if ( (totalRemote != 0) && (numRemote == SOMAXCONN) ) {
00954
00955 sprintf(command, "%s -max_machines %d -io_server %s %d %s",
00956 encoder_name, numMachines, hostName, portNum, paramFile);
00957 safe_fork(command);
00958
00959
00960 otherSize = sizeof(otherSocket);
00961 otherSock = accept(serverSocket, &otherSocket, &otherSize);
00962 if ( otherSock == -1 ) {
00963 fprintf(stderr, "ERROR: MASTER SERVER accept returned error %d\n", errno);
00964 exit(1);
00965 }
00966
00967 SafeRead(otherSock, (char *)(&ioPortNum[numInputPorts]), 4);
00968 ioPortNum[numInputPorts] = ntohl(ioPortNum[numInputPorts]);
00969 close(otherSock);
00970
00971 if ( debugSockets ) {
00972 fprintf(stdout, "---MASTER SERVER: I/O port number = %d\n",
00973 ioPortNum[numInputPorts]);
00974 }
00975
00976 numInputPorts++;
00977 numRemote = 0;
00978 }
00979
00980 finished[ind] = FALSE;
00981 numSeconds[ind] = 0;
00982
00983 startFrame = nextFrame;
00984 if ( parallelPerfect ) {
00985 endFrame = startFrame+((numInputFiles-startFrame)/
00986 (numMachines-ind)) -1;
00987
00988 if ( forceIalign ) {
00989 while (FType_Type(endFrame) != 'i') {endFrame++;}
00990 }
00991
00992
00993 if ( endFrame < startFrame ) {
00994 endFrame = startFrame;
00995 }
00996
00997
00998 if ( endFrame >= numInputFiles ) {
00999 endFrame = numInputFiles-1;
01000 }
01001 } else if ( forceIalign ) {
01002 endFrame = startFrame+framePatternLen-1;
01003 while (FType_Type(endFrame) != 'i') {endFrame++;}
01004 } else {
01005 endFrame = startFrame+parallelTestFrames-1;
01006 }
01007
01008 if ( remote[ind] ) {
01009 sprintf(command, "%s %s -l %s %s %s -child %s %d %d %d %d %d %d -frames %d %d %s",
01010 rsh,
01011 machineName[ind], userName[ind], niceNess,
01012 executable[ind],
01013 hostName, portNum, ioPortNum[numInputPorts-1],
01014 combinePortNum, decodePortNum, ind,
01015 remote[ind],
01016 startFrame, endFrame,
01017 remoteParamFile[ind]);
01018 numRemote++;
01019 totalRemote--;
01020 } else {
01021 sprintf(command, "%s %s -l %s %s %s -child %s %d %d %d %d %d %d -frames %d %d %s",
01022 rsh,
01023 machineName[ind], userName[ind], niceNess,
01024 executable[ind],
01025 hostName, portNum, ioPortNum[numInputPorts-1],
01026 combinePortNum, decodePortNum, ind,
01027 remote[ind],
01028 startFrame, endFrame,
01029 paramFile);
01030 }
01031
01032 if ( debugMachines ) {
01033 fprintf(stdout, "---%s: frames %d to %d\n",
01034 machineName[ind],
01035 startFrame, endFrame);
01036 }
01037
01038
01039 safe_fork(command);
01040
01041 nextFrame = endFrame+1;
01042 startFrames[ind] = startFrame;
01043 numFrames[ind] = endFrame-startFrame+1;
01044 lastNumFrames[ind] = endFrame-startFrame+1;
01045 }
01046
01047 framesDone = 0;
01048
01049 time(&startUpEnd);
01050
01051
01052 while ( numFinished != numMachines ) {
01053 otherSize = sizeof(otherSocket);
01054 otherSock = accept(serverSocket, &otherSocket, &otherSize);
01055 if ( otherSock == -1 ) {
01056 fprintf(stderr, "ERROR: MASTER SERVER 2 accept returned error %d\n", errno);
01057 exit(1);
01058 }
01059
01060 SafeRead(otherSock, (char *)buffer, 8);
01061
01062 ind = ntohl(buffer[0]);
01063 seconds = ntohl(buffer[1]);
01064
01065 NoteFrameDone(startFrames[ind],
01066 startFrames[ind]+lastNumFrames[ind]-1);
01067
01068 numSeconds[ind] += seconds;
01069 fps[ind] = (float)numFrames[ind]/(float)numSeconds[ind];
01070
01071 if ( seconds != 0 )
01072 framesPerSecond = (float)lastNumFrames[ind]/(float)seconds;
01073 else
01074 framesPerSecond = (float)lastNumFrames[ind]*2.0;
01075
01076 framesDone += lastNumFrames[ind];
01077
01078 if ( nextFrame >= numInputFiles ) {
01079 buffer[0] = htonl(-1);
01080 buffer[1] = htonl(0);
01081 SafeWrite(otherSock, (char *)buffer, 8);
01082 numFinished++;
01083
01084 if ( debugMachines ) {
01085 fprintf(stdout, "---%s FINISHED job (%f fps) (%d/%d done): DONE\n",
01086 machineName[ind], framesPerSecond, numFinished,
01087 numMachines);
01088 }
01089 } else {
01090 if (numSeconds[ind] != 0) {
01091 avgFPS = (float)numFrames[ind]/(float)numSeconds[ind];
01092 } else {
01093 avgFPS = 0.1;
01094 }
01095
01096 startFrame = nextFrame;
01097
01098 if ( parallelTimeChunks == -1 ) {
01099
01100
01101 totalFPS = 0.0;
01102 numMachinesToEstimate = 0;
01103 for ( ind2 = 0; ind2 < numMachines; ind2++ ) {
01104 if ( fps[ind2] < 0.0 ) {
01105 numMachinesToEstimate++;
01106 } else {
01107 totalFPS += fps[ind2];
01108 }
01109 }
01110
01111 totalFPS = (float)numMachines*
01112 (totalFPS/(float)(numMachines-numMachinesToEstimate));
01113
01114 timeChunk = (float)(numInputFiles-nextFrame)/totalFPS;
01115
01116 fprintf(stdout, "ASSIGNING %s %.2f seconds of work\n",
01117 machineName[ind], timeChunk);
01118 fflush(stdout);
01119 endFrame = nextFrame +
01120 (int)((float)timeChunk*avgFPS) - 1;
01121 } else {
01122 endFrame = nextFrame +
01123 (int)((float)parallelTimeChunks*avgFPS) - 1;
01124 }
01125
01126 if ( forceIalign ) {
01127 while (FType_Type(endFrame) != 'i') {endFrame++;}
01128 }
01129
01130 if ( endFrame < startFrame ) {
01131 endFrame = startFrame;
01132 }
01133 if ( endFrame >= numInputFiles ) {
01134 endFrame = numInputFiles-1;
01135 }
01136
01137 nextFrame = endFrame+1;
01138
01139 startFrames[ind] = startFrame;
01140 numFrames[ind] += (endFrame-startFrame+1);
01141 lastNumFrames[ind] = (endFrame-startFrame+1);
01142
01143 if ( debugMachines ) {
01144 fprintf(stdout, "---%s FINISHED job (%f fps) (%d/%d done): next: %d to %d\n",
01145 machineName[ind], framesPerSecond, numFinished,
01146 numMachines, startFrame, endFrame);
01147 }
01148
01149 buffer[0] = htonl(startFrame);
01150 buffer[1] = htonl(endFrame);
01151
01152 SafeWrite(otherSock, (char *)buffer, 8);
01153 }
01154
01155 close(otherSock);
01156
01157 if ( debugMachines ) {
01158 fprintf(stdout, "---FRAMES DONE: %d\tFARMED OUT: %d\tLEFT: %d\n",
01159 framesDone, nextFrame-framesDone, numInputFiles-nextFrame);
01160 }
01161 }
01162
01163 time(&shutDownBegin);
01164
01165
01166 IOhostName = hostName;
01167 for ( ind = 0; ind < numInputPorts; ind++ ) {
01168 ioPortNumber = ioPortNum[ind];
01169 EndIOServer();
01170 }
01171
01172
01173 otherSize = sizeof(otherSocket);
01174 otherSock = accept(serverSocket, &otherSocket, &otherSize);
01175 if ( otherSock == -1 ) {
01176 fprintf(stderr, "ERROR: MASTER SERVER accept returned error %d\n", errno);
01177 exit(1);
01178 }
01179
01180 SafeRead(otherSock, (char *)buffer, 4);
01181 close(otherSock);
01182
01183 close(serverSocket);
01184
01185 time(&timeEnd);
01186 diffTime = (int32)(timeEnd-timeStart);
01187
01188 time(&shutDownEnd);
01189
01190 for ( ind2 = 0; ind2 < 2; ind2++ ) {
01191 if ( ind2 == 0 ) {
01192 filePtr = stdout;
01193 } else if ( statFile != NULL ) {
01194 filePtr = statFile;
01195 } else {
01196 continue;
01197 }
01198
01199 fprintf(filePtr, "\n\n");
01200 fprintf(filePtr, "PARALLEL SUMMARY\n");
01201 fprintf(filePtr, "----------------\n");
01202 fprintf(filePtr, "\n");
01203 fprintf(filePtr, "START UP TIME: %d seconds\n",
01204 (int)startUpEnd-(int)startUpBegin);
01205 fprintf(filePtr, "SHUT DOWN TIME: %d seconds\n",
01206 (int)shutDownEnd-(int)shutDownBegin);
01207
01208 fprintf(filePtr, "%14s\tFrames\tSeconds\tFrames Per Second\tSelf Time\n",
01209 "MACHINE");
01210 fprintf(filePtr, "--------------\t------\t-------\t-----------------\t---------\n");
01211 totalFPS = 0.0;
01212 for ( ind = 0; ind < numMachines; ind++ ) {
01213 localFPS = (float)numFrames[ind]/(float)numSeconds[ind];
01214 fprintf(filePtr, "%14s\t%d\t%d\t%f\t\t%d\n",
01215 machineName[ind], numFrames[ind], numSeconds[ind],
01216 localFPS, (int)((float)numInputFiles/localFPS));
01217 totalFPS += localFPS;
01218 }
01219
01220 fprintf(filePtr, "--------------\t------\t-------\t-----------------\t---------\n");
01221
01222 fprintf(filePtr, "%14s\t\t%d\t%f\n", "OPTIMAL",
01223 (int)((float)numInputFiles/totalFPS),
01224 totalFPS);
01225 fprintf(filePtr, "%14s\t\t%d\t%f\n", "ACTUAL", diffTime,
01226 (float)numInputFiles/(float)diffTime);
01227
01228 fprintf(filePtr, "\n\n");
01229 }
01230
01231 if ( statFile != NULL ) {
01232 fclose(statFile);
01233 }
01234 }
01235
01236
01237
01238
01239
01240
01241
01242
01243
01244
01245
01246
01247
01248 boolean
01249 NotifyMasterDone(hostName, portNum, machineNumber, seconds, frameStart,
01250 frameEnd)
01251 char *hostName;
01252 int portNum;
01253 int machineNumber;
01254 int seconds;
01255 int *frameStart;
01256 int *frameEnd;
01257 {
01258 int clientSocket;
01259 int32 buffer[8];
01260 time_t tempTimeStart, tempTimeEnd;
01261
01262 time(&tempTimeStart);
01263
01264 clientSocket = ConnectToSocket(hostName, portNum, &hostEntry);
01265
01266 buffer[0] = htonl(machineNumber);
01267 buffer[1] = htonl(seconds);
01268
01269 SafeWrite(clientSocket, (char *)buffer, 8);
01270
01271 SafeRead(clientSocket, (char *)buffer, 8);
01272 *frameStart = ntohl(buffer[0]);
01273 *frameEnd = ntohl(buffer[1]);
01274
01275 close(clientSocket);
01276
01277 time(&tempTimeEnd);
01278 IOtime += (tempTimeEnd-tempTimeStart);
01279
01280 return ((*frameStart) >= 0);
01281 }
01282
01283
01284
01285
01286
01287
01288
01289
01290
01291
01292
01293
01294
01295
01296
01297
01298 void
01299 StartDecodeServer(numInputFiles, decodeFileName, parallelHostName, portNum)
01300 int numInputFiles;
01301 char *decodeFileName;
01302 char *parallelHostName;
01303 int portNum;
01304 {
01305 int otherSock, otherSize;
01306 struct sockaddr otherSocket;
01307 int decodePortNum;
01308 int32 buffer[8];
01309 int frameReady;
01310 boolean *ready;
01311 int *waitMachine;
01312 int *waitPort;
01313 int *waitList;
01314 int slaveNumber;
01315 int slavePort;
01316 int waitPtr;
01317 struct hostent *nullHost = NULL;
01318 int clientSocket;
01319
01320
01321
01322 ready = (boolean *) calloc(numInputFiles, sizeof(boolean));
01323 waitMachine = (int *) calloc(numInputFiles, sizeof(int));
01324 waitPort = (int *) malloc(numMachines*sizeof(int));
01325 waitList = (int *) calloc(numMachines, sizeof(int));
01326
01327
01328
01329 decodeServerSocket = CreateListeningSocket(&decodePortNum);
01330
01331 if ( debugSockets ) {
01332 fprintf(stdout, "====DECODE USING PORT %d\n", decodePortNum);
01333 }
01334
01335 TransmitPortNum(parallelHostName, portNum, decodePortNum);
01336
01337 frameDone = (boolean *) malloc(numInputFiles*sizeof(boolean));
01338 memset((char *)frameDone, 0, numInputFiles*sizeof(boolean));
01339
01340
01341 while ( TRUE ) {
01342 otherSize = sizeof(otherSocket);
01343 otherSock = accept(decodeServerSocket, &otherSocket, &otherSize);
01344 if ( otherSock == -1 ) {
01345 fprintf(stderr, "ERROR: DECODE SERVER accept returned error %d\n", errno);
01346 exit(1);
01347 }
01348
01349 SafeRead(otherSock, (char *)buffer, 4);
01350 frameReady = buffer[0];
01351 frameReady = ntohl(frameReady);
01352
01353 if ( frameReady == -2 ) {
01354 SafeRead(otherSock, (char *)buffer, 4);
01355 frameReady = buffer[0];
01356 frameReady = ntohl(frameReady);
01357
01358 if ( debugSockets ) {
01359 fprintf(stdout, "====DECODE SERVER: REQUEST FOR %d\n", frameReady);
01360 fflush(stdout);
01361 }
01362
01363
01364 buffer[0] = frameDone[frameReady];
01365 buffer[0] = htonl(buffer[0]);
01366 SafeWrite(otherSock, (char *)buffer, 4);
01367
01368 if ( ! frameDone[frameReady] ) {
01369
01370 SafeRead(otherSock, (char *)buffer, 8);
01371 slaveNumber = buffer[0];
01372 slaveNumber = ntohl(slaveNumber);
01373 slavePort = buffer[1];
01374 slavePort = ntohl(slavePort);
01375
01376 if ( debugSockets ) {
01377 fprintf(stdout, "WAITING: SLAVE %d, PORT %d\n",
01378 slaveNumber, slavePort);
01379 }
01380
01381 waitPort[slaveNumber] = slavePort;
01382 if ( waitMachine[frameReady] == 0 ) {
01383 waitMachine[frameReady] = slaveNumber+1;
01384 } else {
01385
01386
01387 waitPtr = waitMachine[frameReady]-1;
01388 while ( waitList[waitPtr] != 0 ) {
01389 waitPtr = waitList[waitPtr]-1;
01390 }
01391
01392 waitList[waitPtr] = slaveNumber+1;
01393 waitList[slaveNumber] = 0;
01394 }
01395 }
01396 } else {
01397 frameDone[frameReady] = TRUE;
01398
01399 if ( debugSockets ) {
01400 fprintf(stdout, "====DECODE SERVER: FRAME %d READY\n", frameReady);
01401 fflush(stdout);
01402 }
01403
01404 if ( waitMachine[frameReady] ) {
01405
01406 waitPtr = waitMachine[frameReady]-1;
01407 while ( waitPtr >= 0 ) {
01408 clientSocket = ConnectToSocket(machineName[waitPtr],
01409 waitPort[waitPtr],
01410 &nullHost);
01411 close(clientSocket);
01412 waitPtr = waitList[waitPtr]-1;
01413 }
01414 }
01415 }
01416
01417 close(otherSock);
01418 }
01419
01420 if ( debugSockets ) {
01421 fprintf(stdout, "====DECODE SERVER: Shutting Down\n");
01422 fflush(stdout);
01423 }
01424
01425
01426 TransmitPortNum(parallelHostName, portNum, decodePortNum);
01427
01428 close(decodeServerSocket);
01429 }
01430
01431
01432
01433
01434
01435
01436
01437
01438
01439
01440
01441
01442
01443
01444
01445
01446
01447
01448
01449 static void
01450 TransmitPortNum(hostName, portNum, newPortNum)
01451 char *hostName;
01452 int portNum;
01453 int newPortNum;
01454 {
01455 int clientSocket;
01456 u_long data;
01457
01458 clientSocket = ConnectToSocket(hostName, portNum, &hostEntry);
01459
01460 data = htonl(newPortNum);
01461 SafeWrite(clientSocket, (char *) &data, 4);
01462
01463 close(clientSocket);
01464 }
01465
01466
01467
01468
01469
01470
01471
01472
01473
01474
01475
01476
01477
01478
01479 static void
01480 SafeRead(fd, buf, nbyte)
01481 int fd;
01482 char *buf;
01483 int nbyte;
01484 {
01485 int numRead;
01486 int result;
01487
01488 numRead = 0;
01489
01490 while ( numRead != nbyte ) {
01491 result = read(fd, &buf[numRead], nbyte-numRead);
01492
01493 if ( result == -1 ) {
01494 fprintf(stderr, "ERROR: read (of %d bytes (total %d) ) returned error %d\n",
01495 nbyte-numRead, nbyte, errno);
01496 exit(1);
01497 }
01498 numRead += result;
01499 }
01500 }
01501
01502
01503
01504
01505
01506
01507
01508
01509
01510
01511
01512
01513
01514
01515 static void
01516 SafeWrite(fd, buf, nbyte)
01517 int fd;
01518 char *buf;
01519 int nbyte;
01520 {
01521 int numWritten;
01522 int result;
01523
01524 numWritten = 0;
01525
01526 while ( numWritten != nbyte ) {
01527 result = write(fd, &buf[numWritten], nbyte-numWritten);
01528
01529 if ( result == -1 ) {
01530 fprintf(stderr, "ERROR: read (of %d bytes (total %d) ) returned error %d\n",
01531 nbyte-numWritten, nbyte, errno);
01532 exit(1);
01533 }
01534 numWritten += result;
01535 }
01536 }
01537
01538
01539
01540
01541
01542
01543
01544
01545
01546
01547
01548
01549
01550
01551 static void
01552 EndIOServer()
01553 {
01554
01555 GetRemoteFrame(NULL, -1);
01556 }
01557
01558
01559
01560
01561
01562
01563
01564
01565
01566
01567
01568
01569
01570
01571 void
01572 NotifyDecodeServerReady(id)
01573 int id;
01574 {
01575 int clientSocket;
01576 u_long data;
01577 time_t tempTimeStart, tempTimeEnd;
01578
01579 time(&tempTimeStart);
01580
01581 clientSocket = ConnectToSocket(IOhostName, decodePortNumber, &hostEntry);
01582
01583 data = htonl(id);
01584 SafeWrite(clientSocket, (char *)&data, 4);
01585
01586 close(clientSocket);
01587
01588 time(&tempTimeEnd);
01589 IOtime += (tempTimeEnd-tempTimeStart);
01590 }
01591
01592
01593
01594
01595
01596
01597
01598
01599
01600
01601
01602
01603
01604 void
01605 WaitForDecodedFrame(id)
01606 int id;
01607 {
01608 int clientSocket;
01609 u_long data;
01610 int negativeTwo = -2;
01611 int ready;
01612
01613
01614 if ( debugSockets ) {
01615 fprintf(stdout, "WAITING FOR DECODED FRAME %d\n", id);
01616 }
01617
01618 clientSocket = ConnectToSocket(IOhostName, decodePortNumber, &hostEntry);
01619
01620
01621 data = negativeTwo;
01622 data = htonl(negativeTwo);
01623 SafeWrite(clientSocket, (char *)&data, 4);
01624
01625 data = htonl(id);
01626 SafeWrite(clientSocket, (char *)&data, 4);
01627
01628 SafeRead(clientSocket, (char *)&data, 4);
01629 ready = data;
01630 ready = ntohl(ready);
01631
01632 if ( ! ready ) {
01633 int waitSocket;
01634 int waitPort;
01635 int otherSock, otherSize;
01636 struct sockaddr otherSocket;
01637
01638
01639 waitSocket = CreateListeningSocket(&waitPort);
01640
01641
01642 data = machineNumber;
01643 data = ntohl(data);
01644 SafeWrite(clientSocket, (char *)&data, 4);
01645
01646 data = waitPort;
01647 data = ntohl(data);
01648 SafeWrite(clientSocket, (char *)&data, 4);
01649
01650 close(clientSocket);
01651
01652 if ( debugSockets ) {
01653 fprintf(stdout, "SLAVE: WAITING ON SOCKET %d\n", waitPort);
01654 fflush(stdout);
01655 }
01656
01657 otherSize = sizeof(otherSocket);
01658 otherSock = accept(waitSocket, &otherSocket, &otherSize);
01659 if ( otherSock == -1 ) {
01660 fprintf(stderr, "ERROR: I/O SERVER accept returned error %d\n", errno);
01661 exit(1);
01662 }
01663
01664
01665
01666
01667 close(otherSock);
01668
01669 close(waitSocket);
01670 } else {
01671 close(clientSocket);
01672 }
01673
01674 if ( debugSockets ) {
01675 fprintf(stdout, "YE-HA FRAME %d IS NOW READY\n", id);
01676 }
01677 }
01678
01679
01680
01681
01682
01683
01684
01685
01686
01687
01688
01689
01690
01691 static int
01692 CreateListeningSocket(portNumber)
01693 int *portNumber;
01694 {
01695 int resultSocket;
01696 u_short tempShort;
01697 int result;
01698 struct sockaddr_in nameEntry;
01699
01700 resultSocket = socket(AF_INET, SOCK_STREAM, 0);
01701 if ( resultSocket == -1 ) {
01702 fprintf(stderr, "ERROR: Call to socket() gave error %d\n", errno);
01703 exit(1);
01704 }
01705
01706 memset((char *) &nameEntry, 0, sizeof(nameEntry));
01707 nameEntry.sin_family = AF_INET;
01708
01709
01710 (*portNumber) = 2048;
01711 do {
01712 (*portNumber)++;
01713 tempShort = (*portNumber);
01714 nameEntry.sin_port = htons(tempShort);
01715 result = bind(resultSocket, (struct sockaddr *) &nameEntry,
01716 sizeof(struct sockaddr));
01717 }
01718 while ( result == -1 );
01719
01720
01721
01722
01723 result = listen(resultSocket, SOMAXCONN);
01724 if ( result == -1 ) {
01725 fprintf(stderr, "ERROR: call to listen() gave error %d\n", errno);
01726 exit(1);
01727 }
01728
01729 return resultSocket;
01730 }
01731
01732
01733
01734
01735
01736
01737
01738
01739
01740
01741
01742
01743
01744
01745
01746 static int
01747 ConnectToSocket(machineName, portNum, hostEnt)
01748 char *machineName;
01749 int portNum;
01750 struct hostent **hostEnt;
01751 {
01752 int resultSocket;
01753 int result;
01754 u_short tempShort;
01755 struct sockaddr_in nameEntry;
01756
01757 if ( (*hostEnt) == NULL ) {
01758 (*hostEnt) = gethostbyname(machineName);
01759 if ( (*hostEnt) == NULL ) {
01760 fprintf(stderr, "ERROR: Couldn't get host by name (%s)\n",
01761 machineName);
01762 exit(1);
01763 }
01764 }
01765
01766 resultSocket = socket(AF_INET, SOCK_STREAM, 0);
01767 if ( resultSocket == -1 ) {
01768 fprintf(stderr, "ERROR: socket returned error %d\n", errno);
01769 exit(1);
01770 }
01771
01772 nameEntry.sin_family = AF_INET;
01773 memset((void *) nameEntry.sin_zero, 0, 8);
01774 memcpy((void *) &(nameEntry.sin_addr.s_addr),
01775 (void *) (*hostEnt)->h_addr_list[0],
01776 (size_t) (*hostEnt)->h_length);
01777 tempShort = portNum;
01778 nameEntry.sin_port = htons(tempShort);
01779
01780 result = connect(resultSocket, (struct sockaddr *) &nameEntry,
01781 sizeof(struct sockaddr));
01782 if ( result == -1 ) {
01783 fprintf(stderr, "ERROR: connect (ConnectToSocket, port %d) from machine %s returned error %d\n",
01784 portNum, getenv("HOST"), errno);
01785 exit(1);
01786 }
01787
01788 return resultSocket;
01789 }
01790
01791
01792
01793
01794
01795
01796
01797
01798
01799
01800
01801
01802
01803 void
01804 SendDecodedFrame(frame)
01805 MpegFrame *frame;
01806 {
01807 int clientSocket;
01808 register int y;
01809 int negativeTwo = -2;
01810 uint32 data;
01811
01812
01813 clientSocket = ConnectToSocket(IOhostName, ioPortNumber, &hostEntry);
01814
01815 data = negativeTwo;
01816 data = htonl(data);
01817 SafeWrite(clientSocket, (char *)&data, 4);
01818
01819 data = frame->id;
01820 data = htonl(data);
01821 SafeWrite(clientSocket, (char *)&data, 4);
01822
01823 for ( y = 0; y < Fsize_y; y++ ) {
01824 SafeWrite(clientSocket, (char *)frame->decoded_y[y], Fsize_x);
01825 }
01826
01827 for (y = 0; y < (Fsize_y >> 1); y++) {
01828 SafeWrite(clientSocket, (char *)frame->decoded_cb[y], (Fsize_x >> 1));
01829 }
01830
01831 for (y = 0; y < (Fsize_y >> 1); y++) {
01832 SafeWrite(clientSocket, (char *)frame->decoded_cr[y], (Fsize_x >> 1));
01833 }
01834
01835 close(clientSocket);
01836 }
01837
01838
01839
01840
01841
01842
01843
01844
01845
01846
01847
01848
01849
01850 void
01851 GetRemoteDecodedRefFrame(frame, frameNumber)
01852 MpegFrame *frame;
01853 int frameNumber;
01854 {
01855 int clientSocket;
01856 register int y;
01857 int negativeThree = -3;
01858 uint32 data;
01859
01860
01861 clientSocket = ConnectToSocket(IOhostName, ioPortNumber, &hostEntry);
01862
01863
01864 data = negativeThree;
01865 data = htonl(data);
01866 SafeWrite(clientSocket, (char *)&data, 4);
01867
01868 data = frame->id;
01869 data = htonl(data);
01870 SafeWrite(clientSocket, (char *)&data, 4);
01871
01872 for ( y = 0; y < Fsize_y; y++ ) {
01873 SafeRead(clientSocket, (char *)frame->decoded_y[y], Fsize_x);
01874 }
01875
01876 for (y = 0; y < (Fsize_y >> 1); y++) {
01877 SafeRead(clientSocket, (char *)frame->decoded_cb[y], (Fsize_x >> 1));
01878 }
01879
01880 for (y = 0; y < (Fsize_y >> 1); y++) {
01881 SafeRead(clientSocket, (char *)frame->decoded_cr[y], (Fsize_x >> 1));
01882 }
01883
01884 close(clientSocket);
01885
01886 }
01887
01888
01889
01890
01891
01892
01893
01894
01895
01896
01897
01898
01899
01900
01901
01902
01903
01904
01905
01906
01907 void cleanup_fork( dummy )
01908 int dummy;
01909 {
01910 register int i;
01911 for (i = 0; i < current_max_forked_pid; ++i ) {
01912
01913 #ifdef DEBUG_FORK
01914 fprintf(stderr, "cleanup_fork: killing PID %d\n", ClientPid[i]);
01915 #endif
01916
01917 if (kill(ClientPid[i], TERMINATE_PID_SIGNAL)) {
01918 fprintf(stderr, "cleanup_fork: killed PID=%d failed (errno %d)\n",
01919 ClientPid[i], errno);
01920 }
01921 }
01922 }
01923
01924
01925
01926
01927
01928
01929
01930
01931
01932
01933
01934
01935 static int safe_fork(command)
01936 char *command;
01937 {
01938 static int init=0;
01939 char *argis[MAXARGS];
01940 register int i=1;
01941
01942 if (!(argis[0] = strtok(command, " \t"))) return(0);
01943 while ((argis[i] = strtok(NULL, " \t")) && i < MAXARGS) ++i;
01944 argis[i] = NULL;
01945
01946 #ifdef DEBUG_FORK
01947 {register int i=0;
01948 fprintf(stderr, "Command %s becomes:\n", command);
01949 while(argis[i]) {fprintf(stderr, "--%s--\n", argis[i]); ++i;} }
01950 #endif
01951
01952 if (!init) {
01953 signal (SIGQUIT, cleanup_fork);
01954 signal (SIGTERM, cleanup_fork);
01955 signal (SIGINT , cleanup_fork);
01956 init=1;
01957 }
01958
01959 if (-1 == (ClientPid[current_max_forked_pid] = fork()) ) {
01960 perror("safe_fork: fork failed ");
01961 return(-1);
01962 }
01963 if( !ClientPid[current_max_forked_pid]) {
01964 execvp(argis[0], argis );
01965 perror("safe_fork child: exec failed ");
01966 exit(1);
01967 }
01968 #ifdef DEBUG_FORK
01969 fprintf(stderr, "parallel: forked PID=%d\n", ClientPid[current_max_forked_pid]);
01970 #endif
01971 current_max_forked_pid++;
01972 return(0);
01973 }