xref: /linux/net/sunrpc/svc_xprt.c (revision 2277ab4a1df50e05bc732fe9488d4e902bb8399a)
1 /*
2  * linux/net/sunrpc/svc_xprt.c
3  *
4  * Author: Tom Tucker <tom@opengridcomputing.com>
5  */
6 
7 #include <linux/sched.h>
8 #include <linux/smp_lock.h>
9 #include <linux/errno.h>
10 #include <linux/freezer.h>
11 #include <linux/kthread.h>
12 #include <net/sock.h>
13 #include <linux/sunrpc/stats.h>
14 #include <linux/sunrpc/svc_xprt.h>
15 #include <linux/sunrpc/svcsock.h>
16 
17 #define RPCDBG_FACILITY	RPCDBG_SVCXPRT
18 
19 #define SVC_MAX_WAKING 5
20 
21 static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt);
22 static int svc_deferred_recv(struct svc_rqst *rqstp);
23 static struct cache_deferred_req *svc_defer(struct cache_req *req);
24 static void svc_age_temp_xprts(unsigned long closure);
25 
26 /* apparently the "standard" is that clients close
27  * idle connections after 5 minutes, servers after
28  * 6 minutes
29  *   http://www.connectathon.org/talks96/nfstcp.pdf
30  */
31 static int svc_conn_age_period = 6*60;
32 
33 /* List of registered transport classes */
34 static DEFINE_SPINLOCK(svc_xprt_class_lock);
35 static LIST_HEAD(svc_xprt_class_list);
36 
37 /* SMP locking strategy:
38  *
39  *	svc_pool->sp_lock protects most of the fields of that pool.
40  *	svc_serv->sv_lock protects sv_tempsocks, sv_permsocks, sv_tmpcnt.
41  *	when both need to be taken (rare), svc_serv->sv_lock is first.
42  *	BKL protects svc_serv->sv_nrthread.
43  *	svc_sock->sk_lock protects the svc_sock->sk_deferred list
44  *             and the ->sk_info_authunix cache.
45  *
46  *	The XPT_BUSY bit in xprt->xpt_flags prevents a transport being
47  *	enqueued multiply. During normal transport processing this bit
48  *	is set by svc_xprt_enqueue and cleared by svc_xprt_received.
49  *	Providers should not manipulate this bit directly.
50  *
51  *	Some flags can be set to certain values at any time
52  *	providing that certain rules are followed:
53  *
54  *	XPT_CONN, XPT_DATA:
55  *		- Can be set or cleared at any time.
56  *		- After a set, svc_xprt_enqueue must be called to enqueue
57  *		  the transport for processing.
58  *		- After a clear, the transport must be read/accepted.
59  *		  If this succeeds, it must be set again.
60  *	XPT_CLOSE:
61  *		- Can set at any time. It is never cleared.
62  *      XPT_DEAD:
63  *		- Can only be set while XPT_BUSY is held which ensures
64  *		  that no other thread will be using the transport or will
65  *		  try to set XPT_DEAD.
66  */
67 
68 int svc_reg_xprt_class(struct svc_xprt_class *xcl)
69 {
70 	struct svc_xprt_class *cl;
71 	int res = -EEXIST;
72 
73 	dprintk("svc: Adding svc transport class '%s'\n", xcl->xcl_name);
74 
75 	INIT_LIST_HEAD(&xcl->xcl_list);
76 	spin_lock(&svc_xprt_class_lock);
77 	/* Make sure there isn't already a class with the same name */
78 	list_for_each_entry(cl, &svc_xprt_class_list, xcl_list) {
79 		if (strcmp(xcl->xcl_name, cl->xcl_name) == 0)
80 			goto out;
81 	}
82 	list_add_tail(&xcl->xcl_list, &svc_xprt_class_list);
83 	res = 0;
84 out:
85 	spin_unlock(&svc_xprt_class_lock);
86 	return res;
87 }
88 EXPORT_SYMBOL_GPL(svc_reg_xprt_class);
89 
90 void svc_unreg_xprt_class(struct svc_xprt_class *xcl)
91 {
92 	dprintk("svc: Removing svc transport class '%s'\n", xcl->xcl_name);
93 	spin_lock(&svc_xprt_class_lock);
94 	list_del_init(&xcl->xcl_list);
95 	spin_unlock(&svc_xprt_class_lock);
96 }
97 EXPORT_SYMBOL_GPL(svc_unreg_xprt_class);
98 
99 /*
100  * Format the transport list for printing
101  */
102 int svc_print_xprts(char *buf, int maxlen)
103 {
104 	struct list_head *le;
105 	char tmpstr[80];
106 	int len = 0;
107 	buf[0] = '\0';
108 
109 	spin_lock(&svc_xprt_class_lock);
110 	list_for_each(le, &svc_xprt_class_list) {
111 		int slen;
112 		struct svc_xprt_class *xcl =
113 			list_entry(le, struct svc_xprt_class, xcl_list);
114 
115 		sprintf(tmpstr, "%s %d\n", xcl->xcl_name, xcl->xcl_max_payload);
116 		slen = strlen(tmpstr);
117 		if (len + slen > maxlen)
118 			break;
119 		len += slen;
120 		strcat(buf, tmpstr);
121 	}
122 	spin_unlock(&svc_xprt_class_lock);
123 
124 	return len;
125 }
126 
127 static void svc_xprt_free(struct kref *kref)
128 {
129 	struct svc_xprt *xprt =
130 		container_of(kref, struct svc_xprt, xpt_ref);
131 	struct module *owner = xprt->xpt_class->xcl_owner;
132 	if (test_bit(XPT_CACHE_AUTH, &xprt->xpt_flags)
133 	    && xprt->xpt_auth_cache != NULL)
134 		svcauth_unix_info_release(xprt->xpt_auth_cache);
135 	xprt->xpt_ops->xpo_free(xprt);
136 	module_put(owner);
137 }
138 
139 void svc_xprt_put(struct svc_xprt *xprt)
140 {
141 	kref_put(&xprt->xpt_ref, svc_xprt_free);
142 }
143 EXPORT_SYMBOL_GPL(svc_xprt_put);
144 
145 /*
146  * Called by transport drivers to initialize the transport independent
147  * portion of the transport instance.
148  */
149 void svc_xprt_init(struct svc_xprt_class *xcl, struct svc_xprt *xprt,
150 		   struct svc_serv *serv)
151 {
152 	memset(xprt, 0, sizeof(*xprt));
153 	xprt->xpt_class = xcl;
154 	xprt->xpt_ops = xcl->xcl_ops;
155 	kref_init(&xprt->xpt_ref);
156 	xprt->xpt_server = serv;
157 	INIT_LIST_HEAD(&xprt->xpt_list);
158 	INIT_LIST_HEAD(&xprt->xpt_ready);
159 	INIT_LIST_HEAD(&xprt->xpt_deferred);
160 	mutex_init(&xprt->xpt_mutex);
161 	spin_lock_init(&xprt->xpt_lock);
162 	set_bit(XPT_BUSY, &xprt->xpt_flags);
163 }
164 EXPORT_SYMBOL_GPL(svc_xprt_init);
165 
166 static struct svc_xprt *__svc_xpo_create(struct svc_xprt_class *xcl,
167 					 struct svc_serv *serv,
168 					 const int family,
169 					 const unsigned short port,
170 					 int flags)
171 {
172 	struct sockaddr_in sin = {
173 		.sin_family		= AF_INET,
174 		.sin_addr.s_addr	= htonl(INADDR_ANY),
175 		.sin_port		= htons(port),
176 	};
177 	struct sockaddr_in6 sin6 = {
178 		.sin6_family		= AF_INET6,
179 		.sin6_addr		= IN6ADDR_ANY_INIT,
180 		.sin6_port		= htons(port),
181 	};
182 	struct sockaddr *sap;
183 	size_t len;
184 
185 	switch (family) {
186 	case PF_INET:
187 		sap = (struct sockaddr *)&sin;
188 		len = sizeof(sin);
189 		break;
190 	case PF_INET6:
191 		sap = (struct sockaddr *)&sin6;
192 		len = sizeof(sin6);
193 		break;
194 	default:
195 		return ERR_PTR(-EAFNOSUPPORT);
196 	}
197 
198 	return xcl->xcl_ops->xpo_create(serv, sap, len, flags);
199 }
200 
201 int svc_create_xprt(struct svc_serv *serv, const char *xprt_name,
202 		    const int family, const unsigned short port,
203 		    int flags)
204 {
205 	struct svc_xprt_class *xcl;
206 
207 	dprintk("svc: creating transport %s[%d]\n", xprt_name, port);
208 	spin_lock(&svc_xprt_class_lock);
209 	list_for_each_entry(xcl, &svc_xprt_class_list, xcl_list) {
210 		struct svc_xprt *newxprt;
211 
212 		if (strcmp(xprt_name, xcl->xcl_name))
213 			continue;
214 
215 		if (!try_module_get(xcl->xcl_owner))
216 			goto err;
217 
218 		spin_unlock(&svc_xprt_class_lock);
219 		newxprt = __svc_xpo_create(xcl, serv, family, port, flags);
220 		if (IS_ERR(newxprt)) {
221 			module_put(xcl->xcl_owner);
222 			return PTR_ERR(newxprt);
223 		}
224 
225 		clear_bit(XPT_TEMP, &newxprt->xpt_flags);
226 		spin_lock_bh(&serv->sv_lock);
227 		list_add(&newxprt->xpt_list, &serv->sv_permsocks);
228 		spin_unlock_bh(&serv->sv_lock);
229 		clear_bit(XPT_BUSY, &newxprt->xpt_flags);
230 		return svc_xprt_local_port(newxprt);
231 	}
232  err:
233 	spin_unlock(&svc_xprt_class_lock);
234 	dprintk("svc: transport %s not found\n", xprt_name);
235 	return -ENOENT;
236 }
237 EXPORT_SYMBOL_GPL(svc_create_xprt);
238 
239 /*
240  * Copy the local and remote xprt addresses to the rqstp structure
241  */
242 void svc_xprt_copy_addrs(struct svc_rqst *rqstp, struct svc_xprt *xprt)
243 {
244 	struct sockaddr *sin;
245 
246 	memcpy(&rqstp->rq_addr, &xprt->xpt_remote, xprt->xpt_remotelen);
247 	rqstp->rq_addrlen = xprt->xpt_remotelen;
248 
249 	/*
250 	 * Destination address in request is needed for binding the
251 	 * source address in RPC replies/callbacks later.
252 	 */
253 	sin = (struct sockaddr *)&xprt->xpt_local;
254 	switch (sin->sa_family) {
255 	case AF_INET:
256 		rqstp->rq_daddr.addr = ((struct sockaddr_in *)sin)->sin_addr;
257 		break;
258 	case AF_INET6:
259 		rqstp->rq_daddr.addr6 = ((struct sockaddr_in6 *)sin)->sin6_addr;
260 		break;
261 	}
262 }
263 EXPORT_SYMBOL_GPL(svc_xprt_copy_addrs);
264 
265 /**
266  * svc_print_addr - Format rq_addr field for printing
267  * @rqstp: svc_rqst struct containing address to print
268  * @buf: target buffer for formatted address
269  * @len: length of target buffer
270  *
271  */
272 char *svc_print_addr(struct svc_rqst *rqstp, char *buf, size_t len)
273 {
274 	return __svc_print_addr(svc_addr(rqstp), buf, len);
275 }
276 EXPORT_SYMBOL_GPL(svc_print_addr);
277 
278 /*
279  * Queue up an idle server thread.  Must have pool->sp_lock held.
280  * Note: this is really a stack rather than a queue, so that we only
281  * use as many different threads as we need, and the rest don't pollute
282  * the cache.
283  */
284 static void svc_thread_enqueue(struct svc_pool *pool, struct svc_rqst *rqstp)
285 {
286 	list_add(&rqstp->rq_list, &pool->sp_threads);
287 }
288 
289 /*
290  * Dequeue an nfsd thread.  Must have pool->sp_lock held.
291  */
292 static void svc_thread_dequeue(struct svc_pool *pool, struct svc_rqst *rqstp)
293 {
294 	list_del(&rqstp->rq_list);
295 }
296 
297 /*
298  * Queue up a transport with data pending. If there are idle nfsd
299  * processes, wake 'em up.
300  *
301  */
302 void svc_xprt_enqueue(struct svc_xprt *xprt)
303 {
304 	struct svc_serv	*serv = xprt->xpt_server;
305 	struct svc_pool *pool;
306 	struct svc_rqst	*rqstp;
307 	int cpu;
308 	int thread_avail;
309 
310 	if (!(xprt->xpt_flags &
311 	      ((1<<XPT_CONN)|(1<<XPT_DATA)|(1<<XPT_CLOSE)|(1<<XPT_DEFERRED))))
312 		return;
313 
314 	cpu = get_cpu();
315 	pool = svc_pool_for_cpu(xprt->xpt_server, cpu);
316 	put_cpu();
317 
318 	spin_lock_bh(&pool->sp_lock);
319 
320 	if (test_bit(XPT_DEAD, &xprt->xpt_flags)) {
321 		/* Don't enqueue dead transports */
322 		dprintk("svc: transport %p is dead, not enqueued\n", xprt);
323 		goto out_unlock;
324 	}
325 
326 	pool->sp_stats.packets++;
327 
328 	/* Mark transport as busy. It will remain in this state until
329 	 * the provider calls svc_xprt_received. We update XPT_BUSY
330 	 * atomically because it also guards against trying to enqueue
331 	 * the transport twice.
332 	 */
333 	if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) {
334 		/* Don't enqueue transport while already enqueued */
335 		dprintk("svc: transport %p busy, not enqueued\n", xprt);
336 		goto out_unlock;
337 	}
338 	BUG_ON(xprt->xpt_pool != NULL);
339 	xprt->xpt_pool = pool;
340 
341 	/* Handle pending connection */
342 	if (test_bit(XPT_CONN, &xprt->xpt_flags))
343 		goto process;
344 
345 	/* Handle close in-progress */
346 	if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
347 		goto process;
348 
349 	/* Check if we have space to reply to a request */
350 	if (!xprt->xpt_ops->xpo_has_wspace(xprt)) {
351 		/* Don't enqueue while not enough space for reply */
352 		dprintk("svc: no write space, transport %p  not enqueued\n",
353 			xprt);
354 		xprt->xpt_pool = NULL;
355 		clear_bit(XPT_BUSY, &xprt->xpt_flags);
356 		goto out_unlock;
357 	}
358 
359  process:
360 	/* Work out whether threads are available */
361 	thread_avail = !list_empty(&pool->sp_threads);	/* threads are asleep */
362 	if (pool->sp_nwaking >= SVC_MAX_WAKING) {
363 		/* too many threads are runnable and trying to wake up */
364 		thread_avail = 0;
365 		pool->sp_stats.overloads_avoided++;
366 	}
367 
368 	if (thread_avail) {
369 		rqstp = list_entry(pool->sp_threads.next,
370 				   struct svc_rqst,
371 				   rq_list);
372 		dprintk("svc: transport %p served by daemon %p\n",
373 			xprt, rqstp);
374 		svc_thread_dequeue(pool, rqstp);
375 		if (rqstp->rq_xprt)
376 			printk(KERN_ERR
377 				"svc_xprt_enqueue: server %p, rq_xprt=%p!\n",
378 				rqstp, rqstp->rq_xprt);
379 		rqstp->rq_xprt = xprt;
380 		svc_xprt_get(xprt);
381 		rqstp->rq_reserved = serv->sv_max_mesg;
382 		atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved);
383 		rqstp->rq_waking = 1;
384 		pool->sp_nwaking++;
385 		pool->sp_stats.threads_woken++;
386 		BUG_ON(xprt->xpt_pool != pool);
387 		wake_up(&rqstp->rq_wait);
388 	} else {
389 		dprintk("svc: transport %p put into queue\n", xprt);
390 		list_add_tail(&xprt->xpt_ready, &pool->sp_sockets);
391 		pool->sp_stats.sockets_queued++;
392 		BUG_ON(xprt->xpt_pool != pool);
393 	}
394 
395 out_unlock:
396 	spin_unlock_bh(&pool->sp_lock);
397 }
398 EXPORT_SYMBOL_GPL(svc_xprt_enqueue);
399 
400 /*
401  * Dequeue the first transport.  Must be called with the pool->sp_lock held.
402  */
403 static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool)
404 {
405 	struct svc_xprt	*xprt;
406 
407 	if (list_empty(&pool->sp_sockets))
408 		return NULL;
409 
410 	xprt = list_entry(pool->sp_sockets.next,
411 			  struct svc_xprt, xpt_ready);
412 	list_del_init(&xprt->xpt_ready);
413 
414 	dprintk("svc: transport %p dequeued, inuse=%d\n",
415 		xprt, atomic_read(&xprt->xpt_ref.refcount));
416 
417 	return xprt;
418 }
419 
420 /*
421  * svc_xprt_received conditionally queues the transport for processing
422  * by another thread. The caller must hold the XPT_BUSY bit and must
423  * not thereafter touch transport data.
424  *
425  * Note: XPT_DATA only gets cleared when a read-attempt finds no (or
426  * insufficient) data.
427  */
428 void svc_xprt_received(struct svc_xprt *xprt)
429 {
430 	BUG_ON(!test_bit(XPT_BUSY, &xprt->xpt_flags));
431 	xprt->xpt_pool = NULL;
432 	clear_bit(XPT_BUSY, &xprt->xpt_flags);
433 	svc_xprt_enqueue(xprt);
434 }
435 EXPORT_SYMBOL_GPL(svc_xprt_received);
436 
437 /**
438  * svc_reserve - change the space reserved for the reply to a request.
439  * @rqstp:  The request in question
440  * @space: new max space to reserve
441  *
442  * Each request reserves some space on the output queue of the transport
443  * to make sure the reply fits.  This function reduces that reserved
444  * space to be the amount of space used already, plus @space.
445  *
446  */
447 void svc_reserve(struct svc_rqst *rqstp, int space)
448 {
449 	space += rqstp->rq_res.head[0].iov_len;
450 
451 	if (space < rqstp->rq_reserved) {
452 		struct svc_xprt *xprt = rqstp->rq_xprt;
453 		atomic_sub((rqstp->rq_reserved - space), &xprt->xpt_reserved);
454 		rqstp->rq_reserved = space;
455 
456 		svc_xprt_enqueue(xprt);
457 	}
458 }
459 EXPORT_SYMBOL_GPL(svc_reserve);
460 
461 static void svc_xprt_release(struct svc_rqst *rqstp)
462 {
463 	struct svc_xprt	*xprt = rqstp->rq_xprt;
464 
465 	rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp);
466 
467 	kfree(rqstp->rq_deferred);
468 	rqstp->rq_deferred = NULL;
469 
470 	svc_free_res_pages(rqstp);
471 	rqstp->rq_res.page_len = 0;
472 	rqstp->rq_res.page_base = 0;
473 
474 	/* Reset response buffer and release
475 	 * the reservation.
476 	 * But first, check that enough space was reserved
477 	 * for the reply, otherwise we have a bug!
478 	 */
479 	if ((rqstp->rq_res.len) >  rqstp->rq_reserved)
480 		printk(KERN_ERR "RPC request reserved %d but used %d\n",
481 		       rqstp->rq_reserved,
482 		       rqstp->rq_res.len);
483 
484 	rqstp->rq_res.head[0].iov_len = 0;
485 	svc_reserve(rqstp, 0);
486 	rqstp->rq_xprt = NULL;
487 
488 	svc_xprt_put(xprt);
489 }
490 
491 /*
492  * External function to wake up a server waiting for data
493  * This really only makes sense for services like lockd
494  * which have exactly one thread anyway.
495  */
496 void svc_wake_up(struct svc_serv *serv)
497 {
498 	struct svc_rqst	*rqstp;
499 	unsigned int i;
500 	struct svc_pool *pool;
501 
502 	for (i = 0; i < serv->sv_nrpools; i++) {
503 		pool = &serv->sv_pools[i];
504 
505 		spin_lock_bh(&pool->sp_lock);
506 		if (!list_empty(&pool->sp_threads)) {
507 			rqstp = list_entry(pool->sp_threads.next,
508 					   struct svc_rqst,
509 					   rq_list);
510 			dprintk("svc: daemon %p woken up.\n", rqstp);
511 			/*
512 			svc_thread_dequeue(pool, rqstp);
513 			rqstp->rq_xprt = NULL;
514 			 */
515 			wake_up(&rqstp->rq_wait);
516 		}
517 		spin_unlock_bh(&pool->sp_lock);
518 	}
519 }
520 EXPORT_SYMBOL_GPL(svc_wake_up);
521 
522 int svc_port_is_privileged(struct sockaddr *sin)
523 {
524 	switch (sin->sa_family) {
525 	case AF_INET:
526 		return ntohs(((struct sockaddr_in *)sin)->sin_port)
527 			< PROT_SOCK;
528 	case AF_INET6:
529 		return ntohs(((struct sockaddr_in6 *)sin)->sin6_port)
530 			< PROT_SOCK;
531 	default:
532 		return 0;
533 	}
534 }
535 
536 /*
537  * Make sure that we don't have too many active connections. If we have,
538  * something must be dropped. It's not clear what will happen if we allow
539  * "too many" connections, but when dealing with network-facing software,
540  * we have to code defensively. Here we do that by imposing hard limits.
541  *
542  * There's no point in trying to do random drop here for DoS
543  * prevention. The NFS clients does 1 reconnect in 15 seconds. An
544  * attacker can easily beat that.
545  *
546  * The only somewhat efficient mechanism would be if drop old
547  * connections from the same IP first. But right now we don't even
548  * record the client IP in svc_sock.
549  *
550  * single-threaded services that expect a lot of clients will probably
551  * need to set sv_maxconn to override the default value which is based
552  * on the number of threads
553  */
554 static void svc_check_conn_limits(struct svc_serv *serv)
555 {
556 	unsigned int limit = serv->sv_maxconn ? serv->sv_maxconn :
557 				(serv->sv_nrthreads+3) * 20;
558 
559 	if (serv->sv_tmpcnt > limit) {
560 		struct svc_xprt *xprt = NULL;
561 		spin_lock_bh(&serv->sv_lock);
562 		if (!list_empty(&serv->sv_tempsocks)) {
563 			if (net_ratelimit()) {
564 				/* Try to help the admin */
565 				printk(KERN_NOTICE "%s: too many open  "
566 				       "connections, consider increasing %s\n",
567 				       serv->sv_name, serv->sv_maxconn ?
568 				       "the max number of connections." :
569 				       "the number of threads.");
570 			}
571 			/*
572 			 * Always select the oldest connection. It's not fair,
573 			 * but so is life
574 			 */
575 			xprt = list_entry(serv->sv_tempsocks.prev,
576 					  struct svc_xprt,
577 					  xpt_list);
578 			set_bit(XPT_CLOSE, &xprt->xpt_flags);
579 			svc_xprt_get(xprt);
580 		}
581 		spin_unlock_bh(&serv->sv_lock);
582 
583 		if (xprt) {
584 			svc_xprt_enqueue(xprt);
585 			svc_xprt_put(xprt);
586 		}
587 	}
588 }
589 
590 /*
591  * Receive the next request on any transport.  This code is carefully
592  * organised not to touch any cachelines in the shared svc_serv
593  * structure, only cachelines in the local svc_pool.
594  */
595 int svc_recv(struct svc_rqst *rqstp, long timeout)
596 {
597 	struct svc_xprt		*xprt = NULL;
598 	struct svc_serv		*serv = rqstp->rq_server;
599 	struct svc_pool		*pool = rqstp->rq_pool;
600 	int			len, i;
601 	int			pages;
602 	struct xdr_buf		*arg;
603 	DECLARE_WAITQUEUE(wait, current);
604 	long			time_left;
605 
606 	dprintk("svc: server %p waiting for data (to = %ld)\n",
607 		rqstp, timeout);
608 
609 	if (rqstp->rq_xprt)
610 		printk(KERN_ERR
611 			"svc_recv: service %p, transport not NULL!\n",
612 			 rqstp);
613 	if (waitqueue_active(&rqstp->rq_wait))
614 		printk(KERN_ERR
615 			"svc_recv: service %p, wait queue active!\n",
616 			 rqstp);
617 
618 	/* now allocate needed pages.  If we get a failure, sleep briefly */
619 	pages = (serv->sv_max_mesg + PAGE_SIZE) / PAGE_SIZE;
620 	for (i = 0; i < pages ; i++)
621 		while (rqstp->rq_pages[i] == NULL) {
622 			struct page *p = alloc_page(GFP_KERNEL);
623 			if (!p) {
624 				set_current_state(TASK_INTERRUPTIBLE);
625 				if (signalled() || kthread_should_stop()) {
626 					set_current_state(TASK_RUNNING);
627 					return -EINTR;
628 				}
629 				schedule_timeout(msecs_to_jiffies(500));
630 			}
631 			rqstp->rq_pages[i] = p;
632 		}
633 	rqstp->rq_pages[i++] = NULL; /* this might be seen in nfs_read_actor */
634 	BUG_ON(pages >= RPCSVC_MAXPAGES);
635 
636 	/* Make arg->head point to first page and arg->pages point to rest */
637 	arg = &rqstp->rq_arg;
638 	arg->head[0].iov_base = page_address(rqstp->rq_pages[0]);
639 	arg->head[0].iov_len = PAGE_SIZE;
640 	arg->pages = rqstp->rq_pages + 1;
641 	arg->page_base = 0;
642 	/* save at least one page for response */
643 	arg->page_len = (pages-2)*PAGE_SIZE;
644 	arg->len = (pages-1)*PAGE_SIZE;
645 	arg->tail[0].iov_len = 0;
646 
647 	try_to_freeze();
648 	cond_resched();
649 	if (signalled() || kthread_should_stop())
650 		return -EINTR;
651 
652 	spin_lock_bh(&pool->sp_lock);
653 	if (rqstp->rq_waking) {
654 		rqstp->rq_waking = 0;
655 		pool->sp_nwaking--;
656 		BUG_ON(pool->sp_nwaking < 0);
657 	}
658 	xprt = svc_xprt_dequeue(pool);
659 	if (xprt) {
660 		rqstp->rq_xprt = xprt;
661 		svc_xprt_get(xprt);
662 		rqstp->rq_reserved = serv->sv_max_mesg;
663 		atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved);
664 	} else {
665 		/* No data pending. Go to sleep */
666 		svc_thread_enqueue(pool, rqstp);
667 
668 		/*
669 		 * We have to be able to interrupt this wait
670 		 * to bring down the daemons ...
671 		 */
672 		set_current_state(TASK_INTERRUPTIBLE);
673 
674 		/*
675 		 * checking kthread_should_stop() here allows us to avoid
676 		 * locking and signalling when stopping kthreads that call
677 		 * svc_recv. If the thread has already been woken up, then
678 		 * we can exit here without sleeping. If not, then it
679 		 * it'll be woken up quickly during the schedule_timeout
680 		 */
681 		if (kthread_should_stop()) {
682 			set_current_state(TASK_RUNNING);
683 			spin_unlock_bh(&pool->sp_lock);
684 			return -EINTR;
685 		}
686 
687 		add_wait_queue(&rqstp->rq_wait, &wait);
688 		spin_unlock_bh(&pool->sp_lock);
689 
690 		time_left = schedule_timeout(timeout);
691 
692 		try_to_freeze();
693 
694 		spin_lock_bh(&pool->sp_lock);
695 		remove_wait_queue(&rqstp->rq_wait, &wait);
696 		if (!time_left)
697 			pool->sp_stats.threads_timedout++;
698 
699 		xprt = rqstp->rq_xprt;
700 		if (!xprt) {
701 			svc_thread_dequeue(pool, rqstp);
702 			spin_unlock_bh(&pool->sp_lock);
703 			dprintk("svc: server %p, no data yet\n", rqstp);
704 			if (signalled() || kthread_should_stop())
705 				return -EINTR;
706 			else
707 				return -EAGAIN;
708 		}
709 	}
710 	spin_unlock_bh(&pool->sp_lock);
711 
712 	len = 0;
713 	if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) {
714 		dprintk("svc_recv: found XPT_CLOSE\n");
715 		svc_delete_xprt(xprt);
716 	} else if (test_bit(XPT_LISTENER, &xprt->xpt_flags)) {
717 		struct svc_xprt *newxpt;
718 		newxpt = xprt->xpt_ops->xpo_accept(xprt);
719 		if (newxpt) {
720 			/*
721 			 * We know this module_get will succeed because the
722 			 * listener holds a reference too
723 			 */
724 			__module_get(newxpt->xpt_class->xcl_owner);
725 			svc_check_conn_limits(xprt->xpt_server);
726 			spin_lock_bh(&serv->sv_lock);
727 			set_bit(XPT_TEMP, &newxpt->xpt_flags);
728 			list_add(&newxpt->xpt_list, &serv->sv_tempsocks);
729 			serv->sv_tmpcnt++;
730 			if (serv->sv_temptimer.function == NULL) {
731 				/* setup timer to age temp transports */
732 				setup_timer(&serv->sv_temptimer,
733 					    svc_age_temp_xprts,
734 					    (unsigned long)serv);
735 				mod_timer(&serv->sv_temptimer,
736 					  jiffies + svc_conn_age_period * HZ);
737 			}
738 			spin_unlock_bh(&serv->sv_lock);
739 			svc_xprt_received(newxpt);
740 		}
741 		svc_xprt_received(xprt);
742 	} else {
743 		dprintk("svc: server %p, pool %u, transport %p, inuse=%d\n",
744 			rqstp, pool->sp_id, xprt,
745 			atomic_read(&xprt->xpt_ref.refcount));
746 		rqstp->rq_deferred = svc_deferred_dequeue(xprt);
747 		if (rqstp->rq_deferred) {
748 			svc_xprt_received(xprt);
749 			len = svc_deferred_recv(rqstp);
750 		} else
751 			len = xprt->xpt_ops->xpo_recvfrom(rqstp);
752 		dprintk("svc: got len=%d\n", len);
753 	}
754 
755 	/* No data, incomplete (TCP) read, or accept() */
756 	if (len == 0 || len == -EAGAIN) {
757 		rqstp->rq_res.len = 0;
758 		svc_xprt_release(rqstp);
759 		return -EAGAIN;
760 	}
761 	clear_bit(XPT_OLD, &xprt->xpt_flags);
762 
763 	rqstp->rq_secure = svc_port_is_privileged(svc_addr(rqstp));
764 	rqstp->rq_chandle.defer = svc_defer;
765 
766 	if (serv->sv_stats)
767 		serv->sv_stats->netcnt++;
768 	return len;
769 }
770 EXPORT_SYMBOL_GPL(svc_recv);
771 
772 /*
773  * Drop request
774  */
775 void svc_drop(struct svc_rqst *rqstp)
776 {
777 	dprintk("svc: xprt %p dropped request\n", rqstp->rq_xprt);
778 	svc_xprt_release(rqstp);
779 }
780 EXPORT_SYMBOL_GPL(svc_drop);
781 
782 /*
783  * Return reply to client.
784  */
785 int svc_send(struct svc_rqst *rqstp)
786 {
787 	struct svc_xprt	*xprt;
788 	int		len;
789 	struct xdr_buf	*xb;
790 
791 	xprt = rqstp->rq_xprt;
792 	if (!xprt)
793 		return -EFAULT;
794 
795 	/* release the receive skb before sending the reply */
796 	rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp);
797 
798 	/* calculate over-all length */
799 	xb = &rqstp->rq_res;
800 	xb->len = xb->head[0].iov_len +
801 		xb->page_len +
802 		xb->tail[0].iov_len;
803 
804 	/* Grab mutex to serialize outgoing data. */
805 	mutex_lock(&xprt->xpt_mutex);
806 	if (test_bit(XPT_DEAD, &xprt->xpt_flags))
807 		len = -ENOTCONN;
808 	else
809 		len = xprt->xpt_ops->xpo_sendto(rqstp);
810 	mutex_unlock(&xprt->xpt_mutex);
811 	svc_xprt_release(rqstp);
812 
813 	if (len == -ECONNREFUSED || len == -ENOTCONN || len == -EAGAIN)
814 		return 0;
815 	return len;
816 }
817 
818 /*
819  * Timer function to close old temporary transports, using
820  * a mark-and-sweep algorithm.
821  */
822 static void svc_age_temp_xprts(unsigned long closure)
823 {
824 	struct svc_serv *serv = (struct svc_serv *)closure;
825 	struct svc_xprt *xprt;
826 	struct list_head *le, *next;
827 	LIST_HEAD(to_be_aged);
828 
829 	dprintk("svc_age_temp_xprts\n");
830 
831 	if (!spin_trylock_bh(&serv->sv_lock)) {
832 		/* busy, try again 1 sec later */
833 		dprintk("svc_age_temp_xprts: busy\n");
834 		mod_timer(&serv->sv_temptimer, jiffies + HZ);
835 		return;
836 	}
837 
838 	list_for_each_safe(le, next, &serv->sv_tempsocks) {
839 		xprt = list_entry(le, struct svc_xprt, xpt_list);
840 
841 		/* First time through, just mark it OLD. Second time
842 		 * through, close it. */
843 		if (!test_and_set_bit(XPT_OLD, &xprt->xpt_flags))
844 			continue;
845 		if (atomic_read(&xprt->xpt_ref.refcount) > 1
846 		    || test_bit(XPT_BUSY, &xprt->xpt_flags))
847 			continue;
848 		svc_xprt_get(xprt);
849 		list_move(le, &to_be_aged);
850 		set_bit(XPT_CLOSE, &xprt->xpt_flags);
851 		set_bit(XPT_DETACHED, &xprt->xpt_flags);
852 	}
853 	spin_unlock_bh(&serv->sv_lock);
854 
855 	while (!list_empty(&to_be_aged)) {
856 		le = to_be_aged.next;
857 		/* fiddling the xpt_list node is safe 'cos we're XPT_DETACHED */
858 		list_del_init(le);
859 		xprt = list_entry(le, struct svc_xprt, xpt_list);
860 
861 		dprintk("queuing xprt %p for closing\n", xprt);
862 
863 		/* a thread will dequeue and close it soon */
864 		svc_xprt_enqueue(xprt);
865 		svc_xprt_put(xprt);
866 	}
867 
868 	mod_timer(&serv->sv_temptimer, jiffies + svc_conn_age_period * HZ);
869 }
870 
871 /*
872  * Remove a dead transport
873  */
874 void svc_delete_xprt(struct svc_xprt *xprt)
875 {
876 	struct svc_serv	*serv = xprt->xpt_server;
877 	struct svc_deferred_req *dr;
878 
879 	/* Only do this once */
880 	if (test_and_set_bit(XPT_DEAD, &xprt->xpt_flags))
881 		return;
882 
883 	dprintk("svc: svc_delete_xprt(%p)\n", xprt);
884 	xprt->xpt_ops->xpo_detach(xprt);
885 
886 	spin_lock_bh(&serv->sv_lock);
887 	if (!test_and_set_bit(XPT_DETACHED, &xprt->xpt_flags))
888 		list_del_init(&xprt->xpt_list);
889 	/*
890 	 * We used to delete the transport from whichever list
891 	 * it's sk_xprt.xpt_ready node was on, but we don't actually
892 	 * need to.  This is because the only time we're called
893 	 * while still attached to a queue, the queue itself
894 	 * is about to be destroyed (in svc_destroy).
895 	 */
896 	if (test_bit(XPT_TEMP, &xprt->xpt_flags))
897 		serv->sv_tmpcnt--;
898 
899 	for (dr = svc_deferred_dequeue(xprt); dr;
900 	     dr = svc_deferred_dequeue(xprt)) {
901 		svc_xprt_put(xprt);
902 		kfree(dr);
903 	}
904 
905 	svc_xprt_put(xprt);
906 	spin_unlock_bh(&serv->sv_lock);
907 }
908 
909 void svc_close_xprt(struct svc_xprt *xprt)
910 {
911 	set_bit(XPT_CLOSE, &xprt->xpt_flags);
912 	if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags))
913 		/* someone else will have to effect the close */
914 		return;
915 
916 	svc_xprt_get(xprt);
917 	svc_delete_xprt(xprt);
918 	clear_bit(XPT_BUSY, &xprt->xpt_flags);
919 	svc_xprt_put(xprt);
920 }
921 EXPORT_SYMBOL_GPL(svc_close_xprt);
922 
923 void svc_close_all(struct list_head *xprt_list)
924 {
925 	struct svc_xprt *xprt;
926 	struct svc_xprt *tmp;
927 
928 	list_for_each_entry_safe(xprt, tmp, xprt_list, xpt_list) {
929 		set_bit(XPT_CLOSE, &xprt->xpt_flags);
930 		if (test_bit(XPT_BUSY, &xprt->xpt_flags)) {
931 			/* Waiting to be processed, but no threads left,
932 			 * So just remove it from the waiting list
933 			 */
934 			list_del_init(&xprt->xpt_ready);
935 			clear_bit(XPT_BUSY, &xprt->xpt_flags);
936 		}
937 		svc_close_xprt(xprt);
938 	}
939 }
940 
941 /*
942  * Handle defer and revisit of requests
943  */
944 
945 static void svc_revisit(struct cache_deferred_req *dreq, int too_many)
946 {
947 	struct svc_deferred_req *dr =
948 		container_of(dreq, struct svc_deferred_req, handle);
949 	struct svc_xprt *xprt = dr->xprt;
950 
951 	spin_lock(&xprt->xpt_lock);
952 	set_bit(XPT_DEFERRED, &xprt->xpt_flags);
953 	if (too_many || test_bit(XPT_DEAD, &xprt->xpt_flags)) {
954 		spin_unlock(&xprt->xpt_lock);
955 		dprintk("revisit canceled\n");
956 		svc_xprt_put(xprt);
957 		kfree(dr);
958 		return;
959 	}
960 	dprintk("revisit queued\n");
961 	dr->xprt = NULL;
962 	list_add(&dr->handle.recent, &xprt->xpt_deferred);
963 	spin_unlock(&xprt->xpt_lock);
964 	svc_xprt_enqueue(xprt);
965 	svc_xprt_put(xprt);
966 }
967 
968 /*
969  * Save the request off for later processing. The request buffer looks
970  * like this:
971  *
972  * <xprt-header><rpc-header><rpc-pagelist><rpc-tail>
973  *
974  * This code can only handle requests that consist of an xprt-header
975  * and rpc-header.
976  */
977 static struct cache_deferred_req *svc_defer(struct cache_req *req)
978 {
979 	struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle);
980 	struct svc_deferred_req *dr;
981 
982 	if (rqstp->rq_arg.page_len || !rqstp->rq_usedeferral)
983 		return NULL; /* if more than a page, give up FIXME */
984 	if (rqstp->rq_deferred) {
985 		dr = rqstp->rq_deferred;
986 		rqstp->rq_deferred = NULL;
987 	} else {
988 		size_t skip;
989 		size_t size;
990 		/* FIXME maybe discard if size too large */
991 		size = sizeof(struct svc_deferred_req) + rqstp->rq_arg.len;
992 		dr = kmalloc(size, GFP_KERNEL);
993 		if (dr == NULL)
994 			return NULL;
995 
996 		dr->handle.owner = rqstp->rq_server;
997 		dr->prot = rqstp->rq_prot;
998 		memcpy(&dr->addr, &rqstp->rq_addr, rqstp->rq_addrlen);
999 		dr->addrlen = rqstp->rq_addrlen;
1000 		dr->daddr = rqstp->rq_daddr;
1001 		dr->argslen = rqstp->rq_arg.len >> 2;
1002 		dr->xprt_hlen = rqstp->rq_xprt_hlen;
1003 
1004 		/* back up head to the start of the buffer and copy */
1005 		skip = rqstp->rq_arg.len - rqstp->rq_arg.head[0].iov_len;
1006 		memcpy(dr->args, rqstp->rq_arg.head[0].iov_base - skip,
1007 		       dr->argslen << 2);
1008 	}
1009 	svc_xprt_get(rqstp->rq_xprt);
1010 	dr->xprt = rqstp->rq_xprt;
1011 
1012 	dr->handle.revisit = svc_revisit;
1013 	return &dr->handle;
1014 }
1015 
1016 /*
1017  * recv data from a deferred request into an active one
1018  */
1019 static int svc_deferred_recv(struct svc_rqst *rqstp)
1020 {
1021 	struct svc_deferred_req *dr = rqstp->rq_deferred;
1022 
1023 	/* setup iov_base past transport header */
1024 	rqstp->rq_arg.head[0].iov_base = dr->args + (dr->xprt_hlen>>2);
1025 	/* The iov_len does not include the transport header bytes */
1026 	rqstp->rq_arg.head[0].iov_len = (dr->argslen<<2) - dr->xprt_hlen;
1027 	rqstp->rq_arg.page_len = 0;
1028 	/* The rq_arg.len includes the transport header bytes */
1029 	rqstp->rq_arg.len     = dr->argslen<<2;
1030 	rqstp->rq_prot        = dr->prot;
1031 	memcpy(&rqstp->rq_addr, &dr->addr, dr->addrlen);
1032 	rqstp->rq_addrlen     = dr->addrlen;
1033 	/* Save off transport header len in case we get deferred again */
1034 	rqstp->rq_xprt_hlen   = dr->xprt_hlen;
1035 	rqstp->rq_daddr       = dr->daddr;
1036 	rqstp->rq_respages    = rqstp->rq_pages;
1037 	return (dr->argslen<<2) - dr->xprt_hlen;
1038 }
1039 
1040 
1041 static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt)
1042 {
1043 	struct svc_deferred_req *dr = NULL;
1044 
1045 	if (!test_bit(XPT_DEFERRED, &xprt->xpt_flags))
1046 		return NULL;
1047 	spin_lock(&xprt->xpt_lock);
1048 	clear_bit(XPT_DEFERRED, &xprt->xpt_flags);
1049 	if (!list_empty(&xprt->xpt_deferred)) {
1050 		dr = list_entry(xprt->xpt_deferred.next,
1051 				struct svc_deferred_req,
1052 				handle.recent);
1053 		list_del_init(&dr->handle.recent);
1054 		set_bit(XPT_DEFERRED, &xprt->xpt_flags);
1055 	}
1056 	spin_unlock(&xprt->xpt_lock);
1057 	return dr;
1058 }
1059 
1060 /**
1061  * svc_find_xprt - find an RPC transport instance
1062  * @serv: pointer to svc_serv to search
1063  * @xcl_name: C string containing transport's class name
1064  * @af: Address family of transport's local address
1065  * @port: transport's IP port number
1066  *
1067  * Return the transport instance pointer for the endpoint accepting
1068  * connections/peer traffic from the specified transport class,
1069  * address family and port.
1070  *
1071  * Specifying 0 for the address family or port is effectively a
1072  * wild-card, and will result in matching the first transport in the
1073  * service's list that has a matching class name.
1074  */
1075 struct svc_xprt *svc_find_xprt(struct svc_serv *serv, const char *xcl_name,
1076 			       const sa_family_t af, const unsigned short port)
1077 {
1078 	struct svc_xprt *xprt;
1079 	struct svc_xprt *found = NULL;
1080 
1081 	/* Sanity check the args */
1082 	if (serv == NULL || xcl_name == NULL)
1083 		return found;
1084 
1085 	spin_lock_bh(&serv->sv_lock);
1086 	list_for_each_entry(xprt, &serv->sv_permsocks, xpt_list) {
1087 		if (strcmp(xprt->xpt_class->xcl_name, xcl_name))
1088 			continue;
1089 		if (af != AF_UNSPEC && af != xprt->xpt_local.ss_family)
1090 			continue;
1091 		if (port != 0 && port != svc_xprt_local_port(xprt))
1092 			continue;
1093 		found = xprt;
1094 		svc_xprt_get(xprt);
1095 		break;
1096 	}
1097 	spin_unlock_bh(&serv->sv_lock);
1098 	return found;
1099 }
1100 EXPORT_SYMBOL_GPL(svc_find_xprt);
1101 
1102 static int svc_one_xprt_name(const struct svc_xprt *xprt,
1103 			     char *pos, int remaining)
1104 {
1105 	int len;
1106 
1107 	len = snprintf(pos, remaining, "%s %u\n",
1108 			xprt->xpt_class->xcl_name,
1109 			svc_xprt_local_port(xprt));
1110 	if (len >= remaining)
1111 		return -ENAMETOOLONG;
1112 	return len;
1113 }
1114 
1115 /**
1116  * svc_xprt_names - format a buffer with a list of transport names
1117  * @serv: pointer to an RPC service
1118  * @buf: pointer to a buffer to be filled in
1119  * @buflen: length of buffer to be filled in
1120  *
1121  * Fills in @buf with a string containing a list of transport names,
1122  * each name terminated with '\n'.
1123  *
1124  * Returns positive length of the filled-in string on success; otherwise
1125  * a negative errno value is returned if an error occurs.
1126  */
1127 int svc_xprt_names(struct svc_serv *serv, char *buf, const int buflen)
1128 {
1129 	struct svc_xprt *xprt;
1130 	int len, totlen;
1131 	char *pos;
1132 
1133 	/* Sanity check args */
1134 	if (!serv)
1135 		return 0;
1136 
1137 	spin_lock_bh(&serv->sv_lock);
1138 
1139 	pos = buf;
1140 	totlen = 0;
1141 	list_for_each_entry(xprt, &serv->sv_permsocks, xpt_list) {
1142 		len = svc_one_xprt_name(xprt, pos, buflen - totlen);
1143 		if (len < 0) {
1144 			*buf = '\0';
1145 			totlen = len;
1146 		}
1147 		if (len <= 0)
1148 			break;
1149 
1150 		pos += len;
1151 		totlen += len;
1152 	}
1153 
1154 	spin_unlock_bh(&serv->sv_lock);
1155 	return totlen;
1156 }
1157 EXPORT_SYMBOL_GPL(svc_xprt_names);
1158 
1159 
1160 /*----------------------------------------------------------------------------*/
1161 
1162 static void *svc_pool_stats_start(struct seq_file *m, loff_t *pos)
1163 {
1164 	unsigned int pidx = (unsigned int)*pos;
1165 	struct svc_serv *serv = m->private;
1166 
1167 	dprintk("svc_pool_stats_start, *pidx=%u\n", pidx);
1168 
1169 	lock_kernel();
1170 	/* bump up the pseudo refcount while traversing */
1171 	svc_get(serv);
1172 	unlock_kernel();
1173 
1174 	if (!pidx)
1175 		return SEQ_START_TOKEN;
1176 	return (pidx > serv->sv_nrpools ? NULL : &serv->sv_pools[pidx-1]);
1177 }
1178 
1179 static void *svc_pool_stats_next(struct seq_file *m, void *p, loff_t *pos)
1180 {
1181 	struct svc_pool *pool = p;
1182 	struct svc_serv *serv = m->private;
1183 
1184 	dprintk("svc_pool_stats_next, *pos=%llu\n", *pos);
1185 
1186 	if (p == SEQ_START_TOKEN) {
1187 		pool = &serv->sv_pools[0];
1188 	} else {
1189 		unsigned int pidx = (pool - &serv->sv_pools[0]);
1190 		if (pidx < serv->sv_nrpools-1)
1191 			pool = &serv->sv_pools[pidx+1];
1192 		else
1193 			pool = NULL;
1194 	}
1195 	++*pos;
1196 	return pool;
1197 }
1198 
1199 static void svc_pool_stats_stop(struct seq_file *m, void *p)
1200 {
1201 	struct svc_serv *serv = m->private;
1202 
1203 	lock_kernel();
1204 	/* this function really, really should have been called svc_put() */
1205 	svc_destroy(serv);
1206 	unlock_kernel();
1207 }
1208 
1209 static int svc_pool_stats_show(struct seq_file *m, void *p)
1210 {
1211 	struct svc_pool *pool = p;
1212 
1213 	if (p == SEQ_START_TOKEN) {
1214 		seq_puts(m, "# pool packets-arrived sockets-enqueued threads-woken overloads-avoided threads-timedout\n");
1215 		return 0;
1216 	}
1217 
1218 	seq_printf(m, "%u %lu %lu %lu %lu %lu\n",
1219 		pool->sp_id,
1220 		pool->sp_stats.packets,
1221 		pool->sp_stats.sockets_queued,
1222 		pool->sp_stats.threads_woken,
1223 		pool->sp_stats.overloads_avoided,
1224 		pool->sp_stats.threads_timedout);
1225 
1226 	return 0;
1227 }
1228 
1229 static const struct seq_operations svc_pool_stats_seq_ops = {
1230 	.start	= svc_pool_stats_start,
1231 	.next	= svc_pool_stats_next,
1232 	.stop	= svc_pool_stats_stop,
1233 	.show	= svc_pool_stats_show,
1234 };
1235 
1236 int svc_pool_stats_open(struct svc_serv *serv, struct file *file)
1237 {
1238 	int err;
1239 
1240 	err = seq_open(file, &svc_pool_stats_seq_ops);
1241 	if (!err)
1242 		((struct seq_file *) file->private_data)->private = serv;
1243 	return err;
1244 }
1245 EXPORT_SYMBOL(svc_pool_stats_open);
1246 
1247 /*----------------------------------------------------------------------------*/
1248