1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * 4 * sched-messaging.c 5 * 6 * messaging: Benchmark for scheduler and IPC mechanisms 7 * 8 * Based on hackbench by Rusty Russell <rusty@rustcorp.com.au> 9 * Ported to perf by Hitoshi Mitake <mitake@dcl.info.waseda.ac.jp> 10 * 11 */ 12 13 #include <subcmd/parse-options.h> 14 #include "bench.h" 15 16 /* Test groups of 20 processes spraying to 20 receivers */ 17 #include <pthread.h> 18 #include <stdio.h> 19 #include <stdlib.h> 20 #include <string.h> 21 #include <errno.h> 22 #include <unistd.h> 23 #include <sys/types.h> 24 #include <sys/socket.h> 25 #include <sys/wait.h> 26 #include <sys/time.h> 27 #include <poll.h> 28 #include <limits.h> 29 #include <err.h> 30 #include <linux/list.h> 31 #include <linux/time64.h> 32 33 #define DATASIZE 100 34 35 static bool use_pipes = false; 36 static unsigned int nr_loops = 100; 37 static bool thread_mode = false; 38 static unsigned int num_groups = 10; 39 static unsigned int total_children = 0; 40 static struct list_head sender_contexts = LIST_HEAD_INIT(sender_contexts); 41 static struct list_head receiver_contexts = LIST_HEAD_INIT(receiver_contexts); 42 43 struct sender_context { 44 struct list_head list; 45 unsigned int num_fds; 46 int ready_out; 47 int wakefd; 48 int out_fds[]; 49 }; 50 51 struct receiver_context { 52 struct list_head list; 53 unsigned int num_packets; 54 int in_fds[2]; 55 int ready_out; 56 int wakefd; 57 }; 58 59 union messaging_worker { 60 pthread_t thread; 61 pid_t pid; 62 }; 63 64 static union messaging_worker *worker_tab; 65 66 static void fdpair(int fds[2]) 67 { 68 if (use_pipes) { 69 if (pipe(fds) == 0) 70 return; 71 } else { 72 if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == 0) 73 return; 74 } 75 76 err(EXIT_FAILURE, use_pipes ? "pipe()" : "socketpair()"); 77 } 78 79 /* Block until we're ready to go */ 80 static void ready(int ready_out, int wakefd) 81 { 82 struct pollfd pollfd = { .fd = wakefd, .events = POLLIN }; 83 84 /* Tell them we're ready. */ 85 if (write(ready_out, "R", 1) != 1) 86 err(EXIT_FAILURE, "CLIENT: ready write"); 87 88 /* Wait for "GO" signal */ 89 if (poll(&pollfd, 1, -1) != 1) 90 err(EXIT_FAILURE, "poll"); 91 } 92 93 /* Sender sprays nr_loops messages down each file descriptor */ 94 static void *sender(struct sender_context *ctx) 95 { 96 char data[DATASIZE]; 97 unsigned int i, j; 98 99 ready(ctx->ready_out, ctx->wakefd); 100 memset(data, 'S', sizeof(data)); 101 102 /* Now pump to every receiver. */ 103 for (i = 0; i < nr_loops; i++) { 104 for (j = 0; j < ctx->num_fds; j++) { 105 int ret, done = 0; 106 107 again: 108 ret = write(ctx->out_fds[j], data + done, 109 sizeof(data) - done); 110 if (ret < 0) 111 err(EXIT_FAILURE, "SENDER: write"); 112 done += ret; 113 if (done < DATASIZE) 114 goto again; 115 } 116 } 117 118 return NULL; 119 } 120 121 122 /* One receiver per fd */ 123 static void *receiver(struct receiver_context* ctx) 124 { 125 unsigned int i; 126 127 if (!thread_mode) 128 close(ctx->in_fds[1]); 129 130 /* Wait for start... */ 131 ready(ctx->ready_out, ctx->wakefd); 132 133 /* Receive them all */ 134 for (i = 0; i < ctx->num_packets; i++) { 135 char data[DATASIZE]; 136 int ret, done = 0; 137 138 again: 139 ret = read(ctx->in_fds[0], data + done, DATASIZE - done); 140 if (ret < 0) 141 err(EXIT_FAILURE, "SERVER: read"); 142 done += ret; 143 if (done < DATASIZE) 144 goto again; 145 } 146 147 return NULL; 148 } 149 150 static void create_thread_worker(union messaging_worker *worker, 151 void *ctx, void *(*func)(void *)) 152 { 153 pthread_attr_t attr; 154 int ret; 155 156 if (pthread_attr_init(&attr) != 0) 157 err(EXIT_FAILURE, "pthread_attr_init:"); 158 159 #ifndef __ia64__ 160 if (pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN) != 0) 161 err(EXIT_FAILURE, "pthread_attr_setstacksize"); 162 #endif 163 164 ret = pthread_create(&worker->thread, &attr, func, ctx); 165 if (ret != 0) 166 err(EXIT_FAILURE, "pthread_create failed"); 167 168 pthread_attr_destroy(&attr); 169 } 170 171 static void create_process_worker(union messaging_worker *worker, 172 void *ctx, void *(*func)(void *)) 173 { 174 /* Fork the receiver. */ 175 worker->pid = fork(); 176 177 if (worker->pid == -1) { 178 err(EXIT_FAILURE, "fork()"); 179 } else if (worker->pid == 0) { 180 (*func) (ctx); 181 exit(0); 182 } 183 } 184 185 static void create_worker(union messaging_worker *worker, 186 void *ctx, void *(*func)(void *)) 187 { 188 if (!thread_mode) 189 return create_process_worker(worker, ctx, func); 190 else 191 return create_thread_worker(worker, ctx, func); 192 } 193 194 static void reap_worker(union messaging_worker *worker) 195 { 196 int proc_status; 197 void *thread_status; 198 199 if (!thread_mode) { 200 /* process mode */ 201 wait(&proc_status); 202 if (!WIFEXITED(proc_status)) 203 exit(1); 204 } else { 205 pthread_join(worker->thread, &thread_status); 206 } 207 } 208 209 /* One group of senders and receivers */ 210 static unsigned int group(union messaging_worker *worker, 211 unsigned int num_fds, 212 int ready_out, 213 int wakefd) 214 { 215 unsigned int i; 216 struct sender_context *snd_ctx = malloc(sizeof(struct sender_context) + 217 num_fds * sizeof(int)); 218 219 if (!snd_ctx) 220 err(EXIT_FAILURE, "malloc()"); 221 222 list_add(&snd_ctx->list, &sender_contexts); 223 for (i = 0; i < num_fds; i++) { 224 int fds[2]; 225 struct receiver_context *ctx = malloc(sizeof(*ctx)); 226 227 if (!ctx) 228 err(EXIT_FAILURE, "malloc()"); 229 230 list_add(&ctx->list, &receiver_contexts); 231 232 /* Create the pipe between client and server */ 233 fdpair(fds); 234 235 ctx->num_packets = num_fds * nr_loops; 236 ctx->in_fds[0] = fds[0]; 237 ctx->in_fds[1] = fds[1]; 238 ctx->ready_out = ready_out; 239 ctx->wakefd = wakefd; 240 241 create_worker(worker + i, ctx, (void *)receiver); 242 243 snd_ctx->out_fds[i] = fds[1]; 244 if (!thread_mode) 245 close(fds[0]); 246 } 247 248 /* Now we have all the fds, fork the senders */ 249 for (i = 0; i < num_fds; i++) { 250 snd_ctx->ready_out = ready_out; 251 snd_ctx->wakefd = wakefd; 252 snd_ctx->num_fds = num_fds; 253 254 create_worker(worker + num_fds + i, snd_ctx, (void *)sender); 255 } 256 257 /* Close the fds we have left */ 258 if (!thread_mode) 259 for (i = 0; i < num_fds; i++) 260 close(snd_ctx->out_fds[i]); 261 262 /* Return number of children to reap */ 263 return num_fds * 2; 264 } 265 266 static void sig_handler(int sig __maybe_unused) 267 { 268 unsigned int i; 269 270 /* 271 * When exit abnormally, kill all forked child processes. 272 */ 273 for (i = 0; i < total_children; i++) 274 kill(worker_tab[i].pid, SIGKILL); 275 } 276 277 static const struct option options[] = { 278 OPT_BOOLEAN('p', "pipe", &use_pipes, 279 "Use pipe() instead of socketpair()"), 280 OPT_BOOLEAN('t', "thread", &thread_mode, 281 "Be multi thread instead of multi process"), 282 OPT_UINTEGER('g', "group", &num_groups, "Specify number of groups"), 283 OPT_UINTEGER('l', "nr_loops", &nr_loops, "Specify the number of loops to run (default: 100)"), 284 OPT_END() 285 }; 286 287 static const char * const bench_sched_message_usage[] = { 288 "perf bench sched messaging <options>", 289 NULL 290 }; 291 292 int bench_sched_messaging(int argc, const char **argv) 293 { 294 unsigned int i; 295 struct timeval start, stop, diff; 296 unsigned int num_fds = 20; 297 int readyfds[2], wakefds[2]; 298 char dummy; 299 struct sender_context *pos, *n; 300 301 argc = parse_options(argc, argv, options, 302 bench_sched_message_usage, 0); 303 304 worker_tab = malloc(num_fds * 2 * num_groups * sizeof(union messaging_worker)); 305 if (!worker_tab) 306 err(EXIT_FAILURE, "main:malloc()"); 307 308 fdpair(readyfds); 309 fdpair(wakefds); 310 311 if (!thread_mode) { 312 signal(SIGINT, sig_handler); 313 signal(SIGTERM, sig_handler); 314 } 315 316 for (i = 0; i < num_groups; i++) 317 total_children += group(worker_tab + total_children, num_fds, 318 readyfds[1], wakefds[0]); 319 320 /* Wait for everyone to be ready */ 321 for (i = 0; i < total_children; i++) 322 if (read(readyfds[0], &dummy, 1) != 1) 323 err(EXIT_FAILURE, "Reading for readyfds"); 324 325 gettimeofday(&start, NULL); 326 327 /* Kick them off */ 328 if (write(wakefds[1], &dummy, 1) != 1) 329 err(EXIT_FAILURE, "Writing to start them"); 330 331 /* Reap them all */ 332 for (i = 0; i < total_children; i++) 333 reap_worker(worker_tab + i); 334 335 gettimeofday(&stop, NULL); 336 337 timersub(&stop, &start, &diff); 338 339 switch (bench_format) { 340 case BENCH_FORMAT_DEFAULT: 341 printf("# %d sender and receiver %s per group\n", 342 num_fds, thread_mode ? "threads" : "processes"); 343 printf("# %d groups == %d %s run\n\n", 344 num_groups, num_groups * 2 * num_fds, 345 thread_mode ? "threads" : "processes"); 346 printf(" %14s: %lu.%03lu [sec]\n", "Total time", 347 (unsigned long) diff.tv_sec, 348 (unsigned long) (diff.tv_usec / USEC_PER_MSEC)); 349 break; 350 case BENCH_FORMAT_SIMPLE: 351 printf("%lu.%03lu\n", (unsigned long) diff.tv_sec, 352 (unsigned long) (diff.tv_usec / USEC_PER_MSEC)); 353 break; 354 default: 355 /* reaching here is something disaster */ 356 fprintf(stderr, "Unknown format:%d\n", bench_format); 357 exit(1); 358 break; 359 } 360 361 free(worker_tab); 362 list_for_each_entry_safe(pos, n, &sender_contexts, list) { 363 list_del_init(&pos->list); 364 free(pos); 365 } 366 list_for_each_entry_safe(pos, n, &receiver_contexts, list) { 367 list_del_init(&pos->list); 368 free(pos); 369 } 370 return 0; 371 } 372