1 /*- 2 * Copyright (c) 2005-2008 Poul-Henning Kamp 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 * 26 * $FreeBSD$ 27 */ 28 29 #include <assert.h> 30 #include <stdio.h> 31 #include <string.h> 32 #include <stdlib.h> 33 #include <unistd.h> 34 #include <time.h> 35 #include <sys/endian.h> 36 37 #include <zlib.h> 38 39 #include "fifolog.h" 40 #include "libfifolog.h" 41 #include "libfifolog_int.h" 42 #include "fifolog_write.h" 43 #include "miniobj.h" 44 45 #define ALLOC(ptr, size) do { \ 46 (*(ptr)) = calloc(size, 1); \ 47 assert(*(ptr) != NULL); \ 48 } while (0) 49 50 51 const char *fifolog_write_statnames[] = { 52 [FIFOLOG_PT_BYTES_PRE] = "Bytes before compression", 53 [FIFOLOG_PT_BYTES_POST] = "Bytes after compression", 54 [FIFOLOG_PT_WRITES] = "Writes", 55 [FIFOLOG_PT_FLUSH] = "Flushes", 56 [FIFOLOG_PT_SYNC] = "Syncs", 57 [FIFOLOG_PT_RUNTIME] = "Runtime" 58 }; 59 60 /* 61 * Check that everything is all right 62 */ 63 static void 64 fifolog_write_assert(const struct fifolog_writer *f) 65 { 66 67 CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC); 68 assert(f->iptr == f->ff->zs->next_in + f->ff->zs->avail_in); 69 assert(f->ff->zs->next_out + f->ff->zs->avail_out == \ 70 f->ff->recbuf + f->ff->recsize); 71 } 72 73 struct fifolog_writer * 74 fifolog_write_new(void) 75 { 76 struct fifolog_writer *f; 77 78 ALLOC(&f, sizeof *f); 79 f->magic = FIFOLOG_WRITER_MAGIC; 80 return (f); 81 } 82 83 void 84 fifolog_write_destroy(struct fifolog_writer *f) 85 { 86 CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC); 87 free(f); 88 } 89 90 void 91 fifolog_write_close(struct fifolog_writer *f) 92 { 93 94 CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC); 95 fifolog_int_close(&f->ff); 96 free(f->ff); 97 if (f->ibuf != NULL) 98 free(f->ibuf); 99 free(f); 100 } 101 102 static void 103 fifo_prepobuf(struct fifolog_writer *f, time_t now, int flag) 104 { 105 106 memset(f->ff->recbuf, 0, f->ff->recsize); 107 f->ff->zs->next_out = f->ff->recbuf + 5; 108 f->ff->zs->avail_out = f->ff->recsize - 5; 109 if (f->recno == 0 && f->seq == 0) { 110 srandomdev(); 111 do { 112 f->seq = random(); 113 } while (f->seq == 0); 114 } 115 be32enc(f->ff->recbuf, f->seq++); 116 f->ff->recbuf[4] = f->flag; 117 f->flag = 0; 118 if (flag) { 119 f->ff->recbuf[4] |= FIFOLOG_FLG_SYNC; 120 be32enc(f->ff->recbuf + 5, (u_int)now); 121 f->ff->zs->next_out += 4; 122 f->ff->zs->avail_out -= 4; 123 } 124 fifolog_write_assert(f); 125 } 126 127 const char * 128 fifolog_write_open(struct fifolog_writer *f, const char *fn, unsigned writerate, unsigned syncrate, int compression) 129 { 130 const char *es; 131 int i; 132 time_t now; 133 off_t o; 134 135 CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC); 136 137 /* Check for legal compression value */ 138 if (compression < Z_DEFAULT_COMPRESSION || 139 compression > Z_BEST_COMPRESSION) 140 return ("Illegal compression value"); 141 142 f->writerate = writerate; 143 f->syncrate = syncrate; 144 f->compression = compression; 145 146 /* Reset statistics */ 147 memset(f->cnt, 0, sizeof f->cnt); 148 149 es = fifolog_int_open(&f->ff, fn, 1); 150 if (es != NULL) 151 return (es); 152 es = fifolog_int_findend(f->ff, &o); 153 if (es != NULL) 154 return (es); 155 i = fifolog_int_read(f->ff, o); 156 if (i) 157 return ("Read error, looking for seq"); 158 f->seq = be32dec(f->ff->recbuf); 159 if (f->seq == 0) { 160 /* Empty fifolog */ 161 f->seq = random(); 162 } else { 163 f->recno = o + 1; 164 f->seq++; 165 } 166 167 f->ibufsize = 32768; 168 ALLOC(&f->ibuf, f->ibufsize); 169 f->iptr = f->ibuf; 170 f->ff->zs->next_in = f->iptr; 171 i = deflateInit(f->ff->zs, (int)f->compression); 172 assert(i == Z_OK); 173 174 f->flag |= FIFOLOG_FLG_RESTART; 175 176 time(&now); 177 fifo_prepobuf(f, now, 1); 178 f->starttime = now; 179 180 fifolog_write_assert(f); 181 return (NULL); 182 } 183 184 static void 185 fifo_writerec(struct fifolog_writer *f) 186 { 187 int i; 188 time_t t; 189 190 fifolog_write_assert(f); 191 f->writes_since_sync++; 192 193 assert(f->recno < f->ff->logsize); 194 f->cnt[FIFOLOG_PT_BYTES_POST] += f->ff->recsize - f->ff->zs->avail_out; 195 if (f->ff->zs->avail_out == 0) { 196 /* nothing */ 197 } else if (f->ff->zs->avail_out <= 255) { 198 f->ff->recbuf[f->ff->recsize - 1] = 199 (u_char)f->ff->zs->avail_out; 200 f->ff->recbuf[4] |= FIFOLOG_FLG_1BYTE; 201 } else { 202 be32enc(f->ff->recbuf + f->ff->recsize - 4, 203 f->ff->zs->avail_out); 204 f->ff->recbuf[4] |= FIFOLOG_FLG_4BYTE; 205 } 206 i = pwrite(f->ff->fd, f->ff->recbuf, f->ff->recsize, 207 (f->recno + 1) * f->ff->recsize); 208 assert (i == (int)f->ff->recsize); 209 if (++f->recno == f->ff->logsize) 210 f->recno = 0; 211 f->cnt[FIFOLOG_PT_WRITES]++; 212 time(&t); 213 f->cnt[FIFOLOG_PT_RUNTIME] = t - f->starttime; /*lint !e776 */ 214 fifolog_write_assert(f); 215 } 216 217 int 218 fifolog_write_poll(struct fifolog_writer *f, time_t now) 219 { 220 int i, fl, bo, bf; 221 222 if (now == 0) 223 time(&now); 224 225 fifolog_write_assert(f); 226 if (f->cleanup || now >= (int)(f->lastsync + f->syncrate)) { 227 /* 228 * We always check the sync timer, otherwise a flood of data 229 * would not get any sync records at all 230 */ 231 f->cleanup = 0; 232 fl = Z_FINISH; 233 f->lastsync = now; 234 f->lastwrite = now; 235 f->cnt[FIFOLOG_PT_SYNC]++; 236 } else if (f->ff->zs->avail_in == 0 && 237 now >= (int)(f->lastwrite + f->writerate)) { 238 /* 239 * We only check for writerate timeouts when the input 240 * buffer is empty. It would be silly to force a write if 241 * pending input could cause it to happen on its own. 242 */ 243 fl = Z_SYNC_FLUSH; 244 f->lastwrite = now; 245 f->cnt[FIFOLOG_PT_FLUSH]++; 246 } else if (f->ff->zs->avail_in == 0) 247 return (0); /* nothing to do */ 248 else 249 fl = Z_NO_FLUSH; 250 251 for (;;) { 252 assert(f->ff->zs->avail_out > 0); 253 254 bf = f->ff->zs->avail_out; 255 256 i = deflate(f->ff->zs, fl); 257 assert (i == Z_OK || i == Z_BUF_ERROR || i == Z_STREAM_END); 258 259 bo = f->ff->zs->avail_out; 260 261 /* If we have output space and not in a hurry.. */ 262 if (bo > 0 && fl == Z_NO_FLUSH) 263 break; 264 265 /* Write output buffer, if anything in it */ 266 if (bo != bf) 267 fifo_writerec(f); 268 269 /* If the buffer were full, we need to check again */ 270 if (bo == 0) { 271 fifo_prepobuf(f, now, 0); 272 continue; 273 } 274 275 if (fl == Z_FINISH) { 276 /* Make next record a SYNC record */ 277 fifo_prepobuf(f, now, 1); 278 /* And reset the zlib engine */ 279 i = deflateReset(f->ff->zs); 280 assert(i == Z_OK); 281 f->writes_since_sync = 0; 282 } else { 283 fifo_prepobuf(f, now, 0); 284 } 285 break; 286 } 287 288 if (f->ff->zs->avail_in == 0) { 289 /* Reset input buffer when empty */ 290 f->iptr = f->ibuf; 291 f->ff->zs->next_in = f->iptr; 292 } 293 294 fifolog_write_assert(f); 295 return (1); 296 } 297 298 static void 299 fifolog_acct(struct fifolog_writer *f, unsigned bytes) 300 { 301 302 f->ff->zs->avail_in += bytes; 303 f->iptr += bytes; 304 f->cnt[FIFOLOG_PT_BYTES_PRE] += bytes; 305 } 306 307 /* 308 * Attempt to write an entry. 309 * Return zero if there is no space, one otherwise 310 */ 311 312 int 313 fifolog_write_bytes(struct fifolog_writer *f, uint32_t id, time_t now, const void *ptr, unsigned len) 314 { 315 u_int l; 316 const unsigned char *p; 317 318 fifolog_write_assert(f); 319 assert(!(id & (FIFOLOG_TIMESTAMP|FIFOLOG_LENGTH))); 320 assert(ptr != NULL); 321 322 p = ptr; 323 if (len == 0) { 324 len = strlen(ptr) + 1; 325 l = 4 + len; /* id */ 326 } else { 327 assert(len <= 255); 328 id |= FIFOLOG_LENGTH; 329 l = 5 + len; /* id + len */ 330 } 331 332 l += 4; /* A timestamp may be necessary */ 333 334 /* Now do timestamp, if needed */ 335 if (now == 0) 336 time(&now); 337 338 assert(l < f->ibufsize); 339 340 /* Return if there is not enough space */ 341 if (f->iptr + l > f->ibuf + f->ibufsize) 342 return (0); 343 344 if (now != f->last) { 345 id |= FIFOLOG_TIMESTAMP; 346 f->last = now; 347 } 348 349 /* Emit instance+flag and length */ 350 be32enc(f->iptr, id); 351 fifolog_acct(f, 4); 352 353 if (id & FIFOLOG_TIMESTAMP) { 354 be32enc(f->iptr, (uint32_t)f->last); 355 fifolog_acct(f, 4); 356 } 357 if (id & FIFOLOG_LENGTH) { 358 f->iptr[0] = (u_char)len; 359 fifolog_acct(f, 1); 360 } 361 362 assert (len > 0); 363 memcpy(f->iptr, p, len); 364 fifolog_acct(f, len); 365 fifolog_write_assert(f); 366 return (1); 367 } 368 369 /* 370 * Write an entry, polling until success. 371 * Long binary entries are broken into 255 byte chunks. 372 */ 373 374 void 375 fifolog_write_bytes_poll(struct fifolog_writer *f, uint32_t id, time_t now, const void *ptr, unsigned len) 376 { 377 u_int l; 378 const unsigned char *p; 379 380 fifolog_write_assert(f); 381 382 assert(!(id & (FIFOLOG_TIMESTAMP|FIFOLOG_LENGTH))); 383 assert(ptr != NULL); 384 385 if (len == 0) { 386 while (!fifolog_write_bytes(f, id, now, ptr, len)) { 387 (void)fifolog_write_poll(f, now); 388 (void)usleep(10000); 389 } 390 } else { 391 p = ptr; 392 for (p = ptr; len > 0; len -= l, p += l) { 393 l = len; 394 if (l > 255) 395 l = 255; 396 while (!fifolog_write_bytes(f, id, now, p, l)) { 397 (void)fifolog_write_poll(f, now); 398 (void)usleep(10000); 399 } 400 } 401 } 402 fifolog_write_assert(f); 403 } 404 405 int 406 fifolog_write_flush(struct fifolog_writer *f) 407 { 408 int i; 409 410 fifolog_write_assert(f); 411 412 f->cleanup = 1; 413 for (i = 0; fifolog_write_poll(f, 0); i = 1) 414 continue; 415 fifolog_write_assert(f); 416 return (i); 417 } 418