xref: /illumos-gate/usr/src/uts/common/fs/zfs/bqueue.c (revision a2cdcdd2)
1 /*
2  * CDDL HEADER START
3  *
4  * This file and its contents are supplied under the terms of the
5  * Common Development and Distribution License ("CDDL"), version 1.0.
6  * You may only use this file in accordance with the terms of version
7  * 1.0 of the CDDL.
8  *
9  * A full copy of the text of the CDDL should have accompanied this
10  * source.  A copy of the CDDL is also available via the Internet at
11  * http://www.illumos.org/license/CDDL.
12  *
13  * CDDL HEADER END
14  */
15 /*
16  * Copyright (c) 2014 by Delphix. All rights reserved.
17  */
18 
19 #include	<sys/bqueue.h>
20 #include	<sys/zfs_context.h>
21 
22 static inline bqueue_node_t *
obj2node(bqueue_t * q,void * data)23 obj2node(bqueue_t *q, void *data)
24 {
25 	return ((bqueue_node_t *)((char *)data + q->bq_node_offset));
26 }
27 
28 /*
29  * Initialize a blocking queue  The maximum capacity of the queue is set to
30  * size.  Types that want to be stored in a bqueue must contain a bqueue_node_t,
31  * and offset should give its offset from the start of the struct.  Return 0 on
32  * success, or -1 on failure.
33  */
34 int
bqueue_init(bqueue_t * q,uint64_t size,size_t node_offset)35 bqueue_init(bqueue_t *q, uint64_t size, size_t node_offset)
36 {
37 	list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t),
38 	    node_offset + offsetof(bqueue_node_t, bqn_node));
39 	cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL);
40 	cv_init(&q->bq_pop_cv, NULL, CV_DEFAULT, NULL);
41 	mutex_init(&q->bq_lock, NULL, MUTEX_DEFAULT, NULL);
42 	q->bq_node_offset = node_offset;
43 	q->bq_size = 0;
44 	q->bq_maxsize = size;
45 	return (0);
46 }
47 
48 /*
49  * Destroy a blocking queue.  This function asserts that there are no
50  * elements in the queue, and no one is blocked on the condition
51  * variables.
52  */
53 void
bqueue_destroy(bqueue_t * q)54 bqueue_destroy(bqueue_t *q)
55 {
56 	ASSERT0(q->bq_size);
57 	cv_destroy(&q->bq_add_cv);
58 	cv_destroy(&q->bq_pop_cv);
59 	mutex_destroy(&q->bq_lock);
60 	list_destroy(&q->bq_list);
61 }
62 
63 /*
64  * Add data to q, consuming size units of capacity.  If there is insufficient
65  * capacity to consume size units, block until capacity exists.  Asserts size is
66  * > 0.
67  */
68 void
bqueue_enqueue(bqueue_t * q,void * data,uint64_t item_size)69 bqueue_enqueue(bqueue_t *q, void *data, uint64_t item_size)
70 {
71 	ASSERT3U(item_size, >, 0);
72 	ASSERT3U(item_size, <, q->bq_maxsize);
73 	mutex_enter(&q->bq_lock);
74 	obj2node(q, data)->bqn_size = item_size;
75 	while (q->bq_size + item_size > q->bq_maxsize) {
76 		cv_wait(&q->bq_add_cv, &q->bq_lock);
77 	}
78 	q->bq_size += item_size;
79 	list_insert_tail(&q->bq_list, data);
80 	cv_signal(&q->bq_pop_cv);
81 	mutex_exit(&q->bq_lock);
82 }
83 /*
84  * Take the first element off of q.  If there are no elements on the queue, wait
85  * until one is put there.  Return the removed element.
86  */
87 void *
bqueue_dequeue(bqueue_t * q)88 bqueue_dequeue(bqueue_t *q)
89 {
90 	void *ret;
91 	uint64_t item_size;
92 	mutex_enter(&q->bq_lock);
93 	while (q->bq_size == 0) {
94 		cv_wait(&q->bq_pop_cv, &q->bq_lock);
95 	}
96 	ret = list_remove_head(&q->bq_list);
97 	item_size = obj2node(q, ret)->bqn_size;
98 	q->bq_size -= item_size;
99 	mutex_exit(&q->bq_lock);
100 	cv_signal(&q->bq_add_cv);
101 	return (ret);
102 }
103 
104 /*
105  * Returns true if the space used is 0.
106  */
107 boolean_t
bqueue_empty(bqueue_t * q)108 bqueue_empty(bqueue_t *q)
109 {
110 	return (q->bq_size == 0);
111 }
112