xref: /illumos-gate/usr/src/uts/common/fs/zfs/bqueue.c (revision a2cdcdd2)
1*a2cdcdd2SPaul Dagnelie /*
2*a2cdcdd2SPaul Dagnelie  * CDDL HEADER START
3*a2cdcdd2SPaul Dagnelie  *
4*a2cdcdd2SPaul Dagnelie  * This file and its contents are supplied under the terms of the
5*a2cdcdd2SPaul Dagnelie  * Common Development and Distribution License ("CDDL"), version 1.0.
6*a2cdcdd2SPaul Dagnelie  * You may only use this file in accordance with the terms of version
7*a2cdcdd2SPaul Dagnelie  * 1.0 of the CDDL.
8*a2cdcdd2SPaul Dagnelie  *
9*a2cdcdd2SPaul Dagnelie  * A full copy of the text of the CDDL should have accompanied this
10*a2cdcdd2SPaul Dagnelie  * source.  A copy of the CDDL is also available via the Internet at
11*a2cdcdd2SPaul Dagnelie  * http://www.illumos.org/license/CDDL.
12*a2cdcdd2SPaul Dagnelie  *
13*a2cdcdd2SPaul Dagnelie  * CDDL HEADER END
14*a2cdcdd2SPaul Dagnelie  */
15*a2cdcdd2SPaul Dagnelie /*
16*a2cdcdd2SPaul Dagnelie  * Copyright (c) 2014 by Delphix. All rights reserved.
17*a2cdcdd2SPaul Dagnelie  */
18*a2cdcdd2SPaul Dagnelie 
19*a2cdcdd2SPaul Dagnelie #include	<sys/bqueue.h>
20*a2cdcdd2SPaul Dagnelie #include	<sys/zfs_context.h>
21*a2cdcdd2SPaul Dagnelie 
22*a2cdcdd2SPaul Dagnelie static inline bqueue_node_t *
obj2node(bqueue_t * q,void * data)23*a2cdcdd2SPaul Dagnelie obj2node(bqueue_t *q, void *data)
24*a2cdcdd2SPaul Dagnelie {
25*a2cdcdd2SPaul Dagnelie 	return ((bqueue_node_t *)((char *)data + q->bq_node_offset));
26*a2cdcdd2SPaul Dagnelie }
27*a2cdcdd2SPaul Dagnelie 
28*a2cdcdd2SPaul Dagnelie /*
29*a2cdcdd2SPaul Dagnelie  * Initialize a blocking queue  The maximum capacity of the queue is set to
30*a2cdcdd2SPaul Dagnelie  * size.  Types that want to be stored in a bqueue must contain a bqueue_node_t,
31*a2cdcdd2SPaul Dagnelie  * and offset should give its offset from the start of the struct.  Return 0 on
32*a2cdcdd2SPaul Dagnelie  * success, or -1 on failure.
33*a2cdcdd2SPaul Dagnelie  */
34*a2cdcdd2SPaul Dagnelie int
bqueue_init(bqueue_t * q,uint64_t size,size_t node_offset)35*a2cdcdd2SPaul Dagnelie bqueue_init(bqueue_t *q, uint64_t size, size_t node_offset)
36*a2cdcdd2SPaul Dagnelie {
37*a2cdcdd2SPaul Dagnelie 	list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t),
38*a2cdcdd2SPaul Dagnelie 	    node_offset + offsetof(bqueue_node_t, bqn_node));
39*a2cdcdd2SPaul Dagnelie 	cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL);
40*a2cdcdd2SPaul Dagnelie 	cv_init(&q->bq_pop_cv, NULL, CV_DEFAULT, NULL);
41*a2cdcdd2SPaul Dagnelie 	mutex_init(&q->bq_lock, NULL, MUTEX_DEFAULT, NULL);
42*a2cdcdd2SPaul Dagnelie 	q->bq_node_offset = node_offset;
43*a2cdcdd2SPaul Dagnelie 	q->bq_size = 0;
44*a2cdcdd2SPaul Dagnelie 	q->bq_maxsize = size;
45*a2cdcdd2SPaul Dagnelie 	return (0);
46*a2cdcdd2SPaul Dagnelie }
47*a2cdcdd2SPaul Dagnelie 
48*a2cdcdd2SPaul Dagnelie /*
49*a2cdcdd2SPaul Dagnelie  * Destroy a blocking queue.  This function asserts that there are no
50*a2cdcdd2SPaul Dagnelie  * elements in the queue, and no one is blocked on the condition
51*a2cdcdd2SPaul Dagnelie  * variables.
52*a2cdcdd2SPaul Dagnelie  */
53*a2cdcdd2SPaul Dagnelie void
bqueue_destroy(bqueue_t * q)54*a2cdcdd2SPaul Dagnelie bqueue_destroy(bqueue_t *q)
55*a2cdcdd2SPaul Dagnelie {
56*a2cdcdd2SPaul Dagnelie 	ASSERT0(q->bq_size);
57*a2cdcdd2SPaul Dagnelie 	cv_destroy(&q->bq_add_cv);
58*a2cdcdd2SPaul Dagnelie 	cv_destroy(&q->bq_pop_cv);
59*a2cdcdd2SPaul Dagnelie 	mutex_destroy(&q->bq_lock);
60*a2cdcdd2SPaul Dagnelie 	list_destroy(&q->bq_list);
61*a2cdcdd2SPaul Dagnelie }
62*a2cdcdd2SPaul Dagnelie 
63*a2cdcdd2SPaul Dagnelie /*
64*a2cdcdd2SPaul Dagnelie  * Add data to q, consuming size units of capacity.  If there is insufficient
65*a2cdcdd2SPaul Dagnelie  * capacity to consume size units, block until capacity exists.  Asserts size is
66*a2cdcdd2SPaul Dagnelie  * > 0.
67*a2cdcdd2SPaul Dagnelie  */
68*a2cdcdd2SPaul Dagnelie void
bqueue_enqueue(bqueue_t * q,void * data,uint64_t item_size)69*a2cdcdd2SPaul Dagnelie bqueue_enqueue(bqueue_t *q, void *data, uint64_t item_size)
70*a2cdcdd2SPaul Dagnelie {
71*a2cdcdd2SPaul Dagnelie 	ASSERT3U(item_size, >, 0);
72*a2cdcdd2SPaul Dagnelie 	ASSERT3U(item_size, <, q->bq_maxsize);
73*a2cdcdd2SPaul Dagnelie 	mutex_enter(&q->bq_lock);
74*a2cdcdd2SPaul Dagnelie 	obj2node(q, data)->bqn_size = item_size;
75*a2cdcdd2SPaul Dagnelie 	while (q->bq_size + item_size > q->bq_maxsize) {
76*a2cdcdd2SPaul Dagnelie 		cv_wait(&q->bq_add_cv, &q->bq_lock);
77*a2cdcdd2SPaul Dagnelie 	}
78*a2cdcdd2SPaul Dagnelie 	q->bq_size += item_size;
79*a2cdcdd2SPaul Dagnelie 	list_insert_tail(&q->bq_list, data);
80*a2cdcdd2SPaul Dagnelie 	cv_signal(&q->bq_pop_cv);
81*a2cdcdd2SPaul Dagnelie 	mutex_exit(&q->bq_lock);
82*a2cdcdd2SPaul Dagnelie }
83*a2cdcdd2SPaul Dagnelie /*
84*a2cdcdd2SPaul Dagnelie  * Take the first element off of q.  If there are no elements on the queue, wait
85*a2cdcdd2SPaul Dagnelie  * until one is put there.  Return the removed element.
86*a2cdcdd2SPaul Dagnelie  */
87*a2cdcdd2SPaul Dagnelie void *
bqueue_dequeue(bqueue_t * q)88*a2cdcdd2SPaul Dagnelie bqueue_dequeue(bqueue_t *q)
89*a2cdcdd2SPaul Dagnelie {
90*a2cdcdd2SPaul Dagnelie 	void *ret;
91*a2cdcdd2SPaul Dagnelie 	uint64_t item_size;
92*a2cdcdd2SPaul Dagnelie 	mutex_enter(&q->bq_lock);
93*a2cdcdd2SPaul Dagnelie 	while (q->bq_size == 0) {
94*a2cdcdd2SPaul Dagnelie 		cv_wait(&q->bq_pop_cv, &q->bq_lock);
95*a2cdcdd2SPaul Dagnelie 	}
96*a2cdcdd2SPaul Dagnelie 	ret = list_remove_head(&q->bq_list);
97*a2cdcdd2SPaul Dagnelie 	item_size = obj2node(q, ret)->bqn_size;
98*a2cdcdd2SPaul Dagnelie 	q->bq_size -= item_size;
99*a2cdcdd2SPaul Dagnelie 	mutex_exit(&q->bq_lock);
100*a2cdcdd2SPaul Dagnelie 	cv_signal(&q->bq_add_cv);
101*a2cdcdd2SPaul Dagnelie 	return (ret);
102*a2cdcdd2SPaul Dagnelie }
103*a2cdcdd2SPaul Dagnelie 
104*a2cdcdd2SPaul Dagnelie /*
105*a2cdcdd2SPaul Dagnelie  * Returns true if the space used is 0.
106*a2cdcdd2SPaul Dagnelie  */
107*a2cdcdd2SPaul Dagnelie boolean_t
bqueue_empty(bqueue_t * q)108*a2cdcdd2SPaul Dagnelie bqueue_empty(bqueue_t *q)
109*a2cdcdd2SPaul Dagnelie {
110*a2cdcdd2SPaul Dagnelie 	return (q->bq_size == 0);
111*a2cdcdd2SPaul Dagnelie }
112