1 /*- 2 * Copyright (c) 2005-2008 Poul-Henning Kamp 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 * 26 * $FreeBSD$ 27 */ 28 29 #include <assert.h> 30 #include <stdio.h> 31 #include <string.h> 32 #include <stdlib.h> 33 #include <unistd.h> 34 #include <time.h> 35 #include <sys/endian.h> 36 37 #include <zlib.h> 38 39 #include "fifolog.h" 40 #include "libfifolog.h" 41 #include "libfifolog_int.h" 42 #include "fifolog_write.h" 43 #include "miniobj.h" 44 45 #define ALLOC(ptr, size) do { \ 46 (*(ptr)) = calloc(size, 1); \ 47 assert(*(ptr) != NULL); \ 48 } while (0) 49 50 51 const char *fifolog_write_statnames[] = { 52 [FIFOLOG_PT_BYTES_PRE] = "Bytes before compression", 53 [FIFOLOG_PT_BYTES_POST] = "Bytes after compression", 54 [FIFOLOG_PT_WRITES] = "Writes", 55 [FIFOLOG_PT_FLUSH] = "Flushes", 56 [FIFOLOG_PT_SYNC] = "Syncs", 57 [FIFOLOG_PT_RUNTIME] = "Runtime" 58 }; 59 60 /* 61 * Check that everything is all right 62 */ 63 static void 64 fifolog_write_assert(const struct fifolog_writer *f) 65 { 66 67 CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC); 68 assert(f->iptr == f->ff->zs->next_in + f->ff->zs->avail_in); 69 assert(f->ff->zs->next_out + f->ff->zs->avail_out == \ 70 f->ff->recbuf + f->ff->recsize); 71 } 72 73 struct fifolog_writer * 74 fifolog_write_new(void) 75 { 76 struct fifolog_writer *f; 77 78 ALLOC(&f, sizeof *f); 79 f->magic = FIFOLOG_WRITER_MAGIC; 80 return (f); 81 } 82 83 void 84 fifolog_write_destroy(struct fifolog_writer *f) 85 { 86 CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC); 87 free(f); 88 } 89 90 void 91 fifolog_write_close(struct fifolog_writer *f) 92 { 93 94 CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC); 95 fifolog_int_close(&f->ff); 96 free(f->ff); 97 if (f->ibuf != NULL) 98 free(f->ibuf); 99 free(f); 100 } 101 102 static void 103 fifo_prepobuf(struct fifolog_writer *f, time_t now, int flag) 104 { 105 106 memset(f->ff->recbuf, 0, f->ff->recsize); 107 f->ff->zs->next_out = f->ff->recbuf + 5; 108 f->ff->zs->avail_out = f->ff->recsize - 5; 109 if (f->recno == 0 && f->seq == 0) { 110 srandomdev(); 111 do { 112 f->seq = random(); 113 } while (f->seq == 0); 114 } 115 be32enc(f->ff->recbuf, f->seq++); 116 f->ff->recbuf[4] = f->flag; 117 f->flag = 0; 118 if (flag) { 119 f->ff->recbuf[4] |= FIFOLOG_FLG_SYNC; 120 be32enc(f->ff->recbuf + 5, (u_int)now); 121 f->ff->zs->next_out += 4; 122 f->ff->zs->avail_out -= 4; 123 } 124 fifolog_write_assert(f); 125 } 126 127 const char * 128 fifolog_write_open(struct fifolog_writer *f, const char *fn, unsigned writerate, unsigned syncrate, int compression) 129 { 130 const char *es; 131 int i; 132 time_t now; 133 off_t o; 134 135 CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC); 136 137 /* Check for legal compression value */ 138 if (compression < Z_DEFAULT_COMPRESSION || 139 compression > Z_BEST_COMPRESSION) 140 return ("Illegal compression value"); 141 142 f->writerate = writerate; 143 f->syncrate = syncrate; 144 f->compression = compression; 145 146 /* Reset statistics */ 147 memset(f->cnt, 0, sizeof f->cnt); 148 149 es = fifolog_int_open(&f->ff, fn, 1); 150 if (es != NULL) 151 return (es); 152 es = fifolog_int_findend(f->ff, &o); 153 if (es != NULL) 154 return (es); 155 if (o == 0) { 156 f->seq = 0; 157 f->recno = 0; 158 } else { 159 i = fifolog_int_read(f->ff, o); 160 if (i) 161 return ("Read error, looking for seq"); 162 f->seq = be32dec(f->ff->recbuf) + 1; 163 f->recno = o + 1; 164 } 165 166 f->ibufsize = 32768; 167 ALLOC(&f->ibuf, f->ibufsize); 168 f->iptr = f->ibuf; 169 f->ff->zs->next_in = f->iptr; 170 i = deflateInit(f->ff->zs, (int)f->compression); 171 assert(i == Z_OK); 172 173 f->flag |= FIFOLOG_FLG_RESTART; 174 175 time(&now); 176 fifo_prepobuf(f, now, 1); 177 f->starttime = now; 178 179 fifolog_write_assert(f); 180 return (NULL); 181 } 182 183 static void 184 fifo_writerec(struct fifolog_writer *f) 185 { 186 int i; 187 time_t t; 188 189 fifolog_write_assert(f); 190 f->writes_since_sync++; 191 192 assert(f->recno < f->ff->logsize); 193 f->cnt[FIFOLOG_PT_BYTES_POST] += f->ff->recsize - f->ff->zs->avail_out; 194 if (f->ff->zs->avail_out == 0) { 195 /* nothing */ 196 } else if (f->ff->zs->avail_out <= 255) { 197 f->ff->recbuf[f->ff->recsize - 1] = 198 (u_char)f->ff->zs->avail_out; 199 f->ff->recbuf[4] |= FIFOLOG_FLG_1BYTE; 200 } else { 201 be32enc(f->ff->recbuf + f->ff->recsize - 4, 202 f->ff->zs->avail_out); 203 f->ff->recbuf[4] |= FIFOLOG_FLG_4BYTE; 204 } 205 i = pwrite(f->ff->fd, f->ff->recbuf, f->ff->recsize, 206 (f->recno + 1) * f->ff->recsize); 207 assert (i == (int)f->ff->recsize); 208 if (++f->recno == f->ff->logsize) 209 f->recno = 0; 210 f->cnt[FIFOLOG_PT_WRITES]++; 211 time(&t); 212 f->cnt[FIFOLOG_PT_RUNTIME] = t - f->starttime; /*lint !e776 */ 213 fifolog_write_assert(f); 214 } 215 216 int 217 fifolog_write_poll(struct fifolog_writer *f, time_t now) 218 { 219 int i, fl, bo, bf; 220 221 if (now == 0) 222 time(&now); 223 224 fifolog_write_assert(f); 225 if (f->cleanup || now >= (int)(f->lastsync + f->syncrate)) { 226 /* 227 * We always check the sync timer, otherwise a flood of data 228 * would not get any sync records at all 229 */ 230 f->cleanup = 0; 231 fl = Z_FINISH; 232 f->lastsync = now; 233 f->lastwrite = now; 234 f->cnt[FIFOLOG_PT_SYNC]++; 235 } else if (f->ff->zs->avail_in == 0 && 236 now >= (int)(f->lastwrite + f->writerate)) { 237 /* 238 * We only check for writerate timeouts when the input 239 * buffer is empty. It would be silly to force a write if 240 * pending input could cause it to happen on its own. 241 */ 242 fl = Z_SYNC_FLUSH; 243 f->lastwrite = now; 244 f->cnt[FIFOLOG_PT_FLUSH]++; 245 } else if (f->ff->zs->avail_in == 0) 246 return (0); /* nothing to do */ 247 else 248 fl = Z_NO_FLUSH; 249 250 for (;;) { 251 assert(f->ff->zs->avail_out > 0); 252 253 bf = f->ff->zs->avail_out; 254 255 i = deflate(f->ff->zs, fl); 256 assert (i == Z_OK || i == Z_BUF_ERROR || i == Z_STREAM_END); 257 258 bo = f->ff->zs->avail_out; 259 260 /* If we have output space and not in a hurry.. */ 261 if (bo > 0 && fl == Z_NO_FLUSH) 262 break; 263 264 /* Write output buffer, if anything in it */ 265 if (bo != bf) 266 fifo_writerec(f); 267 268 /* If the buffer were full, we need to check again */ 269 if (bo == 0) { 270 fifo_prepobuf(f, now, 0); 271 continue; 272 } 273 274 if (fl == Z_FINISH) { 275 /* Make next record a SYNC record */ 276 fifo_prepobuf(f, now, 1); 277 /* And reset the zlib engine */ 278 i = deflateReset(f->ff->zs); 279 assert(i == Z_OK); 280 f->writes_since_sync = 0; 281 } else { 282 fifo_prepobuf(f, now, 0); 283 } 284 break; 285 } 286 287 if (f->ff->zs->avail_in == 0) { 288 /* Reset input buffer when empty */ 289 f->iptr = f->ibuf; 290 f->ff->zs->next_in = f->iptr; 291 } 292 293 fifolog_write_assert(f); 294 return (1); 295 } 296 297 static void 298 fifolog_acct(struct fifolog_writer *f, unsigned bytes) 299 { 300 301 f->ff->zs->avail_in += bytes; 302 f->iptr += bytes; 303 f->cnt[FIFOLOG_PT_BYTES_PRE] += bytes; 304 } 305 306 /* 307 * Attempt to write an entry. 308 * Return zero if there is no space, one otherwise 309 */ 310 311 int 312 fifolog_write_bytes(struct fifolog_writer *f, uint32_t id, time_t now, const void *ptr, unsigned len) 313 { 314 u_int l; 315 const unsigned char *p; 316 317 fifolog_write_assert(f); 318 assert(!(id & (FIFOLOG_TIMESTAMP|FIFOLOG_LENGTH))); 319 assert(ptr != NULL); 320 321 p = ptr; 322 if (len == 0) { 323 len = strlen(ptr) + 1; 324 l = 4 + len; /* id */ 325 } else { 326 assert(len <= 255); 327 id |= FIFOLOG_LENGTH; 328 l = 5 + len; /* id + len */ 329 } 330 331 l += 4; /* A timestamp may be necessary */ 332 333 /* Now do timestamp, if needed */ 334 if (now == 0) 335 time(&now); 336 337 assert(l < f->ibufsize); 338 339 /* Return if there is not enough space */ 340 if (f->iptr + l > f->ibuf + f->ibufsize) 341 return (0); 342 343 if (now != f->last) { 344 id |= FIFOLOG_TIMESTAMP; 345 f->last = now; 346 } 347 348 /* Emit instance+flag and length */ 349 be32enc(f->iptr, id); 350 fifolog_acct(f, 4); 351 352 if (id & FIFOLOG_TIMESTAMP) { 353 be32enc(f->iptr, (uint32_t)f->last); 354 fifolog_acct(f, 4); 355 } 356 if (id & FIFOLOG_LENGTH) { 357 f->iptr[0] = (u_char)len; 358 fifolog_acct(f, 1); 359 } 360 361 assert (len > 0); 362 memcpy(f->iptr, p, len); 363 fifolog_acct(f, len); 364 fifolog_write_assert(f); 365 return (1); 366 } 367 368 /* 369 * Write an entry, polling until success. 370 * Long binary entries are broken into 255 byte chunks. 371 */ 372 373 void 374 fifolog_write_bytes_poll(struct fifolog_writer *f, uint32_t id, time_t now, const void *ptr, unsigned len) 375 { 376 u_int l; 377 const unsigned char *p; 378 379 fifolog_write_assert(f); 380 381 assert(!(id & (FIFOLOG_TIMESTAMP|FIFOLOG_LENGTH))); 382 assert(ptr != NULL); 383 384 if (len == 0) { 385 while (!fifolog_write_bytes(f, id, now, ptr, len)) { 386 (void)fifolog_write_poll(f, now); 387 (void)usleep(10000); 388 } 389 } else { 390 p = ptr; 391 for (p = ptr; len > 0; len -= l, p += l) { 392 l = len; 393 if (l > 255) 394 l = 255; 395 while (!fifolog_write_bytes(f, id, now, p, l)) { 396 (void)fifolog_write_poll(f, now); 397 (void)usleep(10000); 398 } 399 } 400 } 401 fifolog_write_assert(f); 402 } 403 404 int 405 fifolog_write_flush(struct fifolog_writer *f) 406 { 407 int i; 408 409 fifolog_write_assert(f); 410 411 f->cleanup = 1; 412 for (i = 0; fifolog_write_poll(f, 0); i = 1) 413 continue; 414 fifolog_write_assert(f); 415 return (i); 416 } 417