Skip to content

AFNI/NIfTI Server

Sections
Personal tools
You are here: Home » AFNI » Documentation

Doxygen Source Code Documentation


Main Page   Alphabetical List   Data Structures   File List   Data Fields   Globals   Search  

thd_iochan.c File Reference

#include "thd_iochan.h"
#include "Amalloc.h"
#include <errno.h>
#include <signal.h>
#include <time.h>

Go to the source code of this file.


Defines

#define PERROR(x)   do{ if(pron) perror(x); } while(0)
#define STATUS(x)
#define USE_SHUTDOWN
#define CLOSEDOWN(ss)   ( shutdown((ss),2) , close((ss)) )
#define SOCKET_BUFSIZE   (31*1024)
#define QBUF   1024
#define MBUF   1048576

Functions

char * iochan_error_string (void)
void iochan_enable_perror (int q)
int tcp_readcheck (int sd, int msec)
int tcp_writecheck (int sd, int msec)
void tcp_set_cutoff (int sd)
int tcp_alivecheck (int sd)
int tcp_connect (char *host, int port)
int tcp_listen (int port)
int tcp_accept (int sd, char **hostname, char **hostaddr)
key_t string_to_key (char *key_string)
int shm_accept (char *key_string)
int shm_create (char *key_string, int size)
char * shm_attach (int shmid)
int shm_size (int shmid)
int shm_nattach (int shmid)
IOCHANiochan_init (char *name, char *mode)
int shm_alivecheck (int shmid)
int iochan_goodcheck (IOCHAN *ioc, int msec)
void iochan_close (IOCHAN *ioc)
void iochan_set_cutoff (IOCHAN *ioc)
int iochan_readcheck (IOCHAN *ioc, int msec)
int iochan_force_clear (IOCHAN *ioc)
int iochan_clearcheck (IOCHAN *ioc, int msec)
int iochan_writecheck (IOCHAN *ioc, int msec)
int iochan_send (IOCHAN *ioc, char *buffer, int nbytes)
int iochan_sendall (IOCHAN *ioc, char *buffer, int nbytes)
int iochan_recv (IOCHAN *ioc, char *buffer, int nbytes)
int iochan_recvloop (IOCHAN *ioc, char *buffer, int nbytes)
int iochan_recvall (IOCHAN *ioc, char *buffer, int nbytes)
void iochan_sleep (int msec)
int iochan_ctl (IOCHAN *ioc, int cmd, int arg)
void iochan_fork_sigfunc (int sig)
pid_t iochan_fork_relay (char *name_in, char *name_out)
double COX_clock_time (void)
double COX_cpu_time (void)

Variables

char * error_string = NULL
int shm_RMID_delay = 0
int pron = 1
int nosigpipe = 0
IOCHANioc_kill_1 = NULL
IOCHANioc_kill_2 = NULL

Define Documentation

#define CLOSEDOWN ss       ( shutdown((ss),2) , close((ss)) )
 

Definition at line 82 of file thd_iochan.c.

#define MBUF   1048576
 

#define PERROR      do{ if(pron) perror(x); } while(0)
 

Definition at line 72 of file thd_iochan.c.

#define QBUF   1024
 

Compute the number of readable bytes into nread. This routine should be called by the "reading" process. It will then be waiting until the "writing" process increments ioc->bend. *

Definition at line 1031 of file thd_iochan.c.

Referenced by iochan_force_clear().

#define SOCKET_BUFSIZE   (31*1024)
 

this is used to set the send/receive buffer size for sockets *

Definition at line 89 of file thd_iochan.c.

Referenced by tcp_connect(), and tcp_listen().

#define STATUS  
 

Definition at line 73 of file thd_iochan.c.

#define USE_SHUTDOWN
 

Definition at line 80 of file thd_iochan.c.


Function Documentation

double COX_clock_time void   
 

Definition at line 1695 of file thd_iochan.c.

Referenced by AFNI_splashdown(), AFNI_start_io(), calculate_results(), get_surf_data(), main(), and MAIN_workprocess().

01696 {
01697    struct timeval  new_tval ;
01698    struct timezone tzone ;
01699    static struct timeval old_tval ;
01700    static int first = 1 ;
01701 
01702    gettimeofday( &new_tval , &tzone ) ;
01703 
01704    if( first ){
01705       old_tval = new_tval ;
01706       first    = 0 ;
01707       return 0.0 ;
01708    }
01709 
01710    if( old_tval.tv_usec > new_tval.tv_usec ){
01711       new_tval.tv_usec += 1000000 ;
01712       new_tval.tv_sec -- ;
01713    }
01714 
01715    return (double)( (new_tval.tv_sec  - old_tval.tv_sec )
01716                    +(new_tval.tv_usec - old_tval.tv_usec)*1.0e-6 ) ;
01717 }

double COX_cpu_time void   
 

Definition at line 1723 of file thd_iochan.c.

01725 {
01726    struct tms ttt ;
01727 
01728    (void) times( &ttt ) ;
01729    return (  (double) (ttt.tms_utime /* + ttt.tms_stime */ )
01730            / (double) CLK_TCK ) ;
01731 }
01732 #else
01733 { return 0.0 ; }

int iochan_clearcheck IOCHAN   ioc,
int    msec
 

loop until readcheck says no data available *

Definition at line 1061 of file thd_iochan.c.

References IOCHAN::bend, IOCHAN::bstart, IOCHAN::bufsize, error_string, IOCHAN::id, iochan_goodcheck(), iochan_sleep(), MIN, NEXTDMS, SHM_IOCHAN, SHMIOC_WRITE, TCP_IOCHAN, tcp_readcheck(), and IOCHAN::type.

Referenced by AFNI_start_io(), AFNI_start_version_check(), ART_start_io(), main(), and RT_acquire_info().

01062 {
01063    int ii ;
01064 
01065    /** check if the IOCHAN is good **/
01066 
01067    error_string = NULL ;
01068 
01069    ii = iochan_goodcheck(ioc,0) ;
01070    if( ii == -1 ) return -1 ;            /* some error */
01071    if( ii == 0  ) return  1 ;            /* not good yet, so can be no data */
01072 
01073    /** tcp: ==> use the Unix "select" mechanism **/
01074 
01075    if( ioc->type == TCP_IOCHAN ) return ( tcp_readcheck(ioc->id,msec) == 0 ) ;
01076 
01077    /** shm: ==> must loop and wait ourselves **/
01078 
01079    if( ioc->type == SHM_IOCHAN ){
01080       int nread , dms=0 , ms ;
01081 
01082       if( msec < 0 ) msec = 999999999 ;      /* a long time (11+ days) */
01083 
01084       ioc = SHMIOC_WRITE(ioc) ;  /* 24 June 1997 */
01085 
01086       for( ms=0 ; ms < msec ; ms += dms ){
01087          nread = (*(ioc->bend) - *(ioc->bstart) + ioc->bufsize + 1) % (ioc->bufsize) ;
01088          if( nread == 0 ) return 1 ;
01089          dms = NEXTDMS(dms) ; dms = MIN(dms,msec-ms) ; iochan_sleep(dms) ;
01090          ii = iochan_goodcheck(ioc,0) ; if( ii == -1 ) return -1 ;
01091       }
01092       nread = (*(ioc->bend) - *(ioc->bstart) + ioc->bufsize + 1) % (ioc->bufsize) ;
01093       return (nread == 0) ;
01094    }
01095 
01096    return -1 ;  /* should never be reached */
01097 }

void iochan_close IOCHAN   ioc
 

if there is a second channel, check it too *

Definition at line 931 of file thd_iochan.c.

References IOCHAN::bstart, CLOSEDOWN, free, IOCHAN::id, IOCHAN::ioc2, SHM_IOCHAN, shm_nattach(), shm_RMID_delay, TCP_IOCHAN, and IOCHAN::type.

Referenced by AFNI_exit(), AFNI_plugout_exit(), ART_exit(), iochan_fork_relay(), iochan_fork_sigfunc(), iochan_init(), RT_exit(), and vc_exit().

00932 {
00933    if( ioc == NULL ) return ;
00934 
00935    if( ioc->ioc2 != NULL ) iochan_close(ioc->ioc2) ;
00936 
00937    if( ioc->type == TCP_IOCHAN ){
00938       if( ioc->id >= 0 ) CLOSEDOWN(ioc->id) ;
00939    }
00940 
00941    else if( ioc->type == SHM_IOCHAN ){
00942 #ifndef DONT_USE_SHM
00943       if( ioc->id >= 0 ){
00944          shmdt( (char *) ioc->bstart ) ;       /* detach */
00945                                                /* then kill */
00946          if( !shm_RMID_delay || shm_nattach(ioc->id) < 1 )
00947            shmctl( ioc->id , IPC_RMID , NULL ) ;
00948       }
00949 #endif
00950    }
00951 
00952    free( ioc ) ; return ;
00953 }

int iochan_ctl IOCHAN   ioc,
int    cmd,
int    arg
 

Definition at line 1470 of file thd_iochan.c.

References arg, IOC_TCP_SENDSIZE, and IOCHAN::sendsize.

01471 {
01472    if( ioc == NULL ) return -1 ;
01473 
01474    switch( cmd ){
01475 
01476       case IOC_TCP_SENDSIZE:
01477          if( arg >= 0 ){ ioc->sendsize = arg ; return  0 ; }
01478          else                                  return -1 ;
01479       break ;
01480 
01481    }
01482    return -1 ;
01483 }

void iochan_enable_perror int    q
 

Definition at line 65 of file thd_iochan.c.

References pron, and q.

Referenced by AFNI_start_version_check().

00065 { pron = q; }   /* ditto */

char* iochan_error_string void   
 

Definition at line 13 of file thd_iochan.c.

References error_string.

Referenced by afni_io(), AFNI_process_plugout(), and iochan_fork_relay().

00013 { return error_string; }

int iochan_force_clear IOCHAN   ioc
 

Definition at line 1033 of file thd_iochan.c.

References iochan_readcheck(), iochan_recv(), and QBUF.

01034 {
01035    int ii , ntot = 0 ;
01036    char qbuf[QBUF] ;
01037 
01038    do{
01039       ii = iochan_readcheck(ioc,0) ;
01040       if( ii == -1 ) return -1 ;
01041       if( ii ==  0 ) return  ntot ;
01042 
01043       ii = iochan_recv( ioc , qbuf , QBUF ) ;
01044       if( ii == -1 ) return -1 ;
01045       ntot += ii ;
01046 
01047    } while( 1 ) ;  /** loop until readcheck says no data available **/
01048 
01049    return -1 ;  /* should not be reached */
01050 }

pid_t iochan_fork_relay char *    name_in,
char *    name_out
 

Definition at line 1566 of file thd_iochan.c.

References AFMALL, iochan_close(), iochan_error_string(), iochan_fork_sigfunc(), iochan_goodcheck(), iochan_init(), iochan_readcheck(), iochan_recvloop(), iochan_sendall(), iochan_sleep(), and iochan_writecheck().

Referenced by AFNI_start_io().

01567 {
01568    pid_t ppid = (pid_t)(-1) ;
01569    int jj , kk , nbuf ;
01570 #define MBUF 1048576           /* 1 Megabyte */
01571    char * buf , *sss ;
01572    IOCHAN *ioc_in, *ioc_out ;
01573 
01574    if( name_in == NULL || name_out == NULL ) return ppid ;
01575 
01576    /*-- fork into two processes --*/
01577 
01578    ppid = fork() ;
01579    if( ppid == (pid_t)(-1) ){
01580       perror("iochan_fork failed") ;
01581       return ppid ;
01582    }
01583 
01584    if( ppid != 0 ){      /* the parent process */
01585       pid_t qpid ;
01586       iochan_sleep(5) ;                         /* wait a little bit */
01587       qpid = waitpid( ppid , NULL , WNOHANG ) ; /* see if child died */
01588       if( qpid == ppid ) ppid = (pid_t)(-1) ;   /* if it did, return error */
01589       return ppid ;
01590    }
01591 
01592    /*--- from here on is the child process, which never returns ---*/
01593 
01594    ioc_in = iochan_init( name_in , "accept" ) ;  /* open input */
01595    if( ioc_in == NULL ) _exit(1) ;               /* failed?   */
01596 
01597    ioc_out = iochan_init( name_out , "create" ) ; /* open output */
01598    if( ioc_out == NULL ){                         /* failed?    */
01599       iochan_close(ioc_in) ; _exit(1) ;
01600    }
01601 
01602    /* set signal handler to deal with sudden death situations */
01603 
01604    ioc_kill_1 = ioc_in  ;
01605    ioc_kill_2 = ioc_out ;
01606    signal( SIGTERM , iochan_fork_sigfunc ) ;
01607    signal( SIGSEGV , iochan_fork_sigfunc ) ;
01608 
01609    fprintf(stderr,"forked process for shm->tcp started\n") ;
01610 
01611    do{  /* loop until both iochans are ready */
01612 
01613       jj = iochan_goodcheck(ioc_in ,1) ;
01614       kk = iochan_goodcheck(ioc_out,1) ;
01615       if( jj < 0 || kk < 0 ){
01616          iochan_close(ioc_in) ; iochan_close(ioc_out) ; _exit(1) ;
01617       }
01618 
01619    } while( jj == 0 || kk == 0 ) ;
01620 
01621    fprintf(stderr,"forked process fully connected\n") ;
01622 
01623    buf = AFMALL(char, MBUF) ; /* workspace for transfers */
01624    if( buf == NULL ){
01625       fprintf(stderr,"forked process can't malloc I/O buffer") ;
01626       iochan_close(ioc_in) ; iochan_close(ioc_out) ; _exit(1) ;
01627    }
01628 
01629    while(1){  /* loop, waiting for data */
01630 
01631       errno = 0 ;
01632       jj = iochan_readcheck( ioc_in , 20 ) ;          /* any input? */
01633       if( jj < 0 ){                                   /* bad news?  */
01634          if( errno ) perror( "forked readcheck" ) ;
01635          else        fprintf(stderr,"forked readcheck abort: jj=%d!\n",jj) ;
01636          sss = iochan_error_string() ;
01637          if( sss != NULL ) fprintf(stderr," ** %s\n",sss) ;
01638          break ;
01639       }
01640       if( jj == 0 ) continue ;                        /* no news    */
01641 
01642       nbuf = iochan_recvloop( ioc_in , buf , MBUF ) ; /* get input! */
01643       if( nbuf <= 0 ) continue ;                      /* too weird! */
01644 
01645 #if 0
01646       fprintf(stderr,"forked process read %d bytes\n",nbuf) ;
01647 #endif
01648 
01649       errno = 0 ;
01650       kk = iochan_writecheck( ioc_out , 1 ) ;         /* check      */
01651       if( kk == 0 ){
01652          int qq ;
01653          fprintf(stderr,"forked writecheck repeat:") ;
01654          for( qq=0 ; qq < 1000 ; qq++ ){
01655            if( qq%50 == 0 ) fprintf(stderr," %d",qq+1) ;
01656            kk = iochan_writecheck( ioc_out , 2 ) ;
01657            if( kk != 0 ) break ;
01658          }
01659          fprintf(stderr,"\n") ;
01660       }
01661       if( kk <= 0 ){
01662          if( errno ) perror( "forked writecheck" ) ;
01663          else        fprintf(stderr,"forked writecheck abort: kk=%d!\n",kk) ;
01664          sss = iochan_error_string() ;
01665          if( sss != NULL ) fprintf(stderr," ** %s\n",sss) ;
01666          break ;
01667       }
01668       kk = iochan_sendall( ioc_out , buf , nbuf ) ;   /* send data! */
01669       if( kk < 0 ){                                   /* bad news?  */
01670          if( errno ) perror( "forked sendall" ) ;
01671          else        fprintf(stderr,"forked sendall abort: kk=%d!\n",kk) ;
01672          sss = iochan_error_string() ;
01673          if( sss != NULL ) fprintf(stderr," ** %s\n",sss) ;
01674          break ;
01675       }
01676 
01677 #if 0
01678       fprintf(stderr,"forked process wrote %d bytes\n",nbuf) ;
01679 #endif
01680    }
01681 
01682    /* bad news ==> shut down child operations */
01683 
01684    fprintf(stderr,"forked process fails!\n") ;
01685 
01686    iochan_close(ioc_in) ; iochan_close(ioc_out) ; _exit(1) ;
01687 }

void iochan_fork_sigfunc int    sig [static]
 

Definition at line 1546 of file thd_iochan.c.

References iochan_close().

Referenced by iochan_fork_relay().

01547 {
01548    switch( sig ){
01549       case SIGTERM:
01550         if( ioc_kill_1 != NULL ) iochan_close(ioc_kill_1) ;
01551         if( ioc_kill_2 != NULL ) iochan_close(ioc_kill_2) ;
01552         fprintf(stderr,"\n*** iochan_fork received SIGTERM signal\n");
01553         fflush(stderr) ;
01554         _exit(1) ;
01555       case SIGSEGV:
01556         if( ioc_kill_1 != NULL ) iochan_close(ioc_kill_1) ;
01557         if( ioc_kill_2 != NULL ) iochan_close(ioc_kill_2) ;
01558         fprintf(stderr,"\n*** iochan_fork received SIGSEGV signal\n");
01559         fflush(stderr) ;
01560         _exit(1) ;
01561    }
01562 }

int iochan_goodcheck IOCHAN   ioc,
int    msec
 

Definition at line 810 of file thd_iochan.c.

References IOCHAN::bad, IOCHAN::bend, IOCHAN::bstart, IOCHAN::buf, IOCHAN::bufsize, CLOSEDOWN, error_string, free, IOCHAN::id, IOCHAN::ioc2, IOC_BAD, iochan_sleep(), MIN, IOCHAN::name, NEXTDMS, IOCHAN::port, shm_accept(), shm_alivecheck(), shm_attach(), SHM_IOCHAN, shm_nattach(), shm_size(), STATUS, tcp_accept(), tcp_alivecheck(), tcp_connect(), TCP_IOCHAN, tcp_readcheck(), and IOCHAN::type.

Referenced by afni_io(), AFNI_plugout_workproc(), AFNI_process_plugout(), iochan_clearcheck(), iochan_fork_relay(), iochan_readcheck(), iochan_recv(), iochan_recvloop(), iochan_send(), iochan_writecheck(), main(), new_RT_input(), RT_check_listen(), and RT_worker().

00811 {
00812    int ii , jj ;
00813    char * bbb ;
00814 
00815    /** check inputs for OK-osity **/
00816 
00817    error_string = NULL ;
00818 
00819    if( ioc == NULL ){
00820       error_string = "iochan_goodcheck: bad input" ; return -1 ;
00821    }
00822 
00823    /** if it was good before, then check if it is still good **/
00824 
00825    if( IOC_BAD(ioc) == 0 ){
00826       int ich = 1 ;
00827 
00828       if( ioc->type == TCP_IOCHAN ){
00829          ich = tcp_alivecheck(ioc->id) ;
00830       } else if( ioc->type == SHM_IOCHAN ){
00831          ich = shm_alivecheck(ioc->id) ;
00832          if( ich && ioc->ioc2 != NULL )
00833             ich = shm_alivecheck(ioc->ioc2->id) ;
00834       }
00835 
00836       if( ich == 0 ){
00837          error_string = "iochan_goodcheck: no longer alive" ; return -1 ;
00838       }
00839       else
00840          return 1 ;
00841    }
00842 
00843    /** wasn't good before, so check if that condition has changed **/
00844 
00845    /** TCP/IP waiting to accept call from another host **/
00846 
00847    if( ioc->bad == TCP_WAIT_ACCEPT ){
00848       ii = tcp_readcheck(ioc->id,msec) ;               /* see if ready      */
00849       if( ii > 0 ){                                    /* if socket  ready  */
00850          STATUS("iochan_goodcheck: try to accept tcp");
00851          jj = tcp_accept( ioc->id , NULL,&bbb ) ;      /* accept connection */
00852          if( jj >= 0 ){                                /* if accept worked  */
00853             STATUS("iochan_goodcheck: accept worked!") ;
00854             CLOSEDOWN( ioc->id ) ;                     /* close old socket  */
00855             strcpy( ioc->name , bbb ) ;                /* put IP into name  */
00856             free(bbb) ; ioc->bad = 0 ; ioc->id = jj ;  /* and ready to go!  */
00857          } else {
00858            STATUS("iochan_goodcheck: accept failed!") ;
00859          }
00860       }
00861    }
00862 
00863    /** TCP/IP waiting to connect call to another host **/
00864 
00865    else if( ioc->bad == TCP_WAIT_CONNECT ){
00866       int dms=0 , ms ;
00867 
00868       if( msec < 0 ) msec = 999999999 ;      /* a long time (11+ days) */
00869       for( ms=0 ; ms < msec ; ms += dms ){
00870          ioc->id  = tcp_connect( ioc->name , ioc->port ) ; /* try to connect to host */
00871          if( ioc->id >= 0 ) break ;                        /* worked? break out      */
00872          dms = NEXTDMS(dms) ; dms = MIN(dms,msec-ms) ; iochan_sleep(dms) ;
00873       }
00874       if( ioc->id < 0 )                                    /* one last try?          */
00875          ioc->id  = tcp_connect( ioc->name , ioc->port ) ;
00876 
00877       if( ioc->id >= 0 ) ioc->bad = 0 ;                    /* succeeded?             */
00878    }
00879 
00880    /** shmem segment waiting for creation (by someone else) **/
00881 
00882    else if( ioc->bad == SHM_WAIT_CREATE ){
00883       int dms=0 , ms ;
00884 
00885       if( msec < 0 ) msec = 999999999 ;      /* a long time (11+ days) */
00886       for( ms=0 ; ms < msec ; ms += dms ){
00887          ioc->id = shm_accept( ioc->name ) ;  /* try to attach to shmem segment */
00888          if( ioc->id >= 0 ) break ;           /* works? break out               */
00889          dms = NEXTDMS(dms) ; dms = MIN(dms,msec-ms) ; iochan_sleep(dms) ;
00890       }
00891       if( ioc->id < 0 )                       /* one last try?                  */
00892          ioc->id = shm_accept( ioc->name ) ;
00893 
00894       if( ioc->id >= 0 ){                                     /* found it?     */
00895          char * bbb ;
00896          bbb          = shm_attach( ioc->id ) ;               /* attach it     */
00897          ioc->bstart  = (int *) bbb ;                         /* set start,    */
00898          ioc->bend    = (int *) (bbb + sizeof(int)) ;         /* end markers   */
00899          ioc->buf     = bbb + 2*sizeof(int) ;                 /* after markers */
00900          ioc->bufsize = shm_size(ioc->id) - 2*sizeof(int) ;   /* get its size  */
00901          ioc->bad     = 0 ;                                   /* mark ready    */
00902       }
00903    }
00904 
00905    /** shmem segment we created waiting for someone else to attach **/
00906 
00907    else if( ioc->bad == SHM_WAIT_ACCEPT ){
00908       int dms=0 , ms ;
00909 
00910       if( msec < 0 ) msec = 999999999 ;      /* a long time (11+ days) */
00911       for( ms=0 ; ms < msec ; ms += dms ){
00912          if( shm_nattach(ioc->id) > 1 ){ ioc->bad = 0 ; break ; }
00913          dms = NEXTDMS(dms) ; dms = MIN(dms,msec-ms) ; iochan_sleep(dms) ;
00914       }
00915       if( ioc->bad && shm_nattach(ioc->id) > 1 ) ioc->bad = 0 ;
00916    }
00917 
00918    /** if there is a second channel, check it too **/
00919 
00920    if( ioc->ioc2 != NULL && ioc->ioc2->bad != 0 )
00921       iochan_goodcheck( ioc->ioc2 , msec ) ;
00922 
00923    return ( IOC_BAD(ioc) == 0 ) ;
00924 }

IOCHAN* iochan_init char *    name,
char *    mode
 

Definition at line 546 of file thd_iochan.c.

References ACCEPTOR, IOCHAN::bad, IOCHAN::bend, IOCHAN::bstart, IOCHAN::buf, IOCHAN::bufsize, CLOSEDOWN, CREATOR, error_string, free, getenv(), IOCHAN::id, IOCHAN::ioc2, iochan_close(), iochan_sleep(), key, malloc, name, IOCHAN::name, IOCHAN::port, IOCHAN::sendsize, shm_accept(), shm_attach(), shm_create(), SHM_IOCHAN, shm_nattach(), shm_RMID_delay, shm_size(), tcp_accept(), tcp_connect(), TCP_IOCHAN, tcp_listen(), tcp_readcheck(), IOCHAN::type, and IOCHAN::whoami.

Referenced by afni_io(), AFNI_plugout_workproc(), AFNI_start_io(), AFNI_start_version_check(), ART_start_io(), iochan_fork_relay(), main(), new_PLUGOUT_spec(), new_RT_input(), open_URL_hpf(), RT_acquire_info(), RT_check_listen(), and RT_start_child().

00547 {
00548    IOCHAN * ioc ;
00549    int do_create , do_accept ;
00550 
00551    /** 12 Dec 2002: check if shm_RMID_delay needs to be set **/
00552 
00553 #ifndef DONT_USE_SHM
00554    { static int first=1 ;
00555      if( first ){
00556        char *eee = getenv("IOCHAN_DELAY_RMID") ;
00557        shm_RMID_delay = ( eee != NULL && (*eee=='Y' || *eee=='y') ) ;
00558        first = 0 ;
00559      }
00560    }
00561 #endif
00562 
00563    /** check if inputs are reasonable **/
00564 
00565    error_string = NULL ;
00566 
00567    if( name == NULL || strlen(name) < 6 || strlen(name) > 127 ){
00568       error_string = "iochan_init: bad name" ; return NULL ;
00569    }
00570 
00571    if( mode == NULL ){
00572       error_string = "iochan_init: bad mode" ; return NULL ;
00573    }
00574 
00575    do_create = (strcmp(mode,"create") == 0 || strcmp(mode,"w") == 0) ;
00576    do_accept = (strcmp(mode,"accept") == 0 || strcmp(mode,"r") == 0) ;
00577 
00578    if( !do_create && !do_accept ){
00579       error_string = "iochan_init: bad mode" ; return NULL ;
00580    }
00581 
00582 #ifdef DEBUG
00583 fprintf(stderr,"iochan_init: name=%s  mode=%s\n",name,mode) ;
00584 #endif
00585 
00586    /***** deal with TCP/IP sockets *****/
00587 
00588    if( strncmp(name,"tcp:",4) == 0 ){
00589       char host[128] , * hend ;
00590       int  port=-1 , ii , jj ;
00591 
00592       /** find "host" substring **/
00593 
00594       hend = strstr( name+4 , ":" ) ;
00595       if( hend == NULL || hend-name > 128 ){
00596          error_string = "iochan_init: bad name" ; return NULL ;
00597       }
00598       for( ii=4 ; name[ii] != ':' ; ii++ ) host[ii-4] = name[ii] ;
00599       host[ii-4] = '\0' ;
00600 
00601       /** get "port" number **/
00602 
00603       port = strtol( name+ii+1 , NULL , 10 ) ;
00604       if( port <= 0 ){
00605          error_string = "iochan_init: bad port" ; return NULL ;
00606       }
00607 
00608       /** initialize IOCHAN **/
00609 
00610       ioc = (IOCHAN *) malloc( sizeof(IOCHAN) ) ;
00611 
00612       ioc->type     = TCP_IOCHAN ;   /* what kind is this? */
00613       ioc->port     = port ;         /* save the port #    */
00614       ioc->bufsize  = 0 ;            /* TCP has no buffer  */
00615       ioc->buf      = NULL ;
00616       ioc->sendsize = 0 ;            /* no upper limit */
00617       ioc->ioc2     = NULL ;         /* TCP has no second channel */
00618 
00619       /** attach to incoming call **/
00620 
00621       if( do_accept ){
00622          ioc->whoami = ACCEPTOR ;                         /* 24 June 1997 */
00623          ioc->id = tcp_listen( port ) ;                   /* set up to listen  */
00624          if( ioc->id < 0 ){                               /* error? must die!  */
00625             error_string = "iochan_init: tcp_listen fails" ;
00626             free(ioc) ; return NULL ;
00627          }
00628          ioc->bad = TCP_WAIT_ACCEPT ;                     /* not connected yet */
00629          ii = tcp_readcheck(ioc->id,1) ;                  /* see if ready      */
00630          if( ii > 0 ){                                    /* if socket  ready  */
00631             jj = tcp_accept( ioc->id , NULL,&hend ) ;     /* accept connection */
00632             if( jj >= 0 ){                                /* if accept worked  */
00633                CLOSEDOWN( ioc->id ) ;                     /* close old socket  */
00634                strcpy( ioc->name , hend ) ;               /* put IP into name  */
00635                free(hend) ; ioc->bad = 0 ; ioc->id = jj ; /* and ready to go!  */
00636             }
00637          }
00638          return ioc ;
00639       }
00640 
00641       /** place an outgoing call **/
00642 
00643       if( do_create ){
00644          struct hostent * hostp ;
00645          ioc->whoami = CREATOR ;                           /* 24 June 1997 */
00646          hostp = gethostbyname(host) ;                     /* lookup host on net */
00647          if( hostp == NULL ){                              /* fails? must die!   */
00648              error_string = "iochan_init: gethostbyname fails" ;
00649              free(ioc) ; return NULL ;
00650          }
00651          ioc->id  = tcp_connect( host , port ) ;           /* connect to host    */
00652          ioc->bad = (ioc->id < 0) ? TCP_WAIT_CONNECT : 0 ; /* fails? must wait   */
00653          strcpy( ioc->name , host ) ;                      /* save the host name */
00654          return ioc ;
00655       }
00656       return NULL ;  /* should never be reached */
00657    }
00658 
00659    /***** deal with shared memory segments *****/
00660 
00661    if( strncmp(name,"shm:",4) == 0 ){
00662       char key[128] , * kend , shm2[256] ;
00663       int  size=-1 , ii , jj , size2=-1 ;
00664 
00665 #ifdef DONT_USE_SHM
00666       return NULL ;        /* 18 Dec 2002 */
00667 #endif
00668 
00669       /** get keystring **/
00670 
00671       kend = strstr( name+4 , ":" ) ;
00672       if( kend == NULL || kend-name > 128 ){
00673          error_string = "iochan_init: bad name" ; return NULL ;
00674       }
00675       for( ii=4 ; name[ii] != ':' ; ii++ ) key[ii-4] = name[ii] ;
00676       key[ii-4] = '\0' ;
00677 
00678       /** get size **/
00679 
00680       size = strtol( name+ii+1 , &kend , 10 ) ;
00681       if( size < 0 || (size == 0 && do_create) ){
00682          error_string = "iochan_init: bad size" ; return NULL ;
00683       }
00684            if( *kend == 'K' || *kend == 'k' ){ size *= 1024      ; kend++ ; }
00685       else if( *kend == 'M' || *kend == 'm' ){ size *= 1024*1024 ; kend++ ; }
00686 
00687       /** 24 June 1997: get second size **/
00688 
00689       if( *kend == '+' ){
00690          size2 = strtol( kend+1 , &kend , 10 ) ;
00691          if( size2 < 0 || (size2 == 0 && do_create) ){
00692             error_string = "iochan_init: bad size2" ; return NULL ;
00693          }
00694               if( *kend == 'K' || *kend == 'k' ){ size2 *= 1024      ; kend++ ; }
00695          else if( *kend == 'M' || *kend == 'm' ){ size2 *= 1024*1024 ; kend++ ; }
00696 
00697          sprintf(shm2,"shm:%s++:%d",key,size2) ;  /* second channel spec */
00698       } else {
00699          shm2[0] = '\0' ;                         /* no second channel */
00700       }
00701 
00702       /** initialize IOCHAN **/
00703 
00704       ioc = (IOCHAN *) malloc( sizeof(IOCHAN) ) ;
00705 
00706       ioc->type = SHM_IOCHAN ;     /* what type is this? */
00707       strcpy( ioc->name , key ) ;  /* save the key name  */
00708       ioc->ioc2 = NULL ;           /* maybe reset below? */
00709 
00710       /** open the second channel, if any **/
00711 
00712       if( shm2[0] != '\0' ){
00713          ioc->ioc2 = iochan_init( shm2 , mode ) ;
00714          if( ioc->ioc2 == NULL ){
00715             error_string = "iochan_init: can't open shm2" ;
00716             free(ioc) ; return NULL ;
00717          }
00718 #ifdef DEBUG
00719          fprintf(stderr,"iochan_init: input=%s shm2=%s\n",name,shm2) ;
00720 #endif
00721       }
00722 
00723       /** attach to existing shmem segment **/
00724 
00725       if( do_accept ){
00726          ioc->whoami = ACCEPTOR ;          /* 24 June 1997 */
00727          for( ii=0 ; ii < 2 ; ii++ ){      /* try to find segment */
00728             ioc->id = shm_accept( key ) ;  /* several times       */
00729             if( ioc->id >= 0 ) break ;     /* works? break out    */
00730             iochan_sleep(1) ;              /* wait 1 millisecond  */
00731          }
00732          if( ioc->id < 0 ) ioc->id = shm_accept( key ) ; /* 1 last try? */
00733 
00734          if( ioc->id < 0 ){                       /* failed to find segment? */
00735             ioc->bad = SHM_WAIT_CREATE ;          /* mark for waiting        */
00736 
00737          } else {                                                /* found it?     */
00738             char * bbb ;
00739             bbb = shm_attach( ioc->id ) ;                        /* attach it     */
00740             if( bbb == NULL ){                                   /* can't? quit   */
00741                error_string = "iochan_init: shm_attach fails" ;
00742                iochan_close(ioc) ; return NULL ;
00743             }
00744             ioc->bstart  = (int *) bbb ;                         /* set start,    */
00745             ioc->bend    = (int *) (bbb + sizeof(int)) ;         /* end markers   */
00746             ioc->buf     = bbb + 2*sizeof(int) ;                 /* after markers */
00747             ioc->bufsize = shm_size(ioc->id) - 2*sizeof(int) ;   /* get its size  */
00748             if( ioc->bufsize <= 0 ){                             /* can't? quit   */
00749                error_string = "iochan_init: bufsize < 0" ;
00750                iochan_close(ioc) ; return NULL ;
00751             }
00752             ioc->bad = 0 ;                                       /* mark ready    */
00753          }
00754          return ioc ;
00755       }
00756 
00757       /** make a new shmem segment **/
00758 
00759       if( do_create ){
00760          char * bbb ;
00761          ioc->whoami = CREATOR ;                             /* 24 June 1997*/
00762          size    = size + 1 ;                                /* extra byte  */
00763          ioc->id = shm_create( key , size+2*sizeof(int) ) ;  /* create it   */
00764          if( ioc->id < 0 ){                                  /* can't? quit */
00765             error_string = "iochan_init: shm_create fails" ;
00766             iochan_close(ioc->ioc2) ; free(ioc) ; return NULL ;
00767          }
00768          bbb = shm_attach( ioc->id ) ;                       /* attach it   */
00769          if( bbb == NULL ){                                  /* can't? quit */
00770             error_string = "iochan_init: shm_attach fails" ;
00771             iochan_close(ioc) ; free(ioc) ; return NULL ;
00772          }
00773          ioc->bstart    = (int *) bbb ;                      /* init start, */
00774          ioc->bend      = (int *) (bbb + sizeof(int)) ;      /* end markers */
00775          *(ioc->bstart) = 0 ;
00776          *(ioc->bend)   = size-1 ;
00777          ioc->buf       = bbb + 2*sizeof(int) ;              /* I/O buffer  */
00778          ioc->bufsize   = size ;                             /* buffer size */
00779          ioc->bad       = (shm_nattach(ioc->id) < 2)         /* ready if    */
00780                           ?  SHM_WAIT_ACCEPT                 /* both are    */
00781                           :  0 ;                             /* attached    */
00782          return ioc ;
00783       }
00784       return NULL ;  /* should never be reached */
00785    }
00786 
00787    return NULL ;  /* should never be reached */
00788 }

int iochan_readcheck IOCHAN   ioc,
int    msec
 

Definition at line 974 of file thd_iochan.c.

References IOCHAN::bend, IOCHAN::bstart, IOCHAN::bufsize, error_string, IOCHAN::id, iochan_goodcheck(), iochan_sleep(), MIN, NEXTDMS, SHM_IOCHAN, SHMIOC_READ, tcp_alivecheck(), TCP_IOCHAN, tcp_readcheck(), and IOCHAN::type.

Referenced by afni_io(), AFNI_process_plugout(), AFNI_version_check(), iochan_force_clear(), iochan_fork_relay(), main(), new_RT_input(), open_URL_hpf(), read_URL_http(), RT_check_listen(), RT_process_data(), and RT_worker().

00975 {
00976    int ii ;
00977 
00978    /** check if the IOCHAN is good **/
00979 
00980    error_string = NULL ;
00981 
00982    ii = iochan_goodcheck(ioc,0) ;
00983    if( ii == -1 ) return -1 ;            /* some error */
00984    if( ii == 0  ){                       /* not good yet */
00985       ii = iochan_goodcheck(ioc,msec) ;  /* so wait for it to get good */
00986       if( ii != 1 ) return 0 ;           /* if still not good, exit */
00987    }
00988 
00989    /** tcp: ==> just use the Unix "select" mechanism **/
00990 
00991    if( ioc->type == TCP_IOCHAN ){
00992       ii = tcp_alivecheck( ioc->id ) ; if( !ii ) return -1 ;
00993       ii = tcp_readcheck( ioc->id , msec ) ;
00994       if( ii < 0 ) error_string = "iochan_readcheck: socket is bad" ;
00995       return ii ;
00996    }
00997 
00998    /** shm: ==> must loop and wait ourselves **/
00999 
01000    if( ioc->type == SHM_IOCHAN ){
01001       int nread , dms=0 , ms ;
01002 
01003       if( msec < 0 ) msec = 999999999 ;      /* a long time (11+ days) */
01004 
01005       /** Compute the number of readable bytes into nread.  This routine
01006           should be called by the "reading" process.  It will then
01007           be waiting until the "writing" process increments ioc->bend.   **/
01008 
01009       ioc = SHMIOC_READ(ioc) ;  /* 24 June 1997 */
01010 
01011       for( ms=0 ; ms < msec ; ms += dms ){
01012          nread = (*(ioc->bend) - *(ioc->bstart) + ioc->bufsize + 1) % (ioc->bufsize) ;
01013          if( nread > 0 ) return nread ;
01014          dms = NEXTDMS(dms) ; dms = MIN(dms,msec-ms) ; iochan_sleep(dms) ;
01015          ii = iochan_goodcheck(ioc,0) ; if( ii == -1 ) return -1 ;
01016       }
01017       nread = (*(ioc->bend) - *(ioc->bstart) + ioc->bufsize + 1) % (ioc->bufsize) ;
01018       if( nread > 0 ) return nread ;
01019       return 0 ;
01020    }
01021 
01022    return -1 ;  /* should never be reached */
01023 }

int iochan_recv IOCHAN   ioc,
char *    buffer,
int    nbytes
 

check for reasonable inputs *

Definition at line 1319 of file thd_iochan.c.

References BCOPY, IOCHAN::bend, IOCHAN::bstart, IOCHAN::buf, IOCHAN::bufsize, error_string, IOCHAN::id, IOC_BAD, iochan_goodcheck(), PERROR, SHM_IOCHAN, SHMIOC_READ, TCP_IOCHAN, tcp_recv, and IOCHAN::type.

Referenced by afni_io(), AFNI_plugout_workproc(), AFNI_process_plugout(), AFNI_version_check(), iochan_force_clear(), iochan_recvall(), iochan_recvloop(), main(), new_RT_input(), read_URL_http(), and RT_worker().

01320 {
01321    /** check for reasonable inputs **/
01322 
01323    error_string = NULL ;
01324 
01325    if( ioc    == NULL || IOC_BAD(ioc) != 0 ||
01326        buffer == NULL || nbytes < 0          ){
01327 
01328       error_string = "iochan_recv: bad inputs" ; return -1 ;
01329    }
01330 
01331    if( nbytes == 0 ) return 0 ;
01332    if( iochan_goodcheck(ioc,0) != 1 ) return -1 ;
01333 
01334    /** tcp: just use recv **/
01335 
01336    if( ioc->type == TCP_IOCHAN ){
01337       int ii = tcp_recv( ioc->id , buffer , nbytes , 0 ) ;
01338       if( ii == -1 ){
01339          PERROR("Can't read from socket? tcp[recv]") ;
01340          error_string = "iochan_recv: tcp recv fails" ;
01341       }
01342       return ii ;
01343    }
01344 
01345    /** shm: read from the circular buffer, starting at bstart **/
01346 
01347    if( ioc->type == SHM_IOCHAN ){
01348       int nread, bend,bstart , size , sbot,stop ;
01349 
01350       ioc = SHMIOC_READ(ioc) ;  /* 24 June 1997 */
01351 
01352       bend  = *(ioc->bend) ; bstart = *(ioc->bstart) ; size = ioc->bufsize ;
01353       nread = ( bend - bstart + size + 1 ) % size ;    /* readable amount */
01354       if( nread <= 0 ) return 0 ;                      /* nothing!?       */
01355       if( nread > nbytes ) nread = nbytes ;            /* amount to read  */
01356 
01357       sbot = bstart ; stop = sbot + nread-1 ;          /* from sbot to stop */
01358 
01359       if( stop < size ){                             /* 1 piece to copy */
01360          BCOPY( buffer, ioc->buf + sbot, nread ) ;   /* copy the data   */
01361          *(ioc->bstart) = (stop+1) % size ;          /* move bstart up  */
01362 #ifdef DEBUG
01363 fprintf(stderr,"iochan_recv: get 1 piece:  %d to %d\n",sbot,stop) ;
01364 #endif
01365 
01366       } else {                                             /* 2 pieces to copy */
01367          int nn = size - sbot ;                            /* size of piece 1  */
01368          BCOPY( buffer   , ioc->buf + sbot, nn        ) ;  /* copy piece 1     */
01369          BCOPY( buffer+nn, ioc->buf       , nread-nn  ) ;  /* copy piece 2     */
01370          *(ioc->bstart) = nread-nn ;                       /* move bstart up   */
01371 #ifdef DEBUG
01372 fprintf(stderr,"iochan_recv: get 2 pieces: %d to %d AND %d to %d\n",
01373         sbot,sbot+nn-1,0,nread-nn-1) ;
01374 #endif
01375 
01376       }
01377       return nread ;
01378    }
01379 
01380    return -1 ;  /* should not be reached */
01381 }

int iochan_recvall IOCHAN   ioc,
char *    buffer,
int    nbytes
 

check for reasonable inputs *

Definition at line 1425 of file thd_iochan.c.

References error_string, IOC_BAD, iochan_recv(), iochan_sleep(), and NEXTDMS.

Referenced by afni_io(), main(), and RT_read_image().

01426 {
01427    int ii , ntot=0 , dms=0 ;
01428 
01429    /** check for reasonable inputs **/
01430 
01431    error_string = NULL ;
01432 
01433    if( ioc    == NULL || IOC_BAD(ioc) != 0 ||
01434        buffer == NULL || nbytes < 0          ){
01435 
01436       error_string = "iochan_recvall: bad inputs" ; return -1 ;
01437    }
01438 
01439    if( nbytes == 0 ) return 0 ;
01440 
01441    while(1){
01442       ii = iochan_recv( ioc , buffer+ntot , nbytes-ntot ) ;  /* get what's left */
01443       if( ii == -1 ) return -1 ;                             /* an error!?      */
01444       ntot += ii ;                                           /* total so far    */
01445       if( ntot == nbytes ) return nbytes ;                   /* all done!?      */
01446       dms = NEXTDMS(dms) ; iochan_sleep(dms) ;               /* wait a while    */
01447    }
01448    return -1 ;   /* should never be reached */
01449 }

int iochan_recvloop IOCHAN   ioc,
char *    buffer,
int    nbytes
 

shm: read from the circular buffer, starting at bstart *

Definition at line 1388 of file thd_iochan.c.

References error_string, IOC_BAD, iochan_goodcheck(), iochan_recv(), and iochan_sleep().

Referenced by iochan_fork_relay().

01389 {
01390    int jj , nbuf=0 ;
01391 
01392    error_string = NULL ;
01393 
01394    /** check for reasonable inputs **/
01395 
01396    if( ioc    == NULL || IOC_BAD(ioc) != 0 ||
01397        buffer == NULL || nbytes < 0          ){
01398 
01399       error_string = "iochan_recvloop: bad inputs" ; return -1 ;
01400    }
01401 
01402    if( iochan_goodcheck(ioc,0) != 1 ) return -1 ;
01403 
01404    if( nbytes == 0 ) return 0 ;
01405 
01406    while(1){
01407       jj = iochan_recv( ioc , buffer+nbuf , nbytes-nbuf ) ;
01408       if( jj < 1 ) break ;  /* stop if nothing more comes in */
01409       nbuf += jj ;
01410       if( nbuf >= nbytes ) break ;  /* stop if overflow */
01411       iochan_sleep(1) ;
01412    }
01413 
01414    return nbuf ;
01415 }

int iochan_send IOCHAN   ioc,
char *    buffer,
int    nbytes
 

This routine is called by the "writing" process. It will wait until the reading process increments ioc->bstart. *

Definition at line 1176 of file thd_iochan.c.

References BCOPY, IOCHAN::bend, IOCHAN::bstart, IOCHAN::buf, IOCHAN::bufsize, error_string, IOCHAN::id, IOC_BAD, iochan_goodcheck(), iochan_writecheck(), MIN, nosigpipe, PERROR, send, IOCHAN::sendsize, SHM_IOCHAN, SHMIOC_WRITE, TCP_IOCHAN, tcp_writecheck(), and IOCHAN::type.

Referenced by iochan_sendall().

01177 {
01178    int ii ;
01179 
01180    /** check for reasonable inputs **/
01181 
01182    error_string = NULL ;
01183 
01184    if( ioc    == NULL || IOC_BAD(ioc) != 0 ||
01185        buffer == NULL || nbytes < 0          ){
01186 
01187      error_string = "iochan_send: bad inputs" ; return -1 ;
01188    }
01189 
01190    if( nbytes == 0 ) return 0 ;
01191 
01192    ii = iochan_goodcheck(ioc,0) ;
01193    if( ii != 1 ){
01194       if( error_string == NULL )
01195          error_string = "iochan_send: iochan_goodcheck fails" ;
01196       return ii ;
01197    }
01198 
01199    ii = iochan_writecheck(ioc,1) ;
01200    if( ii <= 0 ){
01201       if( error_string == NULL )
01202          error_string = "iochan_send: iochan_writecheck fails" ;
01203       return ii ;
01204    }
01205 
01206    /** tcp: ==> just use send **/
01207 
01208    if( ioc->type == TCP_IOCHAN ){
01209       if( !nosigpipe ){ signal( SIGPIPE , SIG_IGN ) ; nosigpipe = 1 ; }
01210 
01211       if( ioc->sendsize <= 0 || nbytes <= ioc->sendsize ){
01212          int nsent = send( ioc->id , buffer , nbytes , 0 ) ;
01213          if( nsent == -1 ) PERROR("Can't use socket? tcp[send]") ;
01214          if( nsent < 0 ) error_string = "iochan_send: tcp send fails" ;
01215          return nsent ;
01216       } else {
01217          int nsent , ntosend , ntot = 0 ;
01218          do{
01219             while( tcp_writecheck(ioc->id,1) == 0 ) ;      /* spin */
01220             ntosend = MIN( ioc->sendsize , nbytes-ntot ) ;
01221             nsent   = send( ioc->id , buffer+ntot , ntosend , 0 ) ;
01222             if( nsent == -1 ) PERROR("Can't use socket? tcp[send]") ;
01223             if( nsent <= 0 ){
01224                error_string = "iochan_send: tcp send fails" ;
01225                return ((ntot>0) ? ntot : nsent) ;
01226             }
01227             ntot += nsent ;
01228          } while( ntot < nbytes ) ;
01229          return ntot ;
01230       }
01231    }
01232 
01233    /** shm: ==> write into the circular buffer, past "bend" **/
01234 
01235    if( ioc->type == SHM_IOCHAN ){
01236       int nread,nwrite , bend,bstart , ebot,etop , size ;
01237 
01238       ioc = SHMIOC_WRITE(ioc) ;  /* 24 June 1997 */
01239 
01240       bend   = *(ioc->bend) ; bstart = *(ioc->bstart) ; size = ioc->bufsize ;
01241       nread  = ( bend - bstart + size + 1 ) % size ;  /* amount readable  */
01242       nwrite = size - 1 - nread ;                     /* amount writeable */
01243       if( nwrite <= 0 ) return 0 ;                    /* can't write!     */
01244 
01245       if( nwrite > nbytes ) nwrite = nbytes ;         /* how much to write */
01246 
01247       ebot = bend+1 ; if( ebot >= size ) ebot = 0 ;   /* start at ebot */
01248       etop = ebot+nwrite-1 ;                          /* end at etop */
01249 
01250       if( etop < size ){                              /* 1 piece to copy */
01251          BCOPY( ioc->buf + ebot, buffer, nwrite ) ;   /* copy data       */
01252          *(ioc->bend) = etop ;                        /* change bend     */
01253 #ifdef DEBUG
01254 fprintf(stderr,"iochan_send: shm 1 piece:  %d to %d\n",ebot,etop) ;
01255 #endif
01256 
01257       } else {                                             /* 2 pieces to copy */
01258          int nn = size - ebot ;                            /* size of piece 1  */
01259          BCOPY( ioc->buf + ebot, buffer   , nn        ) ;  /* copy piece 1     */
01260          BCOPY( ioc->buf       , buffer+nn, nwrite-nn ) ;  /* copy piece 2     */
01261          *(ioc->bend) = nwrite-nn-1 ;                      /* change bend      */
01262 #ifdef DEBUG
01263 fprintf(stderr,"iochan_send: shm 2 pieces: %d to %d AND %d to %d\n",
01264         ebot,ebot+nn-1,0,nwrite-nn-1) ;
01265 #endif
01266 
01267       }
01268       return nwrite ;
01269    }
01270 
01271    return -1 ;  /* should not be reached */
01272 }

int iochan_sendall IOCHAN   ioc,
char *    buffer,
int    nbytes
 

shm: ==> write into the circular buffer, past "bend" *

Definition at line 1282 of file thd_iochan.c.

References error_string, IOC_BAD, iochan_send(), iochan_sleep(), and NEXTDMS.

Referenced by afni_io(), AFNI_send_image(), AFNI_start_io(), AFNI_start_version_check(), ART_send_control_info(), ART_send_end_of_run(), ART_send_volume(), ART_start_io(), iochan_fork_relay(), main(), open_URL_hpf(), and RT_acquire_info().

01283 {
01284    int ii , ntot=0 , dms=0 ;
01285 
01286    error_string = NULL ;
01287 
01288    /** check for reasonable inputs **/
01289 
01290    if( ioc    == NULL || IOC_BAD(ioc) != 0 ||
01291        buffer == NULL || nbytes < 0          ){
01292 
01293       error_string = "iochan_sendall: bad inputs" ; return -1 ;
01294    }
01295 
01296    if( nbytes == 0 ) return 0 ;
01297 
01298    while(1){
01299       ii = iochan_send( ioc , buffer+ntot , nbytes-ntot ); /* send what's left  */
01300       if( ii == -1 ){                                      /* an error!?        */
01301          if( error_string == NULL )
01302             error_string = "iochan_sendall: iochan_send fails" ;
01303          return -1 ;
01304       }
01305       ntot += ii ;                                         /* total sent so far */
01306       if( ntot == nbytes ) return nbytes ;                 /* all done!?        */
01307       dms = NEXTDMS(dms) ; iochan_sleep(dms) ;             /* wait a while      */
01308    }
01309    return -1 ;   /* should never be reached */
01310 }

void iochan_set_cutoff IOCHAN   ioc
 

Definition at line 955 of file thd_iochan.c.

References IOCHAN::id, TCP_IOCHAN, tcp_set_cutoff(), and IOCHAN::type.

Referenced by afni_io(), AFNI_plugout_exit(), AFNI_plugout_workproc(), and open_URL_hpf().

00956 {
00957    if( ioc == NULL ) return ;
00958 
00959    if( ioc->type == TCP_IOCHAN && ioc->id >= 0 ) tcp_set_cutoff( ioc->id ) ;
00960    return ;
00961 }

void iochan_sleep int    msec
 

check for reasonable inputs *

Definition at line 1455 of file thd_iochan.c.

Referenced by AFNI_exit(), afni_io(), AFNI_plugout_workproc(), AFNI_send_image(), AFNI_start_io(), AFNI_start_version_check(), AFNI_startup_timeout_CB(), ART_open_afni_link(), ART_send_control_info(), ART_send_end_of_run(), ART_start_io(), calculate_results(), find_more_volumes(), iochan_clearcheck(), iochan_fork_relay(), iochan_goodcheck(), iochan_init(), iochan_readcheck(), iochan_recvall(), iochan_recvloop(), iochan_sendall(), iochan_writecheck(), main(), MAIN_workprocess(), new_PLUGOUT_spec(), new_RT_input(), RT_acquire_info(), RT_worker(), THD_fetch_dataset(), and THD_fetch_many_datasets().

01456 {
01457    struct timeval tv ;
01458    if( msec <= 0 ) return ;
01459    tv.tv_sec  = msec/1000 ;
01460    tv.tv_usec = (msec%1000)*1000 ;
01461    select( 1 , NULL,NULL,NULL , &tv ) ;
01462    return ;
01463 }

int iochan_writecheck IOCHAN   ioc,
int    msec
 

shm: ==> must loop and wait ourselves *

Definition at line 1110 of file thd_iochan.c.

References IOCHAN::bend, IOCHAN::bstart, IOCHAN::bufsize, error_string, IOCHAN::id, iochan_goodcheck(), iochan_sleep(), MIN, NEXTDMS, SHM_IOCHAN, SHMIOC_WRITE, TCP_IOCHAN, tcp_writecheck(), and IOCHAN::type.

Referenced by afni_io(), AFNI_send_image(), AFNI_start_io(), AFNI_start_version_check(), ART_start_io(), iochan_fork_relay(), iochan_send(), main(), open_URL_hpf(), and RT_acquire_info().

01111 {
01112    int ii ;
01113 
01114    /** check if the IOCHAN is good **/
01115 
01116    error_string = NULL ;
01117 
01118    ii = iochan_goodcheck(ioc,0) ;
01119    if( ii == -1 ) return -1 ;            /* some error */
01120    if( ii == 0  ){                       /* not good yet */
01121       ii = iochan_goodcheck(ioc,msec) ;  /* so wait for it to get good */
01122       if( ii != 1 ) return ii ;          /* if still not good, exit */
01123    }
01124 
01125    /** tcp: ==> just use the Unix "select" mechanism **/
01126 
01127    if( ioc->type == TCP_IOCHAN ){
01128       ii = tcp_writecheck( ioc->id , msec ) ;
01129       if( ii == -1 ) error_string = "iochan_writecheck: socket not ready" ;
01130       return ii ;
01131    }
01132 
01133    /** shm: ==> must loop and wait ourselves **/
01134 
01135    if( ioc->type == SHM_IOCHAN ){
01136       int nread , dms=0 , ms , nwrite ;
01137 
01138       if( msec < 0 ) msec = 999999999 ;      /* a long time (11+ days) */
01139 
01140       ioc = SHMIOC_WRITE(ioc) ;  /* 24 June 1997 */
01141 
01142       /** This routine is called by the "writing" process.  It will
01143           wait until the reading process increments ioc->bstart.    **/
01144 
01145       for( ms=0 ; ms < msec ; ms += dms ){
01146          nread = (*(ioc->bend) - *(ioc->bstart) + ioc->bufsize + 1) % (ioc->bufsize) ;
01147          nwrite = ioc->bufsize - 1 - nread ;
01148          if( nwrite > 0 ) return nwrite ;
01149          dms = NEXTDMS(dms) ; dms = MIN(dms,msec-ms) ; iochan_sleep(dms) ;
01150          ii = iochan_goodcheck(ioc,0) ; if( ii == -1 ) return -1 ;
01151       }
01152       nread = (*(ioc->bend) - *(ioc->bstart) + ioc->bufsize + 1) % (ioc->bufsize) ;
01153       nwrite = ioc->bufsize - 1 - nread ;
01154       if( nwrite > 0 ) return nwrite ;
01155       return 0 ;
01156    }
01157 
01158    return -1 ;  /* should never be reached */
01159 }

int shm_accept char *    key_string
 

Definition at line 410 of file thd_iochan.c.

References key, and string_to_key().

Referenced by iochan_goodcheck(), and iochan_init().

00411 {
00412    key_t key ;
00413    int   shmid ;
00414 
00415    key   = string_to_key( key_string ) ;
00416    shmid = shmget( key , 0 , 0777 ) ;
00417    return shmid ;
00418 }

int shm_alivecheck int    shmid
 

make a new shmem segment *

Definition at line 795 of file thd_iochan.c.

References shm_nattach().

Referenced by iochan_goodcheck().

00796 {
00797    if( shmid < 0 ) return 0 ;
00798    return (shm_nattach(shmid) >= 2) ;
00799 }

char* shm_attach int    shmid
 

Definition at line 446 of file thd_iochan.c.

References PERROR.

Referenced by iochan_goodcheck(), iochan_init(), proc_finalize_shm_volumes(), and THD_alloc_datablock().

00447 {
00448    char * adr ;
00449    adr = (char *) shmat( shmid , NULL , 0 ) ;
00450    if( adr == (char *) -1 ){
00451      adr = NULL ; PERROR("Can't attach? shm_attach") ;
00452    }
00453    return adr ;
00454 }

int shm_create char *    key_string,
int    size
 

Definition at line 425 of file thd_iochan.c.

References key, PERROR, and string_to_key().

Referenced by iochan_init(), proc_finalize_shm_volumes(), and THD_alloc_datablock().

00426 {
00427    key_t key ;
00428    int   shmid ;
00429 
00430    key   = string_to_key( key_string ) ;
00431    shmid = shmget( key , size , 0777 | IPC_CREAT ) ;
00432    if( shmid < 0 ){
00433      PERROR("Can't create? shm_create") ;
00434      if( pron ) fprintf(stderr,"key_string=%s key=%d size=%d\n",
00435                         key_string , (int)key , size ) ;
00436    }
00437    return shmid ;
00438 }

int shm_nattach int    shmid
 

Definition at line 477 of file thd_iochan.c.

References PERROR, and STATUS.

Referenced by iochan_close(), iochan_goodcheck(), iochan_init(), and shm_alivecheck().

00478 {
00479    int ii ;
00480    struct shmid_ds buf ;
00481 
00482    if( shmid < 0 ){ STATUS("shm_nattach: illegal shmid") ; return -1 ; }
00483    errno = 0 ;
00484    ii = shmctl( shmid , IPC_STAT , &buf ) ;
00485    if( ii < 0 ){
00486      PERROR("Has shared memory buffer gone bad? shm_nattach") ;
00487      return -1 ;
00488    }
00489    return buf.shm_nattch ;
00490 }

int shm_size int    shmid
 

Definition at line 461 of file thd_iochan.c.

References PERROR.

Referenced by iochan_goodcheck(), and iochan_init().

00462 {
00463    int ii ;
00464    struct shmid_ds buf ;
00465 
00466    if( shmid < 0 ) return -1 ;
00467    ii = shmctl( shmid , IPC_STAT , &buf ) ;
00468    if( ii < 0 ){ PERROR("Can't check? shm_size");  return -1; }
00469    return buf.shm_segsz ;
00470 }

key_t string_to_key char *    key_string
 

get address of connector *

Definition at line 388 of file thd_iochan.c.

Referenced by shm_accept(), and shm_create().

00389 {
00390    int ii , sum ;
00391 
00392    sum = 666 ;
00393    if( key_string == NULL ) return (key_t) sum ;
00394 
00395    for( ii=0 ; key_string[ii] != '\0' ; ii++ )
00396       sum += ((int)key_string[ii]) << ((ii%3)*8) ;
00397 
00398         if( sum  < 0 ) sum = -sum      ;
00399    else if( sum == 0 ) sum = 314159265 ;
00400 
00401    return (key_t) sum ;
00402 }

int tcp_accept int    sd,
char **    hostname,
char **    hostaddr
 

set port on remote computer *

Definition at line 338 of file thd_iochan.c.

References malloc, and PERROR.

Referenced by iochan_goodcheck(), iochan_init(), NI_stream_goodcheck(), and NI_stream_open().

00339 {
00340    struct sockaddr_in pin ;
00341    int addrlen , sd_new ;
00342    struct hostent * hostp ;
00343    char * sout , * str ;
00344 
00345    /** accept the connection **/
00346 
00347    /** STATUS("tcp_accept: about to call accept") ; **/
00348 
00349    addrlen = sizeof(pin) ;
00350    sd_new = accept( sd , (struct sockaddr *)&pin , &addrlen ) ;
00351    if( sd_new == -1 ){ PERROR("Can't accept? tcp_accept"); return -1; }
00352 
00353    /** get name of connector **/
00354 
00355    if( hostname != NULL ){
00356       hostp = gethostbyaddr( (char *) (&pin.sin_addr) ,
00357                              sizeof(struct in_addr) , AF_INET ) ;
00358       if( hostp != NULL ){
00359          sout = (char *) malloc( strlen(hostp->h_name)+1 ) ;
00360          strcpy(sout,hostp->h_name) ;
00361       } else {
00362          sout = (char *) malloc( strlen("UNKNOWN")+1 ) ;
00363          strcpy(sout,"UNKNOWN") ;
00364       }
00365       *hostname = sout ;
00366    }
00367 
00368    /** get address of connector **/
00369 
00370    if( hostaddr != NULL ){
00371       str = inet_ntoa( pin.sin_addr ) ;
00372       sout = (char *) malloc( strlen(str)+1 ) ;
00373       strcpy(sout,str) ;
00374       *hostaddr = sout ;
00375    }
00376 
00377    return sd_new ;
00378 }

int tcp_alivecheck int    sd
 

Definition at line 203 of file thd_iochan.c.

References PERROR, tcp_readcheck(), and tcp_recv.

Referenced by iochan_goodcheck(), iochan_readcheck(), NI_stream_goodcheck(), and NI_stream_readcheck().

00204 {
00205    int ii ;
00206    char bbb[4] ;
00207 
00208    ii = tcp_readcheck(sd,0) ;                 /* can I read?          */
00209    if( ii == 0 ) return 1 ;                   /* can't read is OK     */
00210    if( ii <  0 ) return 0 ;                   /* some error is bad    */
00211    errno = 0 ;
00212    ii = tcp_recv( sd , bbb , 1 , MSG_PEEK ) ; /* try to read one byte */
00213    if( ii == 1 ) return 1 ;                   /* if we get it, good   */
00214    if( errno ) PERROR("Socket gone bad? tcp_alivecheck") ;
00215    return 0 ;                                 /* no data ==> death!   */
00216 }

int tcp_connect char *    host,
int    port
 

Definition at line 223 of file thd_iochan.c.

References CLOSEDOWN, l, PERROR, socket, SOCKET_BUFSIZE, and STATUS.

Referenced by iochan_goodcheck(), iochan_init(), NI_stream_goodcheck(), and NI_stream_open().

00224 {
00225    int sd , l ;
00226    struct sockaddr_in sin ;
00227    struct hostent *   hostp ;
00228 
00229    if( host == NULL || port < 1 ){ STATUS("tcp_connect: illegal inputs") ; return -1 ; }
00230 
00231    /** open a socket **/
00232 
00233    sd = socket( AF_INET , SOCK_STREAM , 0 ) ;
00234    if( sd == -1 ){ PERROR("Can't create? tcp_connect[socket]"); return -1; }
00235 
00236    /** set socket options (no delays, large buffers) **/
00237 
00238 #if 0
00239    l = 1;
00240    setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (void *)&l, sizeof(int)) ;
00241 #endif
00242    l = SOCKET_BUFSIZE ;
00243    setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (void *)&l, sizeof(int)) ;
00244    setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (void *)&l, sizeof(int)) ;
00245 
00246    /** set port on remote computer **/
00247 
00248    memset( &sin , 0 , sizeof(sin) ) ;
00249    sin.sin_family = AF_INET ;
00250    sin.sin_port   = htons(port) ;
00251 
00252    /** set remote computer **/
00253 
00254    hostp = gethostbyname(host) ;
00255    if( hostp == NULL ){
00256       PERROR("Can't lookup? tcp_connect[gethostbyname]"); CLOSEDOWN(sd); return -1;
00257    }
00258    sin.sin_addr.s_addr = ((struct in_addr *)(hostp->h_addr))->s_addr ;
00259 
00260    if( connect(sd , (struct sockaddr *)&sin , sizeof(sin)) == -1 ){
00261       PERROR("Can't connect? tcp_connect[connect]") ; CLOSEDOWN(sd); return -1;
00262    }
00263 
00264    return sd ;
00265 }

int tcp_listen int    port
 

set remote computer *

Definition at line 276 of file thd_iochan.c.

References CLOSEDOWN, l, PERROR, socket, SOCKET_BUFSIZE, and STATUS.

Referenced by iochan_init(), and NI_stream_open().

00277 {
00278    int sd , l ;
00279    struct sockaddr_in sin ;
00280 
00281    if( port < 1 ){ STATUS("tcp_listen: illegal port") ; return -1 ; }
00282 
00283    /** open a socket **/
00284 
00285    sd = socket( AF_INET , SOCK_STREAM , 0 ) ;
00286    if( sd == -1 ){ PERROR("Can't create? tcp_listen[socket]"); return -1; }
00287 
00288    /** set socket options (no delays, large buffers) **/
00289 
00290 #if 0
00291    l = 1;
00292    setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (void *)&l, sizeof(int)) ;
00293 #endif
00294    l = SOCKET_BUFSIZE ;
00295    setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (void *)&l, sizeof(int)) ;
00296    setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (void *)&l, sizeof(int)) ;
00297 
00298    /** set port on remote computer **/
00299 
00300    memset( &sin , 0 , sizeof(sin) ) ;
00301    sin.sin_family      = AF_INET ;
00302    sin.sin_port        = htons(port) ;
00303    sin.sin_addr.s_addr = INADDR_ANY ;  /* reader reads from anybody */
00304 
00305    if( bind(sd , (struct sockaddr *)&sin , sizeof(sin)) == -1 ){
00306       PERROR("Can't bind? tcp_listen[bind]"); CLOSEDOWN(sd); return -1;
00307    }
00308 
00309    if( listen(sd,1) == -1 ){
00310       PERROR("Can't listen? tcp_listen[listen]"); CLOSEDOWN(sd); return -1;
00311    }
00312 
00313    return sd ;
00314 }

int tcp_readcheck int    sd,
int    msec
 

Definition at line 107 of file thd_iochan.c.

References PERROR, and STATUS.

Referenced by iochan_clearcheck(), iochan_goodcheck(), iochan_init(), iochan_readcheck(), NI_stream_goodcheck(), NI_stream_open(), NI_stream_read(), NI_stream_readcheck(), and tcp_alivecheck().

00108 {
00109    int ii ;
00110    fd_set rfds ;
00111    struct timeval tv , * tvp ;
00112 
00113    if( sd < 0 ){ STATUS("tcp_readcheck: illegal sd") ; return -1 ; } /* bad socket id */
00114 
00115    FD_ZERO(&rfds) ; FD_SET(sd, &rfds) ;         /* check only sd */
00116 
00117    if( msec >= 0 ){                             /* set timer */
00118       tv.tv_sec  = msec/1000 ;
00119       tv.tv_usec = (msec%1000)*1000 ;
00120       tvp        = &tv ;
00121    } else {
00122       tvp        = NULL ;                       /* forever */
00123    }
00124 
00125    /** STATUS("tcp_readcheck: call select") ; **/
00126 
00127    ii = select(sd+1, &rfds, NULL, NULL, tvp) ;  /* check it */
00128    if( ii == -1 ) PERROR( "Socket gone bad? tcp_readcheck[select]" ) ;
00129    return ii ;
00130 }

void tcp_set_cutoff int    sd
 

STATUS("tcp_writecheck: call select") ; *

Definition at line 179 of file thd_iochan.c.

Referenced by iochan_set_cutoff(), tcp_accept(), tcp_connect(), and tcp_listen().

00180 {
00181 #ifdef SO_LINGER
00182    { struct linger lg ;
00183      lg.l_onoff  = 1 ;
00184      lg.l_linger = 0 ;
00185      setsockopt(sd, SOL_SOCKET, SO_LINGER, (void *)&lg, sizeof(struct linger)) ;
00186    }
00187 #endif
00188 #ifdef SO_REUSEADDR
00189    { int optval = 1;
00190      setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char *)&optval, sizeof(optval)) ;
00191    }
00192 #endif
00193    return ;
00194 }

int tcp_writecheck int    sd,
int    msec
 

STATUS("tcp_readcheck: call select") ; *

Definition at line 132 of file thd_iochan.c.

References PERROR, and STATUS.

Referenced by iochan_send(), iochan_writecheck(), NI_stream_write(), NI_stream_writecheck(), RT_mp_comm_close(), and RT_mp_comm_send_data().

00133 {
00134    int ii ;
00135    fd_set wfds ;
00136    struct timeval tv , * tvp ;
00137 
00138    if( sd < 0 ){ STATUS("tcp_writecheck: illegal sd") ; return -1 ; } /* bad socket id */
00139 
00140    FD_ZERO(&wfds) ; FD_SET(sd, &wfds) ;         /* check only sd */
00141 
00142    if( msec >= 0 ){                             /* set timer */
00143       tv.tv_sec  = msec/1000 ;
00144       tv.tv_usec = (msec%1000)*1000 ;
00145       tvp        = &tv ;
00146    } else {
00147       tvp        = NULL ;                       /* forever */
00148    }
00149 
00150    /** STATUS("tcp_writecheck: call select") ; **/
00151 
00152    ii = select(sd+1, NULL , &wfds, NULL, tvp) ;  /* check it */
00153    if( ii == -1 ) PERROR( "Socket gone bad? tcp_writecheck[select]" ) ;
00154    return ii ;
00155 }

Variable Documentation

char* error_string = NULL [static]
 

Definition at line 11 of file thd_iochan.c.

Referenced by iochan_clearcheck(), iochan_error_string(), iochan_goodcheck(), iochan_init(), iochan_readcheck(), iochan_recv(), iochan_recvall(), iochan_recvloop(), iochan_send(), iochan_sendall(), and iochan_writecheck().

IOCHAN* ioc_kill_1 = NULL [static]
 

Definition at line 1543 of file thd_iochan.c.

IOCHAN* ioc_kill_2 = NULL [static]
 

Definition at line 1544 of file thd_iochan.c.

int nosigpipe = 0 [static]
 

Definition at line 78 of file thd_iochan.c.

Referenced by iochan_send().

int pron = 1 [static]
 

Definition at line 64 of file thd_iochan.c.

Referenced by iochan_enable_perror().

int shm_RMID_delay = 0 [static]
 

Definition at line 16 of file thd_iochan.c.

Referenced by iochan_close(), and iochan_init().

 

Powered by Plone

This site conforms to the following standards: