1 /* 2 * Multipath support for RPC 3 * 4 * Copyright (c) 2015, 2016, Primary Data, Inc. All rights reserved. 5 * 6 * Trond Myklebust <trond.myklebust@primarydata.com> 7 * 8 */ 9 #include <linux/types.h> 10 #include <linux/kref.h> 11 #include <linux/list.h> 12 #include <linux/rcupdate.h> 13 #include <linux/rculist.h> 14 #include <linux/slab.h> 15 #include <asm/cmpxchg.h> 16 #include <linux/spinlock.h> 17 #include <linux/sunrpc/xprt.h> 18 #include <linux/sunrpc/addr.h> 19 #include <linux/sunrpc/xprtmultipath.h> 20 21 typedef struct rpc_xprt *(*xprt_switch_find_xprt_t)(struct list_head *head, 22 const struct rpc_xprt *cur); 23 24 static const struct rpc_xprt_iter_ops rpc_xprt_iter_singular; 25 static const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin; 26 static const struct rpc_xprt_iter_ops rpc_xprt_iter_listall; 27 28 static void xprt_switch_add_xprt_locked(struct rpc_xprt_switch *xps, 29 struct rpc_xprt *xprt) 30 { 31 if (unlikely(xprt_get(xprt) == NULL)) 32 return; 33 list_add_tail_rcu(&xprt->xprt_switch, &xps->xps_xprt_list); 34 smp_wmb(); 35 if (xps->xps_nxprts == 0) 36 xps->xps_net = xprt->xprt_net; 37 xps->xps_nxprts++; 38 } 39 40 /** 41 * rpc_xprt_switch_add_xprt - Add a new rpc_xprt to an rpc_xprt_switch 42 * @xps: pointer to struct rpc_xprt_switch 43 * @xprt: pointer to struct rpc_xprt 44 * 45 * Adds xprt to the end of the list of struct rpc_xprt in xps. 46 */ 47 void rpc_xprt_switch_add_xprt(struct rpc_xprt_switch *xps, 48 struct rpc_xprt *xprt) 49 { 50 if (xprt == NULL) 51 return; 52 spin_lock(&xps->xps_lock); 53 if ((xps->xps_net == xprt->xprt_net || xps->xps_net == NULL) && 54 !rpc_xprt_switch_has_addr(xps, (struct sockaddr *)&xprt->addr)) 55 xprt_switch_add_xprt_locked(xps, xprt); 56 spin_unlock(&xps->xps_lock); 57 } 58 59 static void xprt_switch_remove_xprt_locked(struct rpc_xprt_switch *xps, 60 struct rpc_xprt *xprt) 61 { 62 if (unlikely(xprt == NULL)) 63 return; 64 xps->xps_nxprts--; 65 if (xps->xps_nxprts == 0) 66 xps->xps_net = NULL; 67 smp_wmb(); 68 list_del_rcu(&xprt->xprt_switch); 69 } 70 71 /** 72 * rpc_xprt_switch_remove_xprt - Removes an rpc_xprt from a rpc_xprt_switch 73 * @xps: pointer to struct rpc_xprt_switch 74 * @xprt: pointer to struct rpc_xprt 75 * 76 * Removes xprt from the list of struct rpc_xprt in xps. 77 */ 78 void rpc_xprt_switch_remove_xprt(struct rpc_xprt_switch *xps, 79 struct rpc_xprt *xprt) 80 { 81 spin_lock(&xps->xps_lock); 82 xprt_switch_remove_xprt_locked(xps, xprt); 83 spin_unlock(&xps->xps_lock); 84 xprt_put(xprt); 85 } 86 87 /** 88 * xprt_switch_alloc - Allocate a new struct rpc_xprt_switch 89 * @xprt: pointer to struct rpc_xprt 90 * @gfp_flags: allocation flags 91 * 92 * On success, returns an initialised struct rpc_xprt_switch, containing 93 * the entry xprt. Returns NULL on failure. 94 */ 95 struct rpc_xprt_switch *xprt_switch_alloc(struct rpc_xprt *xprt, 96 gfp_t gfp_flags) 97 { 98 struct rpc_xprt_switch *xps; 99 100 xps = kmalloc(sizeof(*xps), gfp_flags); 101 if (xps != NULL) { 102 spin_lock_init(&xps->xps_lock); 103 kref_init(&xps->xps_kref); 104 xps->xps_nxprts = 0; 105 INIT_LIST_HEAD(&xps->xps_xprt_list); 106 xps->xps_iter_ops = &rpc_xprt_iter_singular; 107 xprt_switch_add_xprt_locked(xps, xprt); 108 } 109 110 return xps; 111 } 112 113 static void xprt_switch_free_entries(struct rpc_xprt_switch *xps) 114 { 115 spin_lock(&xps->xps_lock); 116 while (!list_empty(&xps->xps_xprt_list)) { 117 struct rpc_xprt *xprt; 118 119 xprt = list_first_entry(&xps->xps_xprt_list, 120 struct rpc_xprt, xprt_switch); 121 xprt_switch_remove_xprt_locked(xps, xprt); 122 spin_unlock(&xps->xps_lock); 123 xprt_put(xprt); 124 spin_lock(&xps->xps_lock); 125 } 126 spin_unlock(&xps->xps_lock); 127 } 128 129 static void xprt_switch_free(struct kref *kref) 130 { 131 struct rpc_xprt_switch *xps = container_of(kref, 132 struct rpc_xprt_switch, xps_kref); 133 134 xprt_switch_free_entries(xps); 135 kfree_rcu(xps, xps_rcu); 136 } 137 138 /** 139 * xprt_switch_get - Return a reference to a rpc_xprt_switch 140 * @xps: pointer to struct rpc_xprt_switch 141 * 142 * Returns a reference to xps unless the refcount is already zero. 143 */ 144 struct rpc_xprt_switch *xprt_switch_get(struct rpc_xprt_switch *xps) 145 { 146 if (xps != NULL && kref_get_unless_zero(&xps->xps_kref)) 147 return xps; 148 return NULL; 149 } 150 151 /** 152 * xprt_switch_put - Release a reference to a rpc_xprt_switch 153 * @xps: pointer to struct rpc_xprt_switch 154 * 155 * Release the reference to xps, and free it once the refcount is zero. 156 */ 157 void xprt_switch_put(struct rpc_xprt_switch *xps) 158 { 159 if (xps != NULL) 160 kref_put(&xps->xps_kref, xprt_switch_free); 161 } 162 163 /** 164 * rpc_xprt_switch_set_roundrobin - Set a round-robin policy on rpc_xprt_switch 165 * @xps: pointer to struct rpc_xprt_switch 166 * 167 * Sets a round-robin default policy for iterators acting on xps. 168 */ 169 void rpc_xprt_switch_set_roundrobin(struct rpc_xprt_switch *xps) 170 { 171 if (READ_ONCE(xps->xps_iter_ops) != &rpc_xprt_iter_roundrobin) 172 WRITE_ONCE(xps->xps_iter_ops, &rpc_xprt_iter_roundrobin); 173 } 174 175 static 176 const struct rpc_xprt_iter_ops *xprt_iter_ops(const struct rpc_xprt_iter *xpi) 177 { 178 if (xpi->xpi_ops != NULL) 179 return xpi->xpi_ops; 180 return rcu_dereference(xpi->xpi_xpswitch)->xps_iter_ops; 181 } 182 183 static 184 void xprt_iter_no_rewind(struct rpc_xprt_iter *xpi) 185 { 186 } 187 188 static 189 void xprt_iter_default_rewind(struct rpc_xprt_iter *xpi) 190 { 191 WRITE_ONCE(xpi->xpi_cursor, NULL); 192 } 193 194 static 195 struct rpc_xprt *xprt_switch_find_first_entry(struct list_head *head) 196 { 197 return list_first_or_null_rcu(head, struct rpc_xprt, xprt_switch); 198 } 199 200 static 201 struct rpc_xprt *xprt_iter_first_entry(struct rpc_xprt_iter *xpi) 202 { 203 struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); 204 205 if (xps == NULL) 206 return NULL; 207 return xprt_switch_find_first_entry(&xps->xps_xprt_list); 208 } 209 210 static 211 struct rpc_xprt *xprt_switch_find_current_entry(struct list_head *head, 212 const struct rpc_xprt *cur) 213 { 214 struct rpc_xprt *pos; 215 216 list_for_each_entry_rcu(pos, head, xprt_switch) { 217 if (cur == pos) 218 return pos; 219 } 220 return NULL; 221 } 222 223 static 224 struct rpc_xprt *xprt_iter_current_entry(struct rpc_xprt_iter *xpi) 225 { 226 struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); 227 struct list_head *head; 228 229 if (xps == NULL) 230 return NULL; 231 head = &xps->xps_xprt_list; 232 if (xpi->xpi_cursor == NULL || xps->xps_nxprts < 2) 233 return xprt_switch_find_first_entry(head); 234 return xprt_switch_find_current_entry(head, xpi->xpi_cursor); 235 } 236 237 bool rpc_xprt_switch_has_addr(struct rpc_xprt_switch *xps, 238 const struct sockaddr *sap) 239 { 240 struct list_head *head; 241 struct rpc_xprt *pos; 242 243 if (xps == NULL || sap == NULL) 244 return false; 245 246 head = &xps->xps_xprt_list; 247 list_for_each_entry_rcu(pos, head, xprt_switch) { 248 if (rpc_cmp_addr_port(sap, (struct sockaddr *)&pos->addr)) { 249 pr_info("RPC: addr %s already in xprt switch\n", 250 pos->address_strings[RPC_DISPLAY_ADDR]); 251 return true; 252 } 253 } 254 return false; 255 } 256 257 static 258 struct rpc_xprt *xprt_switch_find_next_entry(struct list_head *head, 259 const struct rpc_xprt *cur) 260 { 261 struct rpc_xprt *pos, *prev = NULL; 262 263 list_for_each_entry_rcu(pos, head, xprt_switch) { 264 if (cur == prev) 265 return pos; 266 prev = pos; 267 } 268 return NULL; 269 } 270 271 static 272 struct rpc_xprt *xprt_switch_set_next_cursor(struct list_head *head, 273 struct rpc_xprt **cursor, 274 xprt_switch_find_xprt_t find_next) 275 { 276 struct rpc_xprt *cur, *pos, *old; 277 278 cur = READ_ONCE(*cursor); 279 for (;;) { 280 old = cur; 281 pos = find_next(head, old); 282 if (pos == NULL) 283 break; 284 cur = cmpxchg_relaxed(cursor, old, pos); 285 if (cur == old) 286 break; 287 } 288 return pos; 289 } 290 291 static 292 struct rpc_xprt *xprt_iter_next_entry_multiple(struct rpc_xprt_iter *xpi, 293 xprt_switch_find_xprt_t find_next) 294 { 295 struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); 296 297 if (xps == NULL) 298 return NULL; 299 return xprt_switch_set_next_cursor(&xps->xps_xprt_list, 300 &xpi->xpi_cursor, 301 find_next); 302 } 303 304 static 305 struct rpc_xprt *xprt_switch_find_next_entry_roundrobin(struct list_head *head, 306 const struct rpc_xprt *cur) 307 { 308 struct rpc_xprt *ret; 309 310 ret = xprt_switch_find_next_entry(head, cur); 311 if (ret != NULL) 312 return ret; 313 return xprt_switch_find_first_entry(head); 314 } 315 316 static 317 struct rpc_xprt *xprt_iter_next_entry_roundrobin(struct rpc_xprt_iter *xpi) 318 { 319 return xprt_iter_next_entry_multiple(xpi, 320 xprt_switch_find_next_entry_roundrobin); 321 } 322 323 static 324 struct rpc_xprt *xprt_iter_next_entry_all(struct rpc_xprt_iter *xpi) 325 { 326 return xprt_iter_next_entry_multiple(xpi, xprt_switch_find_next_entry); 327 } 328 329 /* 330 * xprt_iter_rewind - Resets the xprt iterator 331 * @xpi: pointer to rpc_xprt_iter 332 * 333 * Resets xpi to ensure that it points to the first entry in the list 334 * of transports. 335 */ 336 static 337 void xprt_iter_rewind(struct rpc_xprt_iter *xpi) 338 { 339 rcu_read_lock(); 340 xprt_iter_ops(xpi)->xpi_rewind(xpi); 341 rcu_read_unlock(); 342 } 343 344 static void __xprt_iter_init(struct rpc_xprt_iter *xpi, 345 struct rpc_xprt_switch *xps, 346 const struct rpc_xprt_iter_ops *ops) 347 { 348 rcu_assign_pointer(xpi->xpi_xpswitch, xprt_switch_get(xps)); 349 xpi->xpi_cursor = NULL; 350 xpi->xpi_ops = ops; 351 } 352 353 /** 354 * xprt_iter_init - Initialise an xprt iterator 355 * @xpi: pointer to rpc_xprt_iter 356 * @xps: pointer to rpc_xprt_switch 357 * 358 * Initialises the iterator to use the default iterator ops 359 * as set in xps. This function is mainly intended for internal 360 * use in the rpc_client. 361 */ 362 void xprt_iter_init(struct rpc_xprt_iter *xpi, 363 struct rpc_xprt_switch *xps) 364 { 365 __xprt_iter_init(xpi, xps, NULL); 366 } 367 368 /** 369 * xprt_iter_init_listall - Initialise an xprt iterator 370 * @xpi: pointer to rpc_xprt_iter 371 * @xps: pointer to rpc_xprt_switch 372 * 373 * Initialises the iterator to iterate once through the entire list 374 * of entries in xps. 375 */ 376 void xprt_iter_init_listall(struct rpc_xprt_iter *xpi, 377 struct rpc_xprt_switch *xps) 378 { 379 __xprt_iter_init(xpi, xps, &rpc_xprt_iter_listall); 380 } 381 382 /** 383 * xprt_iter_xchg_switch - Atomically swap out the rpc_xprt_switch 384 * @xpi: pointer to rpc_xprt_iter 385 * @xps: pointer to a new rpc_xprt_switch or NULL 386 * 387 * Swaps out the existing xpi->xpi_xpswitch with a new value. 388 */ 389 struct rpc_xprt_switch *xprt_iter_xchg_switch(struct rpc_xprt_iter *xpi, 390 struct rpc_xprt_switch *newswitch) 391 { 392 struct rpc_xprt_switch __rcu *oldswitch; 393 394 /* Atomically swap out the old xpswitch */ 395 oldswitch = xchg(&xpi->xpi_xpswitch, RCU_INITIALIZER(newswitch)); 396 if (newswitch != NULL) 397 xprt_iter_rewind(xpi); 398 return rcu_dereference_protected(oldswitch, true); 399 } 400 401 /** 402 * xprt_iter_destroy - Destroys the xprt iterator 403 * @xpi pointer to rpc_xprt_iter 404 */ 405 void xprt_iter_destroy(struct rpc_xprt_iter *xpi) 406 { 407 xprt_switch_put(xprt_iter_xchg_switch(xpi, NULL)); 408 } 409 410 /** 411 * xprt_iter_xprt - Returns the rpc_xprt pointed to by the cursor 412 * @xpi: pointer to rpc_xprt_iter 413 * 414 * Returns a pointer to the struct rpc_xprt that is currently 415 * pointed to by the cursor. 416 * Caller must be holding rcu_read_lock(). 417 */ 418 struct rpc_xprt *xprt_iter_xprt(struct rpc_xprt_iter *xpi) 419 { 420 WARN_ON_ONCE(!rcu_read_lock_held()); 421 return xprt_iter_ops(xpi)->xpi_xprt(xpi); 422 } 423 424 static 425 struct rpc_xprt *xprt_iter_get_helper(struct rpc_xprt_iter *xpi, 426 struct rpc_xprt *(*fn)(struct rpc_xprt_iter *)) 427 { 428 struct rpc_xprt *ret; 429 430 do { 431 ret = fn(xpi); 432 if (ret == NULL) 433 break; 434 ret = xprt_get(ret); 435 } while (ret == NULL); 436 return ret; 437 } 438 439 /** 440 * xprt_iter_get_xprt - Returns the rpc_xprt pointed to by the cursor 441 * @xpi: pointer to rpc_xprt_iter 442 * 443 * Returns a reference to the struct rpc_xprt that is currently 444 * pointed to by the cursor. 445 */ 446 struct rpc_xprt *xprt_iter_get_xprt(struct rpc_xprt_iter *xpi) 447 { 448 struct rpc_xprt *xprt; 449 450 rcu_read_lock(); 451 xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_xprt); 452 rcu_read_unlock(); 453 return xprt; 454 } 455 456 /** 457 * xprt_iter_get_next - Returns the next rpc_xprt following the cursor 458 * @xpi: pointer to rpc_xprt_iter 459 * 460 * Returns a reference to the struct rpc_xprt that immediately follows the 461 * entry pointed to by the cursor. 462 */ 463 struct rpc_xprt *xprt_iter_get_next(struct rpc_xprt_iter *xpi) 464 { 465 struct rpc_xprt *xprt; 466 467 rcu_read_lock(); 468 xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_next); 469 rcu_read_unlock(); 470 return xprt; 471 } 472 473 /* Policy for always returning the first entry in the rpc_xprt_switch */ 474 static 475 const struct rpc_xprt_iter_ops rpc_xprt_iter_singular = { 476 .xpi_rewind = xprt_iter_no_rewind, 477 .xpi_xprt = xprt_iter_first_entry, 478 .xpi_next = xprt_iter_first_entry, 479 }; 480 481 /* Policy for round-robin iteration of entries in the rpc_xprt_switch */ 482 static 483 const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin = { 484 .xpi_rewind = xprt_iter_default_rewind, 485 .xpi_xprt = xprt_iter_current_entry, 486 .xpi_next = xprt_iter_next_entry_roundrobin, 487 }; 488 489 /* Policy for once-through iteration of entries in the rpc_xprt_switch */ 490 static 491 const struct rpc_xprt_iter_ops rpc_xprt_iter_listall = { 492 .xpi_rewind = xprt_iter_default_rewind, 493 .xpi_xprt = xprt_iter_current_entry, 494 .xpi_next = xprt_iter_next_entry_all, 495 }; 496