xref: /freebsd/usr.sbin/fifolog/lib/fifolog_write_poll.c (revision d876124d6ae9d56da5b4ff4c6015efd1d0c9222a)
1 /*-
2  * Copyright (c) 2005-2008 Poul-Henning Kamp
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24  * SUCH DAMAGE.
25  *
26  * $FreeBSD$
27  */
28 
29 #include <assert.h>
30 #include <stdio.h>
31 #include <string.h>
32 #include <stdlib.h>
33 #include <unistd.h>
34 #include <time.h>
35 #include <sys/endian.h>
36 
37 #include <zlib.h>
38 
39 #include "fifolog.h"
40 #include "libfifolog.h"
41 #include "libfifolog_int.h"
42 #include "fifolog_write.h"
43 #include "miniobj.h"
44 
45 #define ALLOC(ptr, size) do {                   \
46 	(*(ptr)) = calloc(size, 1);             \
47 	assert(*(ptr) != NULL);                 \
48 } while (0)
49 
50 
51 const char *fifolog_write_statnames[] = {
52 [FIFOLOG_PT_BYTES_PRE] =	"Bytes before compression",
53 [FIFOLOG_PT_BYTES_POST] =	"Bytes after compression",
54 [FIFOLOG_PT_WRITES] =		"Writes",
55 [FIFOLOG_PT_FLUSH] =		"Flushes",
56 [FIFOLOG_PT_SYNC] =		"Syncs",
57 [FIFOLOG_PT_RUNTIME] =		"Runtime"
58 };
59 
60 /*
61  * Check that everything is all right
62  */
63 static void
64 fifolog_write_assert(const struct fifolog_writer *f)
65 {
66 
67 	CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC);
68 	assert(f->iptr == f->ff->zs->next_in + f->ff->zs->avail_in);
69 	assert(f->ff->zs->next_out + f->ff->zs->avail_out == \
70 	    f->ff->recbuf + f->ff->recsize);
71 }
72 
73 struct fifolog_writer *
74 fifolog_write_new(void)
75 {
76 	struct fifolog_writer *f;
77 
78 	ALLOC(&f, sizeof *f);
79 	f->magic = FIFOLOG_WRITER_MAGIC;
80 	return (f);
81 }
82 
83 void
84 fifolog_write_destroy(struct fifolog_writer *f)
85 {
86 	CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC);
87 	free(f);
88 }
89 
90 void
91 fifolog_write_close(struct fifolog_writer *f)
92 {
93 
94 	CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC);
95 	fifolog_int_close(&f->ff);
96 	free(f->ff);
97 	if (f->ibuf != NULL)
98 		free(f->ibuf);
99 	free(f);
100 }
101 
102 static void
103 fifo_prepobuf(struct fifolog_writer *f, time_t now, int flag)
104 {
105 
106 	memset(f->ff->recbuf, 0, f->ff->recsize);
107 	f->ff->zs->next_out = f->ff->recbuf + 5;
108 	f->ff->zs->avail_out = f->ff->recsize - 5;
109 	if (f->recno == 0 && f->seq == 0) {
110 		srandomdev();
111 		do {
112 			f->seq = random();
113 		} while (f->seq == 0);
114 	}
115 	be32enc(f->ff->recbuf, f->seq++);
116 	f->ff->recbuf[4] = f->flag;
117 	f->flag = 0;
118 	if (flag) {
119 		f->ff->recbuf[4] |= FIFOLOG_FLG_SYNC;
120 		be32enc(f->ff->recbuf + 5, (u_int)now);
121 		f->ff->zs->next_out += 4;
122 		f->ff->zs->avail_out -= 4;
123 	}
124 	fifolog_write_assert(f);
125 }
126 
127 const char *
128 fifolog_write_open(struct fifolog_writer *f, const char *fn, unsigned writerate, unsigned syncrate, int compression)
129 {
130 	const char *es;
131 	int i;
132 	time_t now;
133 	off_t o;
134 
135 	CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC);
136 
137 	/* Check for legal compression value */
138 	if (compression < Z_DEFAULT_COMPRESSION ||
139 	    compression > Z_BEST_COMPRESSION)
140 		return ("Illegal compression value");
141 
142 	f->writerate = writerate;
143 	f->syncrate = syncrate;
144 	f->compression = compression;
145 
146 	/* Reset statistics */
147 	memset(f->cnt, 0, sizeof f->cnt);
148 
149 	es = fifolog_int_open(&f->ff, fn, 1);
150 	if (es != NULL)
151 		return (es);
152 	es = fifolog_int_findend(f->ff, &o);
153 	if (es != NULL)
154 		return (es);
155 	if (o == 0) {
156 		f->seq = 0;
157 		f->recno = 0;
158 	} else {
159 		i = fifolog_int_read(f->ff, o);
160 		if (i)
161 			return ("Read error, looking for seq");
162 		f->seq = be32dec(f->ff->recbuf) + 1;
163 		f->recno = o + 1;
164 	}
165 
166 	f->ibufsize = 32768;
167 	ALLOC(&f->ibuf, f->ibufsize);
168 	f->iptr = f->ibuf;
169 	f->ff->zs->next_in = f->iptr;
170 	i = deflateInit(f->ff->zs, (int)f->compression);
171 	assert(i == Z_OK);
172 
173 	f->flag |= FIFOLOG_FLG_RESTART;
174 
175 	time(&now);
176 	fifo_prepobuf(f, now, 1);
177 	f->starttime = now;
178 
179 	fifolog_write_assert(f);
180 	return (NULL);
181 }
182 
183 static void
184 fifo_writerec(struct fifolog_writer *f)
185 {
186 	int i;
187 	time_t t;
188 
189 	fifolog_write_assert(f);
190 	f->writes_since_sync++;
191 
192 	assert(f->recno < f->ff->logsize);
193 	f->cnt[FIFOLOG_PT_BYTES_POST] += f->ff->recsize - f->ff->zs->avail_out;
194 	if (f->ff->zs->avail_out == 0) {
195 		/* nothing */
196 	} else if (f->ff->zs->avail_out <= 255) {
197 		f->ff->recbuf[f->ff->recsize - 1] =
198 		    (u_char)f->ff->zs->avail_out;
199 		f->ff->recbuf[4] |= FIFOLOG_FLG_1BYTE;
200 	} else {
201 		be32enc(f->ff->recbuf + f->ff->recsize - 4,
202 		    f->ff->zs->avail_out);
203 		f->ff->recbuf[4] |= FIFOLOG_FLG_4BYTE;
204 	}
205 	i = pwrite(f->ff->fd, f->ff->recbuf, f->ff->recsize,
206 		(f->recno + 1) * f->ff->recsize);
207 	assert (i == (int)f->ff->recsize);
208 	if (++f->recno == f->ff->logsize)
209 		f->recno = 0;
210 	f->cnt[FIFOLOG_PT_WRITES]++;
211 	time(&t);
212 	f->cnt[FIFOLOG_PT_RUNTIME] = t - f->starttime; /*lint !e776 */
213 	fifolog_write_assert(f);
214 }
215 
216 int
217 fifolog_write_poll(struct fifolog_writer *f, time_t now)
218 {
219 	int i, fl, bo, bf;
220 
221 	if (now == 0)
222 		time(&now);
223 
224 	fifolog_write_assert(f);
225 	if (f->cleanup || now >= (int)(f->lastsync + f->syncrate)) {
226 		/*
227 		 * We always check the sync timer, otherwise a flood of data
228 		 * would not get any sync records at all
229 		 */
230 		f->cleanup = 0;
231 		fl = Z_FINISH;
232 		f->lastsync = now;
233 		f->lastwrite = now;
234 		f->cnt[FIFOLOG_PT_SYNC]++;
235 	} else if (f->ff->zs->avail_in == 0 &&
236 	    now >= (int)(f->lastwrite + f->writerate)) {
237 		/*
238 		 * We only check for writerate timeouts when the input
239 		 * buffer is empty.  It would be silly to force a write if
240 		 * pending input could cause it to happen on its own.
241 		 */
242 		fl = Z_SYNC_FLUSH;
243 		f->lastwrite = now;
244 		f->cnt[FIFOLOG_PT_FLUSH]++;
245 	} else if (f->ff->zs->avail_in == 0)
246 		return (0);			/* nothing to do */
247 	else
248 		fl = Z_NO_FLUSH;
249 
250 	for (;;) {
251 		assert(f->ff->zs->avail_out > 0);
252 
253 		bf = f->ff->zs->avail_out;
254 
255 		i = deflate(f->ff->zs, fl);
256 		assert (i == Z_OK || i == Z_BUF_ERROR || i == Z_STREAM_END);
257 
258 		bo = f->ff->zs->avail_out;
259 
260 		/* If we have output space and not in a hurry.. */
261 		if (bo > 0 && fl == Z_NO_FLUSH)
262 			break;
263 
264 		/* Write output buffer, if anything in it */
265 		if (bo != bf)
266 			fifo_writerec(f);
267 
268 		/* If the buffer were full, we need to check again */
269 		if (bo == 0) {
270 			fifo_prepobuf(f, now, 0);
271 			continue;
272 		}
273 
274 		if (fl == Z_FINISH) {
275 			/* Make next record a SYNC record */
276 			fifo_prepobuf(f, now, 1);
277 			/* And reset the zlib engine */
278 			i = deflateReset(f->ff->zs);
279 			assert(i == Z_OK);
280 			f->writes_since_sync = 0;
281 		} else {
282 			fifo_prepobuf(f, now, 0);
283 		}
284 		break;
285 	}
286 
287 	if (f->ff->zs->avail_in == 0) {
288 		/* Reset input buffer when empty */
289 		f->iptr = f->ibuf;
290 		f->ff->zs->next_in = f->iptr;
291 	}
292 
293 	fifolog_write_assert(f);
294 	return (1);
295 }
296 
297 static void
298 fifolog_acct(struct fifolog_writer *f, unsigned bytes)
299 {
300 
301 	f->ff->zs->avail_in += bytes;
302 	f->iptr += bytes;
303 	f->cnt[FIFOLOG_PT_BYTES_PRE] += bytes;
304 }
305 
306 /*
307  * Attempt to write an entry.
308  * Return zero if there is no space, one otherwise
309  */
310 
311 int
312 fifolog_write_bytes(struct fifolog_writer *f, uint32_t id, time_t now, const void *ptr, unsigned len)
313 {
314 	u_int l;
315 	const unsigned char *p;
316 
317 	fifolog_write_assert(f);
318 	assert(!(id & (FIFOLOG_TIMESTAMP|FIFOLOG_LENGTH)));
319 	assert(ptr != NULL);
320 
321 	p = ptr;
322 	if (len == 0) {
323 		len = strlen(ptr) + 1;
324 		l = 4 + len;		/* id */
325 	} else {
326 		assert(len <= 255);
327 		id |= FIFOLOG_LENGTH;
328 		l = 5 + len;		/* id + len */
329 	}
330 
331 	l += 4; 		/* A timestamp may be necessary */
332 
333 	/* Now do timestamp, if needed */
334 	if (now == 0)
335 		time(&now);
336 
337 	assert(l < f->ibufsize);
338 
339 	/* Return if there is not enough space */
340 	if (f->iptr + l > f->ibuf + f->ibufsize)
341 		return (0);
342 
343 	if (now != f->last) {
344 		id |= FIFOLOG_TIMESTAMP;
345 		f->last = now;
346 	}
347 
348 	/* Emit instance+flag and length */
349 	be32enc(f->iptr, id);
350 	fifolog_acct(f, 4);
351 
352 	if (id & FIFOLOG_TIMESTAMP) {
353 		be32enc(f->iptr, (uint32_t)f->last);
354 		fifolog_acct(f, 4);
355 	}
356 	if (id & FIFOLOG_LENGTH) {
357 		f->iptr[0] = (u_char)len;
358 		fifolog_acct(f, 1);
359 	}
360 
361 	assert (len > 0);
362 	memcpy(f->iptr, p, len);
363 	fifolog_acct(f, len);
364 	fifolog_write_assert(f);
365 	return (1);
366 }
367 
368 /*
369  * Write an entry, polling until success.
370  * Long binary entries are broken into 255 byte chunks.
371  */
372 
373 void
374 fifolog_write_bytes_poll(struct fifolog_writer *f, uint32_t id, time_t now, const void *ptr, unsigned len)
375 {
376 	u_int l;
377 	const unsigned char *p;
378 
379 	fifolog_write_assert(f);
380 
381 	assert(!(id & (FIFOLOG_TIMESTAMP|FIFOLOG_LENGTH)));
382 	assert(ptr != NULL);
383 
384 	if (len == 0) {
385 		while (!fifolog_write_bytes(f, id, now, ptr, len)) {
386 			(void)fifolog_write_poll(f, now);
387 			(void)usleep(10000);
388 		}
389 	} else {
390 		p = ptr;
391 		for (p = ptr; len > 0; len -= l, p += l) {
392 			l = len;
393 			if (l > 255)
394 				l = 255;
395 			while (!fifolog_write_bytes(f, id, now, p, l)) {
396 				(void)fifolog_write_poll(f, now);
397 				(void)usleep(10000);
398 			}
399 		}
400 	}
401 	fifolog_write_assert(f);
402 }
403 
404 int
405 fifolog_write_flush(struct fifolog_writer *f)
406 {
407 	int i;
408 
409 	fifolog_write_assert(f);
410 
411 	f->cleanup = 1;
412 	for (i = 0; fifolog_write_poll(f, 0); i = 1)
413 		continue;
414 	fifolog_write_assert(f);
415 	return (i);
416 }
417