1 /* SPDX-License-Identifier: GPL-2.0 */ 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/types.h> 5 #include <linux/percpu_counter.h> 6 #include <linux/math64.h> 7 #include <linux/ratelimit.h> 8 9 #include <linux/ceph/decode.h> 10 11 #include "metric.h" 12 #include "mds_client.h" 13 14 static bool metrics_disable_warned; 15 16 static inline u32 ceph_subvolume_entry_payload_len(void) 17 { 18 return sizeof(struct ceph_subvolume_metric_entry_wire); 19 } 20 21 static inline u32 ceph_subvolume_entry_encoded_len(void) 22 { 23 return CEPH_ENCODING_START_BLK_LEN + 24 ceph_subvolume_entry_payload_len(); 25 } 26 27 static inline u32 ceph_subvolume_outer_payload_len(u32 nr_subvols) 28 { 29 /* count is encoded as le64 (size_t on wire) to match FUSE client */ 30 return sizeof(__le64) + 31 nr_subvols * ceph_subvolume_entry_encoded_len(); 32 } 33 34 static inline u32 ceph_subvolume_metric_data_len(u32 nr_subvols) 35 { 36 return CEPH_ENCODING_START_BLK_LEN + 37 ceph_subvolume_outer_payload_len(nr_subvols); 38 } 39 40 static inline u32 ceph_subvolume_clamp_u32(u64 val) 41 { 42 return val > U32_MAX ? U32_MAX : (u32)val; 43 } 44 45 static void ceph_init_subvolume_wire_entry( 46 struct ceph_subvolume_metric_entry_wire *dst, 47 const struct ceph_subvol_metric_snapshot *src) 48 { 49 dst->subvolume_id = cpu_to_le64(src->subvolume_id); 50 dst->read_ops = cpu_to_le32(ceph_subvolume_clamp_u32(src->read_ops)); 51 dst->write_ops = cpu_to_le32(ceph_subvolume_clamp_u32(src->write_ops)); 52 dst->read_bytes = cpu_to_le64(src->read_bytes); 53 dst->write_bytes = cpu_to_le64(src->write_bytes); 54 dst->read_latency_us = cpu_to_le64(src->read_latency_us); 55 dst->write_latency_us = cpu_to_le64(src->write_latency_us); 56 dst->time_stamp = 0; 57 } 58 59 static int ceph_encode_subvolume_metrics(void **p, void *end, 60 struct ceph_subvol_metric_snapshot *subvols, 61 u32 nr_subvols) 62 { 63 u32 i; 64 65 ceph_start_encoding(p, 1, 1, 66 ceph_subvolume_outer_payload_len(nr_subvols)); 67 /* count is encoded as le64 (size_t on wire) to match FUSE client */ 68 ceph_encode_64_safe(p, end, (u64)nr_subvols, enc_err); 69 70 for (i = 0; i < nr_subvols; i++) { 71 struct ceph_subvolume_metric_entry_wire wire_entry; 72 73 ceph_init_subvolume_wire_entry(&wire_entry, &subvols[i]); 74 ceph_start_encoding(p, 1, 1, 75 ceph_subvolume_entry_payload_len()); 76 ceph_encode_copy_safe(p, end, &wire_entry, 77 sizeof(wire_entry), enc_err); 78 } 79 80 return 0; 81 enc_err: 82 return -ERANGE; 83 } 84 85 static void ktime_to_ceph_timespec(struct ceph_timespec *ts, ktime_t val) 86 { 87 struct timespec64 t = ktime_to_timespec64(val); 88 ceph_encode_timespec64(ts, &t); 89 } 90 91 static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc, 92 struct ceph_mds_session *s) 93 { 94 struct ceph_metric_head *head; 95 struct ceph_metric_cap *cap; 96 struct ceph_metric_read_latency *read; 97 struct ceph_metric_write_latency *write; 98 struct ceph_metric_metadata_latency *meta; 99 struct ceph_metric_dlease *dlease; 100 struct ceph_opened_files *files; 101 struct ceph_pinned_icaps *icaps; 102 struct ceph_opened_inodes *inodes; 103 struct ceph_read_io_size *rsize; 104 struct ceph_write_io_size *wsize; 105 struct ceph_client_metric *m = &mdsc->metric; 106 struct ceph_subvol_metric_snapshot *subvols = NULL; 107 u64 nr_caps = atomic64_read(&m->total_caps); 108 u32 header_len = sizeof(struct ceph_metric_header); 109 struct ceph_client *cl = mdsc->fsc->client; 110 struct ceph_msg *msg; 111 u32 nr_subvols = 0; 112 size_t subvol_len = 0; 113 void *cursor; 114 s64 sum; 115 s32 items = 0; 116 s32 len; 117 118 /* Do not send the metrics until the MDS rank is ready */ 119 mutex_lock(&mdsc->mutex); 120 if (ceph_mdsmap_get_state(mdsc->mdsmap, s->s_mds) != CEPH_MDS_STATE_ACTIVE) { 121 mutex_unlock(&mdsc->mutex); 122 return false; 123 } 124 mutex_unlock(&mdsc->mutex); 125 126 if (ceph_subvolume_metrics_enabled(&mdsc->subvol_metrics) && 127 test_bit(CEPHFS_FEATURE_SUBVOLUME_METRICS, &s->s_features)) { 128 int ret; 129 130 ret = ceph_subvolume_metrics_snapshot(&mdsc->subvol_metrics, 131 &subvols, &nr_subvols, 132 true); 133 if (ret) { 134 pr_warn_client(cl, "failed to snapshot subvolume metrics: %d\n", 135 ret); 136 /* 137 * On error, ceph_subvolume_metrics_snapshot() guarantees 138 * *out = NULL and *nr = 0 at function entry, so subvols 139 * is already NULL here - no cleanup needed. 140 */ 141 nr_subvols = 0; 142 subvols = NULL; 143 } 144 } 145 146 if (nr_subvols) { 147 /* type (le32) + ENCODE_START payload - no metric header */ 148 subvol_len = sizeof(__le32) + 149 ceph_subvolume_metric_data_len(nr_subvols); 150 } 151 152 len = sizeof(*head) + sizeof(*cap) + sizeof(*read) + sizeof(*write) 153 + sizeof(*meta) + sizeof(*dlease) + sizeof(*files) 154 + sizeof(*icaps) + sizeof(*inodes) + sizeof(*rsize) 155 + sizeof(*wsize) + subvol_len; 156 157 msg = ceph_msg_new(CEPH_MSG_CLIENT_METRICS, len, GFP_NOFS, true); 158 if (!msg) { 159 pr_err_client(cl, "to mds%d, failed to allocate message\n", 160 s->s_mds); 161 kfree(subvols); 162 return false; 163 } 164 165 head = msg->front.iov_base; 166 167 /* encode the cap metric */ 168 cap = (struct ceph_metric_cap *)(head + 1); 169 cap->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_CAP_INFO); 170 cap->header.ver = 1; 171 cap->header.compat = 1; 172 cap->header.data_len = cpu_to_le32(sizeof(*cap) - header_len); 173 cap->hit = cpu_to_le64(percpu_counter_sum(&m->i_caps_hit)); 174 cap->mis = cpu_to_le64(percpu_counter_sum(&m->i_caps_mis)); 175 cap->total = cpu_to_le64(nr_caps); 176 items++; 177 178 /* encode the read latency metric */ 179 read = (struct ceph_metric_read_latency *)(cap + 1); 180 read->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_READ_LATENCY); 181 read->header.ver = 2; 182 read->header.compat = 1; 183 read->header.data_len = cpu_to_le32(sizeof(*read) - header_len); 184 sum = m->metric[METRIC_READ].latency_sum; 185 ktime_to_ceph_timespec(&read->lat, sum); 186 ktime_to_ceph_timespec(&read->avg, m->metric[METRIC_READ].latency_avg); 187 read->sq_sum = cpu_to_le64(m->metric[METRIC_READ].latency_sq_sum); 188 read->count = cpu_to_le64(m->metric[METRIC_READ].total); 189 items++; 190 191 /* encode the write latency metric */ 192 write = (struct ceph_metric_write_latency *)(read + 1); 193 write->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_WRITE_LATENCY); 194 write->header.ver = 2; 195 write->header.compat = 1; 196 write->header.data_len = cpu_to_le32(sizeof(*write) - header_len); 197 sum = m->metric[METRIC_WRITE].latency_sum; 198 ktime_to_ceph_timespec(&write->lat, sum); 199 ktime_to_ceph_timespec(&write->avg, m->metric[METRIC_WRITE].latency_avg); 200 write->sq_sum = cpu_to_le64(m->metric[METRIC_WRITE].latency_sq_sum); 201 write->count = cpu_to_le64(m->metric[METRIC_WRITE].total); 202 items++; 203 204 /* encode the metadata latency metric */ 205 meta = (struct ceph_metric_metadata_latency *)(write + 1); 206 meta->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_METADATA_LATENCY); 207 meta->header.ver = 2; 208 meta->header.compat = 1; 209 meta->header.data_len = cpu_to_le32(sizeof(*meta) - header_len); 210 sum = m->metric[METRIC_METADATA].latency_sum; 211 ktime_to_ceph_timespec(&meta->lat, sum); 212 ktime_to_ceph_timespec(&meta->avg, m->metric[METRIC_METADATA].latency_avg); 213 meta->sq_sum = cpu_to_le64(m->metric[METRIC_METADATA].latency_sq_sum); 214 meta->count = cpu_to_le64(m->metric[METRIC_METADATA].total); 215 items++; 216 217 /* encode the dentry lease metric */ 218 dlease = (struct ceph_metric_dlease *)(meta + 1); 219 dlease->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_DENTRY_LEASE); 220 dlease->header.ver = 1; 221 dlease->header.compat = 1; 222 dlease->header.data_len = cpu_to_le32(sizeof(*dlease) - header_len); 223 dlease->hit = cpu_to_le64(percpu_counter_sum(&m->d_lease_hit)); 224 dlease->mis = cpu_to_le64(percpu_counter_sum(&m->d_lease_mis)); 225 dlease->total = cpu_to_le64(atomic64_read(&m->total_dentries)); 226 items++; 227 228 sum = percpu_counter_sum(&m->total_inodes); 229 230 /* encode the opened files metric */ 231 files = (struct ceph_opened_files *)(dlease + 1); 232 files->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_OPENED_FILES); 233 files->header.ver = 1; 234 files->header.compat = 1; 235 files->header.data_len = cpu_to_le32(sizeof(*files) - header_len); 236 files->opened_files = cpu_to_le64(atomic64_read(&m->opened_files)); 237 files->total = cpu_to_le64(sum); 238 items++; 239 240 /* encode the pinned icaps metric */ 241 icaps = (struct ceph_pinned_icaps *)(files + 1); 242 icaps->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_PINNED_ICAPS); 243 icaps->header.ver = 1; 244 icaps->header.compat = 1; 245 icaps->header.data_len = cpu_to_le32(sizeof(*icaps) - header_len); 246 icaps->pinned_icaps = cpu_to_le64(nr_caps); 247 icaps->total = cpu_to_le64(sum); 248 items++; 249 250 /* encode the opened inodes metric */ 251 inodes = (struct ceph_opened_inodes *)(icaps + 1); 252 inodes->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_OPENED_INODES); 253 inodes->header.ver = 1; 254 inodes->header.compat = 1; 255 inodes->header.data_len = cpu_to_le32(sizeof(*inodes) - header_len); 256 inodes->opened_inodes = cpu_to_le64(percpu_counter_sum(&m->opened_inodes)); 257 inodes->total = cpu_to_le64(sum); 258 items++; 259 260 /* encode the read io size metric */ 261 rsize = (struct ceph_read_io_size *)(inodes + 1); 262 rsize->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_READ_IO_SIZES); 263 rsize->header.ver = 1; 264 rsize->header.compat = 1; 265 rsize->header.data_len = cpu_to_le32(sizeof(*rsize) - header_len); 266 rsize->total_ops = cpu_to_le64(m->metric[METRIC_READ].total); 267 rsize->total_size = cpu_to_le64(m->metric[METRIC_READ].size_sum); 268 items++; 269 270 /* encode the write io size metric */ 271 wsize = (struct ceph_write_io_size *)(rsize + 1); 272 wsize->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_WRITE_IO_SIZES); 273 wsize->header.ver = 1; 274 wsize->header.compat = 1; 275 wsize->header.data_len = cpu_to_le32(sizeof(*wsize) - header_len); 276 wsize->total_ops = cpu_to_le64(m->metric[METRIC_WRITE].total); 277 wsize->total_size = cpu_to_le64(m->metric[METRIC_WRITE].size_sum); 278 items++; 279 280 cursor = wsize + 1; 281 282 if (nr_subvols) { 283 void *payload; 284 void *payload_end; 285 int ret; 286 287 /* Emit only the type (le32), no ver/compat/data_len */ 288 ceph_encode_32(&cursor, CLIENT_METRIC_TYPE_SUBVOLUME_METRICS); 289 items++; 290 291 payload = cursor; 292 payload_end = (char *)payload + 293 ceph_subvolume_metric_data_len(nr_subvols); 294 295 ret = ceph_encode_subvolume_metrics(&payload, payload_end, 296 subvols, nr_subvols); 297 if (ret) { 298 pr_warn_client(cl, 299 "failed to encode subvolume metrics\n"); 300 kfree(subvols); 301 ceph_msg_put(msg); 302 return false; 303 } 304 305 WARN_ON(payload != payload_end); 306 cursor = payload; 307 } 308 309 put_unaligned_le32(items, &head->num); 310 msg->front.iov_len = (char *)cursor - (char *)head; 311 msg->hdr.version = cpu_to_le16(1); 312 msg->hdr.compat_version = cpu_to_le16(1); 313 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 314 315 ceph_con_send(&s->s_con, msg); 316 317 if (nr_subvols) { 318 mutex_lock(&mdsc->subvol_metrics_last_mutex); 319 kfree(mdsc->subvol_metrics_last); 320 mdsc->subvol_metrics_last = subvols; 321 mdsc->subvol_metrics_last_nr = nr_subvols; 322 mdsc->subvol_metrics_sent += nr_subvols; 323 mdsc->subvol_metrics_nonzero_sends++; 324 mutex_unlock(&mdsc->subvol_metrics_last_mutex); 325 326 subvols = NULL; 327 } 328 kfree(subvols); 329 330 return true; 331 } 332 333 334 static void metric_get_session(struct ceph_mds_client *mdsc) 335 { 336 struct ceph_mds_session *s; 337 int i; 338 339 mutex_lock(&mdsc->mutex); 340 for (i = 0; i < mdsc->max_sessions; i++) { 341 s = __ceph_lookup_mds_session(mdsc, i); 342 if (!s) 343 continue; 344 345 /* 346 * Skip it if MDS doesn't support the metric collection, 347 * or the MDS will close the session's socket connection 348 * directly when it get this message. 349 * 350 * Also skip sessions that don't support SUBVOLUME_METRICS 351 * when subvolume metrics collection is enabled. This ensures 352 * we only send subvolume metrics to MDSs that understand them. 353 * If no session supports the feature, metrics won't be sent. 354 */ 355 if (check_session_state(s) && 356 test_bit(CEPHFS_FEATURE_METRIC_COLLECT, &s->s_features)) { 357 if (ceph_subvolume_metrics_enabled(&mdsc->subvol_metrics) && 358 !test_bit(CEPHFS_FEATURE_SUBVOLUME_METRICS, 359 &s->s_features)) { 360 ceph_put_mds_session(s); 361 continue; 362 } 363 mdsc->metric.session = s; 364 break; 365 } 366 367 ceph_put_mds_session(s); 368 } 369 mutex_unlock(&mdsc->mutex); 370 } 371 372 static void metric_delayed_work(struct work_struct *work) 373 { 374 struct ceph_client_metric *m = 375 container_of(work, struct ceph_client_metric, delayed_work.work); 376 struct ceph_mds_client *mdsc = 377 container_of(m, struct ceph_mds_client, metric); 378 379 if (mdsc->stopping) 380 return; 381 382 if (disable_send_metrics) { 383 if (!metrics_disable_warned) { 384 pr_info("ceph: metrics sending disabled via module parameter\n"); 385 metrics_disable_warned = true; 386 } 387 return; 388 } 389 metrics_disable_warned = false; 390 391 if (!m->session || !check_session_state(m->session)) { 392 if (m->session) { 393 ceph_put_mds_session(m->session); 394 m->session = NULL; 395 } 396 metric_get_session(mdsc); 397 } 398 399 if (m->session) 400 ceph_mdsc_send_metrics(mdsc, m->session); 401 else 402 pr_warn_ratelimited("ceph: metrics worker has no MDS session\n"); 403 404 metric_schedule_delayed(m); 405 } 406 407 int ceph_metric_init(struct ceph_client_metric *m) 408 { 409 struct ceph_metric *metric; 410 int ret, i; 411 412 if (!m) 413 return -EINVAL; 414 415 atomic64_set(&m->total_dentries, 0); 416 ret = percpu_counter_init(&m->d_lease_hit, 0, GFP_KERNEL); 417 if (ret) 418 return ret; 419 420 ret = percpu_counter_init(&m->d_lease_mis, 0, GFP_KERNEL); 421 if (ret) 422 goto err_d_lease_mis; 423 424 atomic64_set(&m->total_caps, 0); 425 ret = percpu_counter_init(&m->i_caps_hit, 0, GFP_KERNEL); 426 if (ret) 427 goto err_i_caps_hit; 428 429 ret = percpu_counter_init(&m->i_caps_mis, 0, GFP_KERNEL); 430 if (ret) 431 goto err_i_caps_mis; 432 433 for (i = 0; i < METRIC_MAX; i++) { 434 metric = &m->metric[i]; 435 spin_lock_init(&metric->lock); 436 metric->size_sum = 0; 437 metric->size_min = U64_MAX; 438 metric->size_max = 0; 439 metric->total = 0; 440 metric->latency_sum = 0; 441 metric->latency_avg = 0; 442 metric->latency_sq_sum = 0; 443 metric->latency_min = KTIME_MAX; 444 metric->latency_max = 0; 445 } 446 447 atomic64_set(&m->opened_files, 0); 448 ret = percpu_counter_init(&m->opened_inodes, 0, GFP_KERNEL); 449 if (ret) 450 goto err_opened_inodes; 451 ret = percpu_counter_init(&m->total_inodes, 0, GFP_KERNEL); 452 if (ret) 453 goto err_total_inodes; 454 455 m->session = NULL; 456 INIT_DELAYED_WORK(&m->delayed_work, metric_delayed_work); 457 458 return 0; 459 460 err_total_inodes: 461 percpu_counter_destroy(&m->opened_inodes); 462 err_opened_inodes: 463 percpu_counter_destroy(&m->i_caps_mis); 464 err_i_caps_mis: 465 percpu_counter_destroy(&m->i_caps_hit); 466 err_i_caps_hit: 467 percpu_counter_destroy(&m->d_lease_mis); 468 err_d_lease_mis: 469 percpu_counter_destroy(&m->d_lease_hit); 470 471 return ret; 472 } 473 474 void ceph_metric_destroy(struct ceph_client_metric *m) 475 { 476 if (!m) 477 return; 478 479 cancel_delayed_work_sync(&m->delayed_work); 480 481 percpu_counter_destroy(&m->total_inodes); 482 percpu_counter_destroy(&m->opened_inodes); 483 percpu_counter_destroy(&m->i_caps_mis); 484 percpu_counter_destroy(&m->i_caps_hit); 485 percpu_counter_destroy(&m->d_lease_mis); 486 percpu_counter_destroy(&m->d_lease_hit); 487 488 ceph_put_mds_session(m->session); 489 } 490 491 #define METRIC_UPDATE_MIN_MAX(min, max, new) \ 492 { \ 493 if (unlikely(new < min)) \ 494 min = new; \ 495 if (unlikely(new > max)) \ 496 max = new; \ 497 } 498 499 static inline void __update_mean_and_stdev(ktime_t total, ktime_t *lavg, 500 ktime_t *sq_sump, ktime_t lat) 501 { 502 ktime_t avg; 503 504 if (unlikely(total == 1)) { 505 *lavg = lat; 506 } else { 507 /* the sq is (lat - old_avg) * (lat - new_avg) */ 508 avg = *lavg + div64_s64(lat - *lavg, total); 509 *sq_sump += (lat - *lavg)*(lat - avg); 510 *lavg = avg; 511 } 512 } 513 514 void ceph_update_metrics(struct ceph_metric *m, 515 ktime_t r_start, ktime_t r_end, 516 unsigned int size, int rc) 517 { 518 ktime_t lat = ktime_sub(r_end, r_start); 519 ktime_t total; 520 521 if (unlikely(rc < 0 && rc != -ENOENT && rc != -ETIMEDOUT)) 522 return; 523 524 spin_lock(&m->lock); 525 total = ++m->total; 526 m->size_sum += size; 527 METRIC_UPDATE_MIN_MAX(m->size_min, m->size_max, size); 528 m->latency_sum += lat; 529 METRIC_UPDATE_MIN_MAX(m->latency_min, m->latency_max, lat); 530 __update_mean_and_stdev(total, &m->latency_avg, &m->latency_sq_sum, 531 lat); 532 spin_unlock(&m->lock); 533 } 534