vx32

Local 9vx git repository for patches.
git clone git://r-36.net/vx32
Log | Files | Refs

devpipe.c (5881B)


      1 #include	"u.h"
      2 #include	"lib.h"
      3 #include	"mem.h"
      4 #include	"dat.h"
      5 #include	"fns.h"
      6 #include	"error.h"
      7 
      8 #include	"netif.h"
      9 
     10 typedef struct Pipe	Pipe;
     11 struct Pipe
     12 {
     13 	QLock lk;
     14 	Pipe	*next;
     15 	int	ref;
     16 	ulong	path;
     17 	Queue	*q[2];
     18 	int	qref[2];
     19 };
     20 
     21 struct
     22 {
     23 	Lock lk;
     24 	ulong	path;
     25 } pipealloc;
     26 
     27 enum
     28 {
     29 	Qdir,
     30 	Qdata0,
     31 	Qdata1,
     32 };
     33 
     34 Dirtab pipedir[] =
     35 {
     36 	".",		{Qdir,0,QTDIR},	0,		DMDIR|0500,
     37 	"data",		{Qdata0},	0,		0600,
     38 	"data1",	{Qdata1},	0,		0600,
     39 };
     40 #define NPIPEDIR 3
     41 
     42 static void
     43 pipeinit(void)
     44 {
     45 	if(conf.pipeqsize == 0){
     46 		if(conf.nmach > 1)
     47 			conf.pipeqsize = 256*1024;
     48 		else
     49 			conf.pipeqsize = 32*1024;
     50 	}
     51 }
     52 
     53 /*
     54  *  create a pipe, no streams are created until an open
     55  */
     56 static Chan*
     57 pipeattach(char *spec)
     58 {
     59 	Pipe *p;
     60 	Chan *c;
     61 
     62 	c = devattach('|', spec);
     63 	p = malloc(sizeof(Pipe));
     64 	if(p == 0)
     65 		exhausted("memory");
     66 	p->ref = 1;
     67 
     68 	p->q[0] = qopen(conf.pipeqsize, 0, 0, 0);
     69 	if(p->q[0] == 0){
     70 		free(p);
     71 		exhausted("memory");
     72 	}
     73 	p->q[1] = qopen(conf.pipeqsize, 0, 0, 0);
     74 	if(p->q[1] == 0){
     75 		free(p->q[0]);
     76 		free(p);
     77 		exhausted("memory");
     78 	}
     79 
     80 	lock(&pipealloc.lk);
     81 	p->path = ++pipealloc.path;
     82 	unlock(&pipealloc.lk);
     83 
     84 	mkqid(&c->qid, NETQID(2*p->path, Qdir), 0, QTDIR);
     85 	c->aux = p;
     86 	c->dev = 0;
     87 	return c;
     88 }
     89 
     90 static int
     91 pipegen(Chan *c, char *name, Dirtab *tab, int ntab, int i, Dir *dp)
     92 {
     93 	Qid q;
     94 	int len;
     95 	Pipe *p;
     96 
     97 	if(i == DEVDOTDOT){
     98 		devdir(c, c->qid, "#|", 0, eve, DMDIR|0555, dp);
     99 		return 1;
    100 	}
    101 	i++;	/* skip . */
    102 	if(tab==0 || i>=ntab)
    103 		return -1;
    104 
    105 	tab += i;
    106 	p = c->aux;
    107 	switch((ulong)tab->qid.path){
    108 	case Qdata0:
    109 		len = qlen(p->q[0]);
    110 		break;
    111 	case Qdata1:
    112 		len = qlen(p->q[1]);
    113 		break;
    114 	default:
    115 		len = tab->length;
    116 		break;
    117 	}
    118 	mkqid(&q, NETQID(NETID(c->qid.path), tab->qid.path), 0, QTFILE);
    119 	devdir(c, q, tab->name, len, eve, tab->perm, dp);
    120 	return 1;
    121 }
    122 
    123 
    124 static Walkqid*
    125 pipewalk(Chan *c, Chan *nc, char **name, int nname)
    126 {
    127 	Walkqid *wq;
    128 	Pipe *p;
    129 
    130 	wq = devwalk(c, nc, name, nname, pipedir, NPIPEDIR, pipegen);
    131 	if(wq != nil && wq->clone != nil && wq->clone != c){
    132 		p = c->aux;
    133 		qlock(&p->lk);
    134 		p->ref++;
    135 		if(c->flag & COPEN){
    136 			print("channel open in pipewalk\n");
    137 			switch(NETTYPE(c->qid.path)){
    138 			case Qdata0:
    139 				p->qref[0]++;
    140 				break;
    141 			case Qdata1:
    142 				p->qref[1]++;
    143 				break;
    144 			}
    145 		}
    146 		qunlock(&p->lk);
    147 	}
    148 	return wq;
    149 }
    150 
    151 static int
    152 pipestat(Chan *c, uchar *db, int n)
    153 {
    154 	Pipe *p;
    155 	Dir dir;
    156 
    157 	p = c->aux;
    158 
    159 	switch(NETTYPE(c->qid.path)){
    160 	case Qdir:
    161 		devdir(c, c->qid, ".", 0, eve, DMDIR|0555, &dir);
    162 		break;
    163 	case Qdata0:
    164 		devdir(c, c->qid, "data", qlen(p->q[0]), eve, 0600, &dir);
    165 		break;
    166 	case Qdata1:
    167 		devdir(c, c->qid, "data1", qlen(p->q[1]), eve, 0600, &dir);
    168 		break;
    169 	default:
    170 		panic("pipestat");
    171 	}
    172 	n = convD2M(&dir, db, n);
    173 	if(n < BIT16SZ)
    174 		error(Eshortstat);
    175 	return n;
    176 }
    177 
    178 /*
    179  *  if the stream doesn't exist, create it
    180  */
    181 static Chan*
    182 pipeopen(Chan *c, int omode)
    183 {
    184 	Pipe *p;
    185 
    186 	if(c->qid.type & QTDIR){
    187 		if(omode != OREAD)
    188 			error(Ebadarg);
    189 		c->mode = omode;
    190 		c->flag |= COPEN;
    191 		c->offset = 0;
    192 		return c;
    193 	}
    194 
    195 	p = c->aux;
    196 	qlock(&p->lk);
    197 	switch(NETTYPE(c->qid.path)){
    198 	case Qdata0:
    199 		p->qref[0]++;
    200 		break;
    201 	case Qdata1:
    202 		p->qref[1]++;
    203 		break;
    204 	}
    205 	qunlock(&p->lk);
    206 
    207 	c->mode = openmode(omode);
    208 	c->flag |= COPEN;
    209 	c->offset = 0;
    210 	c->iounit = qiomaxatomic;
    211 	return c;
    212 }
    213 
    214 static void
    215 pipeclose(Chan *c)
    216 {
    217 	Pipe *p;
    218 
    219 	p = c->aux;
    220 	qlock(&p->lk);
    221 
    222 	if(c->flag & COPEN){
    223 		/*
    224 		 *  closing either side hangs up the stream
    225 		 */
    226 		switch(NETTYPE(c->qid.path)){
    227 		case Qdata0:
    228 			p->qref[0]--;
    229 			if(p->qref[0] == 0){
    230 				qhangup(p->q[1], 0);
    231 				qclose(p->q[0]);
    232 			}
    233 			break;
    234 		case Qdata1:
    235 			p->qref[1]--;
    236 			if(p->qref[1] == 0){
    237 				qhangup(p->q[0], 0);
    238 				qclose(p->q[1]);
    239 			}
    240 			break;
    241 		}
    242 	}
    243 
    244 
    245 	/*
    246 	 *  if both sides are closed, they are reusable
    247 	 */
    248 	if(p->qref[0] == 0 && p->qref[1] == 0){
    249 		qreopen(p->q[0]);
    250 		qreopen(p->q[1]);
    251 	}
    252 
    253 	/*
    254 	 *  free the structure on last close
    255 	 */
    256 	p->ref--;
    257 	if(p->ref == 0){
    258 		qunlock(&p->lk);
    259 		free(p->q[0]);
    260 		free(p->q[1]);
    261 		free(p);
    262 	} else
    263 		qunlock(&p->lk);
    264 }
    265 
    266 static long
    267 piperead(Chan *c, void *va, long n, vlong offset)
    268 {
    269 	Pipe *p;
    270 
    271 	p = c->aux;
    272 
    273 	switch(NETTYPE(c->qid.path)){
    274 	case Qdir:
    275 		return devdirread(c, va, n, pipedir, NPIPEDIR, pipegen);
    276 	case Qdata0:
    277 		return qread(p->q[0], va, n);
    278 	case Qdata1:
    279 		return qread(p->q[1], va, n);
    280 	default:
    281 		panic("piperead");
    282 	}
    283 	return -1;	/* not reached */
    284 }
    285 
    286 static Block*
    287 pipebread(Chan *c, long n, ulong offset)
    288 {
    289 	Pipe *p;
    290 
    291 	p = c->aux;
    292 
    293 	switch(NETTYPE(c->qid.path)){
    294 	case Qdata0:
    295 		return qbread(p->q[0], n);
    296 	case Qdata1:
    297 		return qbread(p->q[1], n);
    298 	}
    299 
    300 	return devbread(c, n, offset);
    301 }
    302 
    303 /*
    304  *  a write to a closed pipe causes a note to be sent to
    305  *  the process.
    306  */
    307 static long
    308 pipewrite(Chan *c, void *va, long n, vlong offset)
    309 {
    310 	Pipe *p;
    311 
    312 	if(!islo())
    313 		print("pipewrite hi %#p\n", getcallerpc(&c));
    314 	if(waserror()) {
    315 		/* avoid notes when pipe is a mounted queue */
    316 		if((c->flag & CMSG) == 0)
    317 			postnote(up, 1, "sys: write on closed pipe", NUser);
    318 		nexterror();
    319 	}
    320 
    321 	p = c->aux;
    322 
    323 	switch(NETTYPE(c->qid.path)){
    324 	case Qdata0:
    325 		n = qwrite(p->q[1], va, n);
    326 		break;
    327 
    328 	case Qdata1:
    329 		n = qwrite(p->q[0], va, n);
    330 		break;
    331 
    332 	default:
    333 		panic("pipewrite");
    334 	}
    335 
    336 	poperror();
    337 	return n;
    338 }
    339 
    340 static long
    341 pipebwrite(Chan *c, Block *bp, ulong offset)
    342 {
    343 	long n;
    344 	Pipe *p;
    345 
    346 	if(waserror()) {
    347 		/* avoid notes when pipe is a mounted queue */
    348 		if((c->flag & CMSG) == 0)
    349 			postnote(up, 1, "sys: write on closed pipe", NUser);
    350 		nexterror();
    351 	}
    352 
    353 	p = c->aux;
    354 	switch(NETTYPE(c->qid.path)){
    355 	case Qdata0:
    356 		n = qbwrite(p->q[1], bp);
    357 		break;
    358 
    359 	case Qdata1:
    360 		n = qbwrite(p->q[0], bp);
    361 		break;
    362 
    363 	default:
    364 		n = 0;
    365 		panic("pipebwrite");
    366 	}
    367 
    368 	poperror();
    369 	return n;
    370 }
    371 
    372 Dev pipedevtab = {
    373 	'|',
    374 	"pipe",
    375 
    376 	devreset,
    377 	pipeinit,
    378 	devshutdown,
    379 	pipeattach,
    380 	pipewalk,
    381 	pipestat,
    382 	pipeopen,
    383 	devcreate,
    384 	pipeclose,
    385 	piperead,
    386 	pipebread,
    387 	pipewrite,
    388 	pipebwrite,
    389 	devremove,
    390 	devwstat,
    391 };