summaryrefslogtreecommitdiffstats
path: root/src/libmux/io.c
diff options
context:
space:
mode:
authorrsc <devnull@localhost>2003-12-06 18:08:52 +0000
committerrsc <devnull@localhost>2003-12-06 18:08:52 +0000
commitd3df308747ee4d1fcc063a348dcf1146b390bda7 (patch)
treea204b027256ec29b110caaa86100cbd701808b5b /src/libmux/io.c
parente97ceade5e1bba5787e39429384336fa37797906 (diff)
downloadplan9port-d3df308747ee4d1fcc063a348dcf1146b390bda7.tar.gz
plan9port-d3df308747ee4d1fcc063a348dcf1146b390bda7.zip
File system stuff.
Diffstat (limited to 'src/libmux/io.c')
-rw-r--r--src/libmux/io.c136
1 files changed, 136 insertions, 0 deletions
diff --git a/src/libmux/io.c b/src/libmux/io.c
new file mode 100644
index 00000000..3d932b1a
--- /dev/null
+++ b/src/libmux/io.c
@@ -0,0 +1,136 @@
+/* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */
+/* See COPYRIGHT */
+
+#include <u.h>
+#include <libc.h>
+#include <mux.h>
+
+/*
+ * If you fork off two procs running muxrecvproc and muxsendproc,
+ * then muxrecv/muxsend (and thus muxrpc) will never block except on
+ * rendevouses, which is nice when it's running in one thread of many.
+ */
+void
+_muxrecvproc(void *v)
+{
+ void *p;
+ Mux *mux;
+ Muxqueue *q;
+
+ mux = v;
+ q = _muxqalloc();
+
+ qlock(&mux->lk);
+ mux->readq = q;
+ qlock(&mux->inlk);
+ rwakeup(&mux->rpcfork);
+ qunlock(&mux->lk);
+
+ while((p = mux->recv(mux)) != nil)
+ if(_muxqsend(q, p) < 0){
+ free(p);
+ break;
+ }
+ qunlock(&mux->inlk);
+ qlock(&mux->lk);
+ _muxqhangup(q);
+ while((p = _muxnbqrecv(q)) != nil)
+ free(p);
+ free(q);
+ mux->readq = nil;
+ rwakeup(&mux->rpcfork);
+ qunlock(&mux->lk);
+}
+
+void
+_muxsendproc(void *v)
+{
+ Muxqueue *q;
+ void *p;
+ Mux *mux;
+
+ mux = v;
+ q = _muxqalloc();
+
+ qlock(&mux->lk);
+ mux->writeq = q;
+ qlock(&mux->outlk);
+ rwakeup(&mux->rpcfork);
+ qunlock(&mux->lk);
+
+ while((p = _muxqrecv(q)) != nil)
+ if(mux->send(mux, p) < 0)
+ break;
+ qunlock(&mux->outlk);
+ qlock(&mux->lk);
+ _muxqhangup(q);
+ while((p = _muxnbqrecv(q)) != nil)
+ free(p);
+ free(q);
+ mux->writeq = nil;
+ rwakeup(&mux->rpcfork);
+ qunlock(&mux->lk);
+ return;
+}
+
+void*
+_muxrecv(Mux *mux)
+{
+ void *p;
+
+ qlock(&mux->lk);
+/*
+ if(mux->state != VtStateConnected){
+ werrstr("not connected");
+ qunlock(&mux->lk);
+ return nil;
+ }
+*/
+ if(mux->readq){
+ qunlock(&mux->lk);
+ return _muxqrecv(mux->readq);
+ }
+
+ qlock(&mux->inlk);
+ qunlock(&mux->lk);
+ p = mux->recv(mux);
+ qunlock(&mux->inlk);
+/*
+ if(!p)
+ vthangup(mux);
+*/
+ return p;
+}
+
+int
+_muxsend(Mux *mux, void *p)
+{
+ qlock(&mux->lk);
+/*
+ if(mux->state != VtStateConnected){
+ packetfree(p);
+ werrstr("not connected");
+ qunlock(&mux->lk);
+ return -1;
+ }
+*/
+ if(mux->writeq){
+ qunlock(&mux->lk);
+ if(_muxqsend(mux->writeq, p) < 0){
+ free(p);
+ return -1;
+ }
+ return 0;
+ }
+
+ qlock(&mux->outlk);
+ qunlock(&mux->lk);
+ if(mux->send(mux, p) < 0){
+ qunlock(&mux->outlk);
+ /* vthangup(mux); */
+ return -1;
+ }
+ qunlock(&mux->outlk);
+ return 0;
+}
+