diff options
| author | rsc <devnull@localhost> | 2003-12-06 18:08:52 +0000 |
|---|---|---|
| committer | rsc <devnull@localhost> | 2003-12-06 18:08:52 +0000 |
| commit | d3df308747ee4d1fcc063a348dcf1146b390bda7 (patch) | |
| tree | a204b027256ec29b110caaa86100cbd701808b5b /src/libmux/io.c | |
| parent | e97ceade5e1bba5787e39429384336fa37797906 (diff) | |
| download | plan9port-d3df308747ee4d1fcc063a348dcf1146b390bda7.tar.gz plan9port-d3df308747ee4d1fcc063a348dcf1146b390bda7.zip | |
File system stuff.
Diffstat (limited to 'src/libmux/io.c')
| -rw-r--r-- | src/libmux/io.c | 136 |
1 files changed, 136 insertions, 0 deletions
diff --git a/src/libmux/io.c b/src/libmux/io.c new file mode 100644 index 00000000..3d932b1a --- /dev/null +++ b/src/libmux/io.c @@ -0,0 +1,136 @@ +/* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */ +/* See COPYRIGHT */ + +#include <u.h> +#include <libc.h> +#include <mux.h> + +/* + * If you fork off two procs running muxrecvproc and muxsendproc, + * then muxrecv/muxsend (and thus muxrpc) will never block except on + * rendevouses, which is nice when it's running in one thread of many. + */ +void +_muxrecvproc(void *v) +{ + void *p; + Mux *mux; + Muxqueue *q; + + mux = v; + q = _muxqalloc(); + + qlock(&mux->lk); + mux->readq = q; + qlock(&mux->inlk); + rwakeup(&mux->rpcfork); + qunlock(&mux->lk); + + while((p = mux->recv(mux)) != nil) + if(_muxqsend(q, p) < 0){ + free(p); + break; + } + qunlock(&mux->inlk); + qlock(&mux->lk); + _muxqhangup(q); + while((p = _muxnbqrecv(q)) != nil) + free(p); + free(q); + mux->readq = nil; + rwakeup(&mux->rpcfork); + qunlock(&mux->lk); +} + +void +_muxsendproc(void *v) +{ + Muxqueue *q; + void *p; + Mux *mux; + + mux = v; + q = _muxqalloc(); + + qlock(&mux->lk); + mux->writeq = q; + qlock(&mux->outlk); + rwakeup(&mux->rpcfork); + qunlock(&mux->lk); + + while((p = _muxqrecv(q)) != nil) + if(mux->send(mux, p) < 0) + break; + qunlock(&mux->outlk); + qlock(&mux->lk); + _muxqhangup(q); + while((p = _muxnbqrecv(q)) != nil) + free(p); + free(q); + mux->writeq = nil; + rwakeup(&mux->rpcfork); + qunlock(&mux->lk); + return; +} + +void* +_muxrecv(Mux *mux) +{ + void *p; + + qlock(&mux->lk); +/* + if(mux->state != VtStateConnected){ + werrstr("not connected"); + qunlock(&mux->lk); + return nil; + } +*/ + if(mux->readq){ + qunlock(&mux->lk); + return _muxqrecv(mux->readq); + } + + qlock(&mux->inlk); + qunlock(&mux->lk); + p = mux->recv(mux); + qunlock(&mux->inlk); +/* + if(!p) + vthangup(mux); +*/ + return p; +} + +int +_muxsend(Mux *mux, void *p) +{ + qlock(&mux->lk); +/* + if(mux->state != VtStateConnected){ + packetfree(p); + werrstr("not connected"); + qunlock(&mux->lk); + return -1; + } +*/ + if(mux->writeq){ + qunlock(&mux->lk); + if(_muxqsend(mux->writeq, p) < 0){ + free(p); + return -1; + } + return 0; + } + + qlock(&mux->outlk); + qunlock(&mux->lk); + if(mux->send(mux, p) < 0){ + qunlock(&mux->outlk); + /* vthangup(mux); */ + return -1; + } + qunlock(&mux->outlk); + return 0; +} + |
