1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/math64.h> 5 #include <linux/slab.h> 6 #include <linux/seq_file.h> 7 8 #include "subvolume_metrics.h" 9 #include "mds_client.h" 10 #include "super.h" 11 12 /** 13 * struct ceph_subvol_metric_rb_entry - Per-subvolume I/O metrics node 14 * @node: Red-black tree linkage for tracker->tree 15 * @subvolume_id: Subvolume identifier (key for rb-tree lookup) 16 * @read_ops: Accumulated read operation count since last snapshot 17 * @write_ops: Accumulated write operation count since last snapshot 18 * @read_bytes: Accumulated bytes read since last snapshot 19 * @write_bytes: Accumulated bytes written since last snapshot 20 * @read_latency_us: Sum of read latencies in microseconds 21 * @write_latency_us: Sum of write latencies in microseconds 22 */ 23 struct ceph_subvol_metric_rb_entry { 24 struct rb_node node; 25 u64 subvolume_id; 26 u64 read_ops; 27 u64 write_ops; 28 u64 read_bytes; 29 u64 write_bytes; 30 u64 read_latency_us; 31 u64 write_latency_us; 32 }; 33 34 static struct kmem_cache *ceph_subvol_metric_entry_cachep; 35 36 void ceph_subvolume_metrics_init(struct ceph_subvolume_metrics_tracker *tracker) 37 { 38 spin_lock_init(&tracker->lock); 39 tracker->tree = RB_ROOT_CACHED; 40 tracker->nr_entries = 0; 41 tracker->enabled = false; 42 atomic64_set(&tracker->snapshot_attempts, 0); 43 atomic64_set(&tracker->snapshot_empty, 0); 44 atomic64_set(&tracker->snapshot_failures, 0); 45 atomic64_set(&tracker->record_calls, 0); 46 atomic64_set(&tracker->record_disabled, 0); 47 atomic64_set(&tracker->record_no_subvol, 0); 48 atomic64_set(&tracker->total_read_ops, 0); 49 atomic64_set(&tracker->total_read_bytes, 0); 50 atomic64_set(&tracker->total_write_ops, 0); 51 atomic64_set(&tracker->total_write_bytes, 0); 52 } 53 54 static struct ceph_subvol_metric_rb_entry * 55 __lookup_entry(struct ceph_subvolume_metrics_tracker *tracker, u64 subvol_id) 56 { 57 struct rb_node *node; 58 59 node = tracker->tree.rb_root.rb_node; 60 while (node) { 61 struct ceph_subvol_metric_rb_entry *entry = 62 rb_entry(node, struct ceph_subvol_metric_rb_entry, node); 63 64 if (subvol_id < entry->subvolume_id) 65 node = node->rb_left; 66 else if (subvol_id > entry->subvolume_id) 67 node = node->rb_right; 68 else 69 return entry; 70 } 71 72 return NULL; 73 } 74 75 static struct ceph_subvol_metric_rb_entry * 76 __insert_entry(struct ceph_subvolume_metrics_tracker *tracker, 77 struct ceph_subvol_metric_rb_entry *entry) 78 { 79 struct rb_node **link = &tracker->tree.rb_root.rb_node; 80 struct rb_node *parent = NULL; 81 bool leftmost = true; 82 83 while (*link) { 84 struct ceph_subvol_metric_rb_entry *cur = 85 rb_entry(*link, struct ceph_subvol_metric_rb_entry, node); 86 87 parent = *link; 88 if (entry->subvolume_id < cur->subvolume_id) 89 link = &(*link)->rb_left; 90 else if (entry->subvolume_id > cur->subvolume_id) { 91 link = &(*link)->rb_right; 92 leftmost = false; 93 } else 94 return cur; 95 } 96 97 rb_link_node(&entry->node, parent, link); 98 rb_insert_color_cached(&entry->node, &tracker->tree, leftmost); 99 tracker->nr_entries++; 100 return entry; 101 } 102 103 static void ceph_subvolume_metrics_clear_locked( 104 struct ceph_subvolume_metrics_tracker *tracker) 105 { 106 struct rb_node *node = rb_first_cached(&tracker->tree); 107 108 while (node) { 109 struct ceph_subvol_metric_rb_entry *entry = 110 rb_entry(node, struct ceph_subvol_metric_rb_entry, node); 111 struct rb_node *next = rb_next(node); 112 113 rb_erase_cached(&entry->node, &tracker->tree); 114 tracker->nr_entries--; 115 kmem_cache_free(ceph_subvol_metric_entry_cachep, entry); 116 node = next; 117 } 118 119 tracker->tree = RB_ROOT_CACHED; 120 } 121 122 void ceph_subvolume_metrics_destroy(struct ceph_subvolume_metrics_tracker *tracker) 123 { 124 spin_lock(&tracker->lock); 125 ceph_subvolume_metrics_clear_locked(tracker); 126 tracker->enabled = false; 127 spin_unlock(&tracker->lock); 128 } 129 130 void ceph_subvolume_metrics_enable(struct ceph_subvolume_metrics_tracker *tracker, 131 bool enable) 132 { 133 spin_lock(&tracker->lock); 134 if (enable) { 135 tracker->enabled = true; 136 } else { 137 tracker->enabled = false; 138 ceph_subvolume_metrics_clear_locked(tracker); 139 } 140 spin_unlock(&tracker->lock); 141 } 142 143 void ceph_subvolume_metrics_record(struct ceph_subvolume_metrics_tracker *tracker, 144 u64 subvol_id, bool is_write, 145 size_t size, u64 latency_us) 146 { 147 struct ceph_subvol_metric_rb_entry *entry, *new_entry = NULL; 148 bool retry = false; 149 150 /* CEPH_SUBVOLUME_ID_NONE (0) means unknown/unset subvolume */ 151 if (!READ_ONCE(tracker->enabled) || 152 subvol_id == CEPH_SUBVOLUME_ID_NONE || !size || !latency_us) 153 return; 154 155 /* 156 * Retry loop for lock-free allocation pattern: 157 * 1. First iteration: lookup under lock, if miss -> drop lock, alloc, retry 158 * 2. Second iteration: lookup again (may have been inserted), insert if still missing 159 * 3. On race (another thread inserted same key): free our alloc, retry 160 * All successful paths exit via return, so retry flag doesn't need reset. 161 */ 162 do { 163 spin_lock(&tracker->lock); 164 if (!tracker->enabled) { 165 spin_unlock(&tracker->lock); 166 if (new_entry) 167 kmem_cache_free(ceph_subvol_metric_entry_cachep, new_entry); 168 return; 169 } 170 171 entry = __lookup_entry(tracker, subvol_id); 172 if (!entry) { 173 if (!new_entry) { 174 spin_unlock(&tracker->lock); 175 new_entry = kmem_cache_zalloc(ceph_subvol_metric_entry_cachep, 176 GFP_NOFS); 177 if (!new_entry) 178 return; 179 new_entry->subvolume_id = subvol_id; 180 retry = true; 181 continue; 182 } 183 entry = __insert_entry(tracker, new_entry); 184 if (entry != new_entry) { 185 /* raced with another insert */ 186 spin_unlock(&tracker->lock); 187 kmem_cache_free(ceph_subvol_metric_entry_cachep, new_entry); 188 new_entry = NULL; 189 retry = true; 190 continue; 191 } 192 new_entry = NULL; 193 } 194 195 if (is_write) { 196 entry->write_ops++; 197 entry->write_bytes += size; 198 entry->write_latency_us += latency_us; 199 atomic64_inc(&tracker->total_write_ops); 200 atomic64_add(size, &tracker->total_write_bytes); 201 } else { 202 entry->read_ops++; 203 entry->read_bytes += size; 204 entry->read_latency_us += latency_us; 205 atomic64_inc(&tracker->total_read_ops); 206 atomic64_add(size, &tracker->total_read_bytes); 207 } 208 spin_unlock(&tracker->lock); 209 if (new_entry) 210 kmem_cache_free(ceph_subvol_metric_entry_cachep, new_entry); 211 return; 212 } while (retry); 213 } 214 215 int ceph_subvolume_metrics_snapshot(struct ceph_subvolume_metrics_tracker *tracker, 216 struct ceph_subvol_metric_snapshot **out, 217 u32 *nr, bool consume) 218 { 219 struct ceph_subvol_metric_snapshot *snap = NULL; 220 struct rb_node *node; 221 u32 count = 0, idx = 0; 222 int ret = 0; 223 224 *out = NULL; 225 *nr = 0; 226 227 if (!READ_ONCE(tracker->enabled)) 228 return 0; 229 230 atomic64_inc(&tracker->snapshot_attempts); 231 232 spin_lock(&tracker->lock); 233 for (node = rb_first_cached(&tracker->tree); node; node = rb_next(node)) { 234 struct ceph_subvol_metric_rb_entry *entry = 235 rb_entry(node, struct ceph_subvol_metric_rb_entry, node); 236 237 /* Include entries with ANY I/O activity (read OR write) */ 238 if (entry->read_ops || entry->write_ops) 239 count++; 240 } 241 spin_unlock(&tracker->lock); 242 243 if (!count) { 244 atomic64_inc(&tracker->snapshot_empty); 245 return 0; 246 } 247 248 snap = kcalloc(count, sizeof(*snap), GFP_NOFS); 249 if (!snap) { 250 atomic64_inc(&tracker->snapshot_failures); 251 return -ENOMEM; 252 } 253 254 spin_lock(&tracker->lock); 255 node = rb_first_cached(&tracker->tree); 256 while (node) { 257 struct ceph_subvol_metric_rb_entry *entry = 258 rb_entry(node, struct ceph_subvol_metric_rb_entry, node); 259 struct rb_node *next = rb_next(node); 260 261 /* Skip entries with NO I/O activity at all */ 262 if (!entry->read_ops && !entry->write_ops) { 263 rb_erase_cached(&entry->node, &tracker->tree); 264 tracker->nr_entries--; 265 kmem_cache_free(ceph_subvol_metric_entry_cachep, entry); 266 node = next; 267 continue; 268 } 269 270 if (idx >= count) { 271 pr_warn("ceph: subvol metrics snapshot race (idx=%u count=%u)\n", 272 idx, count); 273 break; 274 } 275 276 snap[idx].subvolume_id = entry->subvolume_id; 277 snap[idx].read_ops = entry->read_ops; 278 snap[idx].write_ops = entry->write_ops; 279 snap[idx].read_bytes = entry->read_bytes; 280 snap[idx].write_bytes = entry->write_bytes; 281 snap[idx].read_latency_us = entry->read_latency_us; 282 snap[idx].write_latency_us = entry->write_latency_us; 283 idx++; 284 285 if (consume) { 286 entry->read_ops = 0; 287 entry->write_ops = 0; 288 entry->read_bytes = 0; 289 entry->write_bytes = 0; 290 entry->read_latency_us = 0; 291 entry->write_latency_us = 0; 292 rb_erase_cached(&entry->node, &tracker->tree); 293 tracker->nr_entries--; 294 kmem_cache_free(ceph_subvol_metric_entry_cachep, entry); 295 } 296 node = next; 297 } 298 spin_unlock(&tracker->lock); 299 300 if (!idx) { 301 kfree(snap); 302 snap = NULL; 303 ret = 0; 304 } else { 305 *nr = idx; 306 *out = snap; 307 } 308 309 return ret; 310 } 311 312 void ceph_subvolume_metrics_free_snapshot(struct ceph_subvol_metric_snapshot *snapshot) 313 { 314 kfree(snapshot); 315 } 316 317 /* 318 * Dump subvolume metrics to a seq_file for debugfs. 319 * 320 * Iterates the rb-tree directly under spinlock to avoid allocation. 321 * The lock hold time is minimal since we're only doing seq_printf calls. 322 */ 323 void ceph_subvolume_metrics_dump(struct ceph_subvolume_metrics_tracker *tracker, 324 struct seq_file *s) 325 { 326 struct rb_node *node; 327 bool found = false; 328 329 spin_lock(&tracker->lock); 330 if (!tracker->enabled) { 331 spin_unlock(&tracker->lock); 332 seq_puts(s, "subvolume metrics disabled\n"); 333 return; 334 } 335 336 for (node = rb_first_cached(&tracker->tree); node; node = rb_next(node)) { 337 struct ceph_subvol_metric_rb_entry *entry = 338 rb_entry(node, struct ceph_subvol_metric_rb_entry, node); 339 u64 avg_rd_lat, avg_wr_lat; 340 341 if (!entry->read_ops && !entry->write_ops) 342 continue; 343 344 if (!found) { 345 seq_puts(s, "subvol_id rd_ops rd_bytes rd_avg_lat_us wr_ops wr_bytes wr_avg_lat_us\n"); 346 seq_puts(s, "------------------------------------------------------------------------------------------------\n"); 347 found = true; 348 } 349 350 avg_rd_lat = entry->read_ops ? 351 div64_u64(entry->read_latency_us, entry->read_ops) : 0; 352 avg_wr_lat = entry->write_ops ? 353 div64_u64(entry->write_latency_us, entry->write_ops) : 0; 354 355 seq_printf(s, "%-15llu%-10llu%-12llu%-16llu%-10llu%-12llu%-16llu\n", 356 entry->subvolume_id, 357 entry->read_ops, 358 entry->read_bytes, 359 avg_rd_lat, 360 entry->write_ops, 361 entry->write_bytes, 362 avg_wr_lat); 363 } 364 spin_unlock(&tracker->lock); 365 366 if (!found) 367 seq_puts(s, "(no subvolume metrics collected)\n"); 368 } 369 370 void ceph_subvolume_metrics_record_io(struct ceph_mds_client *mdsc, 371 struct ceph_inode_info *ci, 372 bool is_write, size_t bytes, 373 ktime_t start, ktime_t end) 374 { 375 struct ceph_subvolume_metrics_tracker *tracker; 376 u64 subvol_id; 377 s64 delta_us; 378 379 if (!mdsc || !ci || !bytes) 380 return; 381 382 tracker = &mdsc->subvol_metrics; 383 atomic64_inc(&tracker->record_calls); 384 385 if (!ceph_subvolume_metrics_enabled(tracker)) { 386 atomic64_inc(&tracker->record_disabled); 387 return; 388 } 389 390 subvol_id = READ_ONCE(ci->i_subvolume_id); 391 if (subvol_id == CEPH_SUBVOLUME_ID_NONE) { 392 atomic64_inc(&tracker->record_no_subvol); 393 return; 394 } 395 396 delta_us = ktime_to_us(ktime_sub(end, start)); 397 if (delta_us <= 0) 398 delta_us = 1; 399 400 ceph_subvolume_metrics_record(tracker, subvol_id, is_write, 401 bytes, (u64)delta_us); 402 } 403 404 int __init ceph_subvolume_metrics_cache_init(void) 405 { 406 ceph_subvol_metric_entry_cachep = KMEM_CACHE(ceph_subvol_metric_rb_entry, 407 SLAB_RECLAIM_ACCOUNT); 408 if (!ceph_subvol_metric_entry_cachep) 409 return -ENOMEM; 410 return 0; 411 } 412 413 void ceph_subvolume_metrics_cache_destroy(void) 414 { 415 kmem_cache_destroy(ceph_subvol_metric_entry_cachep); 416 } 417