1 /*
2 * Copyright (c) 2004-2016 Maxim Sobolev <sobomax@FreeBSD.org>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 */
26
27 #include <sys/cdefs.h>
28 #include <err.h>
29 #include <pthread.h>
30 #include <stdint.h>
31 #include <stdlib.h>
32
33 #if defined(MKUZ_DEBUG)
34 # include <assert.h>
35 #endif
36
37 #include "mkuzip.h"
38 #include "mkuz_fqueue.h"
39 #include "mkuz_conveyor.h"
40 #include "mkuz_blk.h"
41 #include "mkuz_blk_chain.h"
42
43 struct mkuz_fifo_queue *
mkuz_fqueue_ctor(int wakeup_len)44 mkuz_fqueue_ctor(int wakeup_len)
45 {
46 struct mkuz_fifo_queue *fqp;
47
48 fqp = mkuz_safe_zmalloc(sizeof(struct mkuz_fifo_queue));
49 fqp->wakeup_len = wakeup_len;
50 if (pthread_mutex_init(&fqp->mtx, NULL) != 0) {
51 errx(1, "pthread_mutex_init() failed");
52 }
53 if (pthread_cond_init(&fqp->cvar, NULL) != 0) {
54 errx(1, "pthread_cond_init() failed");
55 }
56 return (fqp);
57 }
58
59 void
mkuz_fqueue_enq(struct mkuz_fifo_queue * fqp,struct mkuz_blk * bp)60 mkuz_fqueue_enq(struct mkuz_fifo_queue *fqp, struct mkuz_blk *bp)
61 {
62 struct mkuz_bchain_link *ip;
63
64 ip = mkuz_safe_zmalloc(sizeof(struct mkuz_bchain_link));
65 ip->this = bp;
66
67 pthread_mutex_lock(&fqp->mtx);
68 if (fqp->first != NULL) {
69 fqp->first->prev = ip;
70 } else {
71 fqp->last = ip;
72 }
73 fqp->first = ip;
74 fqp->length += 1;
75 if (fqp->length >= fqp->wakeup_len) {
76 pthread_cond_signal(&fqp->cvar);
77 }
78 pthread_mutex_unlock(&fqp->mtx);
79 }
80
81 #if defined(NOTYET)
82 int
mkuz_fqueue_enq_all(struct mkuz_fifo_queue * fqp,struct mkuz_bchain_link * cip_f,struct mkuz_bchain_link * cip_l,int clen)83 mkuz_fqueue_enq_all(struct mkuz_fifo_queue *fqp, struct mkuz_bchain_link *cip_f,
84 struct mkuz_bchain_link *cip_l, int clen)
85 {
86 int rval;
87
88 pthread_mutex_lock(&fqp->mtx);
89 if (fqp->first != NULL) {
90 fqp->first->prev = cip_l;
91 } else {
92 fqp->last = cip_l;
93 }
94 fqp->first = cip_f;
95 fqp->length += clen;
96 rval = fqp->length;
97 if (fqp->length >= fqp->wakeup_len) {
98 pthread_cond_signal(&fqp->cvar);
99 }
100 pthread_mutex_unlock(&fqp->mtx);
101 return (rval);
102 }
103 #endif
104
105 static int
mkuz_fqueue_check(struct mkuz_fifo_queue * fqp,cmp_cb_t cmp_cb,void * cap)106 mkuz_fqueue_check(struct mkuz_fifo_queue *fqp, cmp_cb_t cmp_cb, void *cap)
107 {
108 struct mkuz_bchain_link *ip;
109
110 for (ip = fqp->last; ip != NULL; ip = ip->prev) {
111 if (cmp_cb(ip->this, cap)) {
112 return (1);
113 }
114 }
115 return (0);
116 }
117
118 struct mkuz_blk *
mkuz_fqueue_deq_when(struct mkuz_fifo_queue * fqp,cmp_cb_t cmp_cb,void * cap)119 mkuz_fqueue_deq_when(struct mkuz_fifo_queue *fqp, cmp_cb_t cmp_cb, void *cap)
120 {
121 struct mkuz_bchain_link *ip, *newlast, *newfirst, *mip;
122 struct mkuz_blk *bp;
123
124 pthread_mutex_lock(&fqp->mtx);
125 while (fqp->last == NULL || !mkuz_fqueue_check(fqp, cmp_cb, cap)) {
126 pthread_cond_wait(&fqp->cvar, &fqp->mtx);
127 }
128 if (cmp_cb(fqp->last->this, cap)) {
129 mip = fqp->last;
130 fqp->last = mip->prev;
131 if (fqp->last == NULL) {
132 #if defined(MKUZ_DEBUG)
133 assert(fqp->length == 1);
134 #endif
135 fqp->first = NULL;
136 }
137 } else {
138 #if defined(MKUZ_DEBUG)
139 assert(fqp->length > 1);
140 #endif
141 newfirst = newlast = fqp->last;
142 mip = NULL;
143 for (ip = fqp->last->prev; ip != NULL; ip = ip->prev) {
144 if (cmp_cb(ip->this, cap)) {
145 mip = ip;
146 continue;
147 }
148 newfirst->prev = ip;
149 newfirst = ip;
150 }
151 newfirst->prev = NULL;
152 fqp->first = newfirst;
153 fqp->last = newlast;
154 }
155 fqp->length -= 1;
156 pthread_mutex_unlock(&fqp->mtx);
157 bp = mip->this;
158 free(mip);
159
160 return bp;
161 }
162
163 struct mkuz_blk *
mkuz_fqueue_deq(struct mkuz_fifo_queue * fqp)164 mkuz_fqueue_deq(struct mkuz_fifo_queue *fqp)
165 {
166 struct mkuz_bchain_link *ip;
167 struct mkuz_blk *bp;
168
169 pthread_mutex_lock(&fqp->mtx);
170 while (fqp->last == NULL) {
171 pthread_cond_wait(&fqp->cvar, &fqp->mtx);
172 }
173 #if defined(MKUZ_DEBUG)
174 assert(fqp->length > 0);
175 #endif
176 ip = fqp->last;
177 fqp->last = ip->prev;
178 if (fqp->last == NULL) {
179 #if defined(MKUZ_DEBUG)
180 assert(fqp->length == 1);
181 #endif
182 fqp->first = NULL;
183 }
184 fqp->length -= 1;
185 pthread_mutex_unlock(&fqp->mtx);
186 bp = ip->this;
187 free(ip);
188
189 return bp;
190 }
191
192 #if defined(NOTYET)
193 struct mkuz_bchain_link *
mkuz_fqueue_deq_all(struct mkuz_fifo_queue * fqp,int * rclen)194 mkuz_fqueue_deq_all(struct mkuz_fifo_queue *fqp, int *rclen)
195 {
196 struct mkuz_bchain_link *rchain;
197
198 pthread_mutex_lock(&fqp->mtx);
199 while (fqp->last == NULL) {
200 pthread_cond_wait(&fqp->cvar, &fqp->mtx);
201 }
202 #if defined(MKUZ_DEBUG)
203 assert(fqp->length > 0);
204 #endif
205 rchain = fqp->last;
206 fqp->first = fqp->last = NULL;
207 *rclen = fqp->length;
208 fqp->length = 0;
209 pthread_mutex_unlock(&fqp->mtx);
210 return (rchain);
211 }
212 #endif
213