devpipe.c (5881B)
1 #include "u.h" 2 #include "lib.h" 3 #include "mem.h" 4 #include "dat.h" 5 #include "fns.h" 6 #include "error.h" 7 8 #include "netif.h" 9 10 typedef struct Pipe Pipe; 11 struct Pipe 12 { 13 QLock lk; 14 Pipe *next; 15 int ref; 16 ulong path; 17 Queue *q[2]; 18 int qref[2]; 19 }; 20 21 struct 22 { 23 Lock lk; 24 ulong path; 25 } pipealloc; 26 27 enum 28 { 29 Qdir, 30 Qdata0, 31 Qdata1, 32 }; 33 34 Dirtab pipedir[] = 35 { 36 ".", {Qdir,0,QTDIR}, 0, DMDIR|0500, 37 "data", {Qdata0}, 0, 0600, 38 "data1", {Qdata1}, 0, 0600, 39 }; 40 #define NPIPEDIR 3 41 42 static void 43 pipeinit(void) 44 { 45 if(conf.pipeqsize == 0){ 46 if(conf.nmach > 1) 47 conf.pipeqsize = 256*1024; 48 else 49 conf.pipeqsize = 32*1024; 50 } 51 } 52 53 /* 54 * create a pipe, no streams are created until an open 55 */ 56 static Chan* 57 pipeattach(char *spec) 58 { 59 Pipe *p; 60 Chan *c; 61 62 c = devattach('|', spec); 63 p = malloc(sizeof(Pipe)); 64 if(p == 0) 65 exhausted("memory"); 66 p->ref = 1; 67 68 p->q[0] = qopen(conf.pipeqsize, 0, 0, 0); 69 if(p->q[0] == 0){ 70 free(p); 71 exhausted("memory"); 72 } 73 p->q[1] = qopen(conf.pipeqsize, 0, 0, 0); 74 if(p->q[1] == 0){ 75 free(p->q[0]); 76 free(p); 77 exhausted("memory"); 78 } 79 80 lock(&pipealloc.lk); 81 p->path = ++pipealloc.path; 82 unlock(&pipealloc.lk); 83 84 mkqid(&c->qid, NETQID(2*p->path, Qdir), 0, QTDIR); 85 c->aux = p; 86 c->dev = 0; 87 return c; 88 } 89 90 static int 91 pipegen(Chan *c, char *name, Dirtab *tab, int ntab, int i, Dir *dp) 92 { 93 Qid q; 94 int len; 95 Pipe *p; 96 97 if(i == DEVDOTDOT){ 98 devdir(c, c->qid, "#|", 0, eve, DMDIR|0555, dp); 99 return 1; 100 } 101 i++; /* skip . */ 102 if(tab==0 || i>=ntab) 103 return -1; 104 105 tab += i; 106 p = c->aux; 107 switch((ulong)tab->qid.path){ 108 case Qdata0: 109 len = qlen(p->q[0]); 110 break; 111 case Qdata1: 112 len = qlen(p->q[1]); 113 break; 114 default: 115 len = tab->length; 116 break; 117 } 118 mkqid(&q, NETQID(NETID(c->qid.path), tab->qid.path), 0, QTFILE); 119 devdir(c, q, tab->name, len, eve, tab->perm, dp); 120 return 1; 121 } 122 123 124 static Walkqid* 125 pipewalk(Chan *c, Chan *nc, char **name, int nname) 126 { 127 Walkqid *wq; 128 Pipe *p; 129 130 wq = devwalk(c, nc, name, nname, pipedir, NPIPEDIR, pipegen); 131 if(wq != nil && wq->clone != nil && wq->clone != c){ 132 p = c->aux; 133 qlock(&p->lk); 134 p->ref++; 135 if(c->flag & COPEN){ 136 print("channel open in pipewalk\n"); 137 switch(NETTYPE(c->qid.path)){ 138 case Qdata0: 139 p->qref[0]++; 140 break; 141 case Qdata1: 142 p->qref[1]++; 143 break; 144 } 145 } 146 qunlock(&p->lk); 147 } 148 return wq; 149 } 150 151 static int 152 pipestat(Chan *c, uchar *db, int n) 153 { 154 Pipe *p; 155 Dir dir; 156 157 p = c->aux; 158 159 switch(NETTYPE(c->qid.path)){ 160 case Qdir: 161 devdir(c, c->qid, ".", 0, eve, DMDIR|0555, &dir); 162 break; 163 case Qdata0: 164 devdir(c, c->qid, "data", qlen(p->q[0]), eve, 0600, &dir); 165 break; 166 case Qdata1: 167 devdir(c, c->qid, "data1", qlen(p->q[1]), eve, 0600, &dir); 168 break; 169 default: 170 panic("pipestat"); 171 } 172 n = convD2M(&dir, db, n); 173 if(n < BIT16SZ) 174 error(Eshortstat); 175 return n; 176 } 177 178 /* 179 * if the stream doesn't exist, create it 180 */ 181 static Chan* 182 pipeopen(Chan *c, int omode) 183 { 184 Pipe *p; 185 186 if(c->qid.type & QTDIR){ 187 if(omode != OREAD) 188 error(Ebadarg); 189 c->mode = omode; 190 c->flag |= COPEN; 191 c->offset = 0; 192 return c; 193 } 194 195 p = c->aux; 196 qlock(&p->lk); 197 switch(NETTYPE(c->qid.path)){ 198 case Qdata0: 199 p->qref[0]++; 200 break; 201 case Qdata1: 202 p->qref[1]++; 203 break; 204 } 205 qunlock(&p->lk); 206 207 c->mode = openmode(omode); 208 c->flag |= COPEN; 209 c->offset = 0; 210 c->iounit = qiomaxatomic; 211 return c; 212 } 213 214 static void 215 pipeclose(Chan *c) 216 { 217 Pipe *p; 218 219 p = c->aux; 220 qlock(&p->lk); 221 222 if(c->flag & COPEN){ 223 /* 224 * closing either side hangs up the stream 225 */ 226 switch(NETTYPE(c->qid.path)){ 227 case Qdata0: 228 p->qref[0]--; 229 if(p->qref[0] == 0){ 230 qhangup(p->q[1], 0); 231 qclose(p->q[0]); 232 } 233 break; 234 case Qdata1: 235 p->qref[1]--; 236 if(p->qref[1] == 0){ 237 qhangup(p->q[0], 0); 238 qclose(p->q[1]); 239 } 240 break; 241 } 242 } 243 244 245 /* 246 * if both sides are closed, they are reusable 247 */ 248 if(p->qref[0] == 0 && p->qref[1] == 0){ 249 qreopen(p->q[0]); 250 qreopen(p->q[1]); 251 } 252 253 /* 254 * free the structure on last close 255 */ 256 p->ref--; 257 if(p->ref == 0){ 258 qunlock(&p->lk); 259 free(p->q[0]); 260 free(p->q[1]); 261 free(p); 262 } else 263 qunlock(&p->lk); 264 } 265 266 static long 267 piperead(Chan *c, void *va, long n, vlong offset) 268 { 269 Pipe *p; 270 271 p = c->aux; 272 273 switch(NETTYPE(c->qid.path)){ 274 case Qdir: 275 return devdirread(c, va, n, pipedir, NPIPEDIR, pipegen); 276 case Qdata0: 277 return qread(p->q[0], va, n); 278 case Qdata1: 279 return qread(p->q[1], va, n); 280 default: 281 panic("piperead"); 282 } 283 return -1; /* not reached */ 284 } 285 286 static Block* 287 pipebread(Chan *c, long n, ulong offset) 288 { 289 Pipe *p; 290 291 p = c->aux; 292 293 switch(NETTYPE(c->qid.path)){ 294 case Qdata0: 295 return qbread(p->q[0], n); 296 case Qdata1: 297 return qbread(p->q[1], n); 298 } 299 300 return devbread(c, n, offset); 301 } 302 303 /* 304 * a write to a closed pipe causes a note to be sent to 305 * the process. 306 */ 307 static long 308 pipewrite(Chan *c, void *va, long n, vlong offset) 309 { 310 Pipe *p; 311 312 if(!islo()) 313 print("pipewrite hi %#p\n", getcallerpc(&c)); 314 if(waserror()) { 315 /* avoid notes when pipe is a mounted queue */ 316 if((c->flag & CMSG) == 0) 317 postnote(up, 1, "sys: write on closed pipe", NUser); 318 nexterror(); 319 } 320 321 p = c->aux; 322 323 switch(NETTYPE(c->qid.path)){ 324 case Qdata0: 325 n = qwrite(p->q[1], va, n); 326 break; 327 328 case Qdata1: 329 n = qwrite(p->q[0], va, n); 330 break; 331 332 default: 333 panic("pipewrite"); 334 } 335 336 poperror(); 337 return n; 338 } 339 340 static long 341 pipebwrite(Chan *c, Block *bp, ulong offset) 342 { 343 long n; 344 Pipe *p; 345 346 if(waserror()) { 347 /* avoid notes when pipe is a mounted queue */ 348 if((c->flag & CMSG) == 0) 349 postnote(up, 1, "sys: write on closed pipe", NUser); 350 nexterror(); 351 } 352 353 p = c->aux; 354 switch(NETTYPE(c->qid.path)){ 355 case Qdata0: 356 n = qbwrite(p->q[1], bp); 357 break; 358 359 case Qdata1: 360 n = qbwrite(p->q[0], bp); 361 break; 362 363 default: 364 n = 0; 365 panic("pipebwrite"); 366 } 367 368 poperror(); 369 return n; 370 } 371 372 Dev pipedevtab = { 373 '|', 374 "pipe", 375 376 devreset, 377 pipeinit, 378 devshutdown, 379 pipeattach, 380 pipewalk, 381 pipestat, 382 pipeopen, 383 devcreate, 384 pipeclose, 385 piperead, 386 pipebread, 387 pipewrite, 388 pipebwrite, 389 devremove, 390 devwstat, 391 };