1 /* 2 * Multipath support for RPC 3 * 4 * Copyright (c) 2015, 2016, Primary Data, Inc. All rights reserved. 5 * 6 * Trond Myklebust <trond.myklebust@primarydata.com> 7 * 8 */ 9 #include <linux/types.h> 10 #include <linux/kref.h> 11 #include <linux/list.h> 12 #include <linux/rcupdate.h> 13 #include <linux/rculist.h> 14 #include <linux/slab.h> 15 #include <asm/cmpxchg.h> 16 #include <linux/spinlock.h> 17 #include <linux/sunrpc/xprt.h> 18 #include <linux/sunrpc/xprtmultipath.h> 19 20 typedef struct rpc_xprt *(*xprt_switch_find_xprt_t)(struct list_head *head, 21 const struct rpc_xprt *cur); 22 23 static const struct rpc_xprt_iter_ops rpc_xprt_iter_singular; 24 static const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin; 25 static const struct rpc_xprt_iter_ops rpc_xprt_iter_listall; 26 27 static void xprt_switch_add_xprt_locked(struct rpc_xprt_switch *xps, 28 struct rpc_xprt *xprt) 29 { 30 if (unlikely(xprt_get(xprt) == NULL)) 31 return; 32 list_add_tail_rcu(&xprt->xprt_switch, &xps->xps_xprt_list); 33 smp_wmb(); 34 if (xps->xps_nxprts == 0) 35 xps->xps_net = xprt->xprt_net; 36 xps->xps_nxprts++; 37 } 38 39 /** 40 * rpc_xprt_switch_add_xprt - Add a new rpc_xprt to an rpc_xprt_switch 41 * @xps: pointer to struct rpc_xprt_switch 42 * @xprt: pointer to struct rpc_xprt 43 * 44 * Adds xprt to the end of the list of struct rpc_xprt in xps. 45 */ 46 void rpc_xprt_switch_add_xprt(struct rpc_xprt_switch *xps, 47 struct rpc_xprt *xprt) 48 { 49 if (xprt == NULL) 50 return; 51 spin_lock(&xps->xps_lock); 52 if (xps->xps_net == xprt->xprt_net || xps->xps_net == NULL) 53 xprt_switch_add_xprt_locked(xps, xprt); 54 spin_unlock(&xps->xps_lock); 55 } 56 57 static void xprt_switch_remove_xprt_locked(struct rpc_xprt_switch *xps, 58 struct rpc_xprt *xprt) 59 { 60 if (unlikely(xprt == NULL)) 61 return; 62 xps->xps_nxprts--; 63 if (xps->xps_nxprts == 0) 64 xps->xps_net = NULL; 65 smp_wmb(); 66 list_del_rcu(&xprt->xprt_switch); 67 } 68 69 /** 70 * rpc_xprt_switch_remove_xprt - Removes an rpc_xprt from a rpc_xprt_switch 71 * @xps: pointer to struct rpc_xprt_switch 72 * @xprt: pointer to struct rpc_xprt 73 * 74 * Removes xprt from the list of struct rpc_xprt in xps. 75 */ 76 void rpc_xprt_switch_remove_xprt(struct rpc_xprt_switch *xps, 77 struct rpc_xprt *xprt) 78 { 79 spin_lock(&xps->xps_lock); 80 xprt_switch_remove_xprt_locked(xps, xprt); 81 spin_unlock(&xps->xps_lock); 82 xprt_put(xprt); 83 } 84 85 /** 86 * xprt_switch_alloc - Allocate a new struct rpc_xprt_switch 87 * @xprt: pointer to struct rpc_xprt 88 * @gfp_flags: allocation flags 89 * 90 * On success, returns an initialised struct rpc_xprt_switch, containing 91 * the entry xprt. Returns NULL on failure. 92 */ 93 struct rpc_xprt_switch *xprt_switch_alloc(struct rpc_xprt *xprt, 94 gfp_t gfp_flags) 95 { 96 struct rpc_xprt_switch *xps; 97 98 xps = kmalloc(sizeof(*xps), gfp_flags); 99 if (xps != NULL) { 100 spin_lock_init(&xps->xps_lock); 101 kref_init(&xps->xps_kref); 102 xps->xps_nxprts = 0; 103 INIT_LIST_HEAD(&xps->xps_xprt_list); 104 xps->xps_iter_ops = &rpc_xprt_iter_singular; 105 xprt_switch_add_xprt_locked(xps, xprt); 106 } 107 108 return xps; 109 } 110 111 static void xprt_switch_free_entries(struct rpc_xprt_switch *xps) 112 { 113 spin_lock(&xps->xps_lock); 114 while (!list_empty(&xps->xps_xprt_list)) { 115 struct rpc_xprt *xprt; 116 117 xprt = list_first_entry(&xps->xps_xprt_list, 118 struct rpc_xprt, xprt_switch); 119 xprt_switch_remove_xprt_locked(xps, xprt); 120 spin_unlock(&xps->xps_lock); 121 xprt_put(xprt); 122 spin_lock(&xps->xps_lock); 123 } 124 spin_unlock(&xps->xps_lock); 125 } 126 127 static void xprt_switch_free(struct kref *kref) 128 { 129 struct rpc_xprt_switch *xps = container_of(kref, 130 struct rpc_xprt_switch, xps_kref); 131 132 xprt_switch_free_entries(xps); 133 kfree_rcu(xps, xps_rcu); 134 } 135 136 /** 137 * xprt_switch_get - Return a reference to a rpc_xprt_switch 138 * @xps: pointer to struct rpc_xprt_switch 139 * 140 * Returns a reference to xps unless the refcount is already zero. 141 */ 142 struct rpc_xprt_switch *xprt_switch_get(struct rpc_xprt_switch *xps) 143 { 144 if (xps != NULL && kref_get_unless_zero(&xps->xps_kref)) 145 return xps; 146 return NULL; 147 } 148 149 /** 150 * xprt_switch_put - Release a reference to a rpc_xprt_switch 151 * @xps: pointer to struct rpc_xprt_switch 152 * 153 * Release the reference to xps, and free it once the refcount is zero. 154 */ 155 void xprt_switch_put(struct rpc_xprt_switch *xps) 156 { 157 if (xps != NULL) 158 kref_put(&xps->xps_kref, xprt_switch_free); 159 } 160 161 /** 162 * rpc_xprt_switch_set_roundrobin - Set a round-robin policy on rpc_xprt_switch 163 * @xps: pointer to struct rpc_xprt_switch 164 * 165 * Sets a round-robin default policy for iterators acting on xps. 166 */ 167 void rpc_xprt_switch_set_roundrobin(struct rpc_xprt_switch *xps) 168 { 169 if (READ_ONCE(xps->xps_iter_ops) != &rpc_xprt_iter_roundrobin) 170 WRITE_ONCE(xps->xps_iter_ops, &rpc_xprt_iter_roundrobin); 171 } 172 173 static 174 const struct rpc_xprt_iter_ops *xprt_iter_ops(const struct rpc_xprt_iter *xpi) 175 { 176 if (xpi->xpi_ops != NULL) 177 return xpi->xpi_ops; 178 return rcu_dereference(xpi->xpi_xpswitch)->xps_iter_ops; 179 } 180 181 static 182 void xprt_iter_no_rewind(struct rpc_xprt_iter *xpi) 183 { 184 } 185 186 static 187 void xprt_iter_default_rewind(struct rpc_xprt_iter *xpi) 188 { 189 WRITE_ONCE(xpi->xpi_cursor, NULL); 190 } 191 192 static 193 struct rpc_xprt *xprt_switch_find_first_entry(struct list_head *head) 194 { 195 return list_first_or_null_rcu(head, struct rpc_xprt, xprt_switch); 196 } 197 198 static 199 struct rpc_xprt *xprt_iter_first_entry(struct rpc_xprt_iter *xpi) 200 { 201 struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); 202 203 if (xps == NULL) 204 return NULL; 205 return xprt_switch_find_first_entry(&xps->xps_xprt_list); 206 } 207 208 static 209 struct rpc_xprt *xprt_switch_find_current_entry(struct list_head *head, 210 const struct rpc_xprt *cur) 211 { 212 struct rpc_xprt *pos; 213 214 list_for_each_entry_rcu(pos, head, xprt_switch) { 215 if (cur == pos) 216 return pos; 217 } 218 return NULL; 219 } 220 221 static 222 struct rpc_xprt *xprt_iter_current_entry(struct rpc_xprt_iter *xpi) 223 { 224 struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); 225 struct list_head *head; 226 227 if (xps == NULL) 228 return NULL; 229 head = &xps->xps_xprt_list; 230 if (xpi->xpi_cursor == NULL || xps->xps_nxprts < 2) 231 return xprt_switch_find_first_entry(head); 232 return xprt_switch_find_current_entry(head, xpi->xpi_cursor); 233 } 234 235 static 236 struct rpc_xprt *xprt_switch_find_next_entry(struct list_head *head, 237 const struct rpc_xprt *cur) 238 { 239 struct rpc_xprt *pos, *prev = NULL; 240 241 list_for_each_entry_rcu(pos, head, xprt_switch) { 242 if (cur == prev) 243 return pos; 244 prev = pos; 245 } 246 return NULL; 247 } 248 249 static 250 struct rpc_xprt *xprt_switch_set_next_cursor(struct list_head *head, 251 struct rpc_xprt **cursor, 252 xprt_switch_find_xprt_t find_next) 253 { 254 struct rpc_xprt *cur, *pos, *old; 255 256 cur = READ_ONCE(*cursor); 257 for (;;) { 258 old = cur; 259 pos = find_next(head, old); 260 if (pos == NULL) 261 break; 262 cur = cmpxchg_relaxed(cursor, old, pos); 263 if (cur == old) 264 break; 265 } 266 return pos; 267 } 268 269 static 270 struct rpc_xprt *xprt_iter_next_entry_multiple(struct rpc_xprt_iter *xpi, 271 xprt_switch_find_xprt_t find_next) 272 { 273 struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); 274 struct list_head *head; 275 276 if (xps == NULL) 277 return NULL; 278 head = &xps->xps_xprt_list; 279 if (xps->xps_nxprts < 2) 280 return xprt_switch_find_first_entry(head); 281 return xprt_switch_set_next_cursor(head, &xpi->xpi_cursor, find_next); 282 } 283 284 static 285 struct rpc_xprt *xprt_switch_find_next_entry_roundrobin(struct list_head *head, 286 const struct rpc_xprt *cur) 287 { 288 struct rpc_xprt *ret; 289 290 ret = xprt_switch_find_next_entry(head, cur); 291 if (ret != NULL) 292 return ret; 293 return xprt_switch_find_first_entry(head); 294 } 295 296 static 297 struct rpc_xprt *xprt_iter_next_entry_roundrobin(struct rpc_xprt_iter *xpi) 298 { 299 return xprt_iter_next_entry_multiple(xpi, 300 xprt_switch_find_next_entry_roundrobin); 301 } 302 303 static 304 struct rpc_xprt *xprt_iter_next_entry_all(struct rpc_xprt_iter *xpi) 305 { 306 return xprt_iter_next_entry_multiple(xpi, xprt_switch_find_next_entry); 307 } 308 309 /* 310 * xprt_iter_rewind - Resets the xprt iterator 311 * @xpi: pointer to rpc_xprt_iter 312 * 313 * Resets xpi to ensure that it points to the first entry in the list 314 * of transports. 315 */ 316 static 317 void xprt_iter_rewind(struct rpc_xprt_iter *xpi) 318 { 319 rcu_read_lock(); 320 xprt_iter_ops(xpi)->xpi_rewind(xpi); 321 rcu_read_unlock(); 322 } 323 324 static void __xprt_iter_init(struct rpc_xprt_iter *xpi, 325 struct rpc_xprt_switch *xps, 326 const struct rpc_xprt_iter_ops *ops) 327 { 328 rcu_assign_pointer(xpi->xpi_xpswitch, xprt_switch_get(xps)); 329 xpi->xpi_cursor = NULL; 330 xpi->xpi_ops = ops; 331 } 332 333 /** 334 * xprt_iter_init - Initialise an xprt iterator 335 * @xpi: pointer to rpc_xprt_iter 336 * @xps: pointer to rpc_xprt_switch 337 * 338 * Initialises the iterator to use the default iterator ops 339 * as set in xps. This function is mainly intended for internal 340 * use in the rpc_client. 341 */ 342 void xprt_iter_init(struct rpc_xprt_iter *xpi, 343 struct rpc_xprt_switch *xps) 344 { 345 __xprt_iter_init(xpi, xps, NULL); 346 } 347 348 /** 349 * xprt_iter_init_listall - Initialise an xprt iterator 350 * @xpi: pointer to rpc_xprt_iter 351 * @xps: pointer to rpc_xprt_switch 352 * 353 * Initialises the iterator to iterate once through the entire list 354 * of entries in xps. 355 */ 356 void xprt_iter_init_listall(struct rpc_xprt_iter *xpi, 357 struct rpc_xprt_switch *xps) 358 { 359 __xprt_iter_init(xpi, xps, &rpc_xprt_iter_listall); 360 } 361 362 /** 363 * xprt_iter_xchg_switch - Atomically swap out the rpc_xprt_switch 364 * @xpi: pointer to rpc_xprt_iter 365 * @xps: pointer to a new rpc_xprt_switch or NULL 366 * 367 * Swaps out the existing xpi->xpi_xpswitch with a new value. 368 */ 369 struct rpc_xprt_switch *xprt_iter_xchg_switch(struct rpc_xprt_iter *xpi, 370 struct rpc_xprt_switch *newswitch) 371 { 372 struct rpc_xprt_switch __rcu *oldswitch; 373 374 /* Atomically swap out the old xpswitch */ 375 oldswitch = xchg(&xpi->xpi_xpswitch, RCU_INITIALIZER(newswitch)); 376 if (newswitch != NULL) 377 xprt_iter_rewind(xpi); 378 return rcu_dereference_protected(oldswitch, true); 379 } 380 381 /** 382 * xprt_iter_destroy - Destroys the xprt iterator 383 * @xpi pointer to rpc_xprt_iter 384 */ 385 void xprt_iter_destroy(struct rpc_xprt_iter *xpi) 386 { 387 xprt_switch_put(xprt_iter_xchg_switch(xpi, NULL)); 388 } 389 390 /** 391 * xprt_iter_xprt - Returns the rpc_xprt pointed to by the cursor 392 * @xpi: pointer to rpc_xprt_iter 393 * 394 * Returns a pointer to the struct rpc_xprt that is currently 395 * pointed to by the cursor. 396 * Caller must be holding rcu_read_lock(). 397 */ 398 struct rpc_xprt *xprt_iter_xprt(struct rpc_xprt_iter *xpi) 399 { 400 WARN_ON_ONCE(!rcu_read_lock_held()); 401 return xprt_iter_ops(xpi)->xpi_xprt(xpi); 402 } 403 404 static 405 struct rpc_xprt *xprt_iter_get_helper(struct rpc_xprt_iter *xpi, 406 struct rpc_xprt *(*fn)(struct rpc_xprt_iter *)) 407 { 408 struct rpc_xprt *ret; 409 410 do { 411 ret = fn(xpi); 412 if (ret == NULL) 413 break; 414 ret = xprt_get(ret); 415 } while (ret == NULL); 416 return ret; 417 } 418 419 /** 420 * xprt_iter_get_xprt - Returns the rpc_xprt pointed to by the cursor 421 * @xpi: pointer to rpc_xprt_iter 422 * 423 * Returns a reference to the struct rpc_xprt that is currently 424 * pointed to by the cursor. 425 */ 426 struct rpc_xprt *xprt_iter_get_xprt(struct rpc_xprt_iter *xpi) 427 { 428 struct rpc_xprt *xprt; 429 430 rcu_read_lock(); 431 xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_xprt); 432 rcu_read_unlock(); 433 return xprt; 434 } 435 436 /** 437 * xprt_iter_get_next - Returns the next rpc_xprt following the cursor 438 * @xpi: pointer to rpc_xprt_iter 439 * 440 * Returns a reference to the struct rpc_xprt that immediately follows the 441 * entry pointed to by the cursor. 442 */ 443 struct rpc_xprt *xprt_iter_get_next(struct rpc_xprt_iter *xpi) 444 { 445 struct rpc_xprt *xprt; 446 447 rcu_read_lock(); 448 xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_next); 449 rcu_read_unlock(); 450 return xprt; 451 } 452 453 /* Policy for always returning the first entry in the rpc_xprt_switch */ 454 static 455 const struct rpc_xprt_iter_ops rpc_xprt_iter_singular = { 456 .xpi_rewind = xprt_iter_no_rewind, 457 .xpi_xprt = xprt_iter_first_entry, 458 .xpi_next = xprt_iter_first_entry, 459 }; 460 461 /* Policy for round-robin iteration of entries in the rpc_xprt_switch */ 462 static 463 const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin = { 464 .xpi_rewind = xprt_iter_default_rewind, 465 .xpi_xprt = xprt_iter_current_entry, 466 .xpi_next = xprt_iter_next_entry_roundrobin, 467 }; 468 469 /* Policy for once-through iteration of entries in the rpc_xprt_switch */ 470 static 471 const struct rpc_xprt_iter_ops rpc_xprt_iter_listall = { 472 .xpi_rewind = xprt_iter_default_rewind, 473 .xpi_xprt = xprt_iter_current_entry, 474 .xpi_next = xprt_iter_next_entry_all, 475 }; 476