00001 #include "niml_private.h"
00002
00003
00004
00005
00006
00007
00008
00009 #undef PERROR
00010
00011 #ifdef NIML_DEBUG
00012 # define PERROR(x) perror(x)
00013 #else
00014 # define PERROR(x)
00015 #endif
00016
00017 #include <signal.h>
00018 #include <fcntl.h>
00019
00020
00021
00022
00023 static int nosigpipe = 0 ;
00024
00025
00026
00027
00028 static int sigurg = 0 ;
00029
00030
00031
00032 #define CLOSEDOWN(ss) ( shutdown((ss),2) , close((ss)) )
00033
00034
00035
00036
00037 #define SOCKET_BUFSIZE (31*1024)
00038
00039
00040
00041 #define tcp_recv recv
00042
00043
00044
00045 #define tcp_send send
00046
00047 #ifndef MIN
00048
00049 # define MIN(a,b) (((a)>(b)) ? (b) : (a))
00050 #endif
00051
00052
00053
00054 #define NEXTDMS(dm) MIN(1.1*(dm)+1.01,66.0)
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066 static int num_open_streams = 0 ;
00067
00068
00069
00070 static NI_stream_type ** open_streams = NULL ;
00071
00072
00073
00074 static volatile int doing_atexit = 0 ;
00075
00076
00077
00078
00079 static void add_open_stream( NI_stream_type *ns )
00080 {
00081 int nn = num_open_streams ;
00082
00083 if( ns == NULL ) return ;
00084
00085 open_streams = (NI_stream_type **)realloc( (void *)open_streams ,
00086 sizeof(NI_stream_type *)*(nn+1) );
00087
00088 open_streams[nn] = ns ; num_open_streams++ ; return ;
00089 }
00090
00091
00092
00093
00094 static void remove_open_stream( NI_stream_type *ns )
00095 {
00096 int nn = num_open_streams , ii,jj ;
00097
00098 if( doing_atexit || nn <= 0 || ns == NULL ) return ;
00099
00100 for( ii=0 ; ii < nn ; ii++ )
00101 if( open_streams[ii] == ns ) break ;
00102 if( ii == nn ) return ;
00103
00104 for( jj=ii+1 ; jj < nn ; jj++ )
00105 open_streams[jj-1] = open_streams[jj] ;
00106
00107 open_streams[nn-1] = NULL ; num_open_streams-- ; return ;
00108 }
00109
00110
00111
00112
00113 static void atexit_open_streams(void)
00114 {
00115 int ii ;
00116 if( doing_atexit ) return ;
00117 doing_atexit = 1 ;
00118 for( ii=0 ; ii < num_open_streams ; ii++ ){
00119 NI_sleep(2) ;
00120 NI_stream_close_keep( open_streams[ii] , 5 ) ;
00121 }
00122 return ;
00123 }
00124
00125
00126
00127 static int atexit_is_setup = 0 ;
00128
00129
00130
00131
00132
00133
00134
00135 static void tcp_sigurg_handler( int sig )
00136 {
00137 int nn = num_open_streams , ii , sd,sdtop ;
00138 NI_stream_type *ns ;
00139 fd_set efds ;
00140 struct timeval tv ;
00141 static volatile int busy=0 ;
00142
00143 if( sig != SIGURG ||
00144 busy ||
00145 num_open_streams <= 0 || open_streams == NULL ) return ;
00146
00147 busy = 1 ;
00148
00149
00150
00151
00152 FD_ZERO(&efds) ; sdtop = -1 ;
00153 for( ii=0 ; ii < nn ; ii++ ){
00154 if( open_streams[ii] != NULL &&
00155 open_streams[ii]->bad != MARKED_FOR_DEATH &&
00156 open_streams[ii]->type == NI_TCP_TYPE &&
00157 open_streams[ii]->sd >= 0 ){
00158
00159 FD_SET( open_streams[ii]->sd , &efds ) ;
00160 if( open_streams[ii]->sd > sdtop ) sdtop = open_streams[ii]->sd;
00161 }
00162 }
00163 if( sdtop < 0 ){ busy=0 ; return; }
00164
00165
00166
00167 tv.tv_sec = 0 ;
00168 tv.tv_usec = 666 ;
00169 ii = select(sdtop+1, NULL, NULL, &efds, &tv) ;
00170 if( ii <= 0 ){ busy=0 ; return; }
00171
00172
00173
00174 for( ii=0 ; ii < nn ; ii++ ){
00175 if( open_streams[ii] != NULL && open_streams[ii]->type == NI_TCP_TYPE ){
00176 if( FD_ISSET( open_streams[ii]->sd , &efds ) ){
00177 CLOSEDOWN( open_streams[ii]->sd ) ;
00178 open_streams[ii]->bad = MARKED_FOR_DEATH ;
00179 }
00180 }
00181 }
00182
00183 busy=0 ; return ;
00184 }
00185
00186
00187
00188
00189
00190
00191
00192
00193
00194
00195
00196
00197
00198
00199
00200
00201
00202
00203
00204 static int tcp_readcheck( int sd , int msec )
00205 {
00206 int ii ;
00207 fd_set rfds ;
00208 struct timeval tv , *tvp ;
00209
00210 if( sd < 0 ) return -1 ;
00211
00212 FD_ZERO(&rfds) ; FD_SET(sd, &rfds) ;
00213
00214 if( msec >= 0 ){
00215 tv.tv_sec = msec/1000 ;
00216 tv.tv_usec = (msec%1000)*1000 ;
00217 tvp = &tv ;
00218 } else {
00219 tvp = NULL ;
00220 }
00221
00222 ii = select(sd+1, &rfds, NULL, NULL, tvp) ;
00223 if( ii == -1 ) PERROR( "tcp_readcheck(select)" ) ;
00224 return ii ;
00225 }
00226
00227
00228
00229
00230
00231
00232
00233
00234
00235
00236
00237
00238
00239
00240 static int tcp_writecheck( int sd , int msec )
00241 {
00242 int ii ;
00243 fd_set wfds ;
00244 struct timeval tv , *tvp ;
00245
00246 if( sd < 0 ) return -1 ;
00247
00248 FD_ZERO(&wfds) ; FD_SET(sd, &wfds) ;
00249
00250 if( msec >= 0 ){
00251 tv.tv_sec = msec/1000 ;
00252 tv.tv_usec = (msec%1000)*1000 ;
00253 tvp = &tv ;
00254 } else {
00255 tvp = NULL ;
00256 }
00257
00258 ii = select(sd+1, NULL , &wfds, NULL, tvp);
00259 if( ii == -1 ) PERROR( "tcp_writecheck(select)" ) ;
00260 return ii ;
00261 }
00262
00263
00264
00265
00266
00267
00268
00269 static void tcp_set_cutoff( int sd )
00270 {
00271 if( sd < 0 ) return ;
00272
00273 #ifdef SO_LINGER
00274
00275
00276 { struct linger lg ;
00277 lg.l_onoff = 1 ;
00278 lg.l_linger = 0 ;
00279 setsockopt(sd, SOL_SOCKET, SO_LINGER, (void *)&lg, sizeof(struct linger)) ;
00280 }
00281 #endif
00282
00283 #ifdef SO_REUSEADDR
00284
00285
00286
00287 { int optval = 1;
00288 setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char *)&optval, sizeof(optval)) ;
00289 }
00290 #endif
00291
00292 return ;
00293 }
00294
00295
00296
00297
00298
00299
00300
00301
00302
00303 static int tcp_alivecheck( int sd )
00304 {
00305 int ii ;
00306 char bbb[4] ;
00307
00308 ii = tcp_readcheck(sd,0) ;
00309 if( ii == 0 ) return 1 ;
00310 if( ii < 0 ) return 0 ;
00311 errno = 0 ;
00312 ii = tcp_recv( sd , bbb , 1 , MSG_PEEK ) ;
00313 if( ii == 1 ) return 1 ;
00314 if( errno ) PERROR("tcp_alivecheck") ;
00315 return 0 ;
00316 }
00317
00318
00319
00320
00321
00322
00323
00324
00325
00326 static int tcp_connect( char *host , int port )
00327 {
00328 int sd , l , q,qq ;
00329 struct sockaddr_in sin ;
00330 struct hostent *hostp ;
00331
00332 if( host == NULL || port < 1 ) return -1 ;
00333
00334 #ifdef NIML_DEBUG
00335 NI_dpr("Enter tcp_connect: host=%s port=%d\n",host,port) ;
00336 #endif
00337
00338
00339
00340 sd = socket( AF_INET , SOCK_STREAM , 0 ) ;
00341 if( sd == -1 ){ PERROR("tcp_connect(socket)"); return -1; }
00342
00343
00344
00345 #if 0
00346 { char *eee=getenv( "NIML_TCP_NAGLE" ) ;
00347 if( eee == NULL || toupper(*eee) != 'Y' ){
00348
00349 l = 1;
00350 setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (void *)&l, sizeof(int)) ;
00351 }
00352 }
00353 #endif
00354
00355
00356
00357 #ifdef SOCKET_BUFSIZE
00358 q = 0 ; qq = sizeof(int) ;
00359 getsockopt(sd, SOL_SOCKET, SO_SNDBUF, (void *)&q, &qq ) ;
00360 if( q < SOCKET_BUFSIZE ){
00361 l = SOCKET_BUFSIZE ;
00362 setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (void *)&l, sizeof(int)) ;
00363 }
00364 q = 0 ; qq = sizeof(int) ;
00365 getsockopt(sd, SOL_SOCKET, SO_RCVBUF, (void *)&q, &qq ) ;
00366 if( q < SOCKET_BUFSIZE ){
00367 l = SOCKET_BUFSIZE ;
00368 setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (void *)&l, sizeof(int)) ;
00369 }
00370 #endif
00371
00372
00373
00374 memset( &sin , 0 , sizeof(sin) ) ;
00375 sin.sin_family = AF_INET ;
00376 sin.sin_port = htons(port) ;
00377
00378
00379
00380 hostp = gethostbyname(host) ;
00381 if( hostp == NULL ){
00382 PERROR("tcp_connect(gethostbyname)");
00383 #ifdef NIML_DEBUG
00384 NI_dpr(" tcp_connect: can't gethostbyname(); errno=%d\n",errno);
00385 #endif
00386 CLOSEDOWN(sd); return -1;
00387 }
00388 sin.sin_addr.s_addr = ((struct in_addr *)(hostp->h_addr))->s_addr ;
00389
00390 errno = 0 ;
00391 if( connect(sd,(struct sockaddr *)&sin,sizeof(sin)) != 0 ){
00392 if( errno != ECONNREFUSED ) PERROR("tcp_connect(connect)") ;
00393 #ifdef NIML_DEBUG
00394 NI_dpr(" tcp_connect: can't connect(); errno=%d\n",errno);
00395 #endif
00396 CLOSEDOWN(sd); return -1;
00397 }
00398
00399 #ifdef NIML_DEBUG
00400 NI_dpr(" tcp_connect: connected!\n");
00401 #endif
00402
00403 tcp_set_cutoff( sd ) ;
00404 return sd ;
00405 }
00406
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421 static int tcp_listen( int port )
00422 {
00423 int sd , l , q,qq ;
00424 struct sockaddr_in sin ;
00425
00426 if( port < 1 ) return -1 ;
00427
00428
00429
00430 sd = socket( AF_INET , SOCK_STREAM , 0 ) ;
00431 if( sd == -1 ){ PERROR("tcp_listen(socket)"); return -1; }
00432
00433
00434
00435 #if 0
00436 { char *eee=getenv( "NIML_TCP_NAGLE" ) ;
00437 if( eee == NULL || toupper(*eee) != 'Y' ){
00438
00439 l = 1;
00440 setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (void *)&l, sizeof(int)) ;
00441 }
00442 }
00443 #endif
00444
00445 #ifdef SOCKET_BUFSIZE
00446 q = 0 ; qq = sizeof(int) ;
00447 getsockopt(sd, SOL_SOCKET, SO_SNDBUF, (void *)&q, &qq ) ;
00448 if( q < SOCKET_BUFSIZE ){
00449 l = SOCKET_BUFSIZE ;
00450 setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (void *)&l, sizeof(int)) ;
00451 }
00452 q = 0 ; qq = sizeof(int) ;
00453 getsockopt(sd, SOL_SOCKET, SO_RCVBUF, (void *)&q, &qq ) ;
00454 if( q < SOCKET_BUFSIZE ){
00455 l = SOCKET_BUFSIZE ;
00456 setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (void *)&l, sizeof(int)) ;
00457 }
00458 #endif
00459
00460
00461
00462 memset( &sin , 0 , sizeof(sin) ) ;
00463 sin.sin_family = AF_INET ;
00464 sin.sin_port = htons(port) ;
00465 sin.sin_addr.s_addr = INADDR_ANY ;
00466
00467 if( bind(sd , (struct sockaddr *)&sin , sizeof(sin)) == -1 ){
00468 PERROR("tcp_listen(bind)"); CLOSEDOWN(sd); return -1;
00469 }
00470
00471 if( listen(sd,1) == -1 ){
00472 PERROR("tcp_listen(listen)"); CLOSEDOWN(sd); return -1;
00473 }
00474
00475 tcp_set_cutoff( sd ) ;
00476 return sd ;
00477 }
00478
00479
00480
00481
00482
00483
00484
00485
00486
00487
00488
00489
00490
00491
00492
00493
00494
00495
00496
00497
00498
00499
00500
00501
00502
00503
00504
00505
00506
00507
00508 static int tcp_accept( int sd , char **hostname , char **hostaddr )
00509 {
00510 struct sockaddr_in pin ;
00511 int addrlen , sd_new ;
00512 struct hostent *hostp ;
00513 char *str ;
00514
00515
00516
00517 addrlen = sizeof(pin) ;
00518 sd_new = accept( sd , (struct sockaddr *)&pin , &addrlen ) ;
00519 if( sd_new == -1 ){ PERROR("tcp_accept"); return -1; }
00520
00521
00522
00523 str = inet_ntoa( pin.sin_addr ) ;
00524
00525 if( !NI_trust_host(str) ){
00526 fprintf(stderr,"\n** ILLEGAL attempt to connect from host %s\n",str) ;
00527 CLOSEDOWN( sd_new ) ;
00528 return -1 ;
00529 }
00530
00531 if( hostaddr != NULL ) *hostaddr = NI_strdup(str) ;
00532
00533
00534
00535 if( hostname != NULL ){
00536 hostp = gethostbyaddr( (char *) (&pin.sin_addr) ,
00537 sizeof(struct in_addr) , AF_INET ) ;
00538
00539 if( hostp != NULL ) *hostname = NI_strdup(hostp->h_name) ;
00540 else *hostname = NI_strdup("UNKNOWN") ;
00541 }
00542
00543 tcp_set_cutoff( sd_new ) ;
00544 return sd_new ;
00545 }
00546
00547
00548
00549
00550
00551 static int host_num = 0 ;
00552 static char ** host_list = NULL ;
00553
00554 static char *init_hosts[] = {
00555 "127.0.0.1" ,
00556 "192.168." ,
00557 "128.231.21"
00558 } ;
00559 #define INIT_NHO (sizeof(init_hosts)/sizeof(char *))
00560 #define HSIZE 32
00561
00562
00563
00564
00565
00566
00567
00568
00569 char * NI_hostname_to_inet( char *host )
00570 {
00571 struct hostent *hostp ;
00572 char * iname = NULL , *str ;
00573 int ll ;
00574
00575 if( host == NULL || host[0] == '\0' ) return NULL ;
00576
00577 hostp = gethostbyname(host) ; if( hostp == NULL ) return NULL ;
00578
00579 str = inet_ntoa(*((struct in_addr *)(hostp->h_addr))) ;
00580 if( str == NULL || str[0] == '\0' ) return NULL ;
00581
00582 iname = NI_strdup(str) ; return iname ;
00583 }
00584
00585
00586
00587
00588
00589 static int hostname_dotted( char *hnam )
00590 {
00591 int ii, nh ;
00592 if( hnam == NULL ) return 0 ;
00593 nh = strlen(hnam) ;
00594 for( ii=0 ; ii < nh ; ii++ )
00595 if( !isdigit(hnam[ii]) && hnam[ii] != '.' ) return 0 ;
00596 return 1 ;
00597 }
00598
00599
00600
00601
00602
00603 static void add_trusted_host( char *hnam )
00604 {
00605 char *hh=NULL ;
00606 int ii ;
00607
00608 if( hnam == NULL || hnam[0] == '\0' ) return ;
00609
00610 if( !hostname_dotted(hnam) ){
00611 hh = NI_hostname_to_inet( hnam ) ;
00612 if( hh == NULL ) return ;
00613
00614 } else if( strlen(hnam) > HSIZE-1 ){
00615 return ;
00616 } else {
00617 hh = hnam ;
00618 }
00619
00620 host_list = NI_realloc(host_list, char*,sizeof(char *)*(host_num+1)) ;
00621 host_list[host_num] = NI_malloc(char, HSIZE) ;
00622 strcpy( host_list[host_num] , hh ) ; host_num++ ;
00623
00624 if( hh != hnam ) NI_free(hh) ;
00625 }
00626
00627
00628
00629
00630
00631 static void init_trusted_list(void)
00632 {
00633 int ii ;
00634 char ename[HSIZE] , *str ;
00635
00636 if( host_num == 0 ){
00637 host_num = INIT_NHO ;
00638 host_list = NI_malloc(char*, sizeof(char *) * INIT_NHO ) ;
00639 for( ii=0 ; ii < INIT_NHO ; ii++ ){
00640 host_list[ii] = NI_malloc(char, HSIZE) ;
00641 strcpy( host_list[ii] , init_hosts[ii] ) ;
00642 }
00643
00644 for( ii=0 ; ii <= 99 ; ii++ ){
00645 sprintf(ename,"NIML_TRUSTHOST_%02d",ii) ; str = getenv(ename) ;
00646 if( str == NULL && ii <= 9 ){
00647 sprintf(ename,"NIML_TRUSTHOST_%1d",ii) ; str = getenv(ename) ;
00648 }
00649 if( str == NULL && ii <= 9 ){
00650 sprintf(ename,"NIML_TRUSTHOST_O%1d",ii) ; str = getenv(ename) ;
00651 }
00652 if( str != NULL ) add_trusted_host(str) ;
00653 }
00654
00655 for( ii=0 ; ii <= 99 ; ii++ ){
00656 sprintf(ename,"AFNI_TRUSTHOST_%02d",ii) ; str = getenv(ename) ;
00657 if( str == NULL && ii <= 9 ){
00658 sprintf(ename,"AFNI_TRUSTHOST_%1d",ii) ; str = getenv(ename) ;
00659 }
00660 if( str == NULL && ii <= 9 ){
00661 sprintf(ename,"AFNI_TRUSTHOST_O%1d",ii) ; str = getenv(ename) ;
00662 }
00663 if( str != NULL ) add_trusted_host(str) ;
00664 }
00665 }
00666 return ;
00667 }
00668
00669
00670
00671
00672
00673
00674
00675 void NI_add_trusted_host( char *hostname )
00676 {
00677 if( host_num == 0 ) init_trusted_list() ;
00678 if( hostname == NULL || hostname[0] == '\0' ) return ;
00679 add_trusted_host(hostname) ;
00680 }
00681
00682
00683
00684
00685
00686 int NI_trust_host( char *hostid )
00687 {
00688 int ii ;
00689 char *hh = hostid ;
00690
00691
00692
00693
00694
00695 if( host_num == 0 ){
00696 char *eee = getenv("NIML_COMPLETE_TRUST") ;
00697 if( eee != NULL && toupper(*eee) == 'Y' ) return 1 ;
00698 init_trusted_list() ;
00699 }
00700
00701 if( hostid == NULL || hostid[0] == '\0' ) return 0 ;
00702
00703 if( !hostname_dotted(hostid) ){
00704 hh = NI_hostname_to_inet(hostid) ;
00705 if( hh == NULL ) return 0 ;
00706 }
00707
00708
00709
00710
00711 for( ii=0 ; ii < host_num ; ii++ ){
00712 if( strstr(hh,host_list[ii]) == hh ){
00713 if( hh != hostid ) NI_free(hh) ;
00714 return 1 ;
00715 }
00716 }
00717
00718 if( hh != hostid ) NI_free(hh) ;
00719 return 0 ;
00720 }
00721
00722 #ifndef DONT_USE_SHM
00723
00724
00725
00726
00727
00728
00729
00730
00731
00732 static key_t SHM_string_to_key( char *key_string )
00733 {
00734 int ii , sum ;
00735 key_t kk ;
00736
00737 sum = 987654321 ;
00738 if( key_string == NULL ) return (key_t) sum ;
00739
00740 for( ii=0 ; key_string[ii] != '\0' ; ii++ )
00741 sum += ((int)key_string[ii]) << ((ii%3)*8) ;
00742
00743 kk = (key_t) sum ;
00744 #ifdef IPC_PRIVATE
00745 if( kk == IPC_PRIVATE || kk <= 0 ) kk = 666 ;
00746 #endif
00747 return kk ;
00748 }
00749
00750
00751
00752
00753
00754
00755 static int SHM_accept( char *key_string )
00756 {
00757 key_t key ;
00758 int shmid ;
00759
00760 key = SHM_string_to_key( key_string ) ;
00761 shmid = shmget( key , 0 , 0777 ) ;
00762 return shmid ;
00763 }
00764
00765
00766
00767
00768
00769
00770 static int SHM_create( char *key_string , int size )
00771 {
00772 key_t key ;
00773 int shmid ;
00774
00775 key = SHM_string_to_key( key_string ) ;
00776 shmid = shmget( key , size , 0777 | IPC_CREAT ) ;
00777 if( shmid < 0 ) PERROR("SHM_create") ;
00778 return shmid ;
00779 }
00780
00781
00782
00783
00784
00785
00786
00787 static char * SHM_attach( int shmid )
00788 {
00789 char *adr ;
00790 adr = (char *) shmat( shmid , NULL , 0 ) ;
00791 if( adr == (char *) -1 ){ adr = NULL ; PERROR("SHM_attach") ; }
00792 return adr ;
00793 }
00794
00795
00796
00797
00798
00799
00800 static int SHM_size( int shmid )
00801 {
00802 int ii ;
00803 struct shmid_ds buf ;
00804
00805 if( shmid < 0 ) return -1 ;
00806 ii = shmctl( shmid , IPC_STAT , &buf ) ;
00807 if( ii < 0 ){ PERROR("SHM_size") ; return -1 ; }
00808 return buf.shm_segsz ;
00809 }
00810
00811
00812
00813
00814
00815
00816 static int SHM_nattach( int shmid )
00817 {
00818 int ii ;
00819 static struct shmid_ds buf ;
00820 char *eee = getenv( "NIML_DNAME" ) ;
00821
00822 if( shmid < 0 ) return -1 ;
00823 ii = shmctl( shmid , IPC_STAT , &buf ) ;
00824 if( ii < 0 ){
00825 if( eee != NULL ) fprintf(stderr,"SHM_nattach: trying again!\n") ;
00826 NI_sleep(9) ;
00827 ii = shmctl( shmid , IPC_STAT , &buf ) ;
00828 }
00829 if( ii < 0 ){
00830 char *ppp ;
00831 if( eee != NULL ){
00832 ppp = (char *)calloc(1,strlen(eee)+32) ;
00833 strcpy(ppp,"SHM_nattach (") ;
00834 strcat(ppp,eee) ; strcat(ppp,")") ;
00835 } else {
00836 ppp = strdup("SHM_nattach") ;
00837 }
00838 PERROR(ppp);
00839 fprintf(stderr,"%s: called shmctl(%x,%x,%p), got %d\n",
00840 ppp,(unsigned int)shmid, (unsigned int)IPC_STAT, (void *)&buf,
00841 ii ) ;
00842 free((void *)ppp); return -1;
00843 } else if( eee != NULL ){
00844 fprintf(stderr,"SHM_nattach (%s): called shmctl(%x,%x,%p), got %d\n",
00845 eee,
00846 (unsigned int)shmid, (unsigned int)IPC_STAT, (void *)&buf,
00847 (int)buf.shm_nattch ) ;
00848 }
00849 return (int)buf.shm_nattch ;
00850 }
00851
00852
00853
00854
00855
00856
00857
00858 static int SHM_fill_accept( SHMioc *ioc )
00859 {
00860 char *bbb ;
00861 int jj ;
00862
00863 if( ioc == NULL || ioc->id < 0 ) return -1 ;
00864
00865 NI_sleep(2) ;
00866 bbb = SHM_attach( ioc->id ) ;
00867 if( bbb == NULL ) return -1 ;
00868
00869 if( SHM_nattach(ioc->id) != 2 ){
00870 NI_sleep(10) ;
00871 if( SHM_nattach(ioc->id) != 2 ){
00872 shmdt( bbb ) ;
00873 shmctl( ioc->id , IPC_RMID , NULL ) ;
00874 ioc->bad = SHM_IS_DEAD ; return -1 ;
00875 }
00876 }
00877
00878 jj = SHM_size(ioc->id) ;
00879 if( jj <= SHM_HSIZE ){
00880 shmdt( bbb ) ;
00881 shmctl( ioc->id , IPC_RMID , NULL ) ;
00882 ioc->bad = SHM_IS_DEAD ; return -1 ;
00883 }
00884
00885 ioc->shmbuf = bbb ;
00886 ioc->shmhead = (int *) bbb ;
00887
00888 ioc->bufsize1 = ioc->shmhead[SHM_SIZE1] ;
00889 ioc->bstart1 = ioc->shmhead + SHM_BSTART1 ;
00890 ioc->bend1 = ioc->shmhead + SHM_BEND1 ;
00891 ioc->buf1 = ioc->shmbuf + SHM_HSIZE ;
00892
00893 ioc->bufsize2 = ioc->shmhead[SHM_SIZE2] ;
00894 ioc->bstart2 = ioc->shmhead + SHM_BSTART2 ;
00895 ioc->bend2 = ioc->shmhead + SHM_BEND2 ;
00896 ioc->buf2 = ioc->buf1 + ioc->bufsize1 ;
00897
00898 if( jj < SHM_HSIZE+ioc->bufsize1+ioc->bufsize2 ){
00899 shmdt( bbb ) ;
00900 shmctl( ioc->id , IPC_RMID , NULL ) ;
00901 ioc->bad = SHM_IS_DEAD ; return -1 ;
00902 }
00903
00904 ioc->bad = 0 ; return 1 ;
00905 }
00906
00907
00908
00909
00910
00911
00912
00913
00914
00915
00916
00917
00918
00919
00920
00921
00922
00923
00924
00925
00926
00927 static SHMioc * SHM_init( char *name , char *mode )
00928 {
00929 SHMioc *ioc ;
00930 int do_create , do_accept ;
00931 char key[128] , *kend ;
00932 int size1=SHM_DEFAULT_SIZE , ii , jj , size2=SHM_DEFAULT_SIZE ;
00933
00934
00935
00936 if( name == NULL ||
00937 strlen(name) > 127 ||
00938 strncmp(name,"shm:",4) != 0 ||
00939 mode == NULL ) return NULL ;
00940
00941 do_create = (*mode == 'w') ;
00942 do_accept = (*mode == 'r') ;
00943
00944 if( !do_create && !do_accept ) return NULL ;
00945
00946
00947
00948 for( ii=4 ; name[ii] != ':' && name[ii] != '\0' ; ii++ )
00949 key[ii-4] = name[ii] ;
00950 key[ii-4] = '\0' ;
00951
00952
00953
00954 if( do_create && name[ii] == ':' && name[ii+1] != '\0' ){
00955
00956 size1 = strtol( name+ii+1 , &kend , 10 ) ;
00957 if( size1 <= 0 ) size1 = SHM_DEFAULT_SIZE ;
00958 else {
00959 if( *kend == 'K' || *kend == 'k' ){ size1 *= 1024 ; kend++; }
00960 else if( *kend == 'M' || *kend == 'm' ){ size1 *= 1024*1024; kend++; }
00961 }
00962 size2 = size1 ;
00963
00964
00965
00966 if( *kend == '+' ){
00967 size2 = strtol( kend+1 , &kend , 10 ) ;
00968 if( size2 <= 0 ) size2 = SHM_DEFAULT_SIZE ;
00969 else {
00970 if( *kend == 'K' || *kend == 'k' ){ size2 *= 1024 ; kend++; }
00971 else if( *kend == 'M' || *kend == 'm' ){ size2 *= 1024*1024; kend++; }
00972 }
00973 }
00974 }
00975
00976
00977
00978 ioc = NI_malloc(SHMioc, sizeof(SHMioc) ) ;
00979
00980 strcpy( ioc->name , key ) ;
00981
00982
00983
00984 if( do_accept ){
00985 ioc->whoami = SHM_ACCEPTOR ;
00986 for( ii=0 ; ii < 4 ; ii++ ){
00987 ioc->id = SHM_accept( key ) ;
00988 if( ioc->id >= 0 ) break ;
00989 NI_sleep(ii+1) ;
00990 }
00991 if( ioc->id < 0 )
00992 ioc->id = SHM_accept( key ) ;
00993
00994 if( ioc->id < 0 ){
00995 ioc->bad = SHM_WAIT_CREATE ;
00996 return ioc ;
00997 ioc->goodcheck_time = -99 ;
00998
00999 } else {
01000
01001 jj = SHM_fill_accept( ioc ) ;
01002
01003 if( jj < 0 ){
01004 NI_free(ioc); return NULL;
01005 }
01006
01007 return ioc ;
01008 ioc->goodcheck_time = -99 ;
01009 }
01010 }
01011
01012
01013
01014 if( do_create ){
01015 char *bbb ;
01016
01017 ioc->whoami = SHM_CREATOR ;
01018 ioc->id = SHM_create( key, size1+size2+SHM_HSIZE+4 ) ;
01019 if( ioc->id < 0 ){
01020 NI_free(ioc); return NULL;
01021 }
01022 bbb = SHM_attach( ioc->id ) ;
01023 if( bbb == NULL ){
01024 NI_free(ioc); return NULL;
01025 }
01026
01027 ioc->shmbuf = bbb ;
01028 ioc->shmhead = (int *) bbb ;
01029
01030 ioc->bufsize1 = ioc->shmhead[SHM_SIZE1] = size1 ;
01031 ioc->bstart1 = ioc->shmhead + SHM_BSTART1 ;
01032 ioc->bend1 = ioc->shmhead + SHM_BEND1 ;
01033 ioc->buf1 = ioc->shmbuf + SHM_HSIZE ;
01034
01035 ioc->bufsize2 = ioc->shmhead[SHM_SIZE2] = size2 ;
01036 ioc->bstart2 = ioc->shmhead + SHM_BSTART2 ;
01037 ioc->bend2 = ioc->shmhead + SHM_BEND2 ;
01038 ioc->buf2 = ioc->buf1 + size1 ;
01039
01040 *(ioc->bstart1) = 0 ;
01041 *(ioc->bend1) = size1-1 ;
01042 *(ioc->bstart2) = 0 ;
01043 *(ioc->bend2) = size2-1 ;
01044
01045 NI_sleep(3) ;
01046 jj = SHM_nattach(ioc->id) ;
01047
01048 if( jj < 2 ){
01049 NI_sleep(3) ; jj = SHM_nattach(ioc->id) ;
01050 }
01051
01052 if( jj > 2 ){
01053 shmdt( bbb ) ;
01054 shmctl( ioc->id , IPC_RMID , NULL ) ;
01055 NI_free(ioc); return NULL;
01056 }
01057
01058 ioc->bad = (jj < 2)
01059 ? SHM_WAIT_ACCEPT
01060 : 0 ;
01061 ioc->goodcheck_time = -99 ;
01062 return ioc ;
01063 }
01064
01065 return NULL ;
01066 }
01067
01068
01069
01070
01071
01072
01073 static int SHM_alivecheck( int shmid )
01074 {
01075 if( shmid < 0 ) return 0 ;
01076 return (SHM_nattach(shmid) == 2) ;
01077 }
01078
01079
01080 #ifndef NEXTDMS
01081 #define NEXTDMS(dm) MIN(1.1*(dm)+1.01,99.0)
01082 #endif
01083
01084
01085
01086
01087
01088
01089
01090
01091
01092
01093
01094 static int SHM_goodcheck( SHMioc *ioc , int msec )
01095 {
01096 int ii , jj , ct ;
01097 char *bbb ;
01098
01099
01100
01101 if( ioc == NULL || ioc->bad == SHM_IS_DEAD ) return -1 ;
01102
01103
01104
01105 if( ioc->bad == 0 ){
01106 ct = NI_clock_time() ;
01107 if( ct - ioc->goodcheck_time > 2 ){
01108 ii = SHM_alivecheck(ioc->id) ;
01109 ioc->goodcheck_time = ct ;
01110 } else {
01111 ii = 1 ;
01112 }
01113 if( ii <= 0 ){
01114 #ifdef NIML_DEBUG
01115 NI_dpr("++ Shared memory connection %s has gone bad!\n",
01116 ioc->name ) ;
01117 #endif
01118 shmdt( ioc->shmbuf ) ; ioc->bad = SHM_IS_DEAD ;
01119 shmctl( ioc->id , IPC_RMID , NULL ) ; return -1 ;
01120 }
01121 return 1 ;
01122 }
01123
01124
01125
01126
01127
01128 if( ioc->bad == SHM_WAIT_CREATE ){
01129 int dms=0 , ms ;
01130
01131 if( msec < 0 ) msec = 999999999 ;
01132 for( ms=0 ; ms < msec ; ms += dms ){
01133 ioc->id = SHM_accept( ioc->name ) ;
01134 if( ioc->id >= 0 ) break ;
01135 dms = NEXTDMS(dms) ; dms = MIN(dms,msec-ms) ; NI_sleep(dms) ;
01136 }
01137 if( ioc->id < 0 )
01138 ioc->id = SHM_accept( ioc->name ) ;
01139
01140 if( ioc->id >= 0 ){
01141 jj = SHM_fill_accept( ioc ) ;
01142 if( jj < 0 ) return -1 ;
01143 ioc->bad = 0 ;
01144 return 1 ;
01145 }
01146 return 0 ;
01147 }
01148
01149
01150
01151 else if( ioc->bad == SHM_WAIT_ACCEPT ){
01152 int dms=0 , ms ;
01153
01154 if( msec < 0 ) msec = 999999999 ;
01155 for( ms=0 ; ms < msec ; ms += dms ){
01156 if( SHM_nattach(ioc->id) > 1 ){ ioc->bad = 0 ; return 1 ; }
01157 dms = NEXTDMS(dms) ; dms = MIN(dms,msec-ms) ; NI_sleep(dms) ;
01158 }
01159 if( SHM_nattach(ioc->id) > 1 ){ ioc->bad = 0 ; return 1 ; }
01160 return 0 ;
01161 }
01162
01163 return 0 ;
01164 }
01165
01166
01167
01168
01169
01170 static void SHM_close( SHMioc *ioc )
01171 {
01172 if( ioc == NULL ) return ;
01173
01174 if( ioc->id >= 0 && ioc->bad != SHM_IS_DEAD ){
01175 shmdt( ioc->shmbuf ) ;
01176 shmctl( ioc->id , IPC_RMID , NULL ) ;
01177 ioc->bad = SHM_IS_DEAD ;
01178 }
01179
01180 NI_free(ioc) ; return ;
01181 }
01182
01183
01184
01185
01186
01187
01188
01189
01190
01191
01192 static int SHM_readcheck( SHMioc *ioc , int msec )
01193 {
01194 int ii , ct ;
01195 int nread , dms=0 , ms ;
01196 int *bstart, *bend , bsize ;
01197
01198
01199
01200 ct = NI_clock_time() ;
01201 if( ct - ioc->goodcheck_time > 2 ){
01202 ii = SHM_goodcheck(ioc,0) ;
01203 ioc->goodcheck_time = ct ;
01204 if( ii <= 0 ){
01205 ii = SHM_goodcheck(ioc,msec) ;
01206 if( ii <= 0 ) return ii ;
01207 }
01208 } else if( ioc->bad ) return 0 ;
01209
01210
01211
01212 switch( ioc->whoami ){
01213
01214 default: return -1 ;
01215
01216 case SHM_ACCEPTOR:
01217 bstart = ioc->bstart1 ;
01218 bend = ioc->bend1 ;
01219 bsize = ioc->bufsize1 ;
01220 break ;
01221
01222 case SHM_CREATOR:
01223 bstart = ioc->bstart2 ;
01224 bend = ioc->bend2 ;
01225 bsize = ioc->bufsize2 ;
01226 break ;
01227 }
01228
01229
01230
01231 if( msec < 0 ) msec = 999999999 ;
01232
01233
01234
01235 for( ms=0 ; ms < msec ; ms += dms ){
01236 nread = (*bend - *bstart + bsize + 1) % bsize ;
01237 if( nread > 0 ) return nread ;
01238 dms = NEXTDMS(dms) ; dms = MIN(dms,msec-ms) ; NI_sleep(dms) ;
01239 ii = SHM_goodcheck(ioc,0) ; if( ii == -1 ) return -1 ;
01240 }
01241 nread = (*bend - *bstart + bsize + 1) % bsize ;
01242 if( nread > 0 ) return nread ;
01243 return 0 ;
01244 }
01245
01246
01247
01248
01249
01250
01251
01252
01253
01254
01255 static int SHM_writecheck( SHMioc *ioc , int msec )
01256 {
01257 int ii ;
01258 int nread , dms=0 , ms , nwrite ;
01259 int *bstart, *bend , bsize ;
01260
01261
01262
01263 ii = SHM_goodcheck(ioc,0) ;
01264 if( ii == -1 ) return -1 ;
01265 if( ii == 0 ){
01266 ii = SHM_goodcheck(ioc,msec) ;
01267 if( ii <= 0 ) return ii ;
01268 }
01269
01270
01271
01272 switch( ioc->whoami ){
01273
01274 default: return -1 ;
01275
01276 case SHM_ACCEPTOR:
01277 bstart = ioc->bstart2 ;
01278 bend = ioc->bend2 ;
01279 bsize = ioc->bufsize2 ;
01280 break ;
01281
01282 case SHM_CREATOR:
01283 bstart = ioc->bstart1 ;
01284 bend = ioc->bend1 ;
01285 bsize = ioc->bufsize1 ;
01286 break ;
01287 }
01288
01289 if( msec < 0 ) msec = 999999999 ;
01290
01291 for( ms=0 ; ms < msec ; ms += dms ){
01292 nread = (*bend - *bstart + bsize + 1) % bsize ;
01293 nwrite = bsize - 1 - nread ;
01294 if( nwrite > 0 ) return nwrite ;
01295 dms = NEXTDMS(dms) ; dms = MIN(dms,msec-ms) ; NI_sleep(dms) ;
01296 ii = SHM_goodcheck(ioc,0) ; if( ii == -1 ) return -1 ;
01297 }
01298 nread = (*bend - *bstart + bsize + 1) % bsize ;
01299 nwrite = bsize - 1 - nread ;
01300 if( nwrite > 0 ) return nwrite ;
01301 return 0 ;
01302 }
01303
01304
01305
01306
01307
01308
01309 static int SHM_send( SHMioc *ioc , char *buffer , int nbytes )
01310 {
01311 int ii ;
01312 int nread,nwrite , ebot,etop ;
01313 int *bstart, *bend , bsize ;
01314 char *buf ;
01315
01316
01317
01318 if( ioc == NULL || ioc->bad ||
01319 buffer == NULL || nbytes < 0 ) return -1 ;
01320
01321 if( nbytes == 0 ) return 0 ;
01322
01323 ii = SHM_goodcheck(ioc,1) ;
01324 if( ii <= 0 ) return ii ;
01325
01326 ii = SHM_writecheck(ioc,1) ;
01327 if( ii <= 0 ) return ii ;
01328
01329
01330
01331 switch( ioc->whoami ){
01332
01333 default: return -1 ;
01334
01335 case SHM_ACCEPTOR:
01336 bstart = ioc->bstart2 ;
01337 bend = ioc->bend2 ;
01338 bsize = ioc->bufsize2 ;
01339 buf = ioc->buf2 ;
01340 break ;
01341
01342 case SHM_CREATOR:
01343 bstart = ioc->bstart1 ;
01344 bend = ioc->bend1 ;
01345 bsize = ioc->bufsize1 ;
01346 buf = ioc->buf1 ;
01347 break ;
01348 }
01349
01350
01351
01352 nread = ( *bend - *bstart + bsize + 1 ) % bsize;
01353 nwrite = bsize - 1 - nread ;
01354 if( nwrite <= 0 ) return 0 ;
01355
01356 if( nwrite > nbytes ) nwrite = nbytes ;
01357
01358 ebot = *bend+1 ; if( ebot >= bsize ) ebot = 0 ;
01359 etop = ebot+nwrite-1 ;
01360
01361 if( etop < bsize ){
01362 memcpy( buf + ebot, buffer, nwrite ) ;
01363 *bend = etop ;
01364 } else {
01365 int nn = bsize - ebot ;
01366 memcpy( buf + ebot, buffer , nn ) ;
01367 memcpy( buf , buffer+nn, nwrite-nn ) ;
01368 *bend = nwrite-nn-1 ;
01369 }
01370 return nwrite ;
01371 }
01372
01373
01374
01375
01376
01377
01378
01379
01380
01381 static int SHM_sendall( SHMioc *ioc , char *buffer , int nbytes )
01382 {
01383 int ii , ntot=0 , dms=0 ;
01384
01385
01386
01387 if( ioc == NULL || ioc->bad ||
01388 buffer == NULL || nbytes < 0 ) return -1 ;
01389
01390 if( nbytes == 0 ) return 0 ;
01391
01392 while(1){
01393 ii = SHM_send( ioc , buffer+ntot , nbytes-ntot );
01394 if( ii == -1 ) return -1 ;
01395
01396 if( ii == 0 ){
01397 dms = NEXTDMS(dms) ;
01398 } else {
01399 ntot += ii ;
01400 if( ntot >= nbytes ) return nbytes ;
01401 dms = 1 ;
01402 }
01403
01404 NI_sleep(dms) ;
01405 }
01406 return -1 ;
01407 }
01408
01409
01410
01411
01412
01413
01414
01415
01416 static int SHM_recv( SHMioc *ioc , char *buffer , int nbytes )
01417 {
01418 int *bstart, *bend , bsize ;
01419 char *buf ;
01420 int nread, sbot,stop , ii ;
01421
01422
01423
01424 if( ioc == NULL || ioc->bad ||
01425 buffer == NULL || nbytes < 0 ) return -1 ;
01426
01427 if( nbytes == 0 ) return 0 ;
01428
01429 ii = SHM_goodcheck(ioc,1) ;
01430 if( ii <= 0 ) return ii ;
01431
01432
01433
01434 switch( ioc->whoami ){
01435
01436 default: return -1 ;
01437
01438 case SHM_ACCEPTOR:
01439 bstart = ioc->bstart1 ;
01440 bend = ioc->bend1 ;
01441 bsize = ioc->bufsize1 ;
01442 buf = ioc->buf1 ;
01443 break ;
01444
01445 case SHM_CREATOR:
01446 bstart = ioc->bstart2 ;
01447 bend = ioc->bend2 ;
01448 bsize = ioc->bufsize2 ;
01449 buf = ioc->buf2 ;
01450 break ;
01451 }
01452
01453
01454
01455 nread = ( *bend - *bstart + bsize + 1 ) % bsize ;
01456 if( nread <= 0 ) return 0 ;
01457 if( nread > nbytes ) nread = nbytes ;
01458
01459 sbot = *bstart ; stop = sbot + nread-1 ;
01460
01461 if( stop < bsize ){
01462 memcpy( buffer, buf+sbot, nread ) ;
01463 *bstart = (stop+1) % bsize ;
01464 } else {
01465 int nn = bsize - sbot ;
01466 memcpy( buffer , buf + sbot, nn ) ;
01467 memcpy( buffer+nn, buf , nread-nn ) ;
01468 *bstart = nread-nn ;
01469 }
01470 return nread ;
01471 }
01472 #endif
01473
01474
01475
01476
01477
01478
01479
01480
01481
01482
01483
01484
01485
01486
01487
01488
01489
01490
01491
01492
01493
01494
01495
01496
01497
01498
01499
01500
01501
01502
01503
01504
01505
01506
01507
01508
01509
01510
01511
01512
01513
01514
01515
01516
01517
01518
01519
01520
01521
01522
01523
01524
01525
01526
01527
01528
01529
01530
01531
01532
01533
01534
01535
01536
01537
01538
01539
01540
01541
01542
01543
01544
01545
01546
01547
01548
01549
01550
01551
01552
01553
01554
01555
01556
01557
01558
01559
01560
01561
01562
01563
01564
01565
01566
01567
01568
01569
01570
01571
01572
01573
01574
01575
01576
01577
01578
01579
01580
01581
01582
01583
01584
01585
01586
01587
01588
01589
01590
01591
01592
01593
01594
01595
01596
01597
01598
01599
01600
01601
01602
01603
01604
01605
01606
01607
01608
01609 NI_stream NI_stream_open( char *name , char *mode )
01610 {
01611 NI_stream_type *ns ;
01612 int do_create , do_accept ;
01613
01614
01615
01616 #ifdef NIML_DEBUG
01617 if( dfp == NULL ){
01618 char *eee = getenv("NIML_DEBUG") ;
01619 if( eee != NULL ){
01620 dfp = (strcmp(eee,"stderr")==0) ? stderr : fopen(eee,"w") ;
01621 if( dfp == NULL ){ dfp = stderr; eee = "stderr [defaulted]"; }
01622 fprintf(stderr,"NIML: debug output to %s\n",eee) ;
01623 }
01624 }
01625 #endif
01626
01627 #ifdef NIML_DEBUG
01628 NI_malloc_enable_tracking() ;
01629 #endif
01630
01631
01632
01633 if( NI_strlen(name) < 4 ) return NULL ;
01634
01635 if( mode == NULL ) return NULL ;
01636
01637 do_create = (*mode == 'w' || *mode == 'a') ;
01638 do_accept = (*mode == 'r') ;
01639
01640 if( !do_create && !do_accept ) return NULL ;
01641
01642 if( ! atexit_is_setup ){
01643 atexit(atexit_open_streams) ; atexit_is_setup = 1 ;
01644 }
01645
01646
01647
01648
01649 if( strncmp(name,"tcp:",4) == 0 ){
01650 char host[256] , *hend ;
01651 int port=-1 , ii , jj ;
01652
01653 if( NI_strlen(name) > 127 ) return NULL ;
01654
01655
01656
01657 hend = strstr( name+4 , ":" ) ;
01658 if( hend == NULL || hend-name > 255 ) return NULL ;
01659
01660 for( ii=4 ; name[ii] != ':' ; ii++ ) host[ii-4] = name[ii] ;
01661 host[ii-4] = '\0' ;
01662
01663
01664
01665 port = strtol( name+ii+1 , NULL , 10 ) ;
01666 if( port <= 0 ) return NULL ;
01667
01668
01669
01670 ns = NI_malloc(NI_stream_type, sizeof(NI_stream_type) ) ;
01671
01672 ns->type = NI_TCP_TYPE;
01673 ns->port = port ;
01674 ns->nbuf = 0 ;
01675 ns->npos = 0 ;
01676 ns->b64_numleft = 0 ;
01677
01678 ns->buf = NI_malloc(char, NI_BUFSIZE) ;
01679 ns->bufsize = NI_BUFSIZE ;
01680 ns->name[0] = '\0' ;
01681 NI_strncpy(ns->orig_name,name,256) ;
01682
01683 ns->bin_thresh = -1 ;
01684
01685
01686
01687 if( !sigurg ){ signal(SIGURG,tcp_sigurg_handler); sigurg = 1; }
01688
01689
01690
01691 if( do_accept ){
01692 ns->io_mode = NI_INPUT_MODE ;
01693 ns->sd = tcp_listen( port ) ;
01694 if( ns->sd < 0 ){
01695 NI_free(ns->buf); NI_free(ns); return NULL;
01696 }
01697 ns->bad = TCP_WAIT_ACCEPT ;
01698 ii = tcp_readcheck(ns->sd,1) ;
01699 if( ii > 0 ){
01700 jj = tcp_accept( ns->sd , NULL,&hend ) ;
01701 if( jj >= 0 ){
01702 CLOSEDOWN( ns->sd ) ;
01703 NI_strncpy(ns->name,hend,256) ;
01704 NI_free(hend); ns->bad = 0; ns->sd = jj ;
01705 fcntl( ns->sd, F_SETOWN, (int)getpid() ) ;
01706 }
01707 }
01708
01709 add_open_stream(ns) ;
01710 ns->goodcheck_time = -99 ;
01711 return ns ;
01712 }
01713
01714
01715
01716 if( do_create ){
01717 struct hostent *hostp ;
01718 ns->io_mode = NI_OUTPUT_MODE ;
01719 hostp = gethostbyname(host) ;
01720 if( hostp == NULL ){
01721 NI_free(ns->buf); NI_free(ns); return NULL;
01722 }
01723 ns->sd = tcp_connect( host , port ) ;
01724 ns->bad = (ns->sd < 0) ? TCP_WAIT_CONNECT : 0 ;
01725 NI_strncpy(ns->name,host,256) ;
01726 if( ns->sd >= 0 )
01727 fcntl( ns->sd, F_SETOWN, (int)getpid() ) ;
01728
01729 add_open_stream(ns) ;
01730 ns->goodcheck_time = -99 ;
01731 return ns ;
01732 }
01733 return NULL ;
01734 }
01735
01736 #ifndef DONT_USE_SHM
01737
01738
01739
01740 if( strncmp(name,"shm:",4) == 0 ){
01741 SHMioc *ioc ;
01742
01743 if( *mode == 'a' ) mode = "w" ;
01744 ioc = SHM_init( name , mode ) ;
01745 if( ioc == NULL ) return NULL ;
01746
01747
01748
01749 ns = NI_malloc(NI_stream_type, sizeof(NI_stream_type) ) ;
01750
01751 ns->type = NI_SHM_TYPE;
01752 ns->nbuf = 0 ;
01753 ns->npos = 0 ;
01754 ns->io_mode = do_create ? NI_OUTPUT_MODE
01755 : NI_INPUT_MODE ;
01756 ns->bad = 0 ;
01757 ns->shmioc = ioc ;
01758 ns->b64_numleft = 0 ;
01759
01760 ns->buf = NI_malloc(char, NI_BUFSIZE) ;
01761 ns->bufsize = NI_BUFSIZE ;
01762
01763 NI_strncpy( ns->name , name , 256 ) ;
01764
01765 NI_strncpy(ns->orig_name,name,256) ;
01766
01767 add_open_stream(ns) ;
01768 ns->goodcheck_time = -99 ;
01769 return ns ;
01770 }
01771 #endif
01772
01773
01774
01775
01776 if( strncmp(name,"file:",5) == 0 ){
01777
01778 char *fname = name+5 , *fmode ;
01779 FILE *fp ;
01780
01781 if( NI_strlen(name) > 255 || NI_strlen(fname) < 1 ) return NULL ;
01782
01783 if( *mode == 'a' ) fmode = "ab" ;
01784 else fmode = do_create ? (char *)"wb" : (char *)"rb" ;
01785 fp = fopen( fname , fmode ) ;
01786
01787 if( fp == NULL ) return NULL ;
01788
01789
01790
01791 ns = NI_malloc(NI_stream_type, sizeof(NI_stream_type) ) ;
01792
01793 ns->type = NI_FILE_TYPE;
01794 ns->nbuf = 0 ;
01795 ns->npos = 0 ;
01796 ns->fp = fp ;
01797 ns->io_mode = do_create ? NI_OUTPUT_MODE
01798 : NI_INPUT_MODE ;
01799 ns->bad = 0 ;
01800 ns->b64_numleft = 0 ;
01801
01802 ns->bufsize = do_create ? 16 : NI_BUFSIZE ;
01803 ns->buf = NI_malloc(char, ns->bufsize) ;
01804
01805 NI_strncpy( ns->name , fname , 256 ) ;
01806
01807 NI_strncpy(ns->orig_name,name,256) ;
01808
01809 if( ns->io_mode == NI_INPUT_MODE )
01810 ns->fsize = NI_filesize( fname ) ;
01811 else
01812 ns->fsize = -1 ;
01813
01814 add_open_stream(ns) ;
01815 ns->goodcheck_time = -99 ;
01816 return ns ;
01817 }
01818
01819
01820
01821
01822 if( strncmp(name,"stdin:" ,6) == 0 ) name = "fd:0" ;
01823 else if( strncmp(name,"stdout:",7) == 0 ) name = "fd:1" ;
01824 else if( strncmp(name,"stderr:",7) == 0 ) name = "fd:2" ;
01825
01826 if( strncmp(name,"fd:",3) == 0 ){
01827 int fd=-1 ; FILE *fp ;
01828
01829 sscanf(name+3,"%d",&fd) ;
01830 if( fd < 0 ) return NULL ;
01831
01832 switch( fd ){
01833 default:
01834 fp = fdopen( fd , do_create ? "wb" : "rb" ) ;
01835 if( fp == NULL ) return NULL ;
01836 break ;
01837
01838 case 0:
01839 fp = stdin ;
01840 if( do_create ) return NULL ;
01841 break ;
01842
01843 case 1:
01844 fp = stdout ;
01845 if( !do_create ) return NULL ;
01846 break ;
01847
01848 case 2:
01849 fp = stderr ;
01850 if( !do_create ) return NULL ;
01851 break ;
01852 }
01853
01854
01855
01856 ns = NI_malloc(NI_stream_type, sizeof(NI_stream_type) ) ;
01857
01858 ns->type = NI_FD_TYPE;
01859 ns->nbuf = 0 ;
01860 ns->npos = 0 ;
01861 ns->fp = fp ;
01862 ns->io_mode = do_create ? NI_OUTPUT_MODE
01863 : NI_INPUT_MODE ;
01864 ns->bad = 0 ;
01865 ns->b64_numleft = 0 ;
01866
01867 ns->bufsize = do_create ? 16 : NI_BUFSIZE ;
01868 ns->buf = NI_malloc(char, ns->bufsize) ;
01869
01870 NI_strncpy( ns->name , name , 256 ) ;
01871
01872 NI_strncpy(ns->orig_name,name,256) ;
01873
01874 ns->fsize = -1 ;
01875
01876 add_open_stream(ns) ;
01877 ns->goodcheck_time = -99 ;
01878 return ns ;
01879 }
01880
01881
01882
01883
01884 if( strncmp(name,"str:",4) == 0 ){
01885
01886 int nn = NI_strlen(name+4) ;
01887
01888 ns = NI_malloc(NI_stream_type, sizeof(NI_stream_type) ) ;
01889
01890 ns->type = NI_STRING_TYPE;
01891 ns->io_mode = do_create ? NI_OUTPUT_MODE
01892 : NI_INPUT_MODE ;
01893 ns->bad = 0 ;
01894 ns->npos = 0 ;
01895 ns->b64_numleft = 0 ;
01896
01897
01898
01899
01900
01901
01902 if( do_accept ){
01903 ns->nbuf = nn ;
01904 ns->bufsize = nn+1 ;
01905 ns->buf = NI_malloc(char, nn+1) ;
01906 strcpy(ns->buf,name+4) ;
01907 } else {
01908 ns->nbuf = 0 ;
01909 ns->bufsize = 1 ;
01910 ns->buf = NI_malloc(char, 1) ;
01911 }
01912
01913 strcpy( ns->name , "ElvisHasLeftTheBuilding" ) ;
01914
01915 NI_strncpy(ns->orig_name,name,256) ;
01916
01917 add_open_stream(ns) ;
01918 ns->goodcheck_time = -99 ;
01919 return ns ;
01920 }
01921
01922
01923
01924
01925 if( strncmp(name,"http://",7) == 0 || strncmp(name,"ftp://",6) == 0 ){
01926 int nn ;
01927 char *data=NULL ;
01928
01929 if( do_create ) return NULL ;
01930
01931 nn = NI_read_URL( name , &data ) ;
01932
01933 if( data == NULL || nn <= 4 ){
01934 NI_free(data); return NULL;
01935 }
01936
01937 ns = NI_malloc(NI_stream_type, sizeof(NI_stream_type) ) ;
01938
01939 ns->type = NI_REMOTE_TYPE;
01940 ns->io_mode = NI_INPUT_MODE ;
01941 ns->bad = 0 ;
01942 ns->npos = 0 ;
01943 ns->nbuf = nn ;
01944 ns->bufsize = nn ;
01945 ns->buf = data ;
01946 ns->b64_numleft = 0 ;
01947
01948 NI_strncpy( ns->name , name , 256 ) ;
01949
01950 NI_strncpy(ns->orig_name,name,256) ;
01951
01952 add_open_stream(ns) ;
01953 ns->goodcheck_time = -99 ;
01954 return ns ;
01955 }
01956
01957 return NULL ;
01958 }
01959
01960
01961
01962
01963
01964
01965
01966
01967
01968
01969
01970
01971
01972
01973
01974
01975
01976
01977
01978
01979
01980
01981
01982
01983
01984
01985
01986 int NI_stream_reopen( NI_stream_type *ns , char *nname )
01987 {
01988 NI_stream_type *nsnew ;
01989 int typ_new=0 , port_new=0 , jj,kk ;
01990 char msg[1024] ;
01991
01992
01993
01994 if( ns == NULL || ns->type != NI_TCP_TYPE ) return 0 ;
01995 if( ns->bad == MARKED_FOR_DEATH ) return 0 ;
01996 if( nname == NULL || nname[0] == '\0' ) return 0 ;
01997
01998 if( strncmp(nname,"tcp::",5) == 0 ){
01999 typ_new = NI_TCP_TYPE ;
02000 port_new = strtol(nname+5,NULL,10) ;
02001 if( port_new <= 0 ) return 0 ;
02002 if( port_new == ns->port ) return 1 ;
02003 #ifndef DONT_USE_SHM
02004 } else if( strncmp(nname,"shm:" ,4) == 0 ){
02005 char *eee = getenv("AFNI_NOSHM") ;
02006 if( eee != NULL && toupper(*eee) == 'Y' ){
02007 fprintf(stderr,"** NI_stream_reopen: shm is disabled\n");
02008 return 0 ;
02009 }
02010 if( strstr(ns->orig_name,":localhost:") == NULL ){
02011 fprintf(stderr,"** NI_stream_reopen: shm not localhost!\n");
02012 return 0 ;
02013 }
02014 #endif
02015 } else {
02016 fprintf(stderr,"** NI_stream_reopen: illegal input '%s'\n",nname);
02017 return 0 ;
02018 }
02019
02020 #ifdef NIML_DEBUG
02021 NI_dpr("NI_stream_reopen: waiting for original connection to be good\n") ;
02022 #endif
02023
02024
02025
02026 for( kk=0 ; kk < 10 ; kk++ ){
02027 jj = NI_stream_goodcheck( ns , 1000 ) ;
02028 if( jj > 0 ) break;
02029 if( kk == 0 )
02030 fprintf(stderr,"++ NI_stream_reopen: Waiting for socket connection") ;
02031 else
02032 fprintf(stderr,".") ;
02033 }
02034 if( kk == 10 ){ fprintf(stderr," *Failed*\n"); return 0; }
02035 if( kk > 0 ) fprintf(stderr," *Good*\n") ;
02036
02037
02038
02039 if( strncmp(nname,"tcp::",5) == 0 ){
02040 sprintf(msg,"tcp:%s:%d",ns->name,port_new) ;
02041 }
02042 #ifndef DONT_USE_SHM
02043 else if( strncmp(nname,"shm:" ,4) == 0 ){
02044 NI_strncpy(msg,nname,1024) ;
02045 }
02046 #endif
02047
02048 #ifdef NIML_DEBUG
02049 NI_dpr("NI_stream_reopen: opening new stream %s\n",msg) ;
02050 #endif
02051
02052 nsnew = NI_stream_open( msg, "w" ) ;
02053 if( nsnew == NULL ) return 0 ;
02054
02055
02056
02057
02058 sprintf(msg,"<?ni_do ni_verb='reopen_this' ni_object='%s' ?>\n",nname) ;
02059 kk = strlen(msg) ;
02060
02061 #ifdef NIML_DEBUG
02062 NI_dpr("NI_stream_reopen: sending message %s",msg) ;
02063 #endif
02064
02065 jj = NI_stream_write( ns , msg , kk ) ;
02066 if( jj < kk ){
02067 NI_stream_closenow(nsnew) ; return 0 ;
02068 }
02069
02070
02071
02072 #ifdef NIML_DEBUG
02073 NI_dpr("NI_stream_reopen: waiting for new stream to be good\n") ;
02074 #endif
02075
02076 jj = NI_stream_goodcheck( nsnew , 5000 ) ;
02077 if( jj <= 0 ){
02078 NI_stream_closenow(nsnew) ; return 0 ;
02079 }
02080
02081
02082
02083
02084
02085 #ifdef NIML_DEBUG
02086 NI_dpr("NI_stream_reopen: closing old stream\n") ;
02087 #endif
02088
02089 NI_stream_close_keep(ns,0) ;
02090
02091 *ns = *nsnew ;
02092
02093
02094
02095
02096
02097
02098 remove_open_stream(nsnew) ; NI_free(nsnew) ; add_open_stream(ns) ;
02099
02100 return 1 ;
02101 }
02102
02103
02104
02105
02106
02107
02108
02109 void NI_stream_seek( NI_stream_type *ns , int offset , int whence )
02110 {
02111 if( ns == NULL ||
02112 ns->bad == MARKED_FOR_DEATH ||
02113 ns->type != NI_FILE_TYPE ||
02114 ns->fp == NULL ) return ;
02115
02116 fseek( ns->fp , offset , whence ) ;
02117 ns->nbuf = ns->npos = 0 ;
02118 }
02119
02120
02121
02122
02123
02124
02125
02126 int NI_stream_readable( NI_stream_type *ns )
02127 {
02128 if( ns == NULL || ns->bad == MARKED_FOR_DEATH ) return 0 ;
02129 if( ns->type == NI_TCP_TYPE || ns->type == NI_SHM_TYPE ) return 1 ;
02130 return (ns->io_mode == NI_INPUT_MODE) ;
02131 }
02132
02133
02134
02135
02136
02137
02138
02139 int NI_stream_writeable( NI_stream_type *ns )
02140 {
02141 if( ns == NULL || ns->bad == MARKED_FOR_DEATH ) return 0 ;
02142 if( ns->type == NI_TCP_TYPE || ns->type == NI_SHM_TYPE ) return 1 ;
02143 return (ns->io_mode == NI_OUTPUT_MODE) ;
02144 }
02145
02146
02147
02148
02149
02150
02151 char * NI_stream_name( NI_stream_type *ns )
02152 {
02153 if( ns == NULL ) return NULL ;
02154 return ns->name ;
02155 }
02156
02157
02158
02159
02160
02161
02162
02163
02164
02165 int NI_stream_setbufsize( NI_stream_type *ns , int bs )
02166 {
02167 char *qbuf ;
02168 if( ns == NULL ||
02169 ns->type == NI_STRING_TYPE ||
02170 ns->bad == MARKED_FOR_DEATH ||
02171 bs < 666 ||
02172 bs < ns->nbuf ) return -1 ;
02173
02174 if( !( ns->type == NI_TCP_TYPE || ns->type == NI_SHM_TYPE ||
02175 (ns->type == NI_FILE_TYPE && ns->io_mode == NI_INPUT_MODE) ||
02176 (ns->type == NI_FD_TYPE && ns->io_mode == NI_INPUT_MODE) ) )
02177 return -1 ;
02178
02179 qbuf = NI_realloc( ns->buf , char , bs ) ;
02180 if( qbuf == NULL ) return -1 ;
02181 ns->buf = qbuf ;
02182 ns->bufsize = bs ;
02183 return 1 ;
02184 }
02185
02186
02187
02188
02189
02190
02191 int NI_stream_getbufsize( NI_stream_type *ns )
02192 {
02193 if( ns == NULL || ns->bad == MARKED_FOR_DEATH ) return -1 ;
02194 return ns->bufsize ;
02195 }
02196
02197
02198
02199
02200
02201
02202
02203
02204 char * NI_stream_getbuf( NI_stream_type *ns )
02205 {
02206 if( ns == NULL ||
02207 ns->type != NI_STRING_TYPE ||
02208 ns->io_mode != NI_OUTPUT_MODE ||
02209 ns->bad == MARKED_FOR_DEATH ) return NULL ;
02210
02211 return ns->buf ;
02212 }
02213
02214
02215
02216
02217
02218
02219 void NI_stream_clearbuf( NI_stream_type *ns )
02220 {
02221 if( ns == NULL ||
02222 ns->type != NI_STRING_TYPE ||
02223 ns->io_mode != NI_OUTPUT_MODE ) return ;
02224
02225 NI_free(ns->buf) ;
02226 ns->nbuf = 0 ;
02227 ns->bufsize = 1 ;
02228 ns->buf = NI_malloc(char, 1) ;
02229 }
02230
02231
02232
02233
02234
02235
02236
02237
02238 void NI_stream_setbuf( NI_stream_type *ns , char *str )
02239 {
02240 int nn ;
02241
02242 if( ns == NULL ||
02243 ns->type != NI_STRING_TYPE ||
02244 ns->io_mode != NI_INPUT_MODE ||
02245 str == NULL ||
02246 ns->bad == MARKED_FOR_DEATH ) return ;
02247
02248 NI_free(ns->buf) ;
02249 nn = NI_strlen(str) ;
02250 ns->nbuf = nn ;
02251 ns->npos = 0 ;
02252 ns->bufsize = nn+1 ;
02253 ns->buf = NI_malloc(char, nn+1) ;
02254 strcpy(ns->buf,str) ;
02255 return ;
02256 }
02257
02258
02259
02260
02261
02262
02263
02264
02265
02266
02267
02268
02269
02270
02271
02272
02273
02274
02275
02276 int NI_stream_goodcheck( NI_stream_type *ns , int msec )
02277 {
02278 int ii , jj ;
02279 char *bbb ;
02280
02281
02282
02283 if( ns == NULL || ns->bad == MARKED_FOR_DEATH ) return -1 ;
02284
02285 switch( ns->type ){
02286
02287 #ifndef DONT_USE_SHM
02288
02289
02290 case NI_SHM_TYPE:
02291 return SHM_goodcheck( ns->shmioc , msec ) ;
02292 #endif
02293
02294
02295
02296 case NI_FILE_TYPE:
02297 if( ns->fp == NULL ) return -1 ;
02298 if( ns->io_mode == NI_INPUT_MODE )
02299 return NI_stream_readcheck(ns,0) ;
02300 else
02301 return 1 ;
02302
02303 case NI_FD_TYPE:
02304 return 1 ;
02305
02306
02307
02308 case NI_STRING_TYPE:
02309 if( ns->io_mode == NI_INPUT_MODE )
02310 return NI_stream_readcheck(ns,0) ;
02311 else
02312 return 1 ;
02313
02314
02315
02316 case NI_REMOTE_TYPE:
02317 if( ns->io_mode == NI_INPUT_MODE )
02318 return NI_stream_readcheck(ns,0) ;
02319 else
02320 return -1 ;
02321
02322
02323
02324 case NI_TCP_TYPE:
02325 if( ns->bad == 0 ){
02326 int ich ;
02327 ich = tcp_alivecheck(ns->sd) ;
02328
02329 #ifdef NIML_DEBUG
02330 if( ich == 0 )
02331 NI_dpr("++ Socket %s (port %d) has gone bad!\n",ns->name,ns->port);
02332 #endif
02333
02334 if( ich == 0 ) return -1 ;
02335 return 1 ;
02336 }
02337
02338
02339
02340
02341
02342 if( ns->bad == TCP_WAIT_ACCEPT ){
02343 ii = tcp_readcheck(ns->sd,msec) ;
02344 if( ii > 0 ){
02345 jj = tcp_accept( ns->sd , NULL,&bbb ) ;
02346 if( jj >= 0 ){
02347 CLOSEDOWN( ns->sd ) ;
02348 NI_strncpy(ns->name,bbb,256) ;
02349 NI_free(bbb); ns->bad = 0; ns->sd = jj;
02350 fcntl( ns->sd, F_SETOWN, (int)getpid() );
02351 }
02352 }
02353 }
02354
02355
02356
02357 else if( ns->bad == TCP_WAIT_CONNECT ){
02358 int dms=0 , ms ;
02359
02360 if( msec < 0 ) msec = 999999999 ;
02361 for( ms=0 ; ms < msec ; ms += dms ){
02362 ns->sd = tcp_connect( ns->name , ns->port );
02363 if( ns->sd >= 0 ) break ;
02364 dms = NEXTDMS(dms); dms = MIN(dms,msec-ms); NI_sleep(dms);
02365 }
02366 if( ns->sd < 0 )
02367 ns->sd = tcp_connect( ns->name , ns->port ) ;
02368
02369 if( ns->sd >= 0 ) ns->bad = 0 ;
02370 if( ns->sd >= 0 )
02371 fcntl( ns->sd, F_SETOWN, (int)getpid() );
02372 }
02373
02374
02375
02376 return (ns->bad == 0) ;
02377 }
02378
02379 return -1 ;
02380 }
02381
02382
02383
02384
02385
02386
02387
02388
02389 void NI_stream_close_keep( NI_stream_type *ns , int flag )
02390 {
02391 if( ns == NULL || !isgraph(ns->orig_name[0]) ) return ;
02392
02393 if( (flag & 4) == 0 )
02394 remove_open_stream( ns ) ;
02395
02396 if( ns->bad == MARKED_FOR_DEATH ){
02397 if( ns->buf != NULL ){ NI_free(ns->buf); ns->buf = NULL;}
02398 return ;
02399 }
02400
02401
02402
02403 if( (flag & 1) != 0 &&
02404 (ns->type == NI_TCP_TYPE || ns->type == NI_SHM_TYPE) &&
02405 NI_stream_writecheck(ns,1) > 0 ){
02406
02407 NI_stream_writestring( ns , "<?ni_do ni_verb='close_this' ?>\n" ) ;
02408 NI_sleep(9) ;
02409 }
02410
02411
02412
02413 switch( ns->type ){
02414
02415 #ifndef DONT_USE_SHM
02416 case NI_SHM_TYPE:
02417 NI_sleep(9) ;
02418 SHM_close( ns->shmioc ) ;
02419 break ;
02420 #endif
02421
02422 case NI_FD_TYPE:
02423 if( ns->fp != NULL && ns->io_mode == NI_OUTPUT_MODE ) fflush(ns->fp) ;
02424 break ;
02425
02426 case NI_REMOTE_TYPE:
02427 case NI_STRING_TYPE:
02428 break ;
02429
02430 case NI_FILE_TYPE:
02431 if( ns->fp != NULL ) fclose(ns->fp) ;
02432 break ;
02433
02434 case NI_TCP_TYPE:
02435 if( ns->sd >= 0 ){
02436 if( (flag & 2) != 0 ){
02437 tcp_send( ns->sd , "X" , 1 , MSG_OOB ) ;
02438 NI_sleep(9) ;
02439 }
02440 NI_sleep(2) ;
02441 CLOSEDOWN(ns->sd) ;
02442 }
02443 break ;
02444 }
02445
02446 ns->bad = MARKED_FOR_DEATH ;
02447 if( (flag & 4) == 0 ){
02448 NI_free(ns->buf) ; ns->buf = NULL ;
02449 }
02450 return ;
02451 }
02452
02453
02454
02455
02456
02457
02458
02459
02460 void NI_stream_close( NI_stream_type *ns )
02461 {
02462 NI_stream_close_keep(ns,1) ; NI_free(ns) ; return ;
02463 }
02464
02465
02466
02467
02468
02469 void NI_stream_closenow( NI_stream_type *ns )
02470 {
02471 NI_stream_close_keep(ns,0) ; NI_free(ns) ; return ;
02472 }
02473
02474
02475
02476
02477
02478 void NI_stream_kill( NI_stream_type *ns )
02479 {
02480 NI_stream_close_keep(ns,3) ; NI_free(ns) ; return ;
02481 }
02482
02483
02484
02485
02486
02487
02488
02489
02490 int NI_stream_hasinput( NI_stream_type *ns , int msec )
02491 {
02492 if( ns == NULL || ns->bad == MARKED_FOR_DEATH ) return -1 ;
02493
02494 if( ns->npos < ns->nbuf ) return 1 ;
02495 return NI_stream_readcheck( ns , msec ) ;
02496 }
02497
02498
02499
02500
02501
02502
02503
02504
02505
02506
02507
02508
02509
02510
02511
02512 int NI_stream_readcheck( NI_stream_type *ns , int msec )
02513 {
02514 int ii ;
02515
02516 if( ns == NULL || ns->bad == MARKED_FOR_DEATH ) return -1 ;
02517
02518 switch( ns->type ){
02519
02520 #ifndef DONT_USE_SHM
02521 case NI_SHM_TYPE:
02522 ii = SHM_readcheck( ns->shmioc , msec ) ;
02523 if( ii > 0 ) ii = 1 ;
02524 return ii ;
02525 #endif
02526
02527
02528
02529 case NI_TCP_TYPE:
02530 ii = NI_stream_goodcheck(ns,0) ;
02531 if( ii == -1 ) return -1 ;
02532 if( ii == 0 ){
02533 ii = NI_stream_goodcheck(ns,msec) ;
02534 if( ii != 1 ) return ii ;
02535 }
02536 ii = tcp_alivecheck( ns->sd ) ;
02537 if( !ii ) return -1 ;
02538 ii = tcp_readcheck( ns->sd , msec ) ;
02539 return ii ;
02540
02541
02542
02543 case NI_FD_TYPE:
02544 ii = tcp_readcheck( fileno(ns->fp) , msec ) ;
02545 return ii ;
02546
02547
02548
02549 case NI_FILE_TYPE:{
02550 long f_len , f_pos ;
02551
02552 if( ns->fp == NULL ||
02553 ns->io_mode == NI_OUTPUT_MODE ) return -1 ;
02554
02555 f_len = ns->fsize ;
02556 if( f_len < 0 ) return -1 ;
02557
02558 f_pos = ftell( ns->fp ) ;
02559 if( f_pos < 0 ) return -1 ;
02560
02561 return (f_pos < f_len) ? 1 : -1 ;
02562
02563 }
02564
02565
02566
02567 case NI_REMOTE_TYPE:
02568 case NI_STRING_TYPE:{
02569 if( ns->io_mode == NI_OUTPUT_MODE ) return -1 ;
02570
02571 return (ns->npos < ns->nbuf) ? 1 : -1 ;
02572 }
02573 }
02574
02575 return -1 ;
02576 }
02577
02578
02579
02580
02581
02582
02583
02584
02585
02586
02587
02588
02589
02590 int NI_stream_writecheck( NI_stream_type *ns , int msec )
02591 {
02592 int ii ;
02593
02594 if( !NI_stream_writeable(ns) ) return -1 ;
02595
02596 switch( ns->type ){
02597
02598 #ifndef DONT_USE_SHM
02599 case NI_SHM_TYPE:
02600 ii = SHM_writecheck( ns->shmioc , msec ) ;
02601 if( ii > 0 ) ii = 1 ;
02602 return ii ;
02603 #endif
02604
02605
02606
02607 case NI_TCP_TYPE:
02608 if( ns->bad ){
02609 ii = NI_stream_goodcheck(ns,0) ;
02610 if( ii == -1 ) return -1 ;
02611 if( ii == 0 ){
02612 ii = NI_stream_goodcheck(ns,msec);
02613 if( ii != 1 ) return ii ;
02614 }
02615 }
02616
02617 return tcp_writecheck(ns->sd,msec) ;
02618
02619
02620
02621 case NI_FD_TYPE:
02622 return tcp_writecheck( fileno(ns->fp) , msec ) ;
02623
02624
02625
02626 case NI_FILE_TYPE:
02627 return ( (ns->fp != NULL && ns->io_mode == NI_OUTPUT_MODE) ? 1
02628 : -1 ) ;
02629
02630
02631
02632 case NI_STRING_TYPE:
02633 return ( (ns->io_mode == NI_OUTPUT_MODE) ? 1
02634 : -1 ) ;
02635
02636
02637 case NI_REMOTE_TYPE:
02638 return -1 ;
02639 }
02640
02641 return -1 ;
02642 }
02643
02644
02645
02646
02647
02648 int NI_stream_writestring( NI_stream_type *ns , char *str )
02649 {
02650 if( str == NULL ) return -1 ;
02651 return NI_stream_write( ns , str , strlen(str) ) ;
02652 }
02653
02654
02655
02656
02657
02658
02659
02660
02661
02662
02663
02664
02665
02666
02667
02668
02669
02670
02671
02672
02673
02674 int NI_stream_write( NI_stream_type *ns , char *buffer , int nbytes )
02675 {
02676 int ii , nsent ;
02677
02678
02679
02680 if( ns == NULL || ns->bad ||
02681 buffer == NULL || nbytes < 0 || ns->bad == MARKED_FOR_DEATH ) return -1;
02682
02683 if( nbytes == 0 ) return 0 ;
02684
02685 #ifdef NIML_DEBUG
02686 NI_dpr("ENTER NI_stream_write\n") ;
02687 #endif
02688
02689 if( ns->type != NI_TCP_TYPE ){
02690 ii = NI_stream_writecheck(ns,66) ;
02691 if( ii < 0 ) return ii ;
02692 }
02693
02694 switch( ns->type ){
02695
02696 #ifndef DONT_USE_SHM
02697 case NI_SHM_TYPE:
02698 return SHM_sendall( ns->shmioc , buffer , nbytes ) ;
02699 #endif
02700
02701
02702
02703 case NI_TCP_TYPE:
02704
02705 if( ns->bad ) return 0 ;
02706
02707
02708
02709
02710 if( !nosigpipe ){ signal(SIGPIPE,SIG_IGN); nosigpipe = 1; }
02711
02712 #if 0
02713
02714 do{ ii=tcp_writecheck(ns->sd,1) ; } while(ii==0) ;
02715 if( ii < 0 ) return -1 ;
02716 #endif
02717
02718 errno = 0 ;
02719 nsent = tcp_send( ns->sd , buffer , nbytes , 0 ) ;
02720 if( nsent < nbytes || errno != 0 ) PERROR("NI_stream_write(send)") ;
02721 if( nsent == 0 ){ fprintf(stderr,"tcp send: 0/%d\n",nbytes); nsent=-1; }
02722 return nsent ;
02723
02724
02725
02726 case NI_FD_TYPE:
02727 case NI_FILE_TYPE:
02728 #ifdef NIML_DEBUG
02729 NI_dpr(" file: about to write %d bytes\n",nbytes) ;
02730 #endif
02731 nsent = fwrite( buffer , 1 , nbytes , ns->fp ) ;
02732 if( nsent < nbytes ) PERROR("NI_stream_write(fwrite)") ;
02733 #ifdef NIML_DEBUG
02734 NI_dpr(" file: actually wrote %d bytes\n",nsent) ;
02735 #endif
02736 if( nsent == 0 ) nsent = -1 ;
02737 fflush(ns->fp) ;
02738 return nsent ;
02739
02740
02741
02742 case NI_STRING_TYPE:
02743 #ifdef NIML_DEBUG
02744 NI_dpr("NI_stream_write str: input=%s\n",ns->buf) ;
02745 #endif
02746 ns->buf = NI_realloc( ns->buf , char , ns->bufsize+nbytes ) ;
02747 memcpy( ns->buf+ns->nbuf , buffer , nbytes ) ;
02748 ns->nbuf += nbytes ; ns->buf[ns->nbuf] = '\0' ;
02749 ns->bufsize += nbytes ;
02750 #ifdef NIML_DEBUG
02751 NI_dpr("NI_stream_write str: output=%s\n",ns->buf) ;
02752 #endif
02753 return nbytes ;
02754
02755
02756
02757 case NI_REMOTE_TYPE:
02758 return -1 ;
02759 }
02760
02761 return -1 ;
02762 }
02763
02764
02765
02766
02767
02768
02769
02770
02771
02772
02773
02774
02775
02776
02777
02778
02779
02780
02781
02782
02783 int NI_stream_read( NI_stream_type *ns , char *buffer , int nbytes )
02784 {
02785 int ii ;
02786
02787
02788
02789 if( ns == NULL || ns->bad || buffer == NULL || nbytes < 0 ) return -1 ;
02790
02791 if( nbytes == 0 ) return 0 ;
02792
02793 #ifdef NIML_DEBUG
02794 NI_dpr("ENTER NI_stream_read\n") ;
02795 #endif
02796
02797 switch( ns->type ){
02798
02799 #ifndef DONT_USE_SHM
02800 case NI_SHM_TYPE:
02801 return SHM_recv( ns->shmioc , buffer , nbytes ) ;
02802 #endif
02803
02804
02805
02806 case NI_TCP_TYPE:
02807 ii = NI_stream_goodcheck(ns,1) ; if( ii != 1 ) return ii ;
02808 #if 0
02809
02810 do{ ii=tcp_readcheck(ns->sd,1); } while( ii==0 ) ;
02811 if( ii < 0 ) return -1 ;
02812 #endif
02813 errno = 0 ;
02814 ii = tcp_recv( ns->sd , buffer , nbytes , 0 ) ;
02815 if( ii == -1 || errno != 0 ) PERROR("NI_stream_read(recv)") ;
02816 #ifdef NIML_DEBUG
02817 NI_dpr(" tcp: got %d/%d bytes ***\n",ii,nbytes) ;
02818 #endif
02819 return ii ;
02820
02821
02822
02823 case NI_FD_TYPE:
02824 case NI_FILE_TYPE:
02825 if( ns->fp == NULL || ns->io_mode == NI_OUTPUT_MODE ) return -1 ;
02826 ii = fread( buffer , 1 , nbytes , ns->fp ) ;
02827 return ii ;
02828
02829
02830
02831 case NI_REMOTE_TYPE:
02832 case NI_STRING_TYPE:
02833 if( ns->io_mode == NI_OUTPUT_MODE ) return -1 ;
02834 ii = ns->nbuf - ns->npos ;
02835 if( ii <= 0 ) return -1 ;
02836 if( ii > nbytes ) ii = nbytes ;
02837 memcpy( buffer , ns->buf+ns->npos , ii ) ;
02838 ns->npos += ii ;
02839 return ii ;
02840 }
02841
02842 return -1 ;
02843 }
02844
02845
02846
02847
02848
02849
02850
02851
02852
02853
02854
02855
02856
02857
02858
02859
02860
02861
02862
02863
02864
02865 int NI_stream_fillbuf( NI_stream_type *ns, int minread, int msec )
02866 {
02867 int nn , ii , ntot=0 , ngood=0 , mwait=0 ;
02868 int start_msec = NI_clock_time() ;
02869
02870 if( NI_stream_goodcheck(ns,0) < 0 ) return -1 ;
02871
02872 if( ns->type == NI_STRING_TYPE ) return -1 ;
02873 if( ns->type == NI_REMOTE_TYPE ) return -1 ;
02874
02875 if( ns->nbuf >= ns->bufsize ) return 0 ;
02876
02877 if( msec < 0 ) msec = 999999999 ;
02878
02879
02880
02881 while(1){
02882
02883 ngood = NI_stream_readcheck(ns,mwait);
02884
02885 if( ngood < 0 ) break ;
02886
02887 if( ngood > 0 ){
02888
02889
02890 ii = NI_stream_read( ns, ns->buf+ns->nbuf, ns->bufsize-ns->nbuf ) ;
02891
02892 if( ii > 0 ){
02893 ns->nbuf += ii ;
02894 ntot += ii ;
02895
02896
02897
02898
02899 if( ns->nbuf >= ns->bufsize || ntot >= minread ) break ;
02900
02901 } else if( ii < 0 ){
02902 ngood = -1 ; break ;
02903 }
02904 }
02905
02906
02907
02908 if( minread <= 0 ) break ;
02909
02910
02911
02912 if( NI_clock_time()-start_msec >= msec ) break ;
02913
02914
02915
02916 if( mwait < 9 ) mwait++ ;
02917 }
02918
02919
02920
02921
02922 if( ntot == 0 && ngood < 0 ) ntot = -1 ;
02923
02924 return ntot ;
02925 }
02926
02927
02928
02929
02930
02931
02932
02933
02934
02935
02936
02937 int NI_stream_readbuf( NI_stream_type *ns , char *buffer , int nbytes )
02938 {
02939 int ii , jj , bs , nout=0 ;
02940
02941
02942
02943 if( nbytes == 0 ) return 0;
02944 if( buffer == NULL || nbytes < 0 ) return -1;
02945 if( ns->buf == NULL || ns->bufsize == 0 ) return -1;
02946 if( !NI_stream_readable(ns) ) return -1;
02947
02948
02949
02950 ii = ns->nbuf - ns->npos ;
02951
02952 if( ii >= nbytes ){
02953 memcpy( buffer , ns->buf + ns->npos , nbytes ) ;
02954 ns->npos += nbytes ;
02955 if( ns->npos == ns->nbuf ) ns->nbuf = ns->npos = 0 ;
02956 return nbytes ;
02957 }
02958
02959
02960
02961 if( ii > 0 ){
02962 memcpy( buffer , ns->buf + ns->npos , ii ) ; nout = ii ;
02963 }
02964 ns->nbuf = ns->npos = 0 ;
02965
02966
02967
02968 if( ns->type == NI_REMOTE_TYPE || ns->type == NI_STRING_TYPE )
02969 return (nout > 0) ? nout : -1 ;
02970
02971
02972
02973 bs = ns->bufsize ;
02974
02975 while( nout < nbytes ){
02976
02977 jj = MIN( bs , nbytes-nout ) ;
02978 ii = NI_stream_fillbuf( ns,jj,1666 ) ;
02979
02980 if( ii > 0 ){
02981 ii = ns->nbuf ;
02982 if( ii > nbytes-nout ) ii = nbytes-nout ;
02983 memcpy( buffer+nout , ns->buf , ii ) ; nout += ii ;
02984 ns->npos += ii ; NI_reset_buffer( ns ) ;
02985 } else {
02986 break ;
02987 }
02988 }
02989
02990 if( nout == 0 && ii < 0 ) nout = -1 ;
02991 return nout ;
02992 }
02993
02994
02995
02996
02997
02998
02999
03000
03001
03002
03003
03004 int NI_stream_readbuf64( NI_stream_type *ns , char *buffer , int nbytes )
03005 {
03006 int ii , jj , bs , nout=0 ;
03007 byte a ,b ,c , w,x,y,z ;
03008 byte ag,bg,cg ;
03009 int num_reread , bpos ;
03010
03011
03012
03013 if( nbytes == 0 ) return 0;
03014 if( buffer == NULL || nbytes < 0 ) return -1;
03015 if( ns->buf == NULL || ns->bufsize == 0 ) return -1;
03016 if( !NI_stream_readable(ns) ) return -1;
03017
03018
03019
03020
03021 if( ns->b64_numleft > 0 ){
03022
03023 if( ns->b64_numleft >= nbytes ){
03024 memcpy( buffer , ns->b64_left , nbytes ) ;
03025 ns->b64_numleft -= nbytes ;
03026 if( ns->b64_numleft > 0 )
03027 memmove( ns->b64_left , ns->b64_left + nbytes , ns->b64_numleft ) ;
03028 return nbytes ;
03029 }
03030
03031
03032
03033 memcpy( buffer , ns->b64_left , ns->b64_numleft ) ;
03034 nout = ns->b64_numleft ;
03035 ns->b64_numleft = 0 ;
03036 }
03037
03038
03039
03040
03041
03042 load_decode_table() ;
03043
03044
03045
03046 num_reread = 0 ;
03047 Base64Reread:
03048 ag = bg = cg = 0 ;
03049 num_reread++ ; if( num_reread > 5 ) goto Base64Done ;
03050
03051
03052
03053 if( num_reread > 1 || ns->nbuf - ns->npos < 4 ){
03054 NI_reset_buffer(ns) ;
03055 ii = 5 - ns->nbuf ; if( ii <= 1 ) ii = 2 ;
03056 ii = NI_stream_fillbuf( ns , ii , 1666 ) ;
03057 if( ns->nbuf < 4 ) goto Base64Done ;
03058 }
03059
03060
03061
03062
03063
03064
03065
03066
03067
03068
03069
03070 while( 1 ){
03071 ag = bg = cg = 0 ;
03072 bpos = ns->npos ;
03073
03074
03075
03076
03077
03078
03079 w = ns->buf[bpos++] ;
03080 while( !B64_goodchar(w) && w != '<' && bpos < ns->nbuf )
03081 w = ns->buf[bpos++] ;
03082 ns->npos = bpos-1 ;
03083 if( w == '<' ) goto Base64Done;
03084 if( bpos == ns->nbuf ) goto Base64Reread;
03085
03086
03087
03088 x = ns->buf[bpos++] ;
03089 while( !B64_goodchar(x) && x != '<' && bpos < ns->nbuf )
03090 x = ns->buf[bpos++] ;
03091 if( x == '<' ){ ns->npos = bpos-1; goto Base64Done; }
03092 if( bpos == ns->nbuf ) goto Base64Reread;
03093
03094
03095
03096 y = ns->buf[bpos++] ;
03097 while( !B64_goodchar(y) && y != '<' && bpos < ns->nbuf )
03098 y = ns->buf[bpos++] ;
03099 if( y == '<' ){ ns->npos = bpos-1; goto Base64Done; }
03100 if( bpos == ns->nbuf ) goto Base64Reread;
03101
03102
03103
03104 z = ns->buf[bpos++] ;
03105 while( !B64_goodchar(z) && z != '<' && bpos < ns->nbuf )
03106 z = ns->buf[bpos++] ;
03107 if( z == '<' ){ ns->npos = bpos-1; goto Base64Done; }
03108
03109
03110
03111 ns->npos = bpos ;
03112
03113 B64_decode4(w,x,y,z,a,b,c) ;
03114
03115 if( z == '=' ){
03116 int nn = B64_decode_count(w,x,y,z) ;
03117 ag = (nn > 0) ;
03118 bg = (nn > 1) ;
03119 cg = 0 ;
03120
03121
03122
03123
03124 if( ag ){ buffer[nout++]=a; ag=0; if(nout >= nbytes) goto Base64Done; }
03125 if( bg ){ buffer[nout++]=b; bg=0; if(nout >= nbytes) goto Base64Done; }
03126 goto Base64Done ;
03127 }
03128
03129
03130
03131
03132 ag = bg = cg = 1 ;
03133 buffer[nout++]=a; ag=0; if(nout >= nbytes) goto Base64Done;
03134 buffer[nout++]=b; bg=0; if(nout >= nbytes) goto Base64Done;
03135 buffer[nout++]=c; cg=0; if(nout >= nbytes) goto Base64Done;
03136
03137
03138
03139
03140
03141 num_reread = 1 ;
03142 if( ns->nbuf - ns->npos < 4 ) goto Base64Reread ;
03143
03144 }
03145
03146
03147
03148
03149
03150
03151 Base64Done:
03152 ns->b64_numleft = 0 ;
03153 if( ag ) ns->b64_left[ ns->b64_numleft++ ] = a ;
03154 if( bg ) ns->b64_left[ ns->b64_numleft++ ] = b ;
03155 if( cg ) ns->b64_left[ ns->b64_numleft++ ] = c ;
03156
03157 return nout ;
03158 }