1 /* 2 * Copyright (c) 2004-2016 Maxim Sobolev <sobomax@FreeBSD.org> 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 */ 26 27 #include <sys/cdefs.h> 28 #include <err.h> 29 #include <pthread.h> 30 #include <stdint.h> 31 #include <stdlib.h> 32 33 #if defined(MKUZ_DEBUG) 34 # include <assert.h> 35 #endif 36 37 #include "mkuzip.h" 38 #include "mkuz_fqueue.h" 39 #include "mkuz_conveyor.h" 40 #include "mkuz_blk.h" 41 #include "mkuz_blk_chain.h" 42 43 struct mkuz_fifo_queue * 44 mkuz_fqueue_ctor(int wakeup_len) 45 { 46 struct mkuz_fifo_queue *fqp; 47 48 fqp = mkuz_safe_zmalloc(sizeof(struct mkuz_fifo_queue)); 49 fqp->wakeup_len = wakeup_len; 50 if (pthread_mutex_init(&fqp->mtx, NULL) != 0) { 51 errx(1, "pthread_mutex_init() failed"); 52 } 53 if (pthread_cond_init(&fqp->cvar, NULL) != 0) { 54 errx(1, "pthread_cond_init() failed"); 55 } 56 return (fqp); 57 } 58 59 void 60 mkuz_fqueue_enq(struct mkuz_fifo_queue *fqp, struct mkuz_blk *bp) 61 { 62 struct mkuz_bchain_link *ip; 63 64 ip = mkuz_safe_zmalloc(sizeof(struct mkuz_bchain_link)); 65 ip->this = bp; 66 67 pthread_mutex_lock(&fqp->mtx); 68 if (fqp->first != NULL) { 69 fqp->first->prev = ip; 70 } else { 71 fqp->last = ip; 72 } 73 fqp->first = ip; 74 fqp->length += 1; 75 if (fqp->length >= fqp->wakeup_len) { 76 pthread_cond_signal(&fqp->cvar); 77 } 78 pthread_mutex_unlock(&fqp->mtx); 79 } 80 81 #if defined(NOTYET) 82 int 83 mkuz_fqueue_enq_all(struct mkuz_fifo_queue *fqp, struct mkuz_bchain_link *cip_f, 84 struct mkuz_bchain_link *cip_l, int clen) 85 { 86 int rval; 87 88 pthread_mutex_lock(&fqp->mtx); 89 if (fqp->first != NULL) { 90 fqp->first->prev = cip_l; 91 } else { 92 fqp->last = cip_l; 93 } 94 fqp->first = cip_f; 95 fqp->length += clen; 96 rval = fqp->length; 97 if (fqp->length >= fqp->wakeup_len) { 98 pthread_cond_signal(&fqp->cvar); 99 } 100 pthread_mutex_unlock(&fqp->mtx); 101 return (rval); 102 } 103 #endif 104 105 static int 106 mkuz_fqueue_check(struct mkuz_fifo_queue *fqp, cmp_cb_t cmp_cb, void *cap) 107 { 108 struct mkuz_bchain_link *ip; 109 110 for (ip = fqp->last; ip != NULL; ip = ip->prev) { 111 if (cmp_cb(ip->this, cap)) { 112 return (1); 113 } 114 } 115 return (0); 116 } 117 118 struct mkuz_blk * 119 mkuz_fqueue_deq_when(struct mkuz_fifo_queue *fqp, cmp_cb_t cmp_cb, void *cap) 120 { 121 struct mkuz_bchain_link *ip, *newlast, *newfirst, *mip; 122 struct mkuz_blk *bp; 123 124 pthread_mutex_lock(&fqp->mtx); 125 while (fqp->last == NULL || !mkuz_fqueue_check(fqp, cmp_cb, cap)) { 126 pthread_cond_wait(&fqp->cvar, &fqp->mtx); 127 } 128 if (cmp_cb(fqp->last->this, cap)) { 129 mip = fqp->last; 130 fqp->last = mip->prev; 131 if (fqp->last == NULL) { 132 #if defined(MKUZ_DEBUG) 133 assert(fqp->length == 1); 134 #endif 135 fqp->first = NULL; 136 } 137 } else { 138 #if defined(MKUZ_DEBUG) 139 assert(fqp->length > 1); 140 #endif 141 newfirst = newlast = fqp->last; 142 mip = NULL; 143 for (ip = fqp->last->prev; ip != NULL; ip = ip->prev) { 144 if (cmp_cb(ip->this, cap)) { 145 mip = ip; 146 continue; 147 } 148 newfirst->prev = ip; 149 newfirst = ip; 150 } 151 newfirst->prev = NULL; 152 fqp->first = newfirst; 153 fqp->last = newlast; 154 } 155 fqp->length -= 1; 156 pthread_mutex_unlock(&fqp->mtx); 157 bp = mip->this; 158 free(mip); 159 160 return bp; 161 } 162 163 struct mkuz_blk * 164 mkuz_fqueue_deq(struct mkuz_fifo_queue *fqp) 165 { 166 struct mkuz_bchain_link *ip; 167 struct mkuz_blk *bp; 168 169 pthread_mutex_lock(&fqp->mtx); 170 while (fqp->last == NULL) { 171 pthread_cond_wait(&fqp->cvar, &fqp->mtx); 172 } 173 #if defined(MKUZ_DEBUG) 174 assert(fqp->length > 0); 175 #endif 176 ip = fqp->last; 177 fqp->last = ip->prev; 178 if (fqp->last == NULL) { 179 #if defined(MKUZ_DEBUG) 180 assert(fqp->length == 1); 181 #endif 182 fqp->first = NULL; 183 } 184 fqp->length -= 1; 185 pthread_mutex_unlock(&fqp->mtx); 186 bp = ip->this; 187 free(ip); 188 189 return bp; 190 } 191 192 #if defined(NOTYET) 193 struct mkuz_bchain_link * 194 mkuz_fqueue_deq_all(struct mkuz_fifo_queue *fqp, int *rclen) 195 { 196 struct mkuz_bchain_link *rchain; 197 198 pthread_mutex_lock(&fqp->mtx); 199 while (fqp->last == NULL) { 200 pthread_cond_wait(&fqp->cvar, &fqp->mtx); 201 } 202 #if defined(MKUZ_DEBUG) 203 assert(fqp->length > 0); 204 #endif 205 rchain = fqp->last; 206 fqp->first = fqp->last = NULL; 207 *rclen = fqp->length; 208 fqp->length = 0; 209 pthread_mutex_unlock(&fqp->mtx); 210 return (rchain); 211 } 212 #endif 213