Skip to content

AFNI/NIfTI Server

Sections
Personal tools
You are here: Home » AFNI » Documentation

Doxygen Source Code Documentation


Main Page   Alphabetical List   Data Structures   File List   Data Fields   Globals   Search  

parallel.c

Go to the documentation of this file.
00001 /*===========================================================================*
00002  * parallel.c                                                                *
00003  *                                                                           *
00004  *      Procedures to make encoder run in parallel                           *
00005  *                                                                           *
00006  * EXPORTED PROCEDURES:                                                      *
00007  *      StartIOServer                                                        *
00008  *      StartCombineServer                                                   *
00009  *      StartDecodeServer                                                    *
00010  *      SendRemoteFrame                                                      *
00011  *      GetRemoteFrame                                                       *
00012  *      StartMasterServer                                                    *
00013  *      NotifyMasterDone                                                     *
00014  *                                                                           *
00015  *===========================================================================*/
00016 
00017 /*
00018  * Copyright (c) 1995 The Regents of the University of California.
00019  * All rights reserved.
00020  *
00021  * Permission to use, copy, modify, and distribute this software and its
00022  * documentation for any purpose, without fee, and without written agreement is
00023  * hereby granted, provided that the above copyright notice and the following
00024  * two paragraphs appear in all copies of this software.
00025  *
00026  * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
00027  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
00028  * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
00029  * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00030  *
00031  * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
00032  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
00033  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
00034  * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
00035  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
00036  */
00037 
00038 /*  
00039  *  $Header: /misc/elrond0/share/cvs/AFNI/src/mpeg_encodedir/parallel.c,v 1.4 2004/04/02 15:12:40 rwcox Exp $
00040  *  $Log: parallel.c,v $
00041  *  Revision 1.4  2004/04/02 15:12:40  rwcox
00042  *  Cput
00043  *
00044  *  Revision 1.3  2003/12/23 13:50:08  rwcox
00045  *  Cput
00046  *
00047  *  Revision 1.2  2003/12/03 14:46:14  rwcox
00048  *  Cput
00049  *
00050  *  Revision 1.1  2001/12/17 16:11:55  rwcox
00051  *  Cadd
00052  *
00053  *  Revision 1.9  1995/08/16 18:22:08  smoot
00054  *  indents
00055  *
00056  *  Revision 1.8  1995/08/14 22:30:20  smoot
00057  *  added safe_fork to allow us to kill kids when we die.
00058  *
00059  *  Revision 1.7  1995/08/07 21:46:14  smoot
00060  *  spawns the same encoder as it is for combine, etc.
00061  *  uses new pattern tables to determine frame types
00062  *
00063  *  Revision 1.6  1995/06/21 18:32:14  smoot
00064  *  Defined SOMAXCONN when not (LINUX)
00065  *  added binary r/w (DOsS!)
00066  *  ANSIified bcopy call
00067  *
00068  * Revision 1.5  1995/01/19  23:09:00  eyhung
00069  * Changed copyrights
00070  *
00071  * Revision 1.4  1994/03/15  00:27:11  keving
00072  * nothing
00073  *
00074  * Revision 1.3  1993/12/22  19:19:01  keving
00075  * nothing
00076  *
00077  * Revision 1.2  1993/07/22  22:23:43  keving
00078  * nothing
00079  *
00080  * Revision 1.1  1993/06/30  20:06:09  keving
00081  * nothing
00082  *
00083  */
00084 
00085 
00086 /*==============*
00087  * HEADER FILES *
00088  *==============*/
00089 
00090 #include <sys/types.h>
00091 #include <sys/socket.h>
00092 #include <sys/times.h>
00093 #include <time.h>
00094 #include <netinet/in.h>
00095 #include <unistd.h>
00096 #include <netdb.h>
00097 #include <errno.h>
00098 #include <string.h>
00099 #include <signal.h>
00100 #include "all.h"
00101 #include "param.h"
00102 #include "mpeg.h"
00103 #include "prototypes.h"
00104 #include "parallel.h"
00105 #include "readframe.h"
00106 #include "fsize.h"
00107 #include "combine.h"
00108 #include "frames.h"
00109 
00110 
00111 #define MAX_IO_SERVERS  10
00112 #ifndef SOMAXCONN
00113 #define SOMAXCONN 5
00114 #endif
00115 
00116 /*==================*
00117  * CONSTANTS        *
00118  *==================*/
00119 
00120 #define TERMINATE_PID_SIGNAL    SIGTERM  /* signal used to terminate forked childs */
00121 #ifndef MAXARGS
00122 #define MAXARGS         1024   /* Max Number of arguments in safe_fork command */
00123 #endif
00124 
00125 /*==================*
00126  * STATIC VARIABLES *
00127  *==================*/
00128 
00129 static int32   diffTime;
00130 static char     rsh[256];
00131 static struct hostent *hostEntry = NULL;
00132 static boolean  *frameDone;
00133 static int      outputServerSocket;
00134 static int      decodeServerSocket;
00135 static boolean  parallelPerfect = FALSE;
00136 static  int     current_max_forked_pid=0;
00137 
00138 
00139 /*==================*
00140  * GLOBAL VARIABLES *
00141  *==================*/
00142 
00143 extern int yuvHeight, yuvWidth;
00144 extern  time_t  timeStart, timeEnd;
00145 extern char     statFileName[256];
00146 extern FILE *statFile;
00147 extern boolean  debugMachines;
00148 extern boolean debugSockets;
00149 int parallelTestFrames = 10;
00150 int parallelTimeChunks = 60;
00151 char *IOhostName;
00152 int ioPortNumber;
00153 int combinePortNumber;
00154 int decodePortNumber;
00155 boolean niceProcesses = FALSE;
00156 boolean forceIalign = FALSE;
00157 int         machineNumber = -1;
00158 boolean remoteIO = FALSE;
00159 boolean separateConversion;
00160 time_t  IOtime = 0;
00161 extern char encoder_name[];
00162 int     ClientPid[MAX_MACHINES+4];
00163 
00164 /*===============================*
00165  * INTERNAL PROCEDURE prototypes *
00166  *===============================*/
00167 
00168 static void     TransmitPortNum _ANSI_ARGS_((char *hostName, int portNum,
00169                                                int ioPortNum));
00170 static void     EndIOServer _ANSI_ARGS_((void));
00171 static void SafeRead _ANSI_ARGS_((int fd, char *buf, int nbyte));
00172 static void SafeWrite _ANSI_ARGS_((int fd, char *buf, int nbyte));
00173 static int  CreateListeningSocket _ANSI_ARGS_((int *portNumber));
00174 static int  ConnectToSocket _ANSI_ARGS_((char *machineName, int portNum,
00175                                          struct hostent **hostEnt));
00176 static int safe_fork _ANSI_ARGS_((char *command));
00177 void cleanup_fork _ANSI_ARGS_ ((int dummy));
00178 
00179 
00180 /*=====================*
00181  * EXPORTED PROCEDURES *
00182  *=====================*/
00183 
00184                         /*=================*
00185                          * IO SERVER STUFF *
00186                          *=================*/
00187 
00188 
00189 /*===========================================================================*
00190  *
00191  * SetIOConvert
00192  *
00193  *      sets the IO conversion to be separate or not.  If separate, then
00194  *      some post-processing is done at slave end
00195  *
00196  * RETURNS:     nothing
00197  *
00198  * SIDE EFFECTS:    none
00199  *
00200  *===========================================================================*/
00201 void
00202 SetIOConvert(separate)
00203     boolean separate;
00204 {
00205     separateConversion = separate;
00206 }
00207 
00208 
00209 /*===========================================================================*
00210  *
00211  * SetParallelPerfect
00212  *
00213  *      If this is called, then frames will be divided up completely, and
00214  *      evenly (modulo rounding) between all the processors
00215  *
00216  * RETURNS:     nothing
00217  *
00218  * SIDE EFFECTS:    Sets parallelPerfect ....
00219  *
00220  *===========================================================================*/
00221 void
00222 SetParallelPerfect(val)
00223 boolean val;
00224 {
00225     parallelPerfect = val;
00226 }
00227 
00228 
00229 /*===========================================================================*
00230  *
00231  * SetRemoteShell
00232  *
00233  *      sets the remote shell program (usually rsh, but different on some
00234  *      machines)
00235  *
00236  * RETURNS:     nothing
00237  *
00238  * SIDE EFFECTS:    none
00239  *
00240  *===========================================================================*/
00241 void
00242 SetRemoteShell(shell)
00243     char *shell;
00244 {
00245     strcpy(rsh, shell);
00246 }
00247 
00248 
00249 /*===========================================================================*
00250  *
00251  * StartIOServer
00252  *
00253  *      start-up the IOServer with this process
00254  *      handles slave requests for frames, and exits when master tells it to
00255  *
00256  * RETURNS:     nothing
00257  *
00258  * SIDE EFFECTS:    none
00259  *
00260  *===========================================================================*/
00261 void
00262   StartIOServer(numInputFiles, parallelHostName, portNum)
00263 int numInputFiles;
00264 char *parallelHostName;
00265 int portNum;
00266 {
00267   int       ioPortNum;
00268   int       serverSocket;
00269   int       otherSock, otherSize;
00270   struct sockaddr otherSocket;
00271   int32   buffer[8];
00272   boolean       done = FALSE;
00273   int       frameNumber;
00274   MpegFrame *frame;
00275   register int y;
00276   int       numBytes;
00277   unsigned char   *bigBuffer;
00278   unsigned char   smallBuffer[1000];
00279   int       bigBufferSize;
00280   FILE    *filePtr;
00281   uint32  data;
00282   char    inputFileName[1024];
00283   char    fileName[1024];
00284 
00285   bigBufferSize = 0;
00286   bigBuffer = NULL;
00287 
00288   /* once we get IO port num, should transmit it to parallel server */
00289 
00290   serverSocket = CreateListeningSocket(&ioPortNum);
00291 
00292   if ( debugSockets ) {
00293     fprintf(stdout, "====I/O USING PORT %d\n", ioPortNum);
00294   }
00295 
00296   TransmitPortNum(parallelHostName, portNum, ioPortNum);
00297 
00298   otherSize = sizeof(otherSocket);
00299 
00300   if ( separateConversion ) {
00301     SetFileType(ioConversion);  /* for reading */
00302   } else {
00303     SetFileType(inputConversion);
00304   }
00305 
00306   /* now, wait until get done signal */
00307   while ( ! done ) {
00308     otherSock = accept(serverSocket, &otherSocket, &otherSize);
00309     if ( otherSock == -1 ) {
00310       fprintf(stderr, "ERROR:  I/O SERVER accept returned error %d\n", errno);
00311       exit(1);
00312     }
00313 
00314     SafeRead(otherSock, (char *)buffer, 4);
00315     frameNumber = ntohl(buffer[0]);
00316 
00317     if ( frameNumber == -1 ) {
00318       done = TRUE;
00319     } else if ( frameNumber == -2 ) {
00320       /* decoded frame to be output to disk */
00321       SafeRead(otherSock, (char *)buffer, 4);
00322       frameNumber = ntohl(buffer[0]);       
00323 
00324       if ( debugSockets ) {
00325         fprintf(stdout, "INPUT SERVER:  GETTING DECODED FRAME %d\n", frameNumber);
00326         fflush(stdout);
00327       }
00328 
00329       /* should read frame from socket, then write to disk */
00330       frame = Frame_New(frameNumber, 'i');
00331 
00332       Frame_AllocDecoded(frame, TRUE);
00333 
00334       for ( y = 0; y < Fsize_y; y++ ) {
00335         SafeRead(otherSock, (char *)frame->decoded_y[y], Fsize_x);
00336       }
00337 
00338       for (y = 0; y < (Fsize_y >> 1); y++) { /* U */
00339         SafeRead(otherSock, (char *)frame->decoded_cb[y], (Fsize_x >> 1));
00340       }
00341 
00342       for (y = 0; y < (Fsize_y >> 1); y++) { /* V */
00343         SafeRead(otherSock, (char *)frame->decoded_cr[y], (Fsize_x >> 1));
00344       }
00345 
00346       /* now output to disk */
00347       WriteDecodedFrame(frame);
00348 
00349       Frame_Free(frame);
00350     } else if ( frameNumber == -3 ) {
00351       /* request for decoded frame from disk */
00352       SafeRead(otherSock, (char *)buffer, 4);
00353       frameNumber = ntohl(buffer[0]);       
00354 
00355       if ( debugSockets ) {
00356         fprintf(stdout, "INPUT SERVER:  READING DECODED FRAME %d from DISK\n", frameNumber);
00357         fflush(stdout);
00358       }
00359 
00360       /* should read frame from disk, then write to socket */
00361       frame = Frame_New(frameNumber, 'i');
00362 
00363       Frame_AllocDecoded(frame, TRUE);
00364 
00365       ReadDecodedRefFrame(frame, frameNumber);
00366 
00367       /* now write to socket */
00368       for ( y = 0; y < Fsize_y; y++ ) {
00369         SafeWrite(otherSock, (char *)frame->decoded_y[y], Fsize_x);
00370       }
00371 
00372       for (y = 0; y < (Fsize_y >> 1); y++) { /* U */
00373         SafeWrite(otherSock, (char *)frame->decoded_cb[y], (Fsize_x >> 1));
00374       }
00375 
00376       for (y = 0; y < (Fsize_y >> 1); y++) { /* V */
00377         SafeWrite(otherSock, (char *)frame->decoded_cr[y], (Fsize_x >> 1));
00378       }
00379 
00380       Frame_Free(frame);
00381     } else if ( frameNumber == -4 ) {
00382       /* routing output frame from socket to disk */
00383       SafeRead(otherSock, (char *)buffer, 8);
00384       frameNumber = buffer[0];
00385       frameNumber = ntohl(frameNumber);
00386 
00387       /* read in number of bytes */
00388       numBytes = buffer[1];
00389       numBytes = ntohl(numBytes);
00390 
00391       /* make sure buffer is big enough for data */
00392       if ( numBytes > bigBufferSize ) {
00393         bigBufferSize = numBytes;
00394         if ( bigBuffer != NULL ) {
00395           free(bigBuffer);
00396         }
00397 
00398         bigBuffer = (unsigned char *) malloc(bigBufferSize*
00399                                              sizeof(unsigned char));
00400       }
00401 
00402       /* now read in the bytes */
00403       SafeRead(otherSock, (char *) bigBuffer, numBytes);
00404 
00405       /* open file to output this stuff to */
00406       sprintf(fileName, "%s.frame.%d", outputFileName, frameNumber);
00407       if ( (filePtr = fopen(fileName, "wb")) == NULL ) {
00408         fprintf(stderr, "ERROR:  Could not open output file(3):  %s\n",
00409                 fileName);
00410         exit(1);
00411       }
00412 
00413       /* now write the bytes here */
00414       fwrite(bigBuffer, sizeof(char), numBytes, filePtr);
00415 
00416       fclose(filePtr);
00417 
00418       if ( debugSockets ) {
00419         fprintf(stdout, "====I/O SERVER:  WROTE FRAME %d to disk\n",
00420                 frameNumber);
00421         fflush(stdout);
00422       }
00423     } else {
00424       if ( debugSockets ) {
00425         fprintf(stdout, "I/O SERVER GETTING FRAME %d\n", frameNumber);
00426         fflush(stdout);
00427       }
00428 
00429       /* should read in frame, then write to socket */
00430       frame = Frame_New(frameNumber, 'i');
00431 
00432       if ( separateConversion ) {
00433         GetNthInputFileName(inputFileName, frameNumber);
00434 
00435         /* do conversion and send right to the socket */
00436         filePtr = ReadIOConvert(inputFileName);
00437         do {
00438           numBytes = fread(smallBuffer, 1, 1000, filePtr);
00439 
00440           if ( numBytes > 0 ) {
00441             data = numBytes;
00442             data = htonl(data);
00443             SafeWrite(otherSock, (char *)&data, 4);
00444             SafeWrite(otherSock, (char *)smallBuffer, numBytes);
00445           }
00446         }
00447         while ( numBytes == 1000 );
00448 
00449         if ( strcmp(ioConversion, "*") == 0 ) {
00450           fclose(filePtr);
00451         } else {
00452           pclose(filePtr);
00453         }
00454       } else {
00455         GetNthInputFileName(inputFileName, frameNumber);
00456         ReadFrame(frame, inputFileName, inputConversion, TRUE);
00457 
00458         /* should now transmit yuv values */
00459         for (y = 0; y < yuvHeight; y++) { /* Y */
00460           SafeWrite(otherSock, (char *)frame->orig_y[y], yuvWidth);
00461         }
00462 
00463         for (y = 0; y < (yuvHeight >> 1); y++) { /* U */
00464           SafeWrite(otherSock, (char *)frame->orig_cb[y], yuvWidth >> 1);
00465         }
00466 
00467         for (y = 0; y < (yuvHeight >> 1); y++) { /* V */
00468           SafeWrite(otherSock, (char *)frame->orig_cr[y], yuvWidth >> 1);
00469         }
00470 
00471         /* now, make sure we don't leave until other processor read everything */
00472 
00473         SafeRead(otherSock, (char *)buffer, 4);
00474         /* should = 0 */
00475       }
00476 
00477       if ( debugSockets ) {
00478         fprintf(stdout, "====I/O SERVER:  READ FRAME %d\n",
00479                 frameNumber);
00480       }
00481 
00482       Frame_Free(frame);
00483     }
00484 
00485     close(otherSock);
00486   }
00487 
00488   close(serverSocket);
00489 
00490   if ( debugSockets ) {
00491     fprintf(stdout, "====I/O SERVER:  Shutting Down\n");
00492   }
00493 }
00494 
00495 
00496 /*===========================================================================*
00497  *
00498  * SendRemoteFrame
00499  *
00500  *      called by a slave to the I/O server; sends an encoded frame
00501  *      to the server to be sent to disk
00502  *
00503  * RETURNS:     nothing
00504  *
00505  * SIDE EFFECTS:    none
00506  *
00507  *===========================================================================*/
00508 void
00509 SendRemoteFrame(frameNumber, bb)
00510     int frameNumber;
00511     BitBucket *bb;
00512 {
00513     int clientSocket;
00514     u_long  data;
00515     int     negativeFour = -4;
00516     time_t  tempTimeStart, tempTimeEnd;
00517 
00518     time(&tempTimeStart);
00519 
00520     clientSocket = ConnectToSocket(IOhostName, ioPortNumber, &hostEntry);
00521 
00522     data = htonl(negativeFour);
00523     SafeWrite(clientSocket, (char *)&data, 4);
00524 
00525     data = htonl(frameNumber);
00526     SafeWrite(clientSocket, (char *)&data, 4);
00527 
00528     if ( frameNumber != -1 ) {
00529         /* send number of bytes */
00530         data = (bb->totalbits+7)>>3;
00531         data = htonl(data);
00532         SafeWrite(clientSocket, (char *)&data, 4);
00533 
00534         /* now send the bytes themselves */
00535         Bitio_WriteToSocket(bb, clientSocket);
00536     }
00537 
00538     close(clientSocket);
00539 
00540     time(&tempTimeEnd);
00541     IOtime += (tempTimeEnd-tempTimeStart);
00542 }
00543 
00544 
00545 
00546 
00547 /*===========================================================================*
00548  *
00549  * NoteFrameDone
00550  *
00551  *      called by slave to the Combine server; tells it these frames are
00552  *      done
00553  *
00554  * RETURNS:     nothing
00555  *
00556  * SIDE EFFECTS:    none
00557  *
00558  *===========================================================================*/
00559 void
00560 NoteFrameDone(frameStart, frameEnd)
00561     int frameStart;
00562     int frameEnd;
00563 {
00564     int clientSocket;
00565     u_long  data;
00566     int     negativeTwo = -2;
00567     time_t  tempTimeStart, tempTimeEnd;
00568 
00569     time(&tempTimeStart);
00570 
00571     clientSocket = ConnectToSocket(IOhostName, combinePortNumber, &hostEntry);
00572 
00573     data = negativeTwo;
00574     data = htonl(negativeTwo);
00575     SafeWrite(clientSocket, (char *)&data, 4);
00576 
00577     data = htonl(frameStart);
00578     SafeWrite(clientSocket, (char *)&data, 4);
00579 
00580     data = htonl(frameEnd);
00581     SafeWrite(clientSocket, (char *)&data, 4);
00582 
00583     close(clientSocket);
00584 
00585     time(&tempTimeEnd);
00586     IOtime += (tempTimeEnd-tempTimeStart);
00587 }
00588 
00589 
00590 /*===========================================================================*
00591  *
00592  * GetRemoteFrame
00593  *
00594  *      called by a slave; gets a remote frame from the I/O server
00595  *
00596  * RETURNS:     nothing
00597  *
00598  * SIDE EFFECTS:    none
00599  *
00600  *===========================================================================*/
00601 void
00602   GetRemoteFrame(frame, frameNumber)
00603 MpegFrame *frame;
00604 int frameNumber;
00605 {
00606   FILE    *filePtr;
00607   int   clientSocket;
00608   unsigned char   smallBuffer[1000];
00609   register int y;
00610   int       numBytes;
00611   u_long  data;
00612   char    fileName[256];
00613 
00614   Fsize_Note(frameNumber, yuvWidth, yuvHeight);
00615 
00616   if ( debugSockets ) {
00617     fprintf(stdout, "MACHINE %s REQUESTING connection for FRAME %d\n",
00618             getenv("HOST"), frameNumber);
00619     fflush(stdout);
00620   }
00621 
00622   clientSocket = ConnectToSocket(IOhostName, ioPortNumber, &hostEntry);
00623 
00624   data = frameNumber;
00625   data = htonl(data);
00626   SafeWrite(clientSocket, (char *)&data, 4);
00627 
00628   if ( frameNumber != -1 ) {
00629     if ( separateConversion ) {
00630       sprintf(fileName, "/tmp/foobar%d", machineNumber);
00631       filePtr = fopen(fileName, "wb");
00632 
00633       /* read in stuff, SafeWrite to file, perform local conversion */
00634       do {
00635         SafeRead(clientSocket, (char *)&numBytes, 4);
00636         numBytes = ntohl(numBytes);
00637 
00638         SafeRead(clientSocket, (char *)smallBuffer, numBytes);
00639 
00640         fwrite(smallBuffer, 1, numBytes, filePtr);
00641       } while ( numBytes == 1000 );
00642       fflush(filePtr);
00643       fclose(filePtr);
00644 
00645       /* now do slave conversion */
00646       ReadFrame(frame, fileName, slaveConversion, FALSE);
00647     } else {
00648       Frame_AllocYCC(frame);
00649 
00650       if ( debugSockets ) {
00651         fprintf(stdout, "MACHINE %s allocated YCC FRAME %d\n",
00652                 getenv("HOST"), frameNumber);
00653         fflush(stdout);
00654       }
00655 
00656       /* should now read yuv values */
00657       for (y = 0; y < yuvHeight; y++) { /* Y */
00658         SafeRead(clientSocket, (char *)frame->orig_y[y], yuvWidth);
00659       }
00660 
00661       for (y = 0; y < (yuvHeight >> 1); y++) { /* U */
00662         SafeRead(clientSocket, (char *)frame->orig_cb[y], yuvWidth>>1);
00663       }
00664 
00665       for (y = 0; y < (yuvHeight >> 1); y++) { /* V */
00666         SafeRead(clientSocket, (char *)frame->orig_cr[y], yuvWidth>>1);
00667       }
00668     }
00669   }
00670 
00671   data = 0;
00672   data = htonl(data);
00673   SafeWrite(clientSocket, (char *)&data, 4);
00674 
00675   close(clientSocket);
00676 
00677   if ( debugSockets ) {
00678     fprintf(stdout, "MACHINE %s READ COMPLETELY FRAME %d\n",
00679             getenv("HOST"), frameNumber);
00680     fflush(stdout);
00681   }
00682 }
00683 
00684 
00685 /*===========================================================================*
00686  *
00687  * StartCombineServer
00688  *
00689  *      start-up the CombineServer with this process
00690  *      handles combination of frames, and tells the
00691  *      master when it's done
00692  *
00693  * RETURNS:     nothing
00694  *
00695  * SIDE EFFECTS:    none
00696  *
00697  *===========================================================================*/
00698 void
00699   StartCombineServer(numInputFiles, outputFileName, parallelHostName, portNum)
00700 int numInputFiles;
00701 char *outputFileName;
00702 char *parallelHostName;
00703 int portNum;
00704 {
00705   int       combinePortNum;
00706   FILE    *ofp;
00707   
00708   /* once we get Combine port num, should transmit it to parallel server */
00709   
00710   outputServerSocket = CreateListeningSocket(&combinePortNum);
00711   
00712   if ( debugSockets ) {
00713     fprintf(stdout, "====OUTPUT USING PORT %d\n", combinePortNum);
00714   }
00715   
00716   TransmitPortNum(parallelHostName, portNum, combinePortNum);
00717   
00718   frameDone = (boolean *) malloc(numInputFiles*sizeof(boolean));
00719   memset((char *)frameDone, 0, numInputFiles*sizeof(boolean));
00720   
00721   if ( (ofp = fopen(outputFileName, "wb")) == NULL ) {
00722     fprintf(stderr, "ERROR:  Could not open output file!!\n");
00723     fflush(stderr);
00724     exit(1);
00725   }
00726   FramesToMPEG(numInputFiles, outputFileName, ofp, TRUE);
00727   
00728   if ( debugSockets ) {
00729     fprintf(stdout, "====COMBINE SERVER:  Shutting Down\n");
00730     fflush(stdout);
00731   }
00732   
00733   /* tell Master server we are done */
00734   TransmitPortNum(parallelHostName, portNum, combinePortNum);
00735   
00736   close(outputServerSocket);
00737 }
00738 
00739 
00740 /*===========================================================================*
00741  *
00742  * WaitForOutputFile
00743  *
00744  *      keep handling output events until we get the specified frame
00745  *      number
00746  *
00747  * RETURNS:     nothing
00748  *
00749  * SIDE EFFECTS:    none
00750  *
00751  *===========================================================================*/
00752 void
00753   WaitForOutputFile(number)
00754 int number;
00755 {
00756   int       otherSock;
00757   static int otherSize = sizeof(struct sockaddr);
00758   struct sockaddr otherSocket;
00759   int       frameNumber;
00760   int32   buffer[8];
00761   int frameStart, frameEnd;
00762 
00763   while ( ! frameDone[number] ) {
00764     otherSock = accept(outputServerSocket, &otherSocket, &otherSize);
00765     if ( otherSock == -1 ) {
00766       fprintf(stderr, "ERROR:  Combine SERVER accept returned error %d\n", errno);
00767       exit(1);
00768     }
00769 
00770     SafeRead(otherSock, (char *)buffer, 4);
00771     frameNumber = ntohl(buffer[0]);
00772 
00773     if ( frameNumber == -2 ) {
00774       /* this is notification from non-remote process that a frame is done */
00775 
00776       SafeRead(otherSock, (char *)buffer, 8);
00777       frameStart = buffer[0];
00778       frameStart = ntohl(frameStart);
00779       frameEnd = buffer[1];
00780       frameEnd = ntohl(frameEnd);
00781 
00782       for ( frameNumber = frameStart; frameNumber <= frameEnd;
00783            frameNumber++ ) {
00784         frameDone[frameNumber] = TRUE;
00785       }
00786     }
00787 
00788     close(otherSock);
00789   }
00790 
00791   if ( debugSockets ) {
00792     fprintf(stdout, "WAIT FOR FRAME %d over\n", number);
00793     fflush(stdout);
00794   }
00795 }
00796 
00797 
00798 /*=====================*
00799  * MASTER SERVER STUFF *
00800  *=====================*/
00801 
00802 
00803 /*===========================================================================*
00804  *
00805  * StartMasterServer
00806  *
00807  *      start the master server with this process
00808  *
00809  * RETURNS:     nothing
00810  *
00811  * SIDE EFFECTS:    none
00812  *
00813  *===========================================================================*/
00814 void
00815   StartMasterServer(numInputFiles, paramFile, outputFileName)
00816 int numInputFiles;
00817 char *paramFile;
00818 char *outputFileName;
00819 {
00820   FILE    *filePtr;
00821   register int ind, ind2;
00822   int       framesPerMachine;
00823   char    command[1024];
00824   char    *hostName;
00825   int       portNum;
00826   int       serverSocket;
00827   boolean finished[MAX_MACHINES];
00828   int       numFinished;
00829   int       otherSock, otherSize;
00830   struct sockaddr otherSocket;
00831   int       seconds;
00832   int32   buffer[8];
00833   int ioPortNum[MAX_IO_SERVERS];
00834   int       combinePortNum, decodePortNum;
00835   int       nextFrame;
00836   int       startFrames[MAX_MACHINES];
00837   int       numFrames[MAX_MACHINES];
00838   int       lastNumFrames[MAX_MACHINES];
00839   int       numSeconds[MAX_MACHINES];
00840   float   fps[MAX_MACHINES];
00841   int       numMachinesToEstimate;
00842   float   framesPerSecond;
00843   float   totalFPS, localFPS;
00844   int       framesDone;
00845   float   avgFPS;
00846   char    niceNess[256];
00847   int32   startFrame, endFrame;
00848   int numInputPorts = 0;
00849   int   numRemote = SOMAXCONN;
00850   int totalRemote = 0;
00851   time_t  startUpBegin, startUpEnd;
00852   time_t  shutDownBegin, shutDownEnd;
00853   float   timeChunk;
00854 
00855   time(&startUpBegin);
00856 
00857   if ( niceProcesses ) {
00858     sprintf(niceNess, "nice");
00859   } else {
00860     niceNess[0] = '\0';
00861   }
00862 
00863   time(&timeStart);
00864 
00865   PrintStartStats(-1, 0);
00866 
00867   /* create a server socket */
00868   hostName = getenv("HOST");
00869 
00870   if ( hostName == NULL ) {
00871     fprintf(stderr, "ERROR:  Set HOST environment variable\n");
00872     exit(1);
00873   }
00874 
00875   hostEntry = gethostbyname(hostName);
00876   if ( hostEntry == NULL ) {
00877     fprintf(stderr, "ERROR:  Could not find host %s in database\n",
00878             hostName);
00879     exit(1);
00880   }
00881 
00882   hostName = hostEntry->h_name;
00883 
00884   serverSocket = CreateListeningSocket(&portNum);
00885   if ( debugSockets ) {
00886     fprintf(stdout, "---USING PORT %d\n", portNum);
00887   }
00888 
00889   /* START COMBINE SERVER */
00890   sprintf(command, "%s -max_machines %d -output_server %s %d %d %s",
00891           encoder_name, numMachines, hostName, portNum, numInputFiles, paramFile);
00892   safe_fork(command);
00893 
00894   /* should now listen for connection from Combine server */
00895   otherSize = sizeof(otherSocket);
00896   otherSock = accept(serverSocket, &otherSocket, &otherSize);
00897   if ( otherSock == -1 ) {
00898     fprintf(stderr, "ERROR:  MASTER SERVER accept returned error %d\n", errno);
00899     exit(1);
00900   }
00901 
00902   SafeRead(otherSock, (char *)(&combinePortNum), 4);
00903   combinePortNum = ntohl(combinePortNum);
00904   combinePortNumber = combinePortNum;
00905   close(otherSock);
00906 
00907   if ( debugSockets ) {
00908     fprintf(stdout, "---MASTER SERVER:  Combine port number = %d\n",
00909             combinePortNum);
00910   }
00911 
00912   /* START DECODE SERVER if necessary */
00913   if ( referenceFrame == DECODED_FRAME ) {
00914     sprintf(command, "%s -max_machines %d -decode_server %s %d %d %s",
00915             encoder_name, numMachines, hostName, portNum, numInputFiles, paramFile);
00916     safe_fork(command);
00917 
00918     /* should now listen for connection from Decode server */
00919     otherSize = sizeof(otherSocket);
00920     otherSock = accept(serverSocket, &otherSocket, &otherSize);
00921     if ( otherSock == -1 ) {
00922       fprintf(stderr, "ERROR:  MASTER SERVER accept returned error %d\n", errno);
00923       exit(1);
00924     }
00925 
00926     SafeRead(otherSock, (char *)(&decodePortNum), 4);
00927     decodePortNum = ntohl(decodePortNum);
00928     close(otherSock);
00929 
00930     if ( debugSockets ) {
00931       fprintf(stdout, "---MASTER SERVER:  Decode port number = %d\n",
00932               decodePortNum);
00933     }
00934   }
00935 
00936   /* we are doing whole thing (if not, see above) */
00937 
00938   framesPerMachine = numInputFiles/numMachines;
00939 
00940   numFinished = 0;
00941 
00942   /* count number of remote machines */
00943   for ( ind = 0; ind < numMachines; ind++ ) {
00944     fps[ind] = -1.0;            /* illegal value as flag */
00945     if ( remote[ind] ) {
00946       totalRemote++;
00947     }
00948   }
00949 
00950   /* DO INITIAL TIME TESTS */
00951   nextFrame = 0;
00952   for ( ind = 0; ind < numMachines; ind++ ) {
00953     if ( (totalRemote != 0) && (numRemote == SOMAXCONN) ) {
00954       /* Create an I/O server */
00955       sprintf(command, "%s -max_machines %d -io_server %s %d %s",
00956               encoder_name, numMachines, hostName, portNum, paramFile);
00957       safe_fork(command);
00958 
00959       /* should now listen for connection from I/O server */
00960       otherSize = sizeof(otherSocket);
00961       otherSock = accept(serverSocket, &otherSocket, &otherSize);
00962       if ( otherSock == -1 ) {
00963         fprintf(stderr, "ERROR:  MASTER SERVER accept returned error %d\n", errno);
00964         exit(1);
00965       }
00966 
00967       SafeRead(otherSock, (char *)(&ioPortNum[numInputPorts]), 4);
00968       ioPortNum[numInputPorts] = ntohl(ioPortNum[numInputPorts]);
00969       close(otherSock);
00970 
00971       if ( debugSockets ) {
00972         fprintf(stdout, "---MASTER SERVER:  I/O port number = %d\n",
00973                 ioPortNum[numInputPorts]);
00974       }
00975 
00976       numInputPorts++;
00977       numRemote = 0;
00978     }
00979 
00980     finished[ind] = FALSE;
00981     numSeconds[ind] = 0;
00982 
00983     startFrame = nextFrame;
00984     if ( parallelPerfect ) {
00985       endFrame = startFrame+((numInputFiles-startFrame)/
00986                              (numMachines-ind))  -1;
00987 
00988       if ( forceIalign ) {
00989         while (FType_Type(endFrame) != 'i') {endFrame++;}
00990       }
00991 
00992       /* always give at least 1 frame */
00993       if ( endFrame < startFrame ) {
00994         endFrame = startFrame;
00995       }
00996 
00997       /* make sure not out of bounds */
00998       if ( endFrame >= numInputFiles ) {
00999         endFrame = numInputFiles-1;
01000       }
01001     } else if ( forceIalign ) {
01002       endFrame = startFrame+framePatternLen-1;
01003       while (FType_Type(endFrame) != 'i') {endFrame++;}
01004     } else {
01005       endFrame = startFrame+parallelTestFrames-1;
01006     }
01007             
01008     if ( remote[ind] ) {
01009       sprintf(command, "%s %s -l %s %s %s -child %s %d %d %d %d %d %d -frames %d %d %s",
01010               rsh,
01011               machineName[ind], userName[ind], niceNess,
01012               executable[ind],
01013               hostName, portNum, ioPortNum[numInputPorts-1],
01014               combinePortNum, decodePortNum, ind,
01015               remote[ind],
01016               startFrame, endFrame,
01017               remoteParamFile[ind]);
01018       numRemote++;
01019       totalRemote--;
01020     } else {
01021       sprintf(command, "%s %s -l %s %s %s -child %s %d %d %d %d %d %d -frames %d %d %s",
01022               rsh,
01023               machineName[ind], userName[ind], niceNess,
01024               executable[ind],
01025               hostName, portNum, ioPortNum[numInputPorts-1],
01026               combinePortNum, decodePortNum, ind,
01027               remote[ind],
01028               startFrame, endFrame,
01029               paramFile);
01030     }
01031 
01032     if ( debugMachines ) {
01033       fprintf(stdout, "---%s:  frames %d to %d\n",
01034               machineName[ind],
01035               startFrame, endFrame);
01036     }
01037         
01038 
01039     safe_fork(command);
01040 
01041     nextFrame = endFrame+1;
01042     startFrames[ind] = startFrame;
01043     numFrames[ind] = endFrame-startFrame+1;
01044     lastNumFrames[ind] = endFrame-startFrame+1;
01045   }
01046 
01047   framesDone = 0;
01048 
01049   time(&startUpEnd);
01050 
01051   /* now, wait for other processes to finish and boss them around */
01052   while ( numFinished != numMachines ) {
01053     otherSize = sizeof(otherSocket);
01054     otherSock = accept(serverSocket, &otherSocket, &otherSize);
01055     if ( otherSock == -1 ) {
01056       fprintf(stderr, "ERROR:  MASTER SERVER 2 accept returned error %d\n", errno);
01057       exit(1);
01058     }
01059 
01060     SafeRead(otherSock, (char *)buffer, 8);
01061 
01062     ind = ntohl(buffer[0]);
01063     seconds = ntohl(buffer[1]);
01064 
01065     NoteFrameDone(startFrames[ind],
01066                   startFrames[ind]+lastNumFrames[ind]-1);
01067 
01068     numSeconds[ind] += seconds;
01069     fps[ind] = (float)numFrames[ind]/(float)numSeconds[ind];
01070 
01071     if ( seconds != 0 )
01072       framesPerSecond = (float)lastNumFrames[ind]/(float)seconds;
01073     else
01074       framesPerSecond = (float)lastNumFrames[ind]*2.0;
01075 
01076     framesDone += lastNumFrames[ind];
01077 
01078     if ( nextFrame >= numInputFiles ) {
01079       buffer[0] = htonl(-1);
01080       buffer[1] = htonl(0);
01081       SafeWrite(otherSock, (char *)buffer, 8);
01082       numFinished++;
01083 
01084       if ( debugMachines ) {
01085         fprintf(stdout, "---%s FINISHED job (%f fps) (%d/%d done):  DONE\n",
01086                 machineName[ind], framesPerSecond, numFinished,
01087                 numMachines);
01088       }
01089     } else {
01090       if (numSeconds[ind] != 0) {
01091         avgFPS = (float)numFrames[ind]/(float)numSeconds[ind];
01092       } else {
01093         avgFPS = 0.1;           /* arbitrary small value */
01094       }
01095 
01096       startFrame = nextFrame;
01097 
01098       if ( parallelTimeChunks == -1 ) { /* TAPER STUFF */
01099         /* estimate time left */
01100         /* frames left = numInputFiles-nextFrame */
01101         totalFPS = 0.0;
01102         numMachinesToEstimate = 0;
01103         for ( ind2 = 0; ind2 < numMachines; ind2++ ) {
01104           if ( fps[ind2] < 0.0 ) {
01105             numMachinesToEstimate++;
01106           } else {
01107             totalFPS += fps[ind2];
01108           }
01109         }
01110 
01111         totalFPS = (float)numMachines*
01112           (totalFPS/(float)(numMachines-numMachinesToEstimate));
01113 
01114         timeChunk = (float)(numInputFiles-nextFrame)/totalFPS;
01115 
01116         fprintf(stdout, "ASSIGNING %s %.2f seconds of work\n",
01117                 machineName[ind], timeChunk);
01118         fflush(stdout);
01119         endFrame = nextFrame +
01120           (int)((float)timeChunk*avgFPS) - 1;
01121       } else {
01122         endFrame = nextFrame +
01123           (int)((float)parallelTimeChunks*avgFPS) - 1;
01124       }
01125 
01126       if ( forceIalign ) {
01127         while (FType_Type(endFrame) != 'i') {endFrame++;}
01128       }
01129 
01130       if ( endFrame < startFrame ) { /* always give at least 1 frame */
01131         endFrame = startFrame;
01132       }
01133       if ( endFrame >= numInputFiles ) {
01134         endFrame = numInputFiles-1;
01135       }
01136 
01137       nextFrame = endFrame+1;
01138 
01139       startFrames[ind] = startFrame;
01140       numFrames[ind] += (endFrame-startFrame+1);
01141       lastNumFrames[ind] = (endFrame-startFrame+1);
01142 
01143       if ( debugMachines ) {
01144         fprintf(stdout, "---%s FINISHED job (%f fps) (%d/%d done):  next:  %d to %d\n",
01145                 machineName[ind], framesPerSecond, numFinished,
01146                 numMachines, startFrame, endFrame);
01147       }
01148 
01149       buffer[0] = htonl(startFrame);
01150       buffer[1] = htonl(endFrame);
01151 
01152       SafeWrite(otherSock, (char *)buffer, 8);
01153     }
01154 
01155     close(otherSock);
01156 
01157     if ( debugMachines ) {
01158       fprintf(stdout, "---FRAMES DONE:  %d\tFARMED OUT:  %d\tLEFT:  %d\n",
01159               framesDone, nextFrame-framesDone, numInputFiles-nextFrame);
01160     }
01161   }
01162 
01163   time(&shutDownBegin);
01164 
01165   /* end all input servers */
01166   IOhostName = hostName;
01167   for ( ind = 0; ind < numInputPorts; ind++ ) {
01168     ioPortNumber = ioPortNum[ind];
01169     EndIOServer();
01170   }
01171 
01172   /* now wait for CombineServer to tell us they're done */
01173   otherSize = sizeof(otherSocket);
01174   otherSock = accept(serverSocket, &otherSocket, &otherSize);
01175   if ( otherSock == -1 ) {
01176     fprintf(stderr, "ERROR:  MASTER SERVER accept returned error %d\n", errno);
01177     exit(1);
01178   }
01179 
01180   SafeRead(otherSock, (char *)buffer, 4);
01181   close(otherSock);
01182     
01183   close(serverSocket);
01184 
01185   time(&timeEnd);
01186   diffTime = (int32)(timeEnd-timeStart);
01187 
01188   time(&shutDownEnd);
01189 
01190   for ( ind2 = 0; ind2 < 2; ind2++ ) {
01191     if ( ind2 == 0 ) {
01192       filePtr = stdout;
01193     } else if ( statFile != NULL ) {
01194       filePtr = statFile;
01195     } else {
01196       continue;
01197     }
01198 
01199     fprintf(filePtr, "\n\n");
01200     fprintf(filePtr, "PARALLEL SUMMARY\n");
01201     fprintf(filePtr, "----------------\n");
01202     fprintf(filePtr, "\n");
01203     fprintf(filePtr, "START UP TIME:  %d seconds\n",
01204             (int)startUpEnd-(int)startUpBegin);
01205     fprintf(filePtr, "SHUT DOWN TIME:  %d seconds\n",
01206             (int)shutDownEnd-(int)shutDownBegin);
01207 
01208     fprintf(filePtr, "%14s\tFrames\tSeconds\tFrames Per Second\tSelf Time\n",
01209             "MACHINE");
01210     fprintf(filePtr, "--------------\t------\t-------\t-----------------\t---------\n");
01211     totalFPS = 0.0;
01212     for ( ind = 0; ind < numMachines; ind++ ) {
01213       localFPS = (float)numFrames[ind]/(float)numSeconds[ind];
01214       fprintf(filePtr, "%14s\t%d\t%d\t%f\t\t%d\n",
01215               machineName[ind], numFrames[ind], numSeconds[ind],
01216               localFPS, (int)((float)numInputFiles/localFPS));
01217       totalFPS += localFPS;
01218     }
01219 
01220     fprintf(filePtr, "--------------\t------\t-------\t-----------------\t---------\n");
01221 
01222     fprintf(filePtr, "%14s\t\t%d\t%f\n", "OPTIMAL", 
01223             (int)((float)numInputFiles/totalFPS),
01224             totalFPS);
01225     fprintf(filePtr, "%14s\t\t%d\t%f\n", "ACTUAL", diffTime, 
01226             (float)numInputFiles/(float)diffTime);
01227 
01228     fprintf(filePtr, "\n\n");
01229   }
01230 
01231   if ( statFile != NULL ) {
01232     fclose(statFile);
01233   }
01234 }
01235 
01236 
01237 /*===========================================================================*
01238  *
01239  * NotifyMasterDone
01240  *
01241  *      called by a slave process; tells the master process it is done
01242  *
01243  * RETURNS:     nothing
01244  *
01245  * SIDE EFFECTS:    none
01246  *
01247  *===========================================================================*/
01248 boolean
01249   NotifyMasterDone(hostName, portNum, machineNumber, seconds, frameStart,
01250                    frameEnd)
01251 char *hostName;
01252 int portNum;
01253 int machineNumber;
01254 int seconds;
01255 int *frameStart;
01256 int *frameEnd;
01257 {
01258   int   clientSocket;
01259   int32   buffer[8];
01260   time_t  tempTimeStart, tempTimeEnd;
01261 
01262   time(&tempTimeStart);
01263 
01264   clientSocket = ConnectToSocket(hostName, portNum, &hostEntry);
01265 
01266   buffer[0] = htonl(machineNumber);
01267   buffer[1] = htonl(seconds);
01268 
01269   SafeWrite(clientSocket, (char *)buffer, 8);
01270 
01271   SafeRead(clientSocket, (char *)buffer, 8);
01272   *frameStart = ntohl(buffer[0]);
01273   *frameEnd = ntohl(buffer[1]);
01274 
01275   close(clientSocket);
01276 
01277   time(&tempTimeEnd);
01278   IOtime += (tempTimeEnd-tempTimeStart);
01279 
01280   return ((*frameStart) >= 0);
01281 }
01282 
01283 
01284 /*===========================================================================*
01285  *
01286  * StartDecodeServer
01287  *
01288  *      start-up the DecodeServer with this process
01289  *      handles transfer of decoded frames to/from processes, and exits
01290  *      when master tells it to
01291  *      this is necessary only if referenceFrame == DECODED_FRAME
01292  *
01293  * RETURNS:     nothing
01294  *
01295  * SIDE EFFECTS:    none
01296  *
01297  *===========================================================================*/
01298 void
01299   StartDecodeServer(numInputFiles, decodeFileName, parallelHostName, portNum)
01300 int numInputFiles;
01301 char *decodeFileName;
01302 char *parallelHostName;
01303 int portNum;
01304 {
01305   int       otherSock, otherSize;
01306   struct sockaddr otherSocket;
01307   int       decodePortNum;
01308   int32   buffer[8];
01309   int       frameReady;
01310   boolean *ready;
01311   int       *waitMachine;
01312   int       *waitPort;
01313   int       *waitList;
01314   int       slaveNumber;
01315   int       slavePort;
01316   int       waitPtr;
01317   struct hostent *nullHost = NULL;
01318   int       clientSocket;
01319 
01320   /* should keep list of port numbers to notify when frames become ready */
01321 
01322   ready = (boolean *) calloc(numInputFiles, sizeof(boolean));
01323   waitMachine = (int *) calloc(numInputFiles, sizeof(int));
01324   waitPort = (int *) malloc(numMachines*sizeof(int));
01325   waitList = (int *) calloc(numMachines, sizeof(int));
01326 
01327   /* once we get Decode port num, should transmit it to parallel server */
01328 
01329   decodeServerSocket = CreateListeningSocket(&decodePortNum);
01330 
01331   if ( debugSockets ) {
01332     fprintf(stdout, "====DECODE USING PORT %d\n", decodePortNum);
01333   }
01334 
01335   TransmitPortNum(parallelHostName, portNum, decodePortNum);
01336 
01337   frameDone = (boolean *) malloc(numInputFiles*sizeof(boolean));
01338   memset((char *)frameDone, 0, numInputFiles*sizeof(boolean));
01339 
01340   /* wait for ready signals and requests */
01341   while ( TRUE ) {
01342     otherSize = sizeof(otherSocket);
01343     otherSock = accept(decodeServerSocket, &otherSocket, &otherSize);
01344     if ( otherSock == -1 ) {
01345       fprintf(stderr, "ERROR:  DECODE SERVER accept returned error %d\n", errno);
01346       exit(1);
01347     }
01348 
01349     SafeRead(otherSock, (char *)buffer, 4);
01350     frameReady = buffer[0];
01351     frameReady = ntohl(frameReady);
01352 
01353     if ( frameReady == -2 ) {
01354       SafeRead(otherSock, (char *)buffer, 4);
01355       frameReady = buffer[0];
01356       frameReady = ntohl(frameReady);
01357 
01358       if ( debugSockets ) {
01359         fprintf(stdout, "====DECODE SERVER:  REQUEST FOR %d\n", frameReady);
01360         fflush(stdout);     
01361       }
01362 
01363       /* now respond if it's ready yet */
01364       buffer[0] = frameDone[frameReady];
01365       buffer[0] = htonl(buffer[0]);
01366       SafeWrite(otherSock, (char *)buffer, 4);
01367 
01368       if ( ! frameDone[frameReady] ) {
01369         /* read machine number, port number */
01370         SafeRead(otherSock, (char *)buffer, 8);
01371         slaveNumber = buffer[0];
01372         slaveNumber = ntohl(slaveNumber);
01373         slavePort = buffer[1];
01374         slavePort = ntohl(slavePort);
01375 
01376         if ( debugSockets ) {
01377           fprintf(stdout, "WAITING:  SLAVE %d, PORT %d\n",
01378                   slaveNumber, slavePort);
01379         }
01380 
01381         waitPort[slaveNumber] = slavePort;
01382         if ( waitMachine[frameReady] == 0 ) {
01383           waitMachine[frameReady] = slaveNumber+1;
01384         } else {
01385           /* someone already waiting for this frame */
01386           /* follow list of waiters to the end */
01387           waitPtr = waitMachine[frameReady]-1;
01388           while ( waitList[waitPtr] != 0 ) {
01389             waitPtr = waitList[waitPtr]-1;
01390           }
01391 
01392           waitList[waitPtr] = slaveNumber+1;
01393           waitList[slaveNumber] = 0;
01394         }
01395       }
01396     } else {
01397       frameDone[frameReady] = TRUE;
01398 
01399       if ( debugSockets ) {
01400         fprintf(stdout, "====DECODE SERVER:  FRAME %d READY\n", frameReady);
01401         fflush(stdout);
01402       }
01403 
01404       if ( waitMachine[frameReady] ) {
01405         /* need to notify one or more machines it's ready */
01406         waitPtr = waitMachine[frameReady]-1;
01407         while ( waitPtr >= 0 ) {
01408           clientSocket = ConnectToSocket(machineName[waitPtr],
01409                                          waitPort[waitPtr],
01410                                          &nullHost);
01411           close(clientSocket);
01412           waitPtr = waitList[waitPtr]-1;
01413         }
01414       }
01415     }
01416 
01417     close(otherSock);
01418   }
01419 
01420   if ( debugSockets ) {
01421     fprintf(stdout, "====DECODE SERVER:  Shutting Down\n");
01422     fflush(stdout);
01423   }
01424 
01425   /* tell Master server we are done */
01426   TransmitPortNum(parallelHostName, portNum, decodePortNum);
01427 
01428   close(decodeServerSocket);
01429 }
01430 
01431 
01432 /*=====================*
01433  * INTERNAL PROCEDURES *
01434  *=====================*/
01435 
01436 
01437 /*===========================================================================*
01438  *
01439  * TransmitPortNum
01440  *
01441  *      called by the I/O or Combine server; transmits the appropriate
01442  *      port number to the master
01443  *
01444  * RETURNS:     nothing
01445  *
01446  * SIDE EFFECTS:    none
01447  *
01448  *===========================================================================*/
01449 static void
01450   TransmitPortNum(hostName, portNum, newPortNum)
01451 char *hostName;
01452 int portNum;
01453 int newPortNum;
01454 {
01455   int   clientSocket;
01456   u_long  data;
01457 
01458   clientSocket = ConnectToSocket(hostName, portNum, &hostEntry);
01459 
01460   data = htonl(newPortNum);
01461   SafeWrite(clientSocket, (char *) &data, 4);
01462 
01463   close(clientSocket);
01464 }
01465 
01466 
01467 /*===========================================================================*
01468  *
01469  * SafeRead
01470  *
01471  *      safely read from the given socket; the procedure keeps reading until
01472  *      it gets the number of bytes specified
01473  *
01474  * RETURNS:     nothing
01475  *
01476  * SIDE EFFECTS:    none
01477  *
01478  *===========================================================================*/
01479 static void
01480   SafeRead(fd, buf, nbyte)
01481 int fd;
01482 char *buf;
01483 int nbyte;
01484 {
01485   int numRead;
01486   int result;
01487 
01488   numRead = 0;
01489 
01490   while ( numRead != nbyte ) {
01491     result = read(fd, &buf[numRead], nbyte-numRead);
01492 
01493     if ( result == -1 ) {
01494       fprintf(stderr, "ERROR:  read (of %d bytes (total %d) ) returned error %d\n",
01495               nbyte-numRead, nbyte, errno);
01496       exit(1);
01497     }
01498     numRead += result;
01499   }
01500 }
01501 
01502 
01503 /*===========================================================================*
01504  *
01505  * SafeWrite
01506  *
01507  *      safely write to the given socket; the procedure keeps writing until
01508  *      it sends the number of bytes specified
01509  *
01510  * RETURNS:     nothing
01511  *
01512  * SIDE EFFECTS:    none
01513  *
01514  *===========================================================================*/
01515 static void
01516   SafeWrite(fd, buf, nbyte)
01517 int fd;
01518 char *buf;
01519 int nbyte;
01520 {
01521   int numWritten;
01522   int result;
01523 
01524   numWritten = 0;
01525 
01526   while ( numWritten != nbyte ) {
01527     result = write(fd, &buf[numWritten], nbyte-numWritten);
01528 
01529     if ( result == -1 ) {
01530       fprintf(stderr, "ERROR:  read (of %d bytes (total %d) ) returned error %d\n",
01531               nbyte-numWritten, nbyte, errno);
01532       exit(1);
01533     }
01534     numWritten += result;
01535   }
01536 }
01537 
01538 
01539 /*===========================================================================*
01540  *
01541  * EndIOServer
01542  *
01543  *      called by the master process -- tells the I/O server to commit
01544  *      suicide
01545  *
01546  * RETURNS:     nothing
01547  *
01548  * SIDE EFFECTS:    none
01549  *
01550  *===========================================================================*/
01551 static void
01552   EndIOServer()
01553 {
01554   /* send signal to IO server:  -1 as frame number */
01555   GetRemoteFrame(NULL, -1);
01556 }
01557 
01558 
01559 /*===========================================================================*
01560  *
01561  * NotifyDecodeServerReady
01562  *
01563  *      called by a slave to the Decode Server to tell it a decoded frame
01564  *      is ready and waiting
01565  *
01566  * RETURNS:     nothing
01567  *
01568  * SIDE EFFECTS:    none
01569  *
01570  *===========================================================================*/
01571 void
01572   NotifyDecodeServerReady(id)
01573 int id;
01574 {
01575   int   clientSocket;
01576   u_long  data;
01577   time_t  tempTimeStart, tempTimeEnd;
01578 
01579   time(&tempTimeStart);
01580 
01581   clientSocket = ConnectToSocket(IOhostName, decodePortNumber, &hostEntry);
01582 
01583   data = htonl(id);
01584   SafeWrite(clientSocket, (char *)&data, 4);
01585 
01586   close(clientSocket);
01587 
01588   time(&tempTimeEnd);
01589   IOtime += (tempTimeEnd-tempTimeStart);
01590 }
01591 
01592 
01593 /*===========================================================================*
01594  *
01595  * WaitForDecodedFrame
01596  *
01597  *      blah blah blah
01598  *
01599  * RETURNS:     nothing
01600  *
01601  * SIDE EFFECTS:    none
01602  *
01603  *===========================================================================*/
01604 void
01605   WaitForDecodedFrame(id)
01606 int id;
01607 {
01608   int   clientSocket;
01609   u_long  data;
01610   int       negativeTwo = -2;
01611   int     ready;
01612 
01613   /* wait for a decoded frame */
01614   if ( debugSockets ) {
01615     fprintf(stdout, "WAITING FOR DECODED FRAME %d\n", id);
01616   }
01617 
01618   clientSocket = ConnectToSocket(IOhostName, decodePortNumber, &hostEntry);
01619 
01620   /* first, tell DecodeServer we're waiting for this frame */
01621   data = negativeTwo;
01622   data = htonl(negativeTwo);
01623   SafeWrite(clientSocket, (char *)&data, 4);
01624 
01625   data = htonl(id);
01626   SafeWrite(clientSocket, (char *)&data, 4);
01627 
01628   SafeRead(clientSocket, (char *)&data, 4);
01629   ready = data;
01630   ready = ntohl(ready);
01631 
01632   if ( ! ready ) {
01633     int     waitSocket;
01634     int     waitPort;
01635     int     otherSock, otherSize;
01636     struct sockaddr otherSocket;
01637 
01638     /* it's not ready; set up a connection and wait for decode server */
01639     waitSocket = CreateListeningSocket(&waitPort);
01640 
01641     /* tell decode server where we are */
01642     data = machineNumber;
01643     data = ntohl(data);
01644     SafeWrite(clientSocket, (char *)&data, 4);
01645 
01646     data = waitPort;
01647     data = ntohl(data);
01648     SafeWrite(clientSocket, (char *)&data, 4);
01649 
01650     close(clientSocket);
01651 
01652     if ( debugSockets ) {
01653       fprintf(stdout, "SLAVE:  WAITING ON SOCKET %d\n", waitPort);
01654       fflush(stdout);
01655     }
01656 
01657     otherSize = sizeof(otherSocket);
01658     otherSock = accept(waitSocket, &otherSocket, &otherSize);
01659     if ( otherSock == -1 ) {
01660       fprintf(stderr, "ERROR:  I/O SERVER accept returned error %d\n", errno);
01661       exit(1);
01662     }
01663 
01664     /* should we verify this is decode server? */
01665     /* for now, we won't */
01666 
01667     close(otherSock);
01668 
01669     close(waitSocket);
01670   } else {
01671     close(clientSocket);
01672   }
01673 
01674   if ( debugSockets ) {
01675     fprintf(stdout, "YE-HA FRAME %d IS NOW READY\n", id);
01676   }
01677 }
01678 
01679 
01680 /*===========================================================================*
01681  *
01682  * CreateListeningSocket
01683  *
01684  *      create a socket, using the first unused port number we can find
01685  *
01686  * RETURNS:     the socket; portNumber is modified appropriately
01687  *
01688  * SIDE EFFECTS:    none
01689  *
01690  *===========================================================================*/
01691 static int
01692   CreateListeningSocket(portNumber)
01693 int *portNumber;
01694 {
01695   int       resultSocket;
01696   u_short tempShort;
01697   int       result;
01698   struct sockaddr_in    nameEntry;
01699 
01700   resultSocket = socket(AF_INET, SOCK_STREAM, 0);
01701   if ( resultSocket == -1 ) {
01702     fprintf(stderr, "ERROR:  Call to socket() gave error %d\n", errno);
01703     exit(1);
01704   }
01705 
01706   memset((char *) &nameEntry, 0, sizeof(nameEntry));
01707   nameEntry.sin_family = AF_INET;
01708 
01709   /* find a port number that isn't used */
01710   (*portNumber) = 2048;
01711   do {
01712     (*portNumber)++;
01713     tempShort = (*portNumber);
01714     nameEntry.sin_port = htons(tempShort);
01715     result = bind(resultSocket, (struct sockaddr *) &nameEntry,
01716                   sizeof(struct sockaddr));
01717   }
01718   while ( result == -1 );
01719 
01720   /* would really like to wait for 1+numMachines machines, but this is max
01721    * allowable, unfortunately
01722    */
01723   result = listen(resultSocket, SOMAXCONN);
01724   if ( result == -1 ) {
01725     fprintf(stderr, "ERROR:  call to listen() gave error %d\n", errno);
01726     exit(1);
01727   }
01728 
01729   return resultSocket;
01730 }
01731 
01732 
01733 /*===========================================================================*
01734  *
01735  * ConnectToSocket
01736  *
01737  *      creates a socket and connects it to the specified socket
01738  *      hostEnt either is the host entry, or is NULL and needs to be
01739  *      found by using machineName
01740  *
01741  * RETURNS:     the socket
01742  *
01743  * SIDE EFFECTS:    none
01744  *
01745  *===========================================================================*/
01746 static int
01747   ConnectToSocket(machineName, portNum, hostEnt)
01748 char *machineName;
01749 int     portNum;
01750 struct hostent **hostEnt;
01751 {
01752   int   resultSocket;
01753   int       result;
01754   u_short           tempShort;
01755   struct sockaddr_in  nameEntry;
01756 
01757   if ( (*hostEnt) == NULL ) {
01758     (*hostEnt) = gethostbyname(machineName);
01759     if ( (*hostEnt) == NULL ) {
01760       fprintf(stderr, "ERROR:  Couldn't get host by name (%s)\n",
01761               machineName);
01762       exit(1);
01763     }
01764   }
01765 
01766   resultSocket = socket(AF_INET, SOCK_STREAM, 0);
01767   if ( resultSocket == -1 ) {
01768     fprintf(stderr, "ERROR:  socket returned error %d\n", errno);
01769     exit(1);
01770   }
01771 
01772   nameEntry.sin_family = AF_INET;
01773   memset((void *) nameEntry.sin_zero, 0, 8);
01774   memcpy((void *) &(nameEntry.sin_addr.s_addr),
01775          (void *) (*hostEnt)->h_addr_list[0],
01776          (size_t) (*hostEnt)->h_length);
01777   tempShort = portNum;
01778   nameEntry.sin_port = htons(tempShort);
01779 
01780   result = connect(resultSocket, (struct sockaddr *) &nameEntry,
01781                    sizeof(struct sockaddr));
01782   if ( result == -1 ) {
01783     fprintf(stderr, "ERROR:  connect (ConnectToSocket, port %d) from machine %s returned error %d\n",
01784             portNum, getenv("HOST"), errno);
01785     exit(1);
01786   }
01787 
01788   return resultSocket;
01789 }
01790 
01791 
01792 /*===========================================================================*
01793  *
01794  * SendDecodedFrame
01795  *
01796  *  Send the frame to the decode server.
01797  *
01798  * RETURNS:     nothing
01799  *
01800  * SIDE EFFECTS:    none
01801  *
01802  *===========================================================================*/
01803 void
01804   SendDecodedFrame(frame)
01805 MpegFrame *frame;
01806 {
01807   int   clientSocket;
01808   register int y;
01809   int       negativeTwo = -2;
01810   uint32  data;
01811 
01812   /* send to IOServer */
01813   clientSocket = ConnectToSocket(IOhostName, ioPortNumber, &hostEntry);
01814 
01815   data = negativeTwo;
01816   data = htonl(data);
01817   SafeWrite(clientSocket, (char *)&data, 4);
01818 
01819   data = frame->id;
01820   data = htonl(data);
01821   SafeWrite(clientSocket, (char *)&data, 4);
01822 
01823   for ( y = 0; y < Fsize_y; y++ ) {
01824     SafeWrite(clientSocket, (char *)frame->decoded_y[y], Fsize_x);
01825   }
01826 
01827   for (y = 0; y < (Fsize_y >> 1); y++) { /* U */
01828     SafeWrite(clientSocket, (char *)frame->decoded_cb[y], (Fsize_x >> 1));
01829   }
01830 
01831   for (y = 0; y < (Fsize_y >> 1); y++) { /* V */
01832     SafeWrite(clientSocket, (char *)frame->decoded_cr[y], (Fsize_x >> 1));
01833   }
01834 
01835   close(clientSocket);
01836 }
01837 
01838 
01839 /*===========================================================================*
01840  *
01841  * GetRemoteDecodedFrame
01842  *
01843  *  get the decoded frame from the decode server.
01844  *
01845  * RETURNS:     nothing
01846  *
01847  * SIDE EFFECTS:   
01848  *
01849  *===========================================================================*/
01850 void
01851   GetRemoteDecodedRefFrame(frame, frameNumber)
01852 MpegFrame *frame;
01853 int frameNumber;
01854 {
01855   int   clientSocket;
01856   register int y;
01857   int       negativeThree = -3;
01858   uint32  data;
01859 
01860   /* send to IOServer */
01861   clientSocket = ConnectToSocket(IOhostName, ioPortNumber, &hostEntry);
01862 
01863   /* ask IOServer for decoded frame */
01864   data = negativeThree;
01865   data = htonl(data);
01866   SafeWrite(clientSocket, (char *)&data, 4);
01867 
01868   data = frame->id;
01869   data = htonl(data);
01870   SafeWrite(clientSocket, (char *)&data, 4);
01871 
01872   for ( y = 0; y < Fsize_y; y++ ) {
01873     SafeRead(clientSocket, (char *)frame->decoded_y[y], Fsize_x);
01874   }
01875 
01876   for (y = 0; y < (Fsize_y >> 1); y++) { /* U */
01877     SafeRead(clientSocket, (char *)frame->decoded_cb[y], (Fsize_x >> 1));
01878   }
01879 
01880   for (y = 0; y < (Fsize_y >> 1); y++) { /* V */
01881     SafeRead(clientSocket, (char *)frame->decoded_cr[y], (Fsize_x >> 1));
01882   }
01883 
01884   close(clientSocket);
01885     
01886 }
01887 
01888 
01889 /*********
01890   routines handling forks, execs, PIDs and signals
01891   save, system-style forks
01892   apian@ise.fhg.de
01893   *******/
01894 
01895 
01896 /*===========================================================================*
01897  *
01898  * cleanup_fork
01899  *
01900  *  Kill all the children, to be used when we get killed
01901  *
01902  * RETURNS:     nothing
01903  *
01904  * SIDE EFFECTS:   kills other processes
01905  *
01906  *===========================================================================*/
01907 void cleanup_fork( dummy )                      /* try to kill all child processes */
01908      int dummy;
01909 {
01910   register int i;
01911   for (i = 0;  i < current_max_forked_pid;  ++i ) {
01912 
01913 #ifdef DEBUG_FORK
01914     fprintf(stderr, "cleanup_fork: killing PID %d\n", ClientPid[i]);
01915 #endif
01916 
01917     if (kill(ClientPid[i], TERMINATE_PID_SIGNAL)) {
01918       fprintf(stderr, "cleanup_fork: killed PID=%d failed (errno %d)\n", 
01919               ClientPid[i], errno);
01920     }
01921   }
01922 }
01923 
01924 /*===========================================================================*
01925  *
01926  * safe_fork
01927  *
01928  *  fork a command
01929  *
01930  * RETURNS:     success/failure
01931  *
01932  * SIDE EFFECTS:   Fork the command, and save to PID so you can kil it later!
01933  *
01934  *===========================================================================*/
01935 static int safe_fork(command)           /* fork child process and remember its PID */
01936      char *command;
01937 {
01938   static int init=0;
01939   char *argis[MAXARGS];
01940   register int i=1;
01941   
01942   if (!(argis[0] = strtok(command, " \t"))) return(0); /* tokenize */
01943   while ((argis[i] = strtok(NULL, " \t")) && i < MAXARGS) ++i;
01944   argis[i] = NULL;
01945   
01946 #ifdef DEBUG_FORK
01947   {register int i=0; 
01948    fprintf(stderr, "Command %s becomes:\n", command);
01949    while(argis[i]) {fprintf(stderr, "--%s--\n", argis[i]); ++i;} }
01950 #endif
01951   
01952   if (!init) {                  /* register clean-up routine */
01953     signal (SIGQUIT, cleanup_fork);
01954     signal (SIGTERM, cleanup_fork);
01955     signal (SIGINT , cleanup_fork);
01956     init=1;
01957   }
01958   
01959   if (-1 == (ClientPid[current_max_forked_pid] = fork()) )  {
01960     perror("safe_fork: fork failed ");
01961     return(-1);
01962   }
01963   if( !ClientPid[current_max_forked_pid]) { /* we are in child process */
01964     execvp(argis[0], argis );
01965     perror("safe_fork child: exec failed ");
01966     exit(1);
01967   }
01968 #ifdef DEBUG_FORK
01969   fprintf(stderr, "parallel: forked PID=%d\n", ClientPid[current_max_forked_pid]);
01970 #endif
01971   current_max_forked_pid++;
01972   return(0);
01973 }
 

Powered by Plone

This site conforms to the following standards: