/* ******************************************************************************* * Copyright (c) 1996 Martin Poole * ******************************************************************************* ** ** WARNING !! WARNING !! WARNING !! WARNING !! WARNING !! WARNING !! ** ** Any changes to be made to this file should first be checked with ** mplib1 source control for library integrity. ** ** mplib1 source control can be reached at mplib1@quatermass.co.uk ** * * $Source: /home/cvs/cvsroot/onelan/onelan/src/mplib1/libsrc/bpo_queue.c,v $ * $Author: mpoole $ * $Date: 2002/10/07 09:37:37 $ * $Revision: 1.2 $ * ******************************************************************************* * * Change History * * $Log: bpo_queue.c,v $ * Revision 1.2 2002/10/07 09:37:37 mpoole * Initial checkin of mplib1-3.1.0 * * Revision 1.1 2002/10/07 09:36:54 mpoole * Initial checkin of mplib1-3.1.0 * * ******************************************************************************* */ #ident "$Header: /home/cvs/cvsroot/onelan/onelan/src/mplib1/libsrc/bpo_queue.c,v 1.2 2002/10/07 09:37:37 mpoole Exp $" /* ------------------------------------------------------------------ Include files ------------------------------------------------------------------ */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "include/bpo_q_internal.h" /* ------------------------------------------------------------------ defines ------------------------------------------------------------------ */ struct q_work { int notify; int rv; int ierrno; const char *q_name; void *r_ptr; struct bpo_Q *qp; }; static int bpo_queue_debug=0; static char queue_debug_str[]="BPO_QUEUE_DEBUG"; static char q_res_name[]="Queues"; static dl_List_t segments; static int seg_init=0; /* ------------------------------------------------------------------ Code starts here ------------------------------------------------------------------ */ static void seginit( void ) { if (seg_init==0) { dl_Init_List( &segments, LN_IGNORECASE ); seg_init=1; } return; } static void Get_Q_pid( struct bpo_Q *qp ) { char tstr[12]; struct bpo_Q_pid *qpp; gen_pid_str( tstr, getpid() ); /* Check the list for the pid_q for us, if not found then create */ qpp = bpo_Find_Item_By_Name( &qp->Q_pid_r_list, tstr ); if (qpp==NULL) { qpp = shalloc( qp, sizeof(struct bpo_Q_pid) ); if (qpp) { if (bpo_queue_debug) fprintfile( stderr, "about to init new q_pid(%p)\n", qpp ); Sstrcpy( qpp->Q_pidstr, tstr ); qpp->Q_pid = getpid(); if (bpo_queue_debug) fprintfile( stderr, "about to init q_pid node(%p)\n", &qpp->Q_pid_Node ); bpo_Init_Node( &qpp->Q_pid_Node, qpp->Q_pidstr, qpp ); if (bpo_queue_debug) fprintfile( stderr, "about to add_tail\n" ); bpo_Add_Tail( &qp->Q_pid_r_list, &qpp->Q_pid_Node ); } } return; } /* Now some internal routines work keeping the cache in sync */ static void free_private_q( struct bpo_private_q *pqp ) { dl_Remove_Node( &pqp->q_Node ); free( pqp ); return; } static struct bpo_private_q * alloc_private_q( struct bpo_Q *qp ) { struct bpo_private_q *pqp; pqp = malloc( sizeof(struct bpo_private_q) + Sstrlen( qp->Q_name ) ); if (pqp) { Sstrcpy( pqp->Q_name, qp->Q_name ); pqp->qp = qp; pqp->Q_max_count = qp->Q_max_count; pqp->Q_Count = qp->Q_List.lno_count; pqp->Q_notify = qp->Q_notify; pqp->Q_flags = qp->Q_flags; pqp->Q_sole_pid = qp->Q_sole_pid; pqp->pid_ser_nbr = 0L; dl_Init_Node( &pqp->q_Node, pqp->Q_name, pqp ); dl_Init_List( &pqp->private_pids, LN_IGNORECASE ); dl_Init_List( &pqp->old_pids, LN_IGNORECASE ); } return(pqp); } static void get_pids( struct bpo_Q_pid *qpp, struct bpo_private_q *pqp ) { struct bpo_private_pid *ppp; if (qpp->Q_pid && is_pid_dead(qpp->Q_pid)) { /* Q pid is defunct */ bpo_Remove_Node( &qpp->Q_pid_Node ); shfree( qpp ); }else { ppp = dl_Find_Item_By_Name( &pqp->old_pids, qpp->Q_pidstr ); if (ppp) { dl_Remove_Node( &ppp->q_Node ); }else { ppp = malloc( sizeof(struct bpo_private_pid) ); if (ppp) { Sstrcpy( ppp->pid_str, qpp->Q_pidstr ); ppp->pid = qpp->Q_pid; dl_Init_Node( &ppp->q_Node, ppp->pid_str, ppp ); } } if (ppp) dl_Add_Tail( &pqp->private_pids, &ppp->q_Node ); } return; } static int pid_transfer( bpo_List_t *blp, struct bpo_private_q *pqp ) { /* Move all the current pids onto the old pid list */ dl_Transfer_Lists( &pqp->private_pids, &pqp->old_pids ); bpo_Walk_List( blp, (bpo_Walk_List_t)get_pids, pqp ); return(0); } static int add_q_to_cache( struct bpo_Q *qp, struct cache_queue_list *qpl ) { struct bpo_private_q *pqp; pqp = dl_Find_Item_By_Name( &qpl->old_q_list, qp->Q_name ); if (pqp) { /* Well details may be correct so transfer them */ dl_Remove_Node( &pqp->q_Node ); }else { /* Need a pqp */ pqp = alloc_private_q( qp ); } if (pqp) { dl_Add_Tail( &qpl->q_list, &pqp->q_Node ); bpo_May_Work_List( &qp->Q_pid_r_list, &pqp->pid_ser_nbr, (bpo_Work_List_t)pid_transfer, pqp ); } return(0); } static int cache_q_list( bpo_List_t *blp, struct cache_queue_list *qpl, char *q_name ) { dl_Transfer_Lists( &qpl->q_list, &qpl->old_q_list ); bpo_Walk_List( blp, (bpo_Walk_List_t)add_q_to_cache, qpl ); return(0); } static void free_private_pid( struct bpo_private_pid *ppp ) { dl_Remove_Node( &ppp->q_Node ); free( ppp ); return; } static void free_old_pids( struct bpo_private_q *pqp ) { dl_Walk_List( &pqp->old_pids, (dl_Walk_List_t)free_private_pid, NULL ); return; } static void free_all_pids( struct bpo_private_q *pqp ) { dl_Walk_List( &pqp->old_pids, (dl_Walk_List_t)free_private_pid, NULL ); dl_Walk_List( &pqp->private_pids, (dl_Walk_List_t)free_private_pid, NULL ); return; } void free_this_private_q( struct bpo_private_q *pqp ) { free_all_pids( pqp ); free_private_q( pqp ); return; } static void old_cleanup( struct cache_queue_list *qpl ) { dl_Walk_List( &qpl->q_list, (dl_Walk_List_t)free_old_pids, NULL ); dl_Walk_List( &qpl->old_q_list, (dl_Walk_List_t)free_all_pids, NULL ); dl_Walk_List( &qpl->old_q_list, (dl_Walk_List_t)free_private_q, NULL ); return; } struct cache_queue_list * cache_queues( struct Q_Head *qh, const char *q_name ) { char tbuf[40]; struct cache_queue_list *qpl; bpo_queue_debug = get_config_flag(queue_debug_str); if (seg_init==0) seginit(); /* either cache the requested q_name (if given) or all queues */ sprintf( tbuf, "%p", qh ); qpl=dl_Find_Item_By_Name( &segments, tbuf ); if ( qpl==NULL && (qpl = malloc( sizeof(struct cache_queue_list) )) ) { /* Never heard of this segment before ? */ if (bpo_queue_debug) fprintfile(stderr,"cache_queues: Now caching %s\n", tbuf ); Sstrcpy( qpl->ptr_str, tbuf ); qpl->serial_nbr = 0L; qpl->qh = qh; dl_Init_Node( &qpl->lnk_node, qpl->ptr_str, qpl ); dl_Init_List( &qpl->q_list, LN_IGNORECASE ); dl_Init_List( &qpl->old_q_list, LN_IGNORECASE ); dl_Add_Tail( &segments, &qpl->lnk_node ); } if (qpl) { bpo_May_Work_List2( &qh->Qh_List, &qpl->serial_nbr, (bpo_Work_List2_t)cache_q_list, qpl, q_name ); old_cleanup( qpl ); } return(qpl); } static void inform_this_pid( struct bpo_private_pid *ppp, const void *hint ) { Inform_Process_Str( hint, ppp->pid_str ); return; } static int inform_a_queue( struct Q_Head *qh, const char *q_name ) { struct cache_queue_list *qpl; struct bpo_private_q *pqp; if ( (qpl = cache_queues( qh, q_name )) && (pqp = dl_Find_Item_By_Name( &qpl->q_list, q_name )) ) { dl_Walk_List( &pqp->private_pids, (dl_Walk_List_t)inform_this_pid, qh ); } return(0); } struct Q_Head * get_Q_resource( const void *hint ) { off_t qro,*ofp; struct Q_Head *qh; qh = Find_SODB_Resource( hint, q_res_name ); if (bpo_queue_debug) fprintfile(stderr,"Find Q Resource returned: %p\n", qh ); if (qh==NULL) { ofp = shalloc( hint, sizeof(struct Q_Head) + sizeof(off_t) ); if (ofp) { *ofp = (off_t)0; qh = (struct Q_Head *)(ofp+1); if (bpo_queue_debug) fprintfile( stderr, "Now to init Q resource\n"); /* Init the structure before adding it to the resources */ qh->lno_me = Get_SODB_Offset( qh ); bpo_Init_List( &qh->Qh_List, LN_IGNORECASE ); qh->Qh_next_id=1; b_Init( &qh->Qh_id_lock ); qro = Add_SODB_Resource( qh, q_res_name ); if (qro == 0) { if (bpo_queue_debug) fprintfile( stderr, "Unable to add Q resource\n"); shfree( ofp ); qh = Find_SODB_Resource( hint, q_res_name ); } } else if (bpo_queue_debug) fprintfile( stderr, "Unable to allocate Q Head\n"); } return(qh); } static int check_q_id( struct bpo_Q *qp, struct q_work *qwp ) { if ( qp->Q_id == qwp->ierrno ) { /* found it, make a note, and set return code */ qwp->qp = qp; qwp->rv=1; } return qwp->rv; } static int alloc_q_id( struct Q_Head *qh ) { int rv; b_Lock( &qh->Qh_id_lock ); rv = qh->Qh_next_id++; b_Unlock( &qh->Qh_id_lock ); return rv; } static struct bpo_Q * New_Q( struct Q_Head *qh, const char *q_name, int q_max, int q_flags, int q_mech ) { struct bpo_Q *nqp; off_t *ofp; ofp = shalloc( qh, sizeof(struct bpo_Q) + sizeof(off_t) + Sstrlen(q_name) ); if (ofp) { nqp = (struct bpo_Q *)(ofp+1); Sstrcpy( nqp->Q_name, q_name ); bpo_Init_Node( &nqp->Q_Node, nqp->Q_name, nqp ); bpo_Init_List( &nqp->Q_List, 0 ); nqp->Q_id = alloc_q_id( qh ); nqp->Q_Count = 0; nqp->Q_max_count = q_max; nqp->Q_flags = q_flags; bpo_Init_List( &nqp->Q_pid_r_list, 0 ); nqp->Q_sole_pid = getpid(); if (q_mech==Q_NM_PIPE) { nqp->Q_notify = q_mech; Get_Q_pid( nqp ); }else { nqp->Q_notify = Q_NM_NONE; } }else nqp = NULL; return(nqp); } static int sole_reuse( bpo_List_t *blp, struct bpo_Q *qp, struct bpo_Q *nqp ) { int rv=0; /* Does the read still exist ? */ if ( qp->Q_sole_pid==(pid_t)0 || ( qp->Q_sole_pid != getpid() && is_pid_dead( qp->Q_sole_pid ) ) ) { if (bpo_queue_debug) fprintfile(stderr, "About to re-use (%p)\n", qp ); bpo_Remove_Node( &qp->Q_Node ); rv = 1; bpo_Transfer_Lists( blp, &nqp->Q_List ); nqp->Q_Count = nqp->Q_List.lno_count; }else { /* It's in use by somebody else, ho, hum.... */ if (bpo_queue_debug) fprintfile(stderr, "Queue currently in use\n" ); } return(rv); } void free_q_stuff( struct bpo_Q *qp ) { off_t *ofp; bpo_Remove_Node( &qp->Q_Node ); bpo_free_list_contents( &qp->Q_List ); bpo_free_list_contents( &qp->Q_pid_r_list ); ofp = ((off_t *)qp)-1; shfree( ofp ); /* shfree( qp );*/ return; } static int maybe_add_queue( bpo_List_t *blp, struct bpo_Q *nqp, struct bpo_Q **rv ) { struct bpo_Q *qp; qp = bpo_Find_Item_By_Name( blp, nqp->Q_name ); if (qp) { struct bpo_Q_pid *qpp; /* Q exists. re-use? */ if (bpo_queue_debug) fprintfile(stderr, "Does existing queue (%p) warrant reuse\n", qp ); /* already exists, does the new requirement conflict ? */ if ( qp->Q_flags & Q_SOLE_READER ) { if (bpo_Work_List2( &qp->Q_List, (bpo_Work_List2_t)sole_reuse, qp, nqp )) { /* Old queue dropped if favour of new one */ *rv = nqp; free_q_stuff( qp ); bpo_Add_Tail( blp, &nqp->Q_Node ); }else { *rv = NULL; } }else { /* Add myself as a reader */ if ( qp->Q_notify == Q_NM_PIPE ) { if (bpo_queue_debug) fprintfile(stderr,"About to add myself as another reader\n" ); qpp = bpo_Remove_Head_Item( &nqp->Q_pid_r_list ); bpo_Add_Tail( &qp->Q_pid_r_list, &qpp->Q_pid_Node ); /* Now touch the list so other programs refresh their cached list of pids */ (void)bpo_Touch_List( blp ); }else { if (bpo_queue_debug) fprintfile(stderr,"Queue being re-used.\n" ); } *rv = qp; } }else { /* Just add the beast I've created */ bpo_Add_Tail( blp, &nqp->Q_Node ); *rv = nqp; } return(0); } struct bpo_Q * mpCreateQ( const void *hint, const char *q_name, int q_max, int q_flags, int q_mech ) { struct Q_Head *qh; struct bpo_Q *qp,*nqp; char tname[20]; char tbuf[12]; bpo_queue_debug = (q_flags & Q_DEBUG) || get_config_flag(queue_debug_str); qh = get_Q_resource( hint ); if (bpo_queue_debug) fprintfile(stderr,"get_Q_resource returned: %p\n", qh ); if (qh==0) return(NULL); if (q_name==NULL || *q_name=='\0') { /* Invent a Q name (this is probably a reply Q */ gen_pid_str( tbuf, getpid() ); sprintf( tname, "reply.%s", tbuf ); if (bpo_queue_debug) fprintfile(stderr,"Inventing Q name\n" ); q_name = tname; } if (bpo_queue_debug) fprintfile(stderr,"Time to lock the q list and search for <%s>\n", q_name ); /* New strategy, create the new Q structure, a q_pid structure then call a locked routine to either add the Q structure and pid, or the pid only. */ nqp = New_Q( qh, q_name, q_max, q_flags, q_mech ); Register_Process_Details( hint ); bpo_Work_List2( &qh->Qh_List, (bpo_Work_List2_t)maybe_add_queue, nqp, &qp ); if (qp != nqp) free_q_stuff(nqp); cache_queues( qh, q_name ); return(qp); } static int add_this_q_item( bpo_List_t *blp, bpo_Node_t *q_node, struct q_work *other ) { /* Try something */ struct bpo_Q *qp; void *tv; int noq; qp = other->qp; if (bpo_queue_debug) fprintfile( stderr, "Found Q (%s) (%p)\n", qp->Q_name, qp ); noq = qp->Q_List.lno_count; if (qp->Q_max_count!=0 && noq >= qp->Q_max_count) { if (bpo_queue_debug) fprintfile( stderr, "Q has reached max count (%d)\n", qp->Q_max_count ); other->rv = 0; }else { tv = bpo_Add_Tail( &qp->Q_List, q_node ); qp->Q_Count = qp->Q_List.lno_count; if (bpo_queue_debug) fprintfile( stderr, "Up count and add (%p) to Q = %p\n", q_node, tv ); /* Call notification mechanism */ if ( qp->Q_notify == Q_NM_PIPE ) { if ( (qp->Q_flags & Q_SOLE_READER) && (qp->Q_flags & Q_PERSISTENT)==0 && is_pid_dead( qp->Q_sole_pid ) != 0 ) { if (bpo_queue_debug) fprintfile( stderr, "Q reader for (%s) has gone\n", other->q_name); /* Problem ! */ other->rv = -1; }else { other->notify = 1; other->rv = 0; } }else other->rv = 1; } return(other->rv); } static int add_q_item( bpo_List_t *blp, bpo_Node_t *q_node, struct q_work *other ) { /* Try something */ struct bpo_Q *qp; qp = bpo_Find_Item_By_Name( blp, other->q_name ); if (qp) { bpo_queue_debug |= (qp->Q_flags & Q_DEBUG); if ((qp->Q_flags & Q_DELETED)==0) { other->qp = qp; bpo_Work_List2( &qp->Q_List, (bpo_Work_List2_t)add_this_q_item, q_node, other ); }else { if (bpo_queue_debug) fprintfile( stderr, "Q exists(%p) but is marked deleted\n", qp ); other->ierrno = ENOENT; /* No such file or directory */ other->rv = -1; } }else { if (bpo_queue_debug) fprintfile( stderr, "Unable to find Q (%s)\n", other->q_name); other->ierrno = ENOENT; /* No such file or directory */ other->rv = -1; } return(other->rv); } int mpPutMsg( const char *q_name, bpo_Node_t *q_node ) { struct Q_Head *qh; struct q_work other; /* struct bpo_Q *qp;*/ bpo_queue_debug = get_config_flag(queue_debug_str); qh = get_Q_resource( q_node ); if (qh==NULL) { errno = ENOSYS; /* Unsupported file system operation (aka, no resource of this name in this segment */ return(-1); } other.notify=0; other.rv= -1; other.ierrno=0; other.qp = NULL; other.q_name = q_name; bpo_Remove_Node( q_node ); bpo_Work_List2( &qh->Qh_List, (bpo_Work_List2_t)add_q_item, q_node, &other ); if (other.rv==0 && other.notify) { inform_a_queue( qh, q_name ); other.rv=1; } errno=other.ierrno; return(other.rv); } int mpPostMsg( const char *q_name, struct bpo_Q_Mesg *mesp, struct bpo_Q *reply_q ) { char *seg_base; struct Q_Head *qh; bpo_queue_debug = get_config_flag(queue_debug_str); seg_base = Get_SODB_Base( reply_q ); if (seg_base==NULL) { if (bpo_queue_debug) fprintfile(stderr, "Can't find base for reply Q\n" ); errno = ENOSYS; return( -1 ); } qh = get_Q_resource( seg_base ); if (qh) { /* does the reply queue really exist */ if (bpo_Find_Node_By_Item( &qh->Qh_List, reply_q )==NULL) { if (bpo_queue_debug) fprintfile(stderr, "reply Q is not a real queue\n" ); errno = ENOENT; return(-1); } } /* mesp->reply_Q = GOFF( seg_base, reply_q ); */ mesp->Q_Reply_id = reply_q->Q_id; return( mpPutMsg( q_name, &mesp->Q_Node ) ); } static int do_a_reply( bpo_List_t *blp, bpo_Node_t *q_node, struct q_work *other ) { if ( bpo_Find_Node_By_Item( blp, other->qp ) != NULL ) { bpo_queue_debug |= (other->qp->Q_flags & Q_DEBUG); bpo_Work_List2( &other->qp->Q_List, (bpo_Work_List2_t)add_this_q_item, q_node, other ); }else { other->rv = -1; other->ierrno= ENOSYS; } return(other->rv); } int mpReplyMsg( struct bpo_Q_Mesg *mesp ) { struct Q_Head *qh; struct q_work other; bpo_queue_debug = get_config_flag(queue_debug_str); qh = get_Q_resource( mesp ); if (qh==NULL) { errno = ENOSYS; /* Unsupported file system operation (aka, no resource of this name in this segment */ return(-1); } /* find the q based on the ID */ other.notify=0; other.rv= 0; other.ierrno=mesp->Q_Reply_id;; other.qp = NULL; other.q_name = NULL; if ( ! bpo_Walk_List2( &qh->Qh_List, (bpo_Walk_List2_t)check_q_id, &other ) ) { /* found it */ errno = EEXIST; /* Queue has gone */ return(-1); } /* Now transfer the message */ other.notify=0; other.rv= -1; other.ierrno=0; other.q_name = NULL; bpo_Remove_Node( &mesp->Q_Node ); bpo_Work_List2( &qh->Qh_List, (bpo_Work_List2_t)do_a_reply, &mesp->Q_Node, &other ); if (other.rv==0 && other.notify) { inform_a_queue( qh, other.qp->Q_name ); other.rv=1; } errno=other.ierrno; return(other.rv); } static int rem_this_q_item( bpo_List_t *blp, struct bpo_Q *qp, struct q_work *other ) { bpo_Node_t *nptr; int nroq; nptr = bpo_Remove_Head( &qp->Q_List ); if (nptr) { other->r_ptr = GRPTR( nptr, nptr->lno_Item ); Add_This_Pid_Resource( qp, nptr ); } nroq = qp->Q_List.lno_count; qp->Q_Count = nroq; if (bpo_queue_debug) fprintfile(stderr, "GetMsg on Q (%s) gives (%p) with (%d) left\n", other->q_name, other->r_ptr, nroq ); return(other->rv); } static int rem_q_item( bpo_List_t *blp, struct q_work *other ) { /* Try something */ struct bpo_Q *qp; qp = bpo_Find_Item_By_Name( blp, other->q_name ); if (qp) { bpo_queue_debug |= (qp->Q_flags & Q_DEBUG); if ((qp->Q_flags & Q_DELETED)==0) { other->qp = qp; bpo_Work_List2( &qp->Q_List, (bpo_Work_List2_t)rem_this_q_item, qp, other ); }else { if (bpo_queue_debug) fprintfile( stderr, "Q exists(%p) but is marked deleted\n", qp ); other->ierrno = ENOENT; /* No such file or directory */ other->rv = -1; other->r_ptr = NULL; } }else { if (bpo_queue_debug) fprintfile( stderr, "Unable to find Q (%s)\n", other->q_name); other->ierrno = ENOENT; /* No such file or directory */ other->rv = -1; other->r_ptr = NULL; } return(other->rv); } void * mpGetMsg( const char *q_name, const void *hint ) { struct Q_Head *qh; struct q_work other; bpo_queue_debug = get_config_flag(queue_debug_str); qh = get_Q_resource( hint ); if (qh==NULL) { errno = ENOSYS; return(NULL); } other.notify=0; other.rv= -1; other.r_ptr=NULL; other.ierrno=0; other.qp = NULL; other.q_name = q_name; bpo_Work_List( &qh->Qh_List, (bpo_Work_List_t)rem_q_item, &other ); errno = other.ierrno; return(other.r_ptr); } void * mpFindQ( const char *q_name, const void *hint ) { struct Q_Head *qh; struct bpo_Q *qp=NULL; struct cache_queue_list *qpl; struct bpo_private_q *pqp; bpo_queue_debug = get_config_flag(queue_debug_str); qh = get_Q_resource( hint ); if (qh==NULL) { errno = ENOSYS; return(NULL); } qpl = cache_queues( qh, q_name ); if ( qpl && (pqp = dl_Find_Item_By_Name( &qpl->q_list, q_name )) ) { qp = pqp->qp; }else errno=ENOENT; return((void *)qp); } static int mpCheckQ_Internal( const char *q_name, struct Q_Head *qh ) { int rv = -1; struct cache_queue_list *qpl; struct bpo_private_q *pqp; errno = 0; qpl = cache_queues( qh, q_name ); if ( qpl && (pqp = dl_Find_Item_By_Name( &qpl->q_list, q_name )) ) { rv = pqp->qp->Q_List.lno_count; }else { errno = ENOENT; /* No such file or directory */ rv = -1; } return(rv); } int mpCheckQ( const char *q_name, const void *hint ) { struct Q_Head *qh; bpo_queue_debug = get_config_flag(queue_debug_str); qh = get_Q_resource( hint ); if (qh==NULL) { errno = ENOSYS; return(-1); } return( mpCheckQ_Internal( q_name, qh ) ); } int mpWaitQ( const char *name, const void *hint ) { int rv = 0; struct Q_Head *qh; struct cache_queue_list *qpl; struct bpo_private_q *pqp; bpo_queue_debug = get_config_flag(queue_debug_str); qh = get_Q_resource( hint ); if (qh==NULL) { errno = ENOSYS; return(-1); } qpl = cache_queues( qh, name ); if ( qpl && (pqp = dl_Find_Item_By_Name( &qpl->q_list, name )) ) { rv = pqp->qp->Q_List.lno_count; /* Call notification mechanism */ if (rv == 0 && pqp->Q_notify == Q_NM_PIPE ) { if (bpo_queue_debug) fprintfile( stderr, "About to Wait_Process_Pipe (O_RDONLY)\n"); Wait_Process_Pipe( ); rv = mpCheckQ_Internal( name, qh ); }else { if (bpo_queue_debug) fprintfile( stderr, "Already %d items on Q (%s)\n", rv, name ); errno = 0; } }else { if (bpo_queue_debug) fprintfile( stderr, "Unable to find Q (%s) to wait on.\n", name ); errno = ENOENT; /* No such file or directory */ rv = -1; } return(rv); } /* -- End of File -- */