xref: /freebsd/contrib/unbound/cachedb/redis.c (revision be771a7b7f4580a30d99e41a5bb1b93a385a119d)
1 /*
2  * cachedb/redis.c - cachedb redis module
3  *
4  * Copyright (c) 2018, NLnet Labs. All rights reserved.
5  *
6  * This software is open source.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * Redistributions of source code must retain the above copyright notice,
13  * this list of conditions and the following disclaimer.
14  *
15  * Redistributions in binary form must reproduce the above copyright notice,
16  * this list of conditions and the following disclaimer in the documentation
17  * and/or other materials provided with the distribution.
18  *
19  * Neither the name of the NLNET LABS nor the names of its contributors may
20  * be used to endorse or promote products derived from this software without
21  * specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27  * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29  * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34  */
35 
36 /**
37  * \file
38  *
39  * This file contains a module that uses the redis database to cache
40  * dns responses.
41  */
42 
43 #include "config.h"
44 #ifdef USE_CACHEDB
45 #include "cachedb/redis.h"
46 #include "cachedb/cachedb.h"
47 #include "util/alloc.h"
48 #include "util/config_file.h"
49 #include "sldns/sbuffer.h"
50 
51 #ifdef USE_REDIS
52 #include "hiredis/hiredis.h"
53 
54 struct redis_moddata {
55 	/* thread-specific redis contexts */
56 	redisContext** ctxs;
57 	redisContext** replica_ctxs;
58 	/* number of ctx entries */
59 	int numctxs;
60 	/* server's IP address or host name */
61 	const char* server_host;
62 	const char* replica_server_host;
63 	/* server's TCP port */
64 	int server_port;
65 	int replica_server_port;
66 	/* server's unix path, or "", NULL if unused */
67 	const char* server_path;
68 	const char* replica_server_path;
69 	/* server's AUTH password, or "", NULL if unused */
70 	const char* server_password;
71 	const char* replica_server_password;
72 	/* timeout for commands */
73 	struct timeval command_timeout;
74 	struct timeval replica_command_timeout;
75 	/* timeout for connection setup */
76 	struct timeval connect_timeout;
77 	struct timeval replica_connect_timeout;
78 	/* the redis logical database to use */
79 	int logical_db;
80 	int replica_logical_db;
81 	/* if the SET with EX command is supported */
82 	int set_with_ex_available;
83 };
84 
85 static redisReply* redis_command(struct module_env*, struct cachedb_env*,
86 	const char*, const uint8_t*, size_t, int);
87 
88 static void
89 moddata_clean(struct redis_moddata** moddata) {
90 	if(!moddata || !*moddata)
91 		return;
92 	if((*moddata)->ctxs) {
93 		int i;
94 		for(i = 0; i < (*moddata)->numctxs; i++) {
95 			if((*moddata)->ctxs[i])
96 				redisFree((*moddata)->ctxs[i]);
97 		}
98 		free((*moddata)->ctxs);
99 	}
100 	if((*moddata)->replica_ctxs) {
101 		int i;
102 		for(i = 0; i < (*moddata)->numctxs; i++) {
103 			if((*moddata)->replica_ctxs[i])
104 				redisFree((*moddata)->replica_ctxs[i]);
105 		}
106 		free((*moddata)->replica_ctxs);
107 	}
108 	free(*moddata);
109 	*moddata = NULL;
110 }
111 
112 static redisContext*
113 redis_connect(const char* host, int port, const char* path,
114 	const char* password, int logical_db,
115 	const struct timeval connect_timeout,
116 	const struct timeval command_timeout)
117 {
118 	redisContext* ctx;
119 
120 	if(path && path[0]!=0) {
121 		ctx = redisConnectUnixWithTimeout(path, connect_timeout);
122 	} else {
123 		ctx = redisConnectWithTimeout(host, port, connect_timeout);
124 	}
125 	if(!ctx || ctx->err) {
126 		const char *errstr = "out of memory";
127 		if(ctx)
128 			errstr = ctx->errstr;
129 		log_err("failed to connect to redis server: %s", errstr);
130 		goto fail;
131 	}
132 	if(redisSetTimeout(ctx, command_timeout) != REDIS_OK) {
133 		log_err("failed to set redis timeout, %s", ctx->errstr);
134 		goto fail;
135 	}
136 	if(password && password[0]!=0) {
137 		redisReply* rep;
138 		rep = redisCommand(ctx, "AUTH %s", password);
139 		if(!rep || rep->type == REDIS_REPLY_ERROR) {
140 			log_err("failed to authenticate with password");
141 			freeReplyObject(rep);
142 			goto fail;
143 		}
144 		freeReplyObject(rep);
145 	}
146 	if(logical_db > 0) {
147 		redisReply* rep;
148 		rep = redisCommand(ctx, "SELECT %d", logical_db);
149 		if(!rep || rep->type == REDIS_REPLY_ERROR) {
150 			log_err("failed to set logical database (%d)",
151 				logical_db);
152 			freeReplyObject(rep);
153 			goto fail;
154 		}
155 		freeReplyObject(rep);
156 	}
157 	if(verbosity >= VERB_OPS) {
158 		char port_str[6+1];
159 		port_str[0] = ' ';
160 		(void)snprintf(port_str+1, sizeof(port_str)-1, "%d", port);
161 		verbose(VERB_OPS, "Connection to Redis established (%s%s)",
162 			path&&path[0]!=0?path:host,
163 			path&&path[0]!=0?"":port_str);
164 	}
165 	return ctx;
166 
167 fail:
168 	if(ctx)
169 		redisFree(ctx);
170 	return NULL;
171 }
172 
173 static void
174 set_timeout(struct timeval* timeout, int value, int explicit_value)
175 {
176 	int v = explicit_value != 0 ? explicit_value : value;
177 	timeout->tv_sec = v / 1000;
178 	timeout->tv_usec = (v % 1000) * 1000;
179 }
180 
181 static int
182 redis_init(struct module_env* env, struct cachedb_env* cachedb_env)
183 {
184 	int i;
185 	struct redis_moddata* moddata = NULL;
186 
187 	verbose(VERB_OPS, "Redis initialization");
188 
189 	moddata = calloc(1, sizeof(struct redis_moddata));
190 	if(!moddata) {
191 		log_err("out of memory");
192 		goto fail;
193 	}
194 	moddata->numctxs = env->cfg->num_threads;
195 	/* note: server_host and similar string configuration options are
196 	 * shallow references to configured strings; we don't have to free them
197 	 * in this module. */
198 	moddata->server_host = env->cfg->redis_server_host;
199 	moddata->replica_server_host = env->cfg->redis_replica_server_host;
200 
201 	moddata->server_port = env->cfg->redis_server_port;
202 	moddata->replica_server_port = env->cfg->redis_replica_server_port;
203 
204 	moddata->server_path = env->cfg->redis_server_path;
205 	moddata->replica_server_path = env->cfg->redis_replica_server_path;
206 
207 	moddata->server_password = env->cfg->redis_server_password;
208 	moddata->replica_server_password = env->cfg->redis_replica_server_password;
209 
210 	set_timeout(&moddata->command_timeout,
211 		env->cfg->redis_timeout,
212 		env->cfg->redis_command_timeout);
213 	set_timeout(&moddata->replica_command_timeout,
214 		env->cfg->redis_replica_timeout,
215 		env->cfg->redis_replica_command_timeout);
216 	set_timeout(&moddata->connect_timeout,
217 		env->cfg->redis_timeout,
218 		env->cfg->redis_connect_timeout);
219 	set_timeout(&moddata->replica_connect_timeout,
220 		env->cfg->redis_replica_timeout,
221 		env->cfg->redis_replica_connect_timeout);
222 
223 	moddata->logical_db = env->cfg->redis_logical_db;
224 	moddata->replica_logical_db = env->cfg->redis_replica_logical_db;
225 
226 	moddata->ctxs = calloc(env->cfg->num_threads, sizeof(redisContext*));
227 	if(!moddata->ctxs) {
228 		log_err("out of memory");
229 		goto fail;
230 	}
231 	if((moddata->replica_server_host && moddata->replica_server_host[0]!=0)
232 		|| (moddata->replica_server_path && moddata->replica_server_path[0]!=0)) {
233 		/* There is a replica configured, allocate ctxs */
234 		moddata->replica_ctxs = calloc(env->cfg->num_threads, sizeof(redisContext*));
235 		if(!moddata->replica_ctxs) {
236 			log_err("out of memory");
237 			goto fail;
238 		}
239 	}
240 	for(i = 0; i < moddata->numctxs; i++) {
241 		redisContext* ctx = redis_connect(
242 			moddata->server_host,
243 			moddata->server_port,
244 			moddata->server_path,
245 			moddata->server_password,
246 			moddata->logical_db,
247 			moddata->connect_timeout,
248 			moddata->command_timeout);
249 		if(!ctx) {
250 			log_err("redis_init: failed to init redis "
251 				"(for thread %d)", i);
252 			/* And continue, the context can be established
253 			 * later, just like after a disconnect. */
254 		}
255 		moddata->ctxs[i] = ctx;
256 	}
257 	if(moddata->replica_ctxs) {
258 		for(i = 0; i < moddata->numctxs; i++) {
259 			redisContext* ctx = redis_connect(
260 				moddata->replica_server_host,
261 				moddata->replica_server_port,
262 				moddata->replica_server_path,
263 				moddata->replica_server_password,
264 				moddata->replica_logical_db,
265 				moddata->replica_connect_timeout,
266 				moddata->replica_command_timeout);
267 			if(!ctx) {
268 				log_err("redis_init: failed to init redis "
269 					"replica (for thread %d)", i);
270 				/* And continue, the context can be established
271 				* later, just like after a disconnect. */
272 			}
273 			moddata->replica_ctxs[i] = ctx;
274 		}
275 	}
276 	cachedb_env->backend_data = moddata;
277 	if(env->cfg->redis_expire_records &&
278 		moddata->ctxs[env->alloc->thread_num] != NULL) {
279 		redisReply* rep = NULL;
280 		int redis_reply_type = 0;
281 		/** check if set with ex command is supported */
282 		rep = redis_command(env, cachedb_env,
283 			"SET __UNBOUND_REDIS_CHECK__ none EX 1", NULL, 0, 1);
284 		if(!rep) {
285 			/** init failed, no response from redis server*/
286 			goto set_with_ex_fail;
287 		}
288 		redis_reply_type = rep->type;
289 		freeReplyObject(rep);
290 		switch(redis_reply_type) {
291 		case REDIS_REPLY_STATUS:
292 			break;
293 		default:
294 			/** init failed, set_with_ex command not supported */
295 			goto set_with_ex_fail;
296 		}
297 		moddata->set_with_ex_available = 1;
298 	}
299 	return 1;
300 
301 set_with_ex_fail:
302 	log_err("redis_init: failure during redis_init, the "
303 		"redis-expire-records option requires the SET with EX command "
304 		"(redis >= 2.6.2)");
305 	return 1;
306 fail:
307 	moddata_clean(&moddata);
308 	return 0;
309 }
310 
311 static void
312 redis_deinit(struct module_env* env, struct cachedb_env* cachedb_env)
313 {
314 	struct redis_moddata* moddata = (struct redis_moddata*)
315 		cachedb_env->backend_data;
316 	(void)env;
317 
318 	verbose(VERB_OPS, "Redis deinitialization");
319 	moddata_clean(&moddata);
320 }
321 
322 /*
323  * Send a redis command and get a reply.  Unified so that it can be used for
324  * both SET and GET.  If 'data' is non-NULL the command is supposed to be
325  * SET and GET otherwise, but the implementation of this function is agnostic
326  * about the semantics (except for logging): 'command', 'data', and 'data_len'
327  * are opaquely passed to redisCommand().
328  * This function first checks whether a connection with a redis server has
329  * been established; if not it tries to set up a new one.
330  * It returns redisReply returned from redisCommand() or NULL if some low
331  * level error happens.  The caller is responsible to check the return value,
332  * if it's non-NULL, it has to free it with freeReplyObject().
333  */
334 static redisReply*
335 redis_command(struct module_env* env, struct cachedb_env* cachedb_env,
336 	const char* command, const uint8_t* data, size_t data_len, int write)
337 {
338 	redisContext* ctx, **ctx_selector;
339 	redisReply* rep;
340 	struct redis_moddata* d = (struct redis_moddata*)
341 		cachedb_env->backend_data;
342 
343 	/* We assume env->alloc->thread_num is a unique ID for each thread
344 	 * in [0, num-of-threads).  We could treat it as an error condition
345 	 * if the assumption didn't hold, but it seems to be a fundamental
346 	 * assumption throughout the unbound architecture, so we simply assert
347 	 * it. */
348 	log_assert(env->alloc->thread_num < d->numctxs);
349 
350 	ctx_selector = !write && d->replica_ctxs
351 		?d->replica_ctxs
352 		:d->ctxs;
353 	ctx = ctx_selector[env->alloc->thread_num];
354 
355 	/* If we've not established a connection to the server or we've closed
356 	 * it on a failure, try to re-establish a new one.   Failures will be
357 	 * logged in redis_connect(). */
358 	if(!ctx) {
359 		if(!write && d->replica_ctxs) {
360 			ctx = redis_connect(
361 				d->replica_server_host,
362 				d->replica_server_port,
363 				d->replica_server_path,
364 				d->replica_server_password,
365 				d->replica_logical_db,
366 				d->replica_connect_timeout,
367 				d->replica_command_timeout);
368 		} else {
369 			ctx = redis_connect(
370 				d->server_host,
371 				d->server_port,
372 				d->server_path,
373 				d->server_password,
374 				d->logical_db,
375 				d->connect_timeout,
376 				d->command_timeout);
377 		}
378 		ctx_selector[env->alloc->thread_num] = ctx;
379 	}
380 	if(!ctx) return NULL;
381 
382 	/* Send the command and get a reply, synchronously. */
383 	rep = (redisReply*)redisCommand(ctx, command, data, data_len);
384 	if(!rep) {
385 		/* Once an error as a NULL-reply is returned the context cannot
386 		 * be reused and we'll need to set up a new connection. */
387 		log_err("redis_command: failed to receive a reply, "
388 			"closing connection: %s", ctx->errstr);
389 		redisFree(ctx);
390 		ctx_selector[env->alloc->thread_num] = NULL;
391 		return NULL;
392 	}
393 
394 	/* Check error in reply to unify logging in that case.
395 	 * The caller may perform context-dependent checks and logging. */
396 	if(rep->type == REDIS_REPLY_ERROR)
397 		log_err("redis: %s resulted in an error: %s",
398 			data ? "set" : "get", rep->str);
399 
400 	return rep;
401 }
402 
403 static int
404 redis_lookup(struct module_env* env, struct cachedb_env* cachedb_env,
405 	char* key, struct sldns_buffer* result_buffer)
406 {
407 	redisReply* rep;
408 	char cmdbuf[4+(CACHEDB_HASHSIZE/8)*2+1]; /* "GET " + key */
409 	int n;
410 	int ret = 0;
411 
412 	verbose(VERB_ALGO, "redis_lookup of %s", key);
413 
414 	n = snprintf(cmdbuf, sizeof(cmdbuf), "GET %s", key);
415 	if(n < 0 || n >= (int)sizeof(cmdbuf)) {
416 		log_err("redis_lookup: unexpected failure to build command");
417 		return 0;
418 	}
419 
420 	rep = redis_command(env, cachedb_env, cmdbuf, NULL, 0, 0);
421 	if(!rep)
422 		return 0;
423 	switch(rep->type) {
424 	case REDIS_REPLY_NIL:
425 		verbose(VERB_ALGO, "redis_lookup: no data cached");
426 		break;
427 	case REDIS_REPLY_STRING:
428 		verbose(VERB_ALGO, "redis_lookup found %d bytes",
429 			(int)rep->len);
430 		if((size_t)rep->len > sldns_buffer_capacity(result_buffer)) {
431 			log_err("redis_lookup: replied data too long: %lu",
432 				(size_t)rep->len);
433 			break;
434 		}
435 		sldns_buffer_clear(result_buffer);
436 		sldns_buffer_write(result_buffer, rep->str, rep->len);
437 		sldns_buffer_flip(result_buffer);
438 		ret = 1;
439 		break;
440 	case REDIS_REPLY_ERROR:
441 		break;		/* already logged */
442 	default:
443 		log_err("redis_lookup: unexpected type of reply for (%d)",
444 			rep->type);
445 		break;
446 	}
447 	freeReplyObject(rep);
448 	return ret;
449 }
450 
451 static void
452 redis_store(struct module_env* env, struct cachedb_env* cachedb_env,
453 	char* key, uint8_t* data, size_t data_len, time_t ttl)
454 {
455 	redisReply* rep;
456 	int n;
457 	struct redis_moddata* moddata = (struct redis_moddata*)
458 		cachedb_env->backend_data;
459 	int set_ttl = (moddata->set_with_ex_available &&
460 		env->cfg->redis_expire_records &&
461 		(!env->cfg->serve_expired || env->cfg->serve_expired_ttl > 0));
462 	/* Supported commands:
463 	 * - "SET " + key + " %b"
464 	 * - "SET " + key + " %b EX " + ttl
465 	 *   older redis 2.0.0 was "SETEX " + key + " " + ttl + " %b"
466 	 * - "EXPIRE " + key + " 0"
467 	 */
468 	char cmdbuf[6+(CACHEDB_HASHSIZE/8)*2+11+3+1];
469 
470 	if (!set_ttl) {
471 		verbose(VERB_ALGO, "redis_store %s (%d bytes)", key, (int)data_len);
472 		/* build command to set to a binary safe string */
473 		n = snprintf(cmdbuf, sizeof(cmdbuf), "SET %s %%b", key);
474 	} else if(ttl == 0) {
475 		/* use the EXPIRE command, SET with EX 0 is an invalid time. */
476 		/* Replies with REDIS_REPLY_INTEGER of 1. */
477 		verbose(VERB_ALGO, "redis_store expire %s (%d bytes)",
478 			key, (int)data_len);
479 		n = snprintf(cmdbuf, sizeof(cmdbuf), "EXPIRE %s 0", key);
480 		data = NULL;
481 		data_len = 0;
482 	} else {
483 		/* add expired ttl time to redis ttl to avoid premature eviction of key */
484 		ttl += env->cfg->serve_expired_ttl;
485 		verbose(VERB_ALGO, "redis_store %s (%d bytes) with ttl %u",
486 			key, (int)data_len, (unsigned)(uint32_t)ttl);
487 		/* build command to set to a binary safe string */
488 		n = snprintf(cmdbuf, sizeof(cmdbuf), "SET %s %%b EX %u", key,
489 			(unsigned)(uint32_t)ttl);
490 	}
491 
492 
493 	if(n < 0 || n >= (int)sizeof(cmdbuf)) {
494 		log_err("redis_store: unexpected failure to build command");
495 		return;
496 	}
497 
498 	rep = redis_command(env, cachedb_env, cmdbuf, data, data_len, 1);
499 	if(rep) {
500 		verbose(VERB_ALGO, "redis_store set completed");
501 		if(rep->type != REDIS_REPLY_STATUS &&
502 			rep->type != REDIS_REPLY_ERROR &&
503 			rep->type != REDIS_REPLY_INTEGER) {
504 			log_err("redis_store: unexpected type of reply (%d)",
505 				rep->type);
506 		}
507 		freeReplyObject(rep);
508 	}
509 }
510 
511 struct cachedb_backend redis_backend = { "redis",
512 	redis_init, redis_deinit, redis_lookup, redis_store
513 };
514 #endif	/* USE_REDIS */
515 #endif /* USE_CACHEDB */
516