vx32

Local 9vx git repository for patches.
git clone git://r-36.net/vx32
Log | Files | Refs

qio.c (23670B)


      1 #include	"u.h"
      2 #include	"lib.h"
      3 #include	"mem.h"
      4 #include	"dat.h"
      5 #include	"fns.h"
      6 #include	"error.h"
      7 
      8 static ulong padblockcnt;
      9 static ulong concatblockcnt;
     10 static ulong pullupblockcnt;
     11 static ulong copyblockcnt;
     12 static ulong consumecnt;
     13 static ulong producecnt;
     14 static ulong qcopycnt;
     15 
     16 static int debugging;
     17 
     18 #define QDEBUG	if(0)
     19 
     20 /*
     21  *  IO queues
     22  */
     23 
     24 struct Queue
     25 {
     26 	Lock lk;
     27 
     28 	Block*	bfirst;		/* buffer */
     29 	Block*	blast;
     30 
     31 	int	len;		/* bytes allocated to queue */
     32 	int	dlen;		/* data bytes in queue */
     33 	int	limit;		/* max bytes in queue */
     34 	int	inilim;		/* initial limit */
     35 	int	state;
     36 	int	noblock;	/* true if writes return immediately when q full */
     37 	int	eof;		/* number of eofs read by user */
     38 
     39 	void	(*kick)(void*);	/* restart output */
     40 	void	(*bypass)(void*, Block*);	/* bypass queue altogether */
     41 	void*	arg;		/* argument to kick */
     42 
     43 	QLock	rlock;		/* mutex for reading processes */
     44 	Rendez	rr;		/* process waiting to read */
     45 	QLock	wlock;		/* mutex for writing processes */
     46 	Rendez	wr;		/* process waiting to write */
     47 
     48 	char	err[ERRMAX];
     49 };
     50 
     51 enum
     52 {
     53 	Maxatomic	= 64*1024,
     54 };
     55 
     56 uint	qiomaxatomic = Maxatomic;
     57 
     58 void
     59 ixsummary(void)
     60 {
     61 	debugging ^= 1;
     62 	iallocsummary();
     63 	print("pad %lud, concat %lud, pullup %lud, copy %lud\n",
     64 		padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
     65 	print("consume %lud, produce %lud, qcopy %lud\n",
     66 		consumecnt, producecnt, qcopycnt);
     67 }
     68 
     69 /*
     70  *  free a list of blocks
     71  */
     72 void
     73 freeblist(Block *b)
     74 {
     75 	Block *next;
     76 
     77 	for(; b != 0; b = next){
     78 		next = b->next;
     79 		if(b->ref == 1)
     80 			b->next = nil;
     81 		freeb(b);
     82 	}
     83 }
     84 
     85 /*
     86  *  pad a block to the front (or the back if size is negative)
     87  */
     88 Block*
     89 padblock(Block *bp, int size)
     90 {
     91 	int n;
     92 	Block *nbp;
     93 
     94 	QDEBUG checkb(bp, "padblock 1");
     95 	if(size >= 0){
     96 		if(bp->rp - bp->base >= size){
     97 			bp->rp -= size;
     98 			return bp;
     99 		}
    100 
    101 		if(bp->next)
    102 			panic("padblock %#p", getcallerpc(&bp));
    103 		n = BLEN(bp);
    104 		padblockcnt++;
    105 		nbp = allocb(size+n);
    106 		nbp->rp += size;
    107 		nbp->wp = nbp->rp;
    108 		memmove(nbp->wp, bp->rp, n);
    109 		nbp->wp += n;
    110 		freeb(bp);
    111 		nbp->rp -= size;
    112 	} else {
    113 		size = -size;
    114 
    115 		if(bp->next)
    116 			panic("padblock %#p", getcallerpc(&bp));
    117 
    118 		if(bp->lim - bp->wp >= size)
    119 			return bp;
    120 
    121 		n = BLEN(bp);
    122 		padblockcnt++;
    123 		nbp = allocb(size+n);
    124 		memmove(nbp->wp, bp->rp, n);
    125 		nbp->wp += n;
    126 		freeb(bp);
    127 	}
    128 	QDEBUG checkb(nbp, "padblock 1");
    129 	return nbp;
    130 }
    131 
    132 /*
    133  *  return count of bytes in a string of blocks
    134  */
    135 int
    136 blocklen(Block *bp)
    137 {
    138 	int len;
    139 
    140 	len = 0;
    141 	while(bp) {
    142 		len += BLEN(bp);
    143 		bp = bp->next;
    144 	}
    145 	return len;
    146 }
    147 
    148 /*
    149  * return count of space in blocks
    150  */
    151 int
    152 blockalloclen(Block *bp)
    153 {
    154 	int len;
    155 
    156 	len = 0;
    157 	while(bp) {
    158 		len += BALLOC(bp);
    159 		bp = bp->next;
    160 	}
    161 	return len;
    162 }
    163 
    164 /*
    165  *  copy the  string of blocks into
    166  *  a single block and free the string
    167  */
    168 Block*
    169 concatblock(Block *bp)
    170 {
    171 	int len;
    172 	Block *nb, *f;
    173 
    174 	if(bp->next == 0)
    175 		return bp;
    176 
    177 	nb = allocb(blocklen(bp));
    178 	for(f = bp; f; f = f->next) {
    179 		len = BLEN(f);
    180 		memmove(nb->wp, f->rp, len);
    181 		nb->wp += len;
    182 	}
    183 	concatblockcnt += BLEN(nb);
    184 	freeblist(bp);
    185 	QDEBUG checkb(nb, "concatblock 1");
    186 	return nb;
    187 }
    188 
    189 /*
    190  *  make sure the first block has at least n bytes
    191  */
    192 Block*
    193 pullupblock(Block *bp, int n)
    194 {
    195 	int i;
    196 	Block *nbp;
    197 
    198 	/*
    199 	 *  this should almost always be true, it's
    200 	 *  just to avoid every caller checking.
    201 	 */
    202 	if(BLEN(bp) >= n)
    203 		return bp;
    204 
    205 	/*
    206 	 *  if not enough room in the first block,
    207 	 *  add another to the front of the list.
    208 	 */
    209 	if(bp->lim - bp->rp < n){
    210 		nbp = allocb(n);
    211 		nbp->next = bp;
    212 		bp = nbp;
    213 	}
    214 
    215 	/*
    216 	 *  copy bytes from the trailing blocks into the first
    217 	 */
    218 	n -= BLEN(bp);
    219 	while((nbp = bp->next)){
    220 		i = BLEN(nbp);
    221 		if(i > n) {
    222 			memmove(bp->wp, nbp->rp, n);
    223 			pullupblockcnt++;
    224 			bp->wp += n;
    225 			nbp->rp += n;
    226 			QDEBUG checkb(bp, "pullupblock 1");
    227 			return bp;
    228 		} else {
    229 			/* shouldn't happen but why crash if it does */
    230 			if(i < 0){
    231 				print("pullup negative length packet, called from %#p\n",
    232 					getcallerpc(&bp));
    233 				i = 0;
    234 			}
    235 			memmove(bp->wp, nbp->rp, i);
    236 			pullupblockcnt++;
    237 			bp->wp += i;
    238 			bp->next = nbp->next;
    239 			nbp->next = 0;
    240 			freeb(nbp);
    241 			n -= i;
    242 			if(n == 0){
    243 				QDEBUG checkb(bp, "pullupblock 2");
    244 				return bp;
    245 			}
    246 		}
    247 	}
    248 	freeb(bp);
    249 	return 0;
    250 }
    251 
    252 /*
    253  *  make sure the first block has at least n bytes
    254  */
    255 Block*
    256 pullupqueue(Queue *q, int n)
    257 {
    258 	Block *b;
    259 
    260 	if(BLEN(q->bfirst) >= n)
    261 		return q->bfirst;
    262 	q->bfirst = pullupblock(q->bfirst, n);
    263 	for(b = q->bfirst; b != nil && b->next != nil; b = b->next)
    264 		;
    265 	q->blast = b;
    266 	return q->bfirst;
    267 }
    268 
    269 /*
    270  *  trim to len bytes starting at offset
    271  */
    272 Block *
    273 trimblock(Block *bp, int offset, int len)
    274 {
    275 	ulong l;
    276 	Block *nb, *startb;
    277 
    278 	QDEBUG checkb(bp, "trimblock 1");
    279 	if(blocklen(bp) < offset+len) {
    280 		freeblist(bp);
    281 		return nil;
    282 	}
    283 
    284 	while((l = BLEN(bp)) < offset) {
    285 		offset -= l;
    286 		nb = bp->next;
    287 		bp->next = nil;
    288 		freeb(bp);
    289 		bp = nb;
    290 	}
    291 
    292 	startb = bp;
    293 	bp->rp += offset;
    294 
    295 	while((l = BLEN(bp)) < len) {
    296 		len -= l;
    297 		bp = bp->next;
    298 	}
    299 
    300 	bp->wp -= (BLEN(bp) - len);
    301 
    302 	if(bp->next) {
    303 		freeblist(bp->next);
    304 		bp->next = nil;
    305 	}
    306 
    307 	return startb;
    308 }
    309 
    310 /*
    311  *  copy 'count' bytes into a new block
    312  */
    313 Block*
    314 copyblock(Block *bp, int count)
    315 {
    316 	int l;
    317 	Block *nbp;
    318 
    319 	QDEBUG checkb(bp, "copyblock 0");
    320 	nbp = allocb(count);
    321 	for(; count > 0 && bp != 0; bp = bp->next){
    322 		l = BLEN(bp);
    323 		if(l > count)
    324 			l = count;
    325 		memmove(nbp->wp, bp->rp, l);
    326 		nbp->wp += l;
    327 		count -= l;
    328 	}
    329 	if(count > 0){
    330 		memset(nbp->wp, 0, count);
    331 		nbp->wp += count;
    332 	}
    333 	copyblockcnt++;
    334 	QDEBUG checkb(nbp, "copyblock 1");
    335 
    336 	return nbp;
    337 }
    338 
    339 Block*
    340 adjustblock(Block* bp, int len)
    341 {
    342 	int n;
    343 	Block *nbp;
    344 
    345 	if(len < 0){
    346 		freeb(bp);
    347 		return nil;
    348 	}
    349 
    350 	if(bp->rp+len > bp->lim){
    351 		nbp = copyblock(bp, len);
    352 		freeblist(bp);
    353 		QDEBUG checkb(nbp, "adjustblock 1");
    354 
    355 		return nbp;
    356 	}
    357 
    358 	n = BLEN(bp);
    359 	if(len > n)
    360 		memset(bp->wp, 0, len-n);
    361 	bp->wp = bp->rp+len;
    362 	QDEBUG checkb(bp, "adjustblock 2");
    363 
    364 	return bp;
    365 }
    366 
    367 
    368 /*
    369  *  throw away up to count bytes from a
    370  *  list of blocks.  Return count of bytes
    371  *  thrown away.
    372  */
    373 int
    374 pullblock(Block **bph, int count)
    375 {
    376 	Block *bp;
    377 	int n, bytes;
    378 
    379 	bytes = 0;
    380 	if(bph == nil)
    381 		return 0;
    382 
    383 	while(*bph != nil && count != 0) {
    384 		bp = *bph;
    385 		n = BLEN(bp);
    386 		if(count < n)
    387 			n = count;
    388 		bytes += n;
    389 		count -= n;
    390 		bp->rp += n;
    391 		QDEBUG checkb(bp, "pullblock ");
    392 		if(BLEN(bp) == 0) {
    393 			*bph = bp->next;
    394 			bp->next = nil;
    395 			freeb(bp);
    396 		}
    397 	}
    398 	return bytes;
    399 }
    400 
    401 /*
    402  *  get next block from a queue, return null if nothing there
    403  */
    404 Block*
    405 qget(Queue *q)
    406 {
    407 	int dowakeup;
    408 	Block *b;
    409 
    410 	/* sync with qwrite */
    411 	ilock(&q->lk);
    412 
    413 	b = q->bfirst;
    414 	if(b == nil){
    415 		q->state |= Qstarve;
    416 		iunlock(&q->lk);
    417 		return nil;
    418 	}
    419 	q->bfirst = b->next;
    420 	b->next = 0;
    421 	q->len -= BALLOC(b);
    422 	q->dlen -= BLEN(b);
    423 	QDEBUG checkb(b, "qget");
    424 
    425 	/* if writer flow controlled, restart */
    426 	if((q->state & Qflow) && q->len < q->limit/2){
    427 		q->state &= ~Qflow;
    428 		dowakeup = 1;
    429 	} else
    430 		dowakeup = 0;
    431 
    432 	iunlock(&q->lk);
    433 
    434 	if(dowakeup)
    435 		wakeup(&q->wr);
    436 
    437 	return b;
    438 }
    439 
    440 /*
    441  *  throw away the next 'len' bytes in the queue
    442  */
    443 int
    444 qdiscard(Queue *q, int len)
    445 {
    446 	Block *b;
    447 	int dowakeup, n, sofar;
    448 
    449 	ilock(&q->lk);
    450 	for(sofar = 0; sofar < len; sofar += n){
    451 		b = q->bfirst;
    452 		if(b == nil)
    453 			break;
    454 		QDEBUG checkb(b, "qdiscard");
    455 		n = BLEN(b);
    456 		if(n <= len - sofar){
    457 			q->bfirst = b->next;
    458 			b->next = 0;
    459 			q->len -= BALLOC(b);
    460 			q->dlen -= BLEN(b);
    461 			freeb(b);
    462 		} else {
    463 			n = len - sofar;
    464 			b->rp += n;
    465 			q->dlen -= n;
    466 		}
    467 	}
    468 
    469 	/*
    470 	 *  if writer flow controlled, restart
    471 	 *
    472 	 *  This used to be
    473 	 *	q->len < q->limit/2
    474 	 *  but it slows down tcp too much for certain write sizes.
    475 	 *  I really don't understand it completely.  It may be
    476 	 *  due to the queue draining so fast that the transmission
    477 	 *  stalls waiting for the app to produce more data.  - presotto
    478 	 */
    479 	if((q->state & Qflow) && q->len < q->limit){
    480 		q->state &= ~Qflow;
    481 		dowakeup = 1;
    482 	} else
    483 		dowakeup = 0;
    484 
    485 	iunlock(&q->lk);
    486 
    487 	if(dowakeup)
    488 		wakeup(&q->wr);
    489 
    490 	return sofar;
    491 }
    492 
    493 /*
    494  *  Interrupt level copy out of a queue, return # bytes copied.
    495  */
    496 int
    497 qconsume(Queue *q, void *vp, int len)
    498 {
    499 	Block *b;
    500 	int n, dowakeup;
    501 	uchar *p = vp;
    502 	Block *tofree = nil;
    503 
    504 	/* sync with qwrite */
    505 	ilock(&q->lk);
    506 
    507 	for(;;) {
    508 		b = q->bfirst;
    509 		if(b == 0){
    510 			q->state |= Qstarve;
    511 			iunlock(&q->lk);
    512 			return -1;
    513 		}
    514 		QDEBUG checkb(b, "qconsume 1");
    515 
    516 		n = BLEN(b);
    517 		if(n > 0)
    518 			break;
    519 		q->bfirst = b->next;
    520 		q->len -= BALLOC(b);
    521 
    522 		/* remember to free this */
    523 		b->next = tofree;
    524 		tofree = b;
    525 	};
    526 
    527 	if(n < len)
    528 		len = n;
    529 	memmove(p, b->rp, len);
    530 	consumecnt += n;
    531 	b->rp += len;
    532 	q->dlen -= len;
    533 
    534 	/* discard the block if we're done with it */
    535 	if((q->state & Qmsg) || len == n){
    536 		q->bfirst = b->next;
    537 		b->next = 0;
    538 		q->len -= BALLOC(b);
    539 		q->dlen -= BLEN(b);
    540 
    541 		/* remember to free this */
    542 		b->next = tofree;
    543 		tofree = b;
    544 	}
    545 
    546 	/* if writer flow controlled, restart */
    547 	if((q->state & Qflow) && q->len < q->limit/2){
    548 		q->state &= ~Qflow;
    549 		dowakeup = 1;
    550 	} else
    551 		dowakeup = 0;
    552 
    553 	iunlock(&q->lk);
    554 
    555 	if(dowakeup)
    556 		wakeup(&q->wr);
    557 
    558 	if(tofree != nil)
    559 		freeblist(tofree);
    560 
    561 	return len;
    562 }
    563 
    564 int
    565 qpass(Queue *q, Block *b)
    566 {
    567 	int dlen, len, dowakeup;
    568 
    569 	/* sync with qread */
    570 	dowakeup = 0;
    571 	ilock(&q->lk);
    572 	if(q->len >= q->limit){
    573 		freeblist(b);
    574 		iunlock(&q->lk);
    575 		return -1;
    576 	}
    577 	if(q->state & Qclosed){
    578 		len = BALLOC(b);
    579 		freeblist(b);
    580 		iunlock(&q->lk);
    581 		return len;
    582 	}
    583 
    584 	/* add buffer to queue */
    585 	if(q->bfirst)
    586 		q->blast->next = b;
    587 	else
    588 		q->bfirst = b;
    589 	len = BALLOC(b);
    590 	dlen = BLEN(b);
    591 	QDEBUG checkb(b, "qpass");
    592 	while(b->next){
    593 		b = b->next;
    594 		QDEBUG checkb(b, "qpass");
    595 		len += BALLOC(b);
    596 		dlen += BLEN(b);
    597 	}
    598 	q->blast = b;
    599 	q->len += len;
    600 	q->dlen += dlen;
    601 
    602 	if(q->len >= q->limit/2)
    603 		q->state |= Qflow;
    604 
    605 	if(q->state & Qstarve){
    606 		q->state &= ~Qstarve;
    607 		dowakeup = 1;
    608 	}
    609 	iunlock(&q->lk);
    610 
    611 	if(dowakeup)
    612 		wakeup(&q->rr);
    613 
    614 	return len;
    615 }
    616 
    617 int
    618 qpassnolim(Queue *q, Block *b)
    619 {
    620 	int dlen, len, dowakeup;
    621 
    622 	/* sync with qread */
    623 	dowakeup = 0;
    624 	ilock(&q->lk);
    625 
    626 	if(q->state & Qclosed){
    627 		freeblist(b);
    628 		iunlock(&q->lk);
    629 		return BALLOC(b);
    630 	}
    631 
    632 	/* add buffer to queue */
    633 	if(q->bfirst)
    634 		q->blast->next = b;
    635 	else
    636 		q->bfirst = b;
    637 	len = BALLOC(b);
    638 	dlen = BLEN(b);
    639 	QDEBUG checkb(b, "qpass");
    640 	while(b->next){
    641 		b = b->next;
    642 		QDEBUG checkb(b, "qpass");
    643 		len += BALLOC(b);
    644 		dlen += BLEN(b);
    645 	}
    646 	q->blast = b;
    647 	q->len += len;
    648 	q->dlen += dlen;
    649 
    650 	if(q->len >= q->limit/2)
    651 		q->state |= Qflow;
    652 
    653 	if(q->state & Qstarve){
    654 		q->state &= ~Qstarve;
    655 		dowakeup = 1;
    656 	}
    657 	iunlock(&q->lk);
    658 
    659 	if(dowakeup)
    660 		wakeup(&q->rr);
    661 
    662 	return len;
    663 }
    664 
    665 /*
    666  *  if the allocated space is way out of line with the used
    667  *  space, reallocate to a smaller block
    668  */
    669 Block*
    670 packblock(Block *bp)
    671 {
    672 	Block **l, *nbp;
    673 	int n;
    674 
    675 	for(l = &bp; *l; l = &(*l)->next){
    676 		nbp = *l;
    677 		n = BLEN(nbp);
    678 		if((n<<2) < BALLOC(nbp)){
    679 			*l = allocb(n);
    680 			memmove((*l)->wp, nbp->rp, n);
    681 			(*l)->wp += n;
    682 			(*l)->next = nbp->next;
    683 			freeb(nbp);
    684 		}
    685 	}
    686 
    687 	return bp;
    688 }
    689 
    690 int
    691 qproduce(Queue *q, void *vp, int len)
    692 {
    693 	Block *b;
    694 	int dowakeup;
    695 	uchar *p = vp;
    696 
    697 	/* sync with qread */
    698 	dowakeup = 0;
    699 	ilock(&q->lk);
    700 
    701 	/* no waiting receivers, room in buffer? */
    702 	if(q->len >= q->limit){
    703 		q->state |= Qflow;
    704 		iunlock(&q->lk);
    705 		return -1;
    706 	}
    707 
    708 	/* save in buffer */
    709 	b = iallocb(len);
    710 	if(b == 0){
    711 		iunlock(&q->lk);
    712 		return 0;
    713 	}
    714 	memmove(b->wp, p, len);
    715 	producecnt += len;
    716 	b->wp += len;
    717 	if(q->bfirst)
    718 		q->blast->next = b;
    719 	else
    720 		q->bfirst = b;
    721 	q->blast = b;
    722 	/* b->next = 0; done by iallocb() */
    723 	q->len += BALLOC(b);
    724 	q->dlen += BLEN(b);
    725 	QDEBUG checkb(b, "qproduce");
    726 
    727 	if(q->state & Qstarve){
    728 		q->state &= ~Qstarve;
    729 		dowakeup = 1;
    730 	}
    731 
    732 	if(q->len >= q->limit)
    733 		q->state |= Qflow;
    734 	iunlock(&q->lk);
    735 
    736 	if(dowakeup)
    737 		wakeup(&q->rr);
    738 
    739 	return len;
    740 }
    741 
    742 /*
    743  *  copy from offset in the queue
    744  */
    745 Block*
    746 qcopy(Queue *q, int len, ulong offset)
    747 {
    748 	int sofar;
    749 	int n;
    750 	Block *b, *nb;
    751 	uchar *p;
    752 
    753 	nb = allocb(len);
    754 
    755 	ilock(&q->lk);
    756 
    757 	/* go to offset */
    758 	b = q->bfirst;
    759 	for(sofar = 0; ; sofar += n){
    760 		if(b == nil){
    761 			iunlock(&q->lk);
    762 			return nb;
    763 		}
    764 		n = BLEN(b);
    765 		if(sofar + n > offset){
    766 			p = b->rp + offset - sofar;
    767 			n -= offset - sofar;
    768 			break;
    769 		}
    770 		QDEBUG checkb(b, "qcopy");
    771 		b = b->next;
    772 	}
    773 
    774 	/* copy bytes from there */
    775 	for(sofar = 0; sofar < len;){
    776 		if(n > len - sofar)
    777 			n = len - sofar;
    778 		memmove(nb->wp, p, n);
    779 		qcopycnt += n;
    780 		sofar += n;
    781 		nb->wp += n;
    782 		b = b->next;
    783 		if(b == nil)
    784 			break;
    785 		n = BLEN(b);
    786 		p = b->rp;
    787 	}
    788 	iunlock(&q->lk);
    789 
    790 	return nb;
    791 }
    792 
    793 /*
    794  *  called by non-interrupt code
    795  */
    796 Queue*
    797 qopen(int limit, int msg, void (*kick)(void*), void *arg)
    798 {
    799 	Queue *q;
    800 
    801 	q = malloc(sizeof(Queue));
    802 	if(q == 0)
    803 		return 0;
    804 
    805 	q->limit = q->inilim = limit;
    806 	q->kick = kick;
    807 	q->arg = arg;
    808 	q->state = msg;
    809 	
    810 	q->state |= Qstarve;
    811 	q->eof = 0;
    812 	q->noblock = 0;
    813 
    814 	return q;
    815 }
    816 
    817 /* open a queue to be bypassed */
    818 Queue*
    819 qbypass(void (*bypass)(void*, Block*), void *arg)
    820 {
    821 	Queue *q;
    822 
    823 	q = malloc(sizeof(Queue));
    824 	if(q == 0)
    825 		return 0;
    826 
    827 	q->limit = 0;
    828 	q->arg = arg;
    829 	q->bypass = bypass;
    830 	q->state = 0;
    831 
    832 	return q;
    833 }
    834 
    835 static int
    836 notempty(void *a)
    837 {
    838 	Queue *q = a;
    839 
    840 	return (q->state & Qclosed) || q->bfirst != 0;
    841 }
    842 
    843 /*
    844  *  wait for the queue to be non-empty or closed.
    845  *  called with q ilocked.
    846  */
    847 static int
    848 qwait(Queue *q)
    849 {
    850 	/* wait for data */
    851 	for(;;){
    852 		if(q->bfirst != nil)
    853 			break;
    854 
    855 		if(q->state & Qclosed){
    856 			if(++q->eof > 3)
    857 				return -1;
    858 			if(*q->err && strcmp(q->err, Ehungup) != 0)
    859 				return -1;
    860 			return 0;
    861 		}
    862 
    863 		q->state |= Qstarve;	/* flag requesting producer to wake me */
    864 		iunlock(&q->lk);
    865 		sleep(&q->rr, notempty, q);
    866 		ilock(&q->lk);
    867 	}
    868 	return 1;
    869 }
    870 
    871 /*
    872  * add a block list to a queue
    873  */
    874 void
    875 qaddlist(Queue *q, Block *b)
    876 {
    877 	/* queue the block */
    878 	if(q->bfirst)
    879 		q->blast->next = b;
    880 	else
    881 		q->bfirst = b;
    882 	q->len += blockalloclen(b);
    883 	q->dlen += blocklen(b);
    884 	while(b->next)
    885 		b = b->next;
    886 	q->blast = b;
    887 }
    888 
    889 /*
    890  *  called with q ilocked
    891  */
    892 Block*
    893 qremove(Queue *q)
    894 {
    895 	Block *b;
    896 
    897 	b = q->bfirst;
    898 	if(b == nil)
    899 		return nil;
    900 	q->bfirst = b->next;
    901 	b->next = nil;
    902 	q->dlen -= BLEN(b);
    903 	q->len -= BALLOC(b);
    904 	QDEBUG checkb(b, "qremove");
    905 	return b;
    906 }
    907 
    908 /*
    909  *  copy the contents of a string of blocks into
    910  *  memory.  emptied blocks are freed.  return
    911  *  pointer to first unconsumed block.
    912  */
    913 Block*
    914 bl2mem(uchar *p, Block *b, int n)
    915 {
    916 	int i;
    917 	Block *next;
    918 
    919 	for(; b != nil; b = next){
    920 		i = BLEN(b);
    921 		if(i > n){
    922 			memmove(p, b->rp, n);
    923 			b->rp += n;
    924 			return b;
    925 		}
    926 		memmove(p, b->rp, i);
    927 		n -= i;
    928 		p += i;
    929 		b->rp += i;
    930 		next = b->next;
    931 		freeb(b);
    932 	}
    933 	return nil;
    934 }
    935 
    936 /*
    937  *  copy the contents of memory into a string of blocks.
    938  *  return nil on error.
    939  */
    940 Block*
    941 mem2bl(uchar *p, int len)
    942 {
    943 	int n;
    944 	Block *b, *first, **l;
    945 
    946 	first = nil;
    947 	l = &first;
    948 	if(waserror()){
    949 		freeblist(first);
    950 		nexterror();
    951 	}
    952 	do {
    953 		n = len;
    954 		if(n > Maxatomic)
    955 			n = Maxatomic;
    956 
    957 		*l = b = allocb(n);
    958 		memmove(b->wp, p, n);
    959 		b->wp += n;
    960 		p += n;
    961 		len -= n;
    962 		l = &b->next;
    963 	} while(len > 0);
    964 	poperror();
    965 
    966 	return first;
    967 }
    968 
    969 /*
    970  *  put a block back to the front of the queue
    971  *  called with q ilocked
    972  */
    973 void
    974 qputback(Queue *q, Block *b)
    975 {
    976 	b->next = q->bfirst;
    977 	if(q->bfirst == nil)
    978 		q->blast = b;
    979 	q->bfirst = b;
    980 	q->len += BALLOC(b);
    981 	q->dlen += BLEN(b);
    982 }
    983 
    984 /*
    985  *  flow control, get producer going again
    986  *  called with q ilocked
    987  */
    988 static void
    989 qwakeup_iunlock(Queue *q)
    990 {
    991 	int dowakeup = 0;
    992 
    993 	/* if writer flow controlled, restart */
    994 	if((q->state & Qflow) && q->len < q->limit/2){
    995 		q->state &= ~Qflow;
    996 		dowakeup = 1;
    997 	}
    998 
    999 	iunlock(&q->lk);
   1000 
   1001 	/* wakeup flow controlled writers */
   1002 	if(dowakeup){
   1003 		if(q->kick)
   1004 			q->kick(q->arg);
   1005 		wakeup(&q->wr);
   1006 	}
   1007 }
   1008 
   1009 /*
   1010  *  get next block from a queue (up to a limit)
   1011  */
   1012 Block*
   1013 qbread(Queue *q, int len)
   1014 {
   1015 	Block *b, *nb;
   1016 	int n;
   1017 
   1018 	qlock(&q->rlock);
   1019 	if(waserror()){
   1020 		qunlock(&q->rlock);
   1021 		nexterror();
   1022 	}
   1023 
   1024 	ilock(&q->lk);
   1025 	switch(qwait(q)){
   1026 	case 0:
   1027 		/* queue closed */
   1028 		iunlock(&q->lk);
   1029 		qunlock(&q->rlock);
   1030 		poperror();
   1031 		return nil;
   1032 	case -1:
   1033 		/* multiple reads on a closed queue */
   1034 		iunlock(&q->lk);
   1035 		error(q->err);
   1036 	}
   1037 
   1038 	/* if we get here, there's at least one block in the queue */
   1039 	b = qremove(q);
   1040 	n = BLEN(b);
   1041 
   1042 	/* split block if it's too big and this is not a message queue */
   1043 	nb = b;
   1044 	if(n > len){
   1045 		if((q->state&Qmsg) == 0){
   1046 			n -= len;
   1047 			b = allocb(n);
   1048 			memmove(b->wp, nb->rp+len, n);
   1049 			b->wp += n;
   1050 			qputback(q, b);
   1051 		}
   1052 		nb->wp = nb->rp + len;
   1053 	}
   1054 
   1055 	/* restart producer */
   1056 	qwakeup_iunlock(q);
   1057 
   1058 	poperror();
   1059 	qunlock(&q->rlock);
   1060 	return nb;
   1061 }
   1062 
   1063 /*
   1064  *  read a queue.  if no data is queued, post a Block
   1065  *  and wait on its Rendez.
   1066  */
   1067 long
   1068 qread(Queue *q, void *vp, int len)
   1069 {
   1070 	Block *b, *first, **l;
   1071 	int m, n;
   1072 
   1073 	qlock(&q->rlock);
   1074 	if(waserror()){
   1075 		qunlock(&q->rlock);
   1076 		nexterror();
   1077 	}
   1078 
   1079 	ilock(&q->lk);
   1080 again:
   1081 	switch(qwait(q)){
   1082 	case 0:
   1083 		/* queue closed */
   1084 		iunlock(&q->lk);
   1085 		qunlock(&q->rlock);
   1086 		poperror();
   1087 		return 0;
   1088 	case -1:
   1089 		/* multiple reads on a closed queue */
   1090 		iunlock(&q->lk);
   1091 		error(q->err);
   1092 	}
   1093 
   1094 	/* if we get here, there's at least one block in the queue */
   1095 	if(q->state & Qcoalesce){
   1096 		/* when coalescing, 0 length blocks just go away */
   1097 		b = q->bfirst;
   1098 		if(BLEN(b) <= 0){
   1099 			freeb(qremove(q));
   1100 			goto again;
   1101 		}
   1102 
   1103 		/*  grab the first block plus as many
   1104 		 *  following blocks as will completely
   1105 		 *  fit in the read.
   1106 		 */
   1107 		n = 0;
   1108 		l = &first;
   1109 		m = BLEN(b);
   1110 		for(;;) {
   1111 			*l = qremove(q);
   1112 			l = &b->next;
   1113 			n += m;
   1114 
   1115 			b = q->bfirst;
   1116 			if(b == nil)
   1117 				break;
   1118 			m = BLEN(b);
   1119 			if(n+m > len)
   1120 				break;
   1121 		}
   1122 	} else {
   1123 		first = qremove(q);
   1124 		n = BLEN(first);
   1125 	}
   1126 
   1127 	/* copy to user space outside of the ilock */
   1128 	iunlock(&q->lk);
   1129 	b = bl2mem(vp, first, len);
   1130 	ilock(&q->lk);
   1131 
   1132 	/* take care of any left over partial block */
   1133 	if(b != nil){
   1134 		n -= BLEN(b);
   1135 		if(q->state & Qmsg)
   1136 			freeb(b);
   1137 		else
   1138 			qputback(q, b);
   1139 	}
   1140 
   1141 	/* restart producer */
   1142 	qwakeup_iunlock(q);
   1143 
   1144 	poperror();
   1145 	qunlock(&q->rlock);
   1146 	return n;
   1147 }
   1148 
   1149 static int
   1150 qnotfull(void *a)
   1151 {
   1152 	Queue *q = a;
   1153 
   1154 	return q->len < q->limit || (q->state & Qclosed);
   1155 }
   1156 
   1157 ulong noblockcnt;
   1158 
   1159 /*
   1160  *  add a block to a queue obeying flow control
   1161  */
   1162 long
   1163 qbwrite(Queue *q, Block *b)
   1164 {
   1165 	int n, dowakeup;
   1166 	Proc *p;
   1167 
   1168 	n = BLEN(b);
   1169 
   1170 	if(q->bypass){
   1171 		(*q->bypass)(q->arg, b);
   1172 		return n;
   1173 	}
   1174 
   1175 	dowakeup = 0;
   1176 	qlock(&q->wlock);
   1177 	if(waserror()){
   1178 		if(b != nil)
   1179 			freeb(b);
   1180 		qunlock(&q->wlock);
   1181 		nexterror();
   1182 	}
   1183 
   1184 	ilock(&q->lk);
   1185 
   1186 	/* give up if the queue is closed */
   1187 	if(q->state & Qclosed){
   1188 		iunlock(&q->lk);
   1189 		error(q->err);
   1190 	}
   1191 
   1192 	/* if nonblocking, don't queue over the limit */
   1193 	if(q->len >= q->limit){
   1194 		if(q->noblock){
   1195 			iunlock(&q->lk);
   1196 			freeb(b);
   1197 			noblockcnt += n;
   1198 			qunlock(&q->wlock);
   1199 			poperror();
   1200 			return n;
   1201 		}
   1202 	}
   1203 
   1204 	/* queue the block */
   1205 	if(q->bfirst)
   1206 		q->blast->next = b;
   1207 	else
   1208 		q->bfirst = b;
   1209 	q->blast = b;
   1210 	b->next = 0;
   1211 	q->len += BALLOC(b);
   1212 	q->dlen += n;
   1213 	QDEBUG checkb(b, "qbwrite");
   1214 	b = nil;
   1215 
   1216 	/* make sure other end gets awakened */
   1217 	if(q->state & Qstarve){
   1218 		q->state &= ~Qstarve;
   1219 		dowakeup = 1;
   1220 	}
   1221 	iunlock(&q->lk);
   1222 
   1223 	/*  get output going again */
   1224 	if(q->kick && (dowakeup || (q->state&Qkick)))
   1225 		q->kick(q->arg);
   1226 
   1227 	/* wakeup anyone consuming at the other end */
   1228 	if(dowakeup){
   1229 		p = wakeup(&q->rr);
   1230 
   1231 		/* if we just wokeup a higher priority process, let it run */
   1232 		if(p != nil && p->priority > up->priority)
   1233 			sched();
   1234 	}
   1235 
   1236 	/*
   1237 	 *  flow control, wait for queue to get below the limit
   1238 	 *  before allowing the process to continue and queue
   1239 	 *  more.  We do this here so that postnote can only
   1240 	 *  interrupt us after the data has been queued.  This
   1241 	 *  means that things like 9p flushes and ssl messages
   1242 	 *  will not be disrupted by software interrupts.
   1243 	 *
   1244 	 *  Note - this is moderately dangerous since a process
   1245 	 *  that keeps getting interrupted and rewriting will
   1246 	 *  queue infinite crud.
   1247 	 */
   1248 	for(;;){
   1249 		if(q->noblock || qnotfull(q))
   1250 			break;
   1251 
   1252 		ilock(&q->lk);
   1253 		q->state |= Qflow;
   1254 		iunlock(&q->lk);
   1255 		sleep(&q->wr, qnotfull, q);
   1256 	}
   1257 	USED(b);
   1258 
   1259 	qunlock(&q->wlock);
   1260 	poperror();
   1261 	return n;
   1262 }
   1263 
   1264 /*
   1265  *  write to a queue.  only Maxatomic bytes at a time is atomic.
   1266  */
   1267 int
   1268 qwrite(Queue *q, void *vp, int len)
   1269 {
   1270 	int n, sofar;
   1271 	Block *b;
   1272 	uchar *p = vp;
   1273 
   1274 	QDEBUG if(!islo())
   1275 		print("qwrite hi %#p\n", getcallerpc(&q));
   1276 
   1277 	sofar = 0;
   1278 	do {
   1279 		n = len-sofar;
   1280 		if(n > Maxatomic)
   1281 			n = Maxatomic;
   1282 
   1283 		b = allocb(n);
   1284 		if(waserror()){
   1285 			freeb(b);
   1286 			nexterror();
   1287 		}
   1288 		memmove(b->wp, p+sofar, n);
   1289 		poperror();
   1290 		b->wp += n;
   1291 
   1292 		qbwrite(q, b);
   1293 
   1294 		sofar += n;
   1295 	} while(sofar < len && (q->state & Qmsg) == 0);
   1296 
   1297 	return len;
   1298 }
   1299 
   1300 /*
   1301  *  used by print() to write to a queue.  Since we may be splhi or not in
   1302  *  a process, don't qlock.
   1303  *
   1304  *  this routine merges adjacent blocks if block n+1 will fit into
   1305  *  the free space of block n.
   1306  */
   1307 int
   1308 qiwrite(Queue *q, void *vp, int len)
   1309 {
   1310 	int n, sofar, dowakeup;
   1311 	Block *b;
   1312 	uchar *p = vp;
   1313 
   1314 	dowakeup = 0;
   1315 
   1316 	sofar = 0;
   1317 	do {
   1318 		n = len-sofar;
   1319 		if(n > Maxatomic)
   1320 			n = Maxatomic;
   1321 
   1322 		b = iallocb(n);
   1323 		if(b == nil)
   1324 			break;
   1325 		memmove(b->wp, p+sofar, n);
   1326 		b->wp += n;
   1327 
   1328 		ilock(&q->lk);
   1329 
   1330 		/* we use an artificially high limit for kernel prints since anything
   1331 		 * over the limit gets dropped
   1332 		 */
   1333 		if(q->dlen >= 16*1024){
   1334 			iunlock(&q->lk);
   1335 			freeb(b);
   1336 			break;
   1337 		}
   1338 
   1339 		QDEBUG checkb(b, "qiwrite");
   1340 		if(q->bfirst)
   1341 			q->blast->next = b;
   1342 		else
   1343 			q->bfirst = b;
   1344 		q->blast = b;
   1345 		q->len += BALLOC(b);
   1346 		q->dlen += n;
   1347 
   1348 		if(q->state & Qstarve){
   1349 			q->state &= ~Qstarve;
   1350 			dowakeup = 1;
   1351 		}
   1352 
   1353 		iunlock(&q->lk);
   1354 
   1355 		if(dowakeup){
   1356 			if(q->kick)
   1357 				q->kick(q->arg);
   1358 			wakeup(&q->rr);
   1359 		}
   1360 
   1361 		sofar += n;
   1362 	} while(sofar < len && (q->state & Qmsg) == 0);
   1363 
   1364 	return sofar;
   1365 }
   1366 
   1367 /*
   1368  *  be extremely careful when calling this,
   1369  *  as there is no reference accounting
   1370  */
   1371 void
   1372 qfree(Queue *q)
   1373 {
   1374 	qclose(q);
   1375 	free(q);
   1376 }
   1377 
   1378 /*
   1379  *  Mark a queue as closed.  No further IO is permitted.
   1380  *  All blocks are released.
   1381  */
   1382 void
   1383 qclose(Queue *q)
   1384 {
   1385 	Block *bfirst;
   1386 
   1387 	if(q == nil)
   1388 		return;
   1389 
   1390 	/* mark it */
   1391 	ilock(&q->lk);
   1392 	q->state |= Qclosed;
   1393 	q->state &= ~(Qflow|Qstarve);
   1394 	strcpy(q->err, Ehungup);
   1395 	bfirst = q->bfirst;
   1396 	q->bfirst = 0;
   1397 	q->len = 0;
   1398 	q->dlen = 0;
   1399 	q->noblock = 0;
   1400 	iunlock(&q->lk);
   1401 
   1402 	/* free queued blocks */
   1403 	freeblist(bfirst);
   1404 
   1405 	/* wake up readers/writers */
   1406 	wakeup(&q->rr);
   1407 	wakeup(&q->wr);
   1408 }
   1409 
   1410 /*
   1411  *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued
   1412  *  blocks.
   1413  */
   1414 void
   1415 qhangup(Queue *q, char *msg)
   1416 {
   1417 	/* mark it */
   1418 	ilock(&q->lk);
   1419 	q->state |= Qclosed;
   1420 	if(msg == 0 || *msg == 0)
   1421 		strcpy(q->err, Ehungup);
   1422 	else
   1423 		strncpy(q->err, msg, ERRMAX-1);
   1424 	iunlock(&q->lk);
   1425 
   1426 	/* wake up readers/writers */
   1427 	wakeup(&q->rr);
   1428 	wakeup(&q->wr);
   1429 }
   1430 
   1431 /*
   1432  *  return non-zero if the q is hungup
   1433  */
   1434 int
   1435 qisclosed(Queue *q)
   1436 {
   1437 	return q->state & Qclosed;
   1438 }
   1439 
   1440 /*
   1441  *  mark a queue as no longer hung up
   1442  */
   1443 void
   1444 qreopen(Queue *q)
   1445 {
   1446 	ilock(&q->lk);
   1447 	q->state &= ~Qclosed;
   1448 	q->state |= Qstarve;
   1449 	q->eof = 0;
   1450 	q->limit = q->inilim;
   1451 	iunlock(&q->lk);
   1452 }
   1453 
   1454 /*
   1455  *  return bytes queued
   1456  */
   1457 int
   1458 qlen(Queue *q)
   1459 {
   1460 	return q->dlen;
   1461 }
   1462 
   1463 /*
   1464  * return space remaining before flow control
   1465  */
   1466 int
   1467 qwindow(Queue *q)
   1468 {
   1469 	int l;
   1470 
   1471 	l = q->limit - q->len;
   1472 	if(l < 0)
   1473 		l = 0;
   1474 	return l;
   1475 }
   1476 
   1477 /*
   1478  *  return true if we can read without blocking
   1479  */
   1480 int
   1481 qcanread(Queue *q)
   1482 {
   1483 	return q->bfirst!=0;
   1484 }
   1485 
   1486 /*
   1487  *  change queue limit
   1488  */
   1489 void
   1490 qsetlimit(Queue *q, int limit)
   1491 {
   1492 	q->limit = limit;
   1493 }
   1494 
   1495 /*
   1496  *  set blocking/nonblocking
   1497  */
   1498 void
   1499 qnoblock(Queue *q, int onoff)
   1500 {
   1501 	q->noblock = onoff;
   1502 }
   1503 
   1504 /*
   1505  *  flush the output queue
   1506  */
   1507 void
   1508 qflush(Queue *q)
   1509 {
   1510 	Block *bfirst;
   1511 
   1512 	/* mark it */
   1513 	ilock(&q->lk);
   1514 	bfirst = q->bfirst;
   1515 	q->bfirst = 0;
   1516 	q->len = 0;
   1517 	q->dlen = 0;
   1518 	iunlock(&q->lk);
   1519 
   1520 	/* free queued blocks */
   1521 	freeblist(bfirst);
   1522 
   1523 	/* wake up readers/writers */
   1524 	wakeup(&q->wr);
   1525 }
   1526 
   1527 int
   1528 qfull(Queue *q)
   1529 {
   1530 	return q->state & Qflow;
   1531 }
   1532 
   1533 int
   1534 qstate(Queue *q)
   1535 {
   1536 	return q->state;
   1537 }