1 /*- 2 * SPDX-License-Identifier: BSD-2-Clause 3 * 4 * Copyright (C) 2009 Gabor Kovesdan <gabor@FreeBSD.org> 5 * Copyright (C) 2012 Oleg Moskalenko <mom040267@gmail.com> 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 27 * SUCH DAMAGE. 28 */ 29 30 #include <sys/cdefs.h> 31 #include <sys/mman.h> 32 #include <sys/stat.h> 33 #include <sys/types.h> 34 #include <sys/queue.h> 35 36 #include <err.h> 37 #include <fcntl.h> 38 #if defined(SORT_THREADS) 39 #include <pthread.h> 40 #endif 41 #include <semaphore.h> 42 #include <stdio.h> 43 #include <stdlib.h> 44 #include <string.h> 45 #include <unistd.h> 46 #include <wchar.h> 47 #include <wctype.h> 48 49 #include "coll.h" 50 #include "file.h" 51 #include "radixsort.h" 52 53 unsigned long long free_memory = 1000000; 54 unsigned long long available_free_memory = 1000000; 55 56 bool use_mmap; 57 58 const char *tmpdir = "/var/tmp"; 59 const char *compress_program; 60 61 size_t max_open_files = 16; 62 63 /* 64 * File reader structure 65 */ 66 struct file_reader 67 { 68 FILE *file; 69 char *fname; 70 char *buffer; 71 unsigned char *mmapaddr; 72 unsigned char *mmapptr; 73 size_t bsz; 74 size_t mmapsize; 75 int fd; 76 char elsymb; 77 }; 78 79 /* 80 * Structure to be used in file merge process. 81 */ 82 struct file_header 83 { 84 struct file_reader *fr; 85 struct sort_list_item *si; /* current top line */ 86 size_t file_pos; 87 }; 88 89 /* 90 * List elements of "cleanable" files list. 91 */ 92 struct CLEANABLE_FILE 93 { 94 char *fn; 95 LIST_ENTRY(CLEANABLE_FILE) files; 96 }; 97 98 /* 99 * List header of "cleanable" files list. 100 */ 101 static LIST_HEAD(CLEANABLE_FILES,CLEANABLE_FILE) tmp_files; 102 103 /* 104 * Semaphore to protect the tmp file list. 105 * We use semaphore here because it is signal-safe, according to POSIX. 106 * And semaphore does not require pthread library. 107 */ 108 static sem_t tmp_files_sem; 109 110 static void mt_sort(struct sort_list *list, 111 int (*sort_func)(void *, size_t, size_t, 112 int (*)(const void *, const void *)), const char* fn); 113 114 /* 115 * Init tmp files list 116 */ 117 void 118 init_tmp_files(void) 119 { 120 121 LIST_INIT(&tmp_files); 122 sem_init(&tmp_files_sem, 0, 1); 123 } 124 125 /* 126 * Save name of a tmp file for signal cleanup 127 */ 128 void 129 tmp_file_atexit(const char *tmp_file) 130 { 131 132 if (tmp_file) { 133 sem_wait(&tmp_files_sem); 134 struct CLEANABLE_FILE *item = 135 sort_malloc(sizeof(struct CLEANABLE_FILE)); 136 item->fn = sort_strdup(tmp_file); 137 LIST_INSERT_HEAD(&tmp_files, item, files); 138 sem_post(&tmp_files_sem); 139 } 140 } 141 142 /* 143 * Clear tmp files 144 */ 145 void 146 clear_tmp_files(void) 147 { 148 struct CLEANABLE_FILE *item; 149 150 sem_wait(&tmp_files_sem); 151 LIST_FOREACH(item,&tmp_files,files) { 152 if ((item) && (item->fn)) 153 unlink(item->fn); 154 } 155 sem_post(&tmp_files_sem); 156 } 157 158 /* 159 * Check whether a file is a temporary file 160 */ 161 static bool 162 file_is_tmp(const char* fn) 163 { 164 struct CLEANABLE_FILE *item; 165 bool ret = false; 166 167 if (fn) { 168 sem_wait(&tmp_files_sem); 169 LIST_FOREACH(item,&tmp_files,files) { 170 if ((item) && (item->fn)) 171 if (strcmp(item->fn, fn) == 0) { 172 ret = true; 173 break; 174 } 175 } 176 sem_post(&tmp_files_sem); 177 } 178 179 return (ret); 180 } 181 182 /* 183 * Generate new temporary file name 184 */ 185 char * 186 new_tmp_file_name(void) 187 { 188 char *ret; 189 int fd; 190 191 if (asprintf(&ret, "%s/.bsdsort.XXXXXXXXXX", tmpdir) == -1) 192 err(2, "asprintf()"); 193 if ((fd = mkstemp(ret)) == -1) 194 err(2, "mkstemp()"); 195 close(fd); 196 197 tmp_file_atexit(ret); 198 return (ret); 199 } 200 201 /* 202 * Initialize file list 203 */ 204 void 205 file_list_init(struct file_list *fl, bool tmp) 206 { 207 208 if (fl) { 209 memset(fl, 0, sizeof(*fl)); 210 fl->tmp = tmp; 211 } 212 } 213 214 /* 215 * Add a file name to the list 216 */ 217 void 218 file_list_add(struct file_list *fl, const char *fn, bool allocate) 219 { 220 221 if (fl && fn) { 222 if (fl->count >= fl->sz || (fl->fns == NULL)) { 223 fl->sz = (fl->sz) * 2 + 1; 224 fl->fns = sort_realloc(fl->fns, fl->sz * 225 sizeof(char *)); 226 } 227 fl->fns[fl->count] = allocate ? sort_strdup(fn) : fn; 228 fl->count += 1; 229 } 230 } 231 232 /* 233 * Populate file list from array of file names 234 */ 235 void 236 file_list_populate(struct file_list *fl, int argc, char **argv, bool allocate) 237 { 238 239 if (fl && argv) { 240 int i; 241 242 for (i = 0; i < argc; i++) 243 file_list_add(fl, argv[i], allocate); 244 } 245 } 246 247 /* 248 * Clean file list data and delete the files, 249 * if this is a list of temporary files 250 */ 251 void 252 file_list_clean(struct file_list *fl) 253 { 254 255 if (fl) { 256 if (fl->fns) { 257 size_t i; 258 259 for (i = 0; i < fl->count; i++) { 260 if (fl->fns[i]) { 261 if (fl->tmp) 262 unlink(fl->fns[i]); 263 sort_free(fl->fns[i]); 264 fl->fns[i] = 0; 265 } 266 } 267 sort_free(fl->fns); 268 fl->fns = NULL; 269 } 270 fl->sz = 0; 271 fl->count = 0; 272 fl->tmp = false; 273 } 274 } 275 276 /* 277 * Init sort list 278 */ 279 void 280 sort_list_init(struct sort_list *l) 281 { 282 283 if (l) { 284 memset(l, 0, sizeof(*l)); 285 l->memsize = sizeof(struct sort_list); 286 } 287 } 288 289 /* 290 * Add string to sort list 291 */ 292 void 293 sort_list_add(struct sort_list *l, struct bwstring *str) 294 { 295 296 if (l && str) { 297 size_t indx = l->count; 298 299 if ((l->list == NULL) || (indx >= l->size)) { 300 size_t newsize = (l->size + 1) + 1024; 301 302 l->list = sort_realloc(l->list, 303 sizeof(struct sort_list_item*) * newsize); 304 l->memsize += (newsize - l->size) * 305 sizeof(struct sort_list_item*); 306 l->size = newsize; 307 } 308 l->list[indx] = sort_list_item_alloc(); 309 sort_list_item_set(l->list[indx], str); 310 l->memsize += sort_list_item_size(l->list[indx]); 311 l->count += 1; 312 } 313 } 314 315 /* 316 * Clean sort list data 317 */ 318 void 319 sort_list_clean(struct sort_list *l) 320 { 321 322 if (l) { 323 if (l->list) { 324 size_t i; 325 326 for (i = 0; i < l->count; i++) { 327 struct sort_list_item *item; 328 329 item = l->list[i]; 330 331 if (item) { 332 sort_list_item_clean(item); 333 sort_free(item); 334 l->list[i] = NULL; 335 } 336 } 337 sort_free(l->list); 338 l->list = NULL; 339 } 340 l->count = 0; 341 l->size = 0; 342 l->memsize = sizeof(struct sort_list); 343 } 344 } 345 346 /* 347 * Write sort list to file 348 */ 349 void 350 sort_list_dump(struct sort_list *l, const char *fn) 351 { 352 353 if (l && fn) { 354 FILE *f; 355 356 f = openfile(fn, "w"); 357 if (f == NULL) 358 err(2, NULL); 359 360 if (l->list) { 361 size_t i; 362 if (!(sort_opts_vals.uflag)) { 363 for (i = 0; i < l->count; ++i) 364 bwsfwrite(l->list[i]->str, f, 365 sort_opts_vals.zflag); 366 } else { 367 struct sort_list_item *last_printed_item = NULL; 368 struct sort_list_item *item; 369 for (i = 0; i < l->count; ++i) { 370 item = l->list[i]; 371 if ((last_printed_item == NULL) || 372 list_coll(&last_printed_item, &item)) { 373 bwsfwrite(item->str, f, sort_opts_vals.zflag); 374 last_printed_item = item; 375 } 376 } 377 } 378 } 379 380 closefile(f, fn); 381 } 382 } 383 384 /* 385 * Checks if the given file is sorted. Stops at the first disorder, 386 * prints the disordered line and returns 1. 387 */ 388 int 389 check(const char *fn) 390 { 391 struct bwstring *s1, *s2, *s1disorder, *s2disorder; 392 struct file_reader *fr; 393 struct keys_array *ka1, *ka2; 394 int res; 395 size_t pos, posdisorder; 396 397 s1 = s2 = s1disorder = s2disorder = NULL; 398 ka1 = ka2 = NULL; 399 400 fr = file_reader_init(fn); 401 402 res = 0; 403 pos = 1; 404 posdisorder = 1; 405 406 if (fr == NULL) { 407 err(2, NULL); 408 goto end; 409 } 410 411 s1 = file_reader_readline(fr); 412 if (s1 == NULL) 413 goto end; 414 415 ka1 = keys_array_alloc(); 416 preproc(s1, ka1); 417 418 s2 = file_reader_readline(fr); 419 if (s2 == NULL) 420 goto end; 421 422 ka2 = keys_array_alloc(); 423 preproc(s2, ka2); 424 425 for (;;) { 426 427 if (debug_sort) { 428 bwsprintf(stdout, s2, "s1=<", ">"); 429 bwsprintf(stdout, s1, "s2=<", ">"); 430 } 431 int cmp = key_coll(ka2, ka1, 0); 432 if (debug_sort) 433 printf("; cmp1=%d", cmp); 434 435 if (!cmp && sort_opts_vals.complex_sort && 436 !(sort_opts_vals.uflag) && !(sort_opts_vals.sflag)) { 437 cmp = top_level_str_coll(s2, s1); 438 if (debug_sort) 439 printf("; cmp2=%d", cmp); 440 } 441 if (debug_sort) 442 printf("\n"); 443 444 if ((sort_opts_vals.uflag && (cmp <= 0)) || (cmp < 0)) { 445 if (!(sort_opts_vals.csilentflag)) { 446 s2disorder = bwsdup(s2); 447 posdisorder = pos; 448 if (debug_sort) 449 s1disorder = bwsdup(s1); 450 } 451 res = 1; 452 goto end; 453 } 454 455 pos++; 456 457 clean_keys_array(s1, ka1); 458 sort_free(ka1); 459 ka1 = ka2; 460 ka2 = NULL; 461 462 bwsfree(s1); 463 s1 = s2; 464 465 s2 = file_reader_readline(fr); 466 if (s2 == NULL) 467 goto end; 468 469 ka2 = keys_array_alloc(); 470 preproc(s2, ka2); 471 } 472 473 end: 474 if (ka1) { 475 clean_keys_array(s1, ka1); 476 sort_free(ka1); 477 } 478 479 if (s1) 480 bwsfree(s1); 481 482 if (ka2) { 483 clean_keys_array(s2, ka2); 484 sort_free(ka2); 485 } 486 487 if (s2) 488 bwsfree(s2); 489 490 if ((fn == NULL) || (*fn == 0) || (strcmp(fn, "-") == 0)) { 491 for (;;) { 492 s2 = file_reader_readline(fr); 493 if (s2 == NULL) 494 break; 495 bwsfree(s2); 496 } 497 } 498 499 file_reader_free(fr); 500 501 if (s2disorder) { 502 bws_disorder_warnx(s2disorder, fn, posdisorder); 503 if (s1disorder) { 504 bws_disorder_warnx(s1disorder, fn, posdisorder); 505 if (s1disorder != s2disorder) 506 bwsfree(s1disorder); 507 } 508 bwsfree(s2disorder); 509 s1disorder = NULL; 510 s2disorder = NULL; 511 } 512 513 if (res) 514 exit(res); 515 516 return (0); 517 } 518 519 /* 520 * Opens a file. If the given filename is "-", stdout will be 521 * opened. 522 */ 523 FILE * 524 openfile(const char *fn, const char *mode) 525 { 526 FILE *file; 527 528 if (strcmp(fn, "-") == 0) 529 return ((mode && mode[0] == 'r') ? stdin : stdout); 530 531 mode_t orig_file_mask = 0; 532 int is_tmp = file_is_tmp(fn); 533 534 if (is_tmp && (mode[0] == 'w')) 535 orig_file_mask = umask(S_IWGRP | S_IWOTH | 536 S_IRGRP | S_IROTH); 537 538 if (is_tmp && (compress_program != NULL)) { 539 int r; 540 char *cmd; 541 542 fflush(stdout); 543 544 if (mode[0] == 'r') 545 r = asprintf(&cmd, "cat %s | %s -d", 546 fn, compress_program); 547 else if (mode[0] == 'w') 548 r = asprintf(&cmd, "%s > %s", 549 compress_program, fn); 550 else 551 err(2, "%s", getstr(7)); 552 553 if (r == -1) 554 err(2, "aspritnf()"); 555 556 if ((file = popen(cmd, mode)) == NULL) 557 err(2, NULL); 558 free(cmd); 559 } else 560 if ((file = fopen(fn, mode)) == NULL) 561 err(2, NULL); 562 563 if (is_tmp && (mode[0] == 'w')) 564 umask(orig_file_mask); 565 566 return (file); 567 } 568 569 /* 570 * Close file 571 */ 572 void 573 closefile(FILE *f, const char *fn) 574 { 575 if (f == NULL || f == stdin) 576 return; 577 if (f == stdout) { 578 fflush(f); 579 return; 580 } 581 if (file_is_tmp(fn) && compress_program != NULL) { 582 if(pclose(f)<0) 583 err(2,NULL); 584 } else 585 fclose(f); 586 } 587 588 /* 589 * Reads a file into the internal buffer. 590 */ 591 struct file_reader * 592 file_reader_init(const char *fsrc) 593 { 594 struct file_reader *ret; 595 596 if (fsrc == NULL) 597 fsrc = "-"; 598 599 ret = sort_calloc(1, sizeof(struct file_reader)); 600 601 ret->elsymb = sort_opts_vals.zflag ? '\0' : '\n'; 602 ret->fname = sort_strdup(fsrc); 603 604 if (strcmp(fsrc, "-") && (compress_program == NULL) && use_mmap) { 605 606 do { 607 struct stat stat_buf; 608 void *addr; 609 size_t sz = 0; 610 int fd, flags; 611 612 flags = MAP_NOCORE | MAP_NOSYNC; 613 614 fd = open(fsrc, O_RDONLY); 615 if (fd < 0) 616 err(2, NULL); 617 618 if (fstat(fd, &stat_buf) < 0) { 619 close(fd); 620 break; 621 } 622 623 sz = stat_buf.st_size; 624 625 #if defined(MAP_PREFAULT_READ) 626 flags |= MAP_PREFAULT_READ; 627 #endif 628 629 addr = mmap(NULL, sz, PROT_READ, flags, fd, 0); 630 if (addr == MAP_FAILED) { 631 close(fd); 632 break; 633 } 634 635 ret->fd = fd; 636 ret->mmapaddr = addr; 637 ret->mmapsize = sz; 638 ret->mmapptr = ret->mmapaddr; 639 640 } while (0); 641 } 642 643 if (ret->mmapaddr == NULL) { 644 ret->file = openfile(fsrc, "r"); 645 if (ret->file == NULL) 646 err(2, NULL); 647 } 648 649 return (ret); 650 } 651 652 struct bwstring * 653 file_reader_readline(struct file_reader *fr) 654 { 655 struct bwstring *ret = NULL; 656 657 if (fr->mmapaddr) { 658 unsigned char *mmapend; 659 660 mmapend = fr->mmapaddr + fr->mmapsize; 661 if (fr->mmapptr >= mmapend) 662 return (NULL); 663 else { 664 unsigned char *strend; 665 size_t sz; 666 667 sz = mmapend - fr->mmapptr; 668 strend = memchr(fr->mmapptr, fr->elsymb, sz); 669 670 if (strend == NULL) { 671 ret = bwscsbdup(fr->mmapptr, sz); 672 fr->mmapptr = mmapend; 673 } else { 674 ret = bwscsbdup(fr->mmapptr, strend - 675 fr->mmapptr); 676 fr->mmapptr = strend + 1; 677 } 678 } 679 } else { 680 ssize_t len; 681 682 len = getdelim(&fr->buffer, &fr->bsz, fr->elsymb, fr->file); 683 if (len < 0) { 684 if (!feof(fr->file)) 685 err(2, NULL); 686 return (NULL); 687 } 688 if (len > 0 && fr->buffer[len - 1] == fr->elsymb) 689 len--; 690 ret = bwscsbdup(fr->buffer, len); 691 } 692 693 return (ret); 694 } 695 696 static void 697 file_reader_clean(struct file_reader *fr) 698 { 699 700 if (fr == NULL) 701 return; 702 703 if (fr->mmapaddr) 704 munmap(fr->mmapaddr, fr->mmapsize); 705 if (fr->fd) 706 close(fr->fd); 707 708 free(fr->buffer); 709 closefile(fr->file, fr->fname); 710 free(fr->fname); 711 memset(fr, 0, sizeof(struct file_reader)); 712 } 713 714 void 715 file_reader_free(struct file_reader *fr) 716 { 717 718 if (fr == NULL) 719 return; 720 file_reader_clean(fr); 721 free(fr); 722 } 723 724 int 725 procfile(const char *fsrc, struct sort_list *list, struct file_list *fl) 726 { 727 struct file_reader *fr; 728 729 fr = file_reader_init(fsrc); 730 if (fr == NULL) 731 err(2, NULL); 732 733 /* file browse cycle */ 734 for (;;) { 735 struct bwstring *bws; 736 737 bws = file_reader_readline(fr); 738 739 if (bws == NULL) 740 break; 741 742 sort_list_add(list, bws); 743 744 if (list->memsize >= available_free_memory) { 745 char *fn; 746 747 fn = new_tmp_file_name(); 748 sort_list_to_file(list, fn); 749 file_list_add(fl, fn, false); 750 sort_list_clean(list); 751 } 752 } 753 754 file_reader_free(fr); 755 756 return (0); 757 } 758 759 /* 760 * Compare file headers. Files with EOF always go to the end of the list. 761 */ 762 static int 763 file_header_cmp(struct file_header *f1, struct file_header *f2) 764 { 765 766 if (f1 == f2) 767 return (0); 768 else { 769 if (f1->fr == NULL) { 770 return ((f2->fr == NULL) ? 0 : +1); 771 } else if (f2->fr == NULL) 772 return (-1); 773 else { 774 int ret; 775 776 ret = list_coll(&(f1->si), &(f2->si)); 777 if (!ret) 778 return ((f1->file_pos < f2->file_pos) ? -1 : +1); 779 return (ret); 780 } 781 } 782 } 783 784 /* 785 * Allocate and init file header structure 786 */ 787 static void 788 file_header_init(struct file_header **fh, const char *fn, size_t file_pos) 789 { 790 791 if (fh && fn) { 792 struct bwstring *line; 793 794 *fh = sort_malloc(sizeof(struct file_header)); 795 (*fh)->file_pos = file_pos; 796 (*fh)->fr = file_reader_init(fn); 797 if ((*fh)->fr == NULL) { 798 perror(fn); 799 err(2, "%s", getstr(8)); 800 } 801 line = file_reader_readline((*fh)->fr); 802 if (line == NULL) { 803 file_reader_free((*fh)->fr); 804 (*fh)->fr = NULL; 805 (*fh)->si = NULL; 806 } else { 807 (*fh)->si = sort_list_item_alloc(); 808 sort_list_item_set((*fh)->si, line); 809 } 810 } 811 } 812 813 /* 814 * Close file 815 */ 816 static void 817 file_header_close(struct file_header **fh) 818 { 819 820 if (fh && *fh) { 821 file_reader_free((*fh)->fr); 822 (*fh)->fr = NULL; 823 if ((*fh)->si) { 824 sort_list_item_clean((*fh)->si); 825 sort_free((*fh)->si); 826 (*fh)->si = NULL; 827 } 828 sort_free(*fh); 829 *fh = NULL; 830 } 831 } 832 833 /* 834 * Swap two array elements 835 */ 836 static void 837 file_header_swap(struct file_header **fh, size_t i1, size_t i2) 838 { 839 struct file_header *tmp; 840 841 tmp = fh[i1]; 842 fh[i1] = fh[i2]; 843 fh[i2] = tmp; 844 } 845 846 /* heap algorithm ==>> */ 847 848 /* 849 * See heap sort algorithm 850 * "Raises" last element to its right place 851 */ 852 static void 853 file_header_heap_swim(struct file_header **fh, size_t indx) 854 { 855 856 if (indx > 0) { 857 size_t parent_index; 858 859 parent_index = (indx - 1) >> 1; 860 861 if (file_header_cmp(fh[indx], fh[parent_index]) < 0) { 862 /* swap child and parent and continue */ 863 file_header_swap(fh, indx, parent_index); 864 file_header_heap_swim(fh, parent_index); 865 } 866 } 867 } 868 869 /* 870 * Sink the top element to its correct position 871 */ 872 static void 873 file_header_heap_sink(struct file_header **fh, size_t indx, size_t size) 874 { 875 size_t left_child_index; 876 size_t right_child_index; 877 878 left_child_index = indx + indx + 1; 879 right_child_index = left_child_index + 1; 880 881 if (left_child_index < size) { 882 size_t min_child_index; 883 884 min_child_index = left_child_index; 885 886 if ((right_child_index < size) && 887 (file_header_cmp(fh[left_child_index], 888 fh[right_child_index]) > 0)) 889 min_child_index = right_child_index; 890 if (file_header_cmp(fh[indx], fh[min_child_index]) > 0) { 891 file_header_swap(fh, indx, min_child_index); 892 file_header_heap_sink(fh, min_child_index, size); 893 } 894 } 895 } 896 897 /* <<== heap algorithm */ 898 899 /* 900 * Adds element to the "left" end 901 */ 902 static void 903 file_header_list_rearrange_from_header(struct file_header **fh, size_t size) 904 { 905 906 file_header_heap_sink(fh, 0, size); 907 } 908 909 /* 910 * Adds element to the "right" end 911 */ 912 static void 913 file_header_list_push(struct file_header *f, struct file_header **fh, size_t size) 914 { 915 916 fh[size++] = f; 917 file_header_heap_swim(fh, size - 1); 918 } 919 920 struct last_printed 921 { 922 struct bwstring *str; 923 }; 924 925 /* 926 * Prints the current line of the file 927 */ 928 static void 929 file_header_print(struct file_header *fh, FILE *f_out, struct last_printed *lp) 930 { 931 932 if (fh && fh->fr && f_out && fh->si && fh->si->str) { 933 if (sort_opts_vals.uflag) { 934 if ((lp->str == NULL) || (str_list_coll(lp->str, &(fh->si)))) { 935 bwsfwrite(fh->si->str, f_out, sort_opts_vals.zflag); 936 if (lp->str) 937 bwsfree(lp->str); 938 lp->str = bwsdup(fh->si->str); 939 } 940 } else 941 bwsfwrite(fh->si->str, f_out, sort_opts_vals.zflag); 942 } 943 } 944 945 /* 946 * Read next line 947 */ 948 static void 949 file_header_read_next(struct file_header *fh) 950 { 951 952 if (fh && fh->fr) { 953 struct bwstring *tmp; 954 955 tmp = file_reader_readline(fh->fr); 956 if (tmp == NULL) { 957 file_reader_free(fh->fr); 958 fh->fr = NULL; 959 if (fh->si) { 960 sort_list_item_clean(fh->si); 961 sort_free(fh->si); 962 fh->si = NULL; 963 } 964 } else { 965 if (fh->si == NULL) 966 fh->si = sort_list_item_alloc(); 967 sort_list_item_set(fh->si, tmp); 968 } 969 } 970 } 971 972 /* 973 * Merge array of "files headers" 974 */ 975 static void 976 file_headers_merge(size_t fnum, struct file_header **fh, FILE *f_out) 977 { 978 struct last_printed lp; 979 size_t i; 980 981 memset(&lp, 0, sizeof(lp)); 982 983 /* 984 * construct the initial sort structure 985 */ 986 for (i = 0; i < fnum; i++) 987 file_header_list_push(fh[i], fh, i); 988 989 while (fh[0]->fr) { /* unfinished files are always in front */ 990 /* output the smallest line: */ 991 file_header_print(fh[0], f_out, &lp); 992 /* read a new line, if possible: */ 993 file_header_read_next(fh[0]); 994 /* re-arrange the list: */ 995 file_header_list_rearrange_from_header(fh, fnum); 996 } 997 998 if (lp.str) 999 bwsfree(lp.str); 1000 } 1001 1002 /* 1003 * Merges the given files into the output file, which can be 1004 * stdout. 1005 */ 1006 static void 1007 merge_files_array(size_t argc, const char **argv, const char *fn_out) 1008 { 1009 1010 if (argv && fn_out) { 1011 struct file_header **fh; 1012 FILE *f_out; 1013 size_t i; 1014 1015 f_out = openfile(fn_out, "w"); 1016 1017 if (f_out == NULL) 1018 err(2, NULL); 1019 1020 fh = sort_malloc((argc + 1) * sizeof(struct file_header *)); 1021 1022 for (i = 0; i < argc; i++) 1023 file_header_init(fh + i, argv[i], (size_t) i); 1024 1025 file_headers_merge(argc, fh, f_out); 1026 1027 for (i = 0; i < argc; i++) 1028 file_header_close(fh + i); 1029 1030 sort_free(fh); 1031 1032 closefile(f_out, fn_out); 1033 } 1034 } 1035 1036 /* 1037 * Shrinks the file list until its size smaller than max number of opened files 1038 */ 1039 static int 1040 shrink_file_list(struct file_list *fl) 1041 { 1042 1043 if ((fl == NULL) || (size_t) (fl->count) < max_open_files) 1044 return (0); 1045 else { 1046 struct file_list new_fl; 1047 size_t indx = 0; 1048 1049 file_list_init(&new_fl, true); 1050 while (indx < fl->count) { 1051 char *fnew; 1052 size_t num; 1053 1054 num = fl->count - indx; 1055 fnew = new_tmp_file_name(); 1056 1057 if ((size_t) num >= max_open_files) 1058 num = max_open_files - 1; 1059 merge_files_array(num, fl->fns + indx, fnew); 1060 if (fl->tmp) { 1061 size_t i; 1062 1063 for (i = 0; i < num; i++) 1064 unlink(fl->fns[indx + i]); 1065 } 1066 file_list_add(&new_fl, fnew, false); 1067 indx += num; 1068 } 1069 fl->tmp = false; /* already taken care of */ 1070 file_list_clean(fl); 1071 1072 fl->count = new_fl.count; 1073 fl->fns = new_fl.fns; 1074 fl->sz = new_fl.sz; 1075 fl->tmp = new_fl.tmp; 1076 1077 return (1); 1078 } 1079 } 1080 1081 /* 1082 * Merge list of files 1083 */ 1084 void 1085 merge_files(struct file_list *fl, const char *fn_out) 1086 { 1087 1088 if (fl && fn_out) { 1089 while (shrink_file_list(fl)); 1090 1091 merge_files_array(fl->count, fl->fns, fn_out); 1092 } 1093 } 1094 1095 static const char * 1096 get_sort_method_name(int sm) 1097 { 1098 1099 if (sm == SORT_MERGESORT) 1100 return "mergesort"; 1101 else if (sort_opts_vals.sort_method == SORT_RADIXSORT) 1102 return "radixsort"; 1103 else if (sort_opts_vals.sort_method == SORT_HEAPSORT) 1104 return "heapsort"; 1105 else 1106 return "quicksort"; 1107 } 1108 1109 /* 1110 * Wrapper for qsort 1111 */ 1112 static int sort_qsort(void *list, size_t count, size_t elem_size, 1113 int (*cmp_func)(const void *, const void *)) 1114 { 1115 1116 qsort(list, count, elem_size, cmp_func); 1117 return (0); 1118 } 1119 1120 /* 1121 * Sort list of lines and writes it to the file 1122 */ 1123 void 1124 sort_list_to_file(struct sort_list *list, const char *outfile) 1125 { 1126 struct sort_mods *sm = &(keys[0].sm); 1127 1128 if (!(sm->Mflag) && !(sm->Rflag) && !(sm->Vflag) && 1129 !(sm->gflag) && !(sm->hflag) && !(sm->nflag)) { 1130 if ((sort_opts_vals.sort_method == SORT_DEFAULT) && byte_sort) 1131 sort_opts_vals.sort_method = SORT_RADIXSORT; 1132 1133 } else if (sort_opts_vals.sort_method == SORT_RADIXSORT) 1134 err(2, "%s", getstr(9)); 1135 1136 /* 1137 * to handle stable sort and the unique cases in the 1138 * right order, we need stable basic algorithm 1139 */ 1140 if (sort_opts_vals.sflag) { 1141 switch (sort_opts_vals.sort_method){ 1142 case SORT_MERGESORT: 1143 break; 1144 case SORT_RADIXSORT: 1145 break; 1146 case SORT_DEFAULT: 1147 sort_opts_vals.sort_method = SORT_MERGESORT; 1148 break; 1149 default: 1150 errx(2, "%s", getstr(10)); 1151 } 1152 } 1153 1154 if (sort_opts_vals.sort_method == SORT_DEFAULT) 1155 sort_opts_vals.sort_method = DEFAULT_SORT_ALGORITHM; 1156 1157 if (debug_sort) 1158 printf("sort_method=%s\n", 1159 get_sort_method_name(sort_opts_vals.sort_method)); 1160 1161 switch (sort_opts_vals.sort_method){ 1162 case SORT_RADIXSORT: 1163 rxsort(list->list, list->count); 1164 sort_list_dump(list, outfile); 1165 break; 1166 case SORT_MERGESORT: 1167 mt_sort(list, mergesort, outfile); 1168 break; 1169 case SORT_HEAPSORT: 1170 mt_sort(list, heapsort, outfile); 1171 break; 1172 case SORT_QSORT: 1173 mt_sort(list, sort_qsort, outfile); 1174 break; 1175 default: 1176 mt_sort(list, DEFAULT_SORT_FUNC, outfile); 1177 break; 1178 } 1179 } 1180 1181 /******************* MT SORT ************************/ 1182 1183 #if defined(SORT_THREADS) 1184 /* semaphore to count threads */ 1185 static sem_t mtsem; 1186 1187 /* current system sort function */ 1188 static int (*g_sort_func)(void *, size_t, size_t, 1189 int(*)(const void *, const void *)); 1190 1191 /* 1192 * Sort cycle thread (in multi-threaded mode) 1193 */ 1194 static void* 1195 mt_sort_thread(void* arg) 1196 { 1197 struct sort_list *list = arg; 1198 1199 g_sort_func(list->list, list->count, sizeof(struct sort_list_item *), 1200 (int(*)(const void *, const void *)) list_coll); 1201 1202 sem_post(&mtsem); 1203 1204 return (arg); 1205 } 1206 1207 /* 1208 * Compare sub-lists. Empty sub-lists always go to the end of the list. 1209 */ 1210 static int 1211 sub_list_cmp(struct sort_list *l1, struct sort_list *l2) 1212 { 1213 1214 if (l1 == l2) 1215 return (0); 1216 else { 1217 if (l1->count == 0) { 1218 return ((l2->count == 0) ? 0 : +1); 1219 } else if (l2->count == 0) { 1220 return (-1); 1221 } else { 1222 int ret; 1223 1224 ret = list_coll(&(l1->list[0]), &(l2->list[0])); 1225 if (!ret) 1226 return ((l1->sub_list_pos < l2->sub_list_pos) ? 1227 -1 : +1); 1228 return (ret); 1229 } 1230 } 1231 } 1232 1233 /* 1234 * Swap two array elements 1235 */ 1236 static void 1237 sub_list_swap(struct sort_list **sl, size_t i1, size_t i2) 1238 { 1239 struct sort_list *tmp; 1240 1241 tmp = sl[i1]; 1242 sl[i1] = sl[i2]; 1243 sl[i2] = tmp; 1244 } 1245 1246 /* heap algorithm ==>> */ 1247 1248 /* 1249 * See heap sort algorithm 1250 * "Raises" last element to its right place 1251 */ 1252 static void 1253 sub_list_swim(struct sort_list **sl, size_t indx) 1254 { 1255 1256 if (indx > 0) { 1257 size_t parent_index; 1258 1259 parent_index = (indx - 1) >> 1; 1260 1261 if (sub_list_cmp(sl[indx], sl[parent_index]) < 0) { 1262 /* swap child and parent and continue */ 1263 sub_list_swap(sl, indx, parent_index); 1264 sub_list_swim(sl, parent_index); 1265 } 1266 } 1267 } 1268 1269 /* 1270 * Sink the top element to its correct position 1271 */ 1272 static void 1273 sub_list_sink(struct sort_list **sl, size_t indx, size_t size) 1274 { 1275 size_t left_child_index; 1276 size_t right_child_index; 1277 1278 left_child_index = indx + indx + 1; 1279 right_child_index = left_child_index + 1; 1280 1281 if (left_child_index < size) { 1282 size_t min_child_index; 1283 1284 min_child_index = left_child_index; 1285 1286 if ((right_child_index < size) && 1287 (sub_list_cmp(sl[left_child_index], 1288 sl[right_child_index]) > 0)) 1289 min_child_index = right_child_index; 1290 if (sub_list_cmp(sl[indx], sl[min_child_index]) > 0) { 1291 sub_list_swap(sl, indx, min_child_index); 1292 sub_list_sink(sl, min_child_index, size); 1293 } 1294 } 1295 } 1296 1297 /* <<== heap algorithm */ 1298 1299 /* 1300 * Adds element to the "right" end 1301 */ 1302 static void 1303 sub_list_push(struct sort_list *s, struct sort_list **sl, size_t size) 1304 { 1305 1306 sl[size++] = s; 1307 sub_list_swim(sl, size - 1); 1308 } 1309 1310 struct last_printed_item 1311 { 1312 struct sort_list_item *item; 1313 }; 1314 1315 /* 1316 * Prints the current line of the file 1317 */ 1318 static void 1319 sub_list_header_print(struct sort_list *sl, FILE *f_out, 1320 struct last_printed_item *lp) 1321 { 1322 1323 if (sl && sl->count && f_out && sl->list[0]->str) { 1324 if (sort_opts_vals.uflag) { 1325 if ((lp->item == NULL) || (list_coll(&(lp->item), 1326 &(sl->list[0])))) { 1327 bwsfwrite(sl->list[0]->str, f_out, 1328 sort_opts_vals.zflag); 1329 lp->item = sl->list[0]; 1330 } 1331 } else 1332 bwsfwrite(sl->list[0]->str, f_out, 1333 sort_opts_vals.zflag); 1334 } 1335 } 1336 1337 /* 1338 * Read next line 1339 */ 1340 static void 1341 sub_list_next(struct sort_list *sl) 1342 { 1343 1344 if (sl && sl->count) { 1345 sl->list += 1; 1346 sl->count -= 1; 1347 } 1348 } 1349 1350 /* 1351 * Merge sub-lists to a file 1352 */ 1353 static void 1354 merge_sub_lists(struct sort_list **sl, size_t n, FILE* f_out) 1355 { 1356 struct last_printed_item lp; 1357 size_t i; 1358 1359 memset(&lp,0,sizeof(lp)); 1360 1361 /* construct the initial list: */ 1362 for (i = 0; i < n; i++) 1363 sub_list_push(sl[i], sl, i); 1364 1365 while (sl[0]->count) { /* unfinished lists are always in front */ 1366 /* output the smallest line: */ 1367 sub_list_header_print(sl[0], f_out, &lp); 1368 /* move to a new line, if possible: */ 1369 sub_list_next(sl[0]); 1370 /* re-arrange the list: */ 1371 sub_list_sink(sl, 0, n); 1372 } 1373 } 1374 1375 /* 1376 * Merge sub-lists to a file 1377 */ 1378 static void 1379 merge_list_parts(struct sort_list **parts, size_t n, const char *fn) 1380 { 1381 FILE* f_out; 1382 1383 f_out = openfile(fn,"w"); 1384 1385 merge_sub_lists(parts, n, f_out); 1386 1387 closefile(f_out, fn); 1388 } 1389 1390 #endif /* defined(SORT_THREADS) */ 1391 /* 1392 * Multi-threaded sort algorithm "driver" 1393 */ 1394 static void 1395 mt_sort(struct sort_list *list, 1396 int(*sort_func)(void *, size_t, size_t, int(*)(const void *, const void *)), 1397 const char* fn) 1398 { 1399 #if defined(SORT_THREADS) 1400 if (nthreads < 2 || list->count < MT_SORT_THRESHOLD) { 1401 size_t nthreads_save = nthreads; 1402 nthreads = 1; 1403 #endif 1404 /* if single thread or small data, do simple sort */ 1405 sort_func(list->list, list->count, 1406 sizeof(struct sort_list_item *), 1407 (int(*)(const void *, const void *)) list_coll); 1408 sort_list_dump(list, fn); 1409 #if defined(SORT_THREADS) 1410 nthreads = nthreads_save; 1411 } else { 1412 /* multi-threaded sort */ 1413 struct sort_list **parts; 1414 size_t avgsize, cstart, i; 1415 1416 /* array of sub-lists */ 1417 parts = sort_malloc(sizeof(struct sort_list*) * nthreads); 1418 cstart = 0; 1419 avgsize = list->count / nthreads; 1420 1421 /* set global system sort function */ 1422 g_sort_func = sort_func; 1423 1424 /* set sublists */ 1425 for (i = 0; i < nthreads; ++i) { 1426 size_t sz = 0; 1427 1428 parts[i] = sort_malloc(sizeof(struct sort_list)); 1429 parts[i]->list = list->list + cstart; 1430 parts[i]->memsize = 0; 1431 parts[i]->sub_list_pos = i; 1432 1433 sz = (i == nthreads - 1) ? list->count - cstart : 1434 avgsize; 1435 1436 parts[i]->count = sz; 1437 1438 parts[i]->size = parts[i]->count; 1439 1440 cstart += sz; 1441 } 1442 1443 /* init threads counting semaphore */ 1444 sem_init(&mtsem, 0, 0); 1445 1446 /* start threads */ 1447 for (i = 0; i < nthreads; ++i) { 1448 pthread_t pth; 1449 pthread_attr_t attr; 1450 1451 pthread_attr_init(&attr); 1452 pthread_attr_setdetachstate(&attr, PTHREAD_DETACHED); 1453 1454 for (;;) { 1455 int res = pthread_create(&pth, &attr, 1456 mt_sort_thread, parts[i]); 1457 1458 if (res >= 0) 1459 break; 1460 if (errno == EAGAIN) { 1461 pthread_yield(); 1462 continue; 1463 } 1464 err(2, NULL); 1465 } 1466 1467 pthread_attr_destroy(&attr); 1468 } 1469 1470 /* wait for threads completion */ 1471 for (i = 0; i < nthreads; ++i) { 1472 sem_wait(&mtsem); 1473 } 1474 /* destroy the semaphore - we do not need it anymore */ 1475 sem_destroy(&mtsem); 1476 1477 /* merge sorted sub-lists to the file */ 1478 merge_list_parts(parts, nthreads, fn); 1479 1480 /* free sub-lists data */ 1481 for (i = 0; i < nthreads; ++i) { 1482 sort_free(parts[i]); 1483 } 1484 sort_free(parts); 1485 } 1486 #endif /* defined(SORT_THREADS) */ 1487 } 1488