1 /*- 2 * SPDX-License-Identifier: BSD-2-Clause-FreeBSD 3 * 4 * Copyright (C) 2009 Gabor Kovesdan <gabor@FreeBSD.org> 5 * Copyright (C) 2012 Oleg Moskalenko <mom040267@gmail.com> 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 27 * SUCH DAMAGE. 28 */ 29 30 #include <sys/cdefs.h> 31 __FBSDID("$FreeBSD$"); 32 33 #include <sys/mman.h> 34 #include <sys/stat.h> 35 #include <sys/types.h> 36 #include <sys/queue.h> 37 38 #include <err.h> 39 #include <fcntl.h> 40 #if defined(SORT_THREADS) 41 #include <pthread.h> 42 #endif 43 #include <semaphore.h> 44 #include <stdio.h> 45 #include <stdlib.h> 46 #include <string.h> 47 #include <unistd.h> 48 #include <wchar.h> 49 #include <wctype.h> 50 51 #include "coll.h" 52 #include "file.h" 53 #include "radixsort.h" 54 55 unsigned long long free_memory = 1000000; 56 unsigned long long available_free_memory = 1000000; 57 58 bool use_mmap; 59 60 const char *tmpdir = "/var/tmp"; 61 const char *compress_program; 62 63 size_t max_open_files = 16; 64 65 /* 66 * File reader structure 67 */ 68 struct file_reader 69 { 70 FILE *file; 71 char *fname; 72 char *buffer; 73 unsigned char *mmapaddr; 74 unsigned char *mmapptr; 75 size_t bsz; 76 size_t mmapsize; 77 int fd; 78 char elsymb; 79 }; 80 81 /* 82 * Structure to be used in file merge process. 83 */ 84 struct file_header 85 { 86 struct file_reader *fr; 87 struct sort_list_item *si; /* current top line */ 88 size_t file_pos; 89 }; 90 91 /* 92 * List elements of "cleanable" files list. 93 */ 94 struct CLEANABLE_FILE 95 { 96 char *fn; 97 LIST_ENTRY(CLEANABLE_FILE) files; 98 }; 99 100 /* 101 * List header of "cleanable" files list. 102 */ 103 static LIST_HEAD(CLEANABLE_FILES,CLEANABLE_FILE) tmp_files; 104 105 /* 106 * Semaphore to protect the tmp file list. 107 * We use semaphore here because it is signal-safe, according to POSIX. 108 * And semaphore does not require pthread library. 109 */ 110 static sem_t tmp_files_sem; 111 112 static void mt_sort(struct sort_list *list, 113 int (*sort_func)(void *, size_t, size_t, 114 int (*)(const void *, const void *)), const char* fn); 115 116 /* 117 * Init tmp files list 118 */ 119 void 120 init_tmp_files(void) 121 { 122 123 LIST_INIT(&tmp_files); 124 sem_init(&tmp_files_sem, 0, 1); 125 } 126 127 /* 128 * Save name of a tmp file for signal cleanup 129 */ 130 void 131 tmp_file_atexit(const char *tmp_file) 132 { 133 134 if (tmp_file) { 135 sem_wait(&tmp_files_sem); 136 struct CLEANABLE_FILE *item = 137 sort_malloc(sizeof(struct CLEANABLE_FILE)); 138 item->fn = sort_strdup(tmp_file); 139 LIST_INSERT_HEAD(&tmp_files, item, files); 140 sem_post(&tmp_files_sem); 141 } 142 } 143 144 /* 145 * Clear tmp files 146 */ 147 void 148 clear_tmp_files(void) 149 { 150 struct CLEANABLE_FILE *item; 151 152 sem_wait(&tmp_files_sem); 153 LIST_FOREACH(item,&tmp_files,files) { 154 if ((item) && (item->fn)) 155 unlink(item->fn); 156 } 157 sem_post(&tmp_files_sem); 158 } 159 160 /* 161 * Check whether a file is a temporary file 162 */ 163 static bool 164 file_is_tmp(const char* fn) 165 { 166 struct CLEANABLE_FILE *item; 167 bool ret = false; 168 169 if (fn) { 170 sem_wait(&tmp_files_sem); 171 LIST_FOREACH(item,&tmp_files,files) { 172 if ((item) && (item->fn)) 173 if (strcmp(item->fn, fn) == 0) { 174 ret = true; 175 break; 176 } 177 } 178 sem_post(&tmp_files_sem); 179 } 180 181 return (ret); 182 } 183 184 /* 185 * Generate new temporary file name 186 */ 187 char * 188 new_tmp_file_name(void) 189 { 190 char *ret; 191 int fd; 192 193 if (asprintf(&ret, "%s/.bsdsort.XXXXXXXXXX", tmpdir) == -1) 194 err(2, "asprintf()"); 195 if ((fd = mkstemp(ret)) == -1) 196 err(2, "mkstemp()"); 197 close(fd); 198 199 tmp_file_atexit(ret); 200 return (ret); 201 } 202 203 /* 204 * Initialize file list 205 */ 206 void 207 file_list_init(struct file_list *fl, bool tmp) 208 { 209 210 if (fl) { 211 memset(fl, 0, sizeof(*fl)); 212 fl->tmp = tmp; 213 } 214 } 215 216 /* 217 * Add a file name to the list 218 */ 219 void 220 file_list_add(struct file_list *fl, const char *fn, bool allocate) 221 { 222 223 if (fl && fn) { 224 if (fl->count >= fl->sz || (fl->fns == NULL)) { 225 fl->sz = (fl->sz) * 2 + 1; 226 fl->fns = sort_realloc(fl->fns, fl->sz * 227 sizeof(char *)); 228 } 229 fl->fns[fl->count] = allocate ? sort_strdup(fn) : fn; 230 fl->count += 1; 231 } 232 } 233 234 /* 235 * Populate file list from array of file names 236 */ 237 void 238 file_list_populate(struct file_list *fl, int argc, char **argv, bool allocate) 239 { 240 241 if (fl && argv) { 242 int i; 243 244 for (i = 0; i < argc; i++) 245 file_list_add(fl, argv[i], allocate); 246 } 247 } 248 249 /* 250 * Clean file list data and delete the files, 251 * if this is a list of temporary files 252 */ 253 void 254 file_list_clean(struct file_list *fl) 255 { 256 257 if (fl) { 258 if (fl->fns) { 259 size_t i; 260 261 for (i = 0; i < fl->count; i++) { 262 if (fl->fns[i]) { 263 if (fl->tmp) 264 unlink(fl->fns[i]); 265 sort_free(fl->fns[i]); 266 fl->fns[i] = 0; 267 } 268 } 269 sort_free(fl->fns); 270 fl->fns = NULL; 271 } 272 fl->sz = 0; 273 fl->count = 0; 274 fl->tmp = false; 275 } 276 } 277 278 /* 279 * Init sort list 280 */ 281 void 282 sort_list_init(struct sort_list *l) 283 { 284 285 if (l) { 286 memset(l, 0, sizeof(*l)); 287 l->memsize = sizeof(struct sort_list); 288 } 289 } 290 291 /* 292 * Add string to sort list 293 */ 294 void 295 sort_list_add(struct sort_list *l, struct bwstring *str) 296 { 297 298 if (l && str) { 299 size_t indx = l->count; 300 301 if ((l->list == NULL) || (indx >= l->size)) { 302 size_t newsize = (l->size + 1) + 1024; 303 304 l->list = sort_realloc(l->list, 305 sizeof(struct sort_list_item*) * newsize); 306 l->memsize += (newsize - l->size) * 307 sizeof(struct sort_list_item*); 308 l->size = newsize; 309 } 310 l->list[indx] = sort_list_item_alloc(); 311 sort_list_item_set(l->list[indx], str); 312 l->memsize += sort_list_item_size(l->list[indx]); 313 l->count += 1; 314 } 315 } 316 317 /* 318 * Clean sort list data 319 */ 320 void 321 sort_list_clean(struct sort_list *l) 322 { 323 324 if (l) { 325 if (l->list) { 326 size_t i; 327 328 for (i = 0; i < l->count; i++) { 329 struct sort_list_item *item; 330 331 item = l->list[i]; 332 333 if (item) { 334 sort_list_item_clean(item); 335 sort_free(item); 336 l->list[i] = NULL; 337 } 338 } 339 sort_free(l->list); 340 l->list = NULL; 341 } 342 l->count = 0; 343 l->size = 0; 344 l->memsize = sizeof(struct sort_list); 345 } 346 } 347 348 /* 349 * Write sort list to file 350 */ 351 void 352 sort_list_dump(struct sort_list *l, const char *fn) 353 { 354 355 if (l && fn) { 356 FILE *f; 357 358 f = openfile(fn, "w"); 359 if (f == NULL) 360 err(2, NULL); 361 362 if (l->list) { 363 size_t i; 364 if (!(sort_opts_vals.uflag)) { 365 for (i = 0; i < l->count; ++i) 366 bwsfwrite(l->list[i]->str, f, 367 sort_opts_vals.zflag); 368 } else { 369 struct sort_list_item *last_printed_item = NULL; 370 struct sort_list_item *item; 371 for (i = 0; i < l->count; ++i) { 372 item = l->list[i]; 373 if ((last_printed_item == NULL) || 374 list_coll(&last_printed_item, &item)) { 375 bwsfwrite(item->str, f, sort_opts_vals.zflag); 376 last_printed_item = item; 377 } 378 } 379 } 380 } 381 382 closefile(f, fn); 383 } 384 } 385 386 /* 387 * Checks if the given file is sorted. Stops at the first disorder, 388 * prints the disordered line and returns 1. 389 */ 390 int 391 check(const char *fn) 392 { 393 struct bwstring *s1, *s2, *s1disorder, *s2disorder; 394 struct file_reader *fr; 395 struct keys_array *ka1, *ka2; 396 int res; 397 size_t pos, posdisorder; 398 399 s1 = s2 = s1disorder = s2disorder = NULL; 400 ka1 = ka2 = NULL; 401 402 fr = file_reader_init(fn); 403 404 res = 0; 405 pos = 1; 406 posdisorder = 1; 407 408 if (fr == NULL) { 409 err(2, NULL); 410 goto end; 411 } 412 413 s1 = file_reader_readline(fr); 414 if (s1 == NULL) 415 goto end; 416 417 ka1 = keys_array_alloc(); 418 preproc(s1, ka1); 419 420 s2 = file_reader_readline(fr); 421 if (s2 == NULL) 422 goto end; 423 424 ka2 = keys_array_alloc(); 425 preproc(s2, ka2); 426 427 for (;;) { 428 429 if (debug_sort) { 430 bwsprintf(stdout, s2, "s1=<", ">"); 431 bwsprintf(stdout, s1, "s2=<", ">"); 432 } 433 int cmp = key_coll(ka2, ka1, 0); 434 if (debug_sort) 435 printf("; cmp1=%d", cmp); 436 437 if (!cmp && sort_opts_vals.complex_sort && 438 !(sort_opts_vals.uflag) && !(sort_opts_vals.sflag)) { 439 cmp = top_level_str_coll(s2, s1); 440 if (debug_sort) 441 printf("; cmp2=%d", cmp); 442 } 443 if (debug_sort) 444 printf("\n"); 445 446 if ((sort_opts_vals.uflag && (cmp <= 0)) || (cmp < 0)) { 447 if (!(sort_opts_vals.csilentflag)) { 448 s2disorder = bwsdup(s2); 449 posdisorder = pos; 450 if (debug_sort) 451 s1disorder = bwsdup(s1); 452 } 453 res = 1; 454 goto end; 455 } 456 457 pos++; 458 459 clean_keys_array(s1, ka1); 460 sort_free(ka1); 461 ka1 = ka2; 462 ka2 = NULL; 463 464 bwsfree(s1); 465 s1 = s2; 466 467 s2 = file_reader_readline(fr); 468 if (s2 == NULL) 469 goto end; 470 471 ka2 = keys_array_alloc(); 472 preproc(s2, ka2); 473 } 474 475 end: 476 if (ka1) { 477 clean_keys_array(s1, ka1); 478 sort_free(ka1); 479 } 480 481 if (s1) 482 bwsfree(s1); 483 484 if (ka2) { 485 clean_keys_array(s2, ka2); 486 sort_free(ka2); 487 } 488 489 if (s2) 490 bwsfree(s2); 491 492 if ((fn == NULL) || (*fn == 0) || (strcmp(fn, "-") == 0)) { 493 for (;;) { 494 s2 = file_reader_readline(fr); 495 if (s2 == NULL) 496 break; 497 bwsfree(s2); 498 } 499 } 500 501 file_reader_free(fr); 502 503 if (s2disorder) { 504 bws_disorder_warnx(s2disorder, fn, posdisorder); 505 if (s1disorder) { 506 bws_disorder_warnx(s1disorder, fn, posdisorder); 507 if (s1disorder != s2disorder) 508 bwsfree(s1disorder); 509 } 510 bwsfree(s2disorder); 511 s1disorder = NULL; 512 s2disorder = NULL; 513 } 514 515 if (res) 516 exit(res); 517 518 return (0); 519 } 520 521 /* 522 * Opens a file. If the given filename is "-", stdout will be 523 * opened. 524 */ 525 FILE * 526 openfile(const char *fn, const char *mode) 527 { 528 FILE *file; 529 530 if (strcmp(fn, "-") == 0) { 531 return ((mode && mode[0] == 'r') ? stdin : stdout); 532 } else { 533 mode_t orig_file_mask = 0; 534 int is_tmp = file_is_tmp(fn); 535 536 if (is_tmp && (mode[0] == 'w')) 537 orig_file_mask = umask(S_IWGRP | S_IWOTH | 538 S_IRGRP | S_IROTH); 539 540 if (is_tmp && (compress_program != NULL)) { 541 char *cmd; 542 size_t cmdsz; 543 544 cmdsz = strlen(fn) + 128; 545 cmd = sort_malloc(cmdsz); 546 547 fflush(stdout); 548 549 if (mode[0] == 'r') 550 snprintf(cmd, cmdsz - 1, "cat %s | %s -d", 551 fn, compress_program); 552 else if (mode[0] == 'w') 553 snprintf(cmd, cmdsz - 1, "%s > %s", 554 compress_program, fn); 555 else 556 err(2, "%s", getstr(7)); 557 558 if ((file = popen(cmd, mode)) == NULL) 559 err(2, NULL); 560 561 sort_free(cmd); 562 563 } else 564 if ((file = fopen(fn, mode)) == NULL) 565 err(2, NULL); 566 567 if (is_tmp && (mode[0] == 'w')) 568 umask(orig_file_mask); 569 } 570 571 return (file); 572 } 573 574 /* 575 * Close file 576 */ 577 void 578 closefile(FILE *f, const char *fn) 579 { 580 if (f == NULL) { 581 ; 582 } else if (f == stdin) { 583 ; 584 } else if (f == stdout) { 585 fflush(f); 586 } else { 587 if (file_is_tmp(fn) && compress_program != NULL) { 588 if(pclose(f)<0) 589 err(2,NULL); 590 } else 591 fclose(f); 592 } 593 } 594 595 /* 596 * Reads a file into the internal buffer. 597 */ 598 struct file_reader * 599 file_reader_init(const char *fsrc) 600 { 601 struct file_reader *ret; 602 603 if (fsrc == NULL) 604 fsrc = "-"; 605 606 ret = sort_calloc(1, sizeof(struct file_reader)); 607 608 ret->elsymb = '\n'; 609 if (sort_opts_vals.zflag) 610 ret->elsymb = 0; 611 612 ret->fname = sort_strdup(fsrc); 613 614 if (strcmp(fsrc, "-") && (compress_program == NULL) && use_mmap) { 615 616 do { 617 struct stat stat_buf; 618 void *addr; 619 size_t sz = 0; 620 int fd, flags; 621 622 flags = MAP_NOCORE | MAP_NOSYNC; 623 624 fd = open(fsrc, O_RDONLY); 625 if (fd < 0) 626 err(2, NULL); 627 628 if (fstat(fd, &stat_buf) < 0) { 629 close(fd); 630 break; 631 } 632 633 sz = stat_buf.st_size; 634 635 #if defined(MAP_PREFAULT_READ) 636 flags |= MAP_PREFAULT_READ; 637 #endif 638 639 addr = mmap(NULL, sz, PROT_READ, flags, fd, 0); 640 if (addr == MAP_FAILED) { 641 close(fd); 642 break; 643 } 644 645 ret->fd = fd; 646 ret->mmapaddr = addr; 647 ret->mmapsize = sz; 648 ret->mmapptr = ret->mmapaddr; 649 650 } while (0); 651 } 652 653 if (ret->mmapaddr == NULL) { 654 ret->file = openfile(fsrc, "r"); 655 if (ret->file == NULL) 656 err(2, NULL); 657 } 658 659 return (ret); 660 } 661 662 struct bwstring * 663 file_reader_readline(struct file_reader *fr) 664 { 665 struct bwstring *ret = NULL; 666 667 if (fr->mmapaddr) { 668 unsigned char *mmapend; 669 670 mmapend = fr->mmapaddr + fr->mmapsize; 671 if (fr->mmapptr >= mmapend) 672 return (NULL); 673 else { 674 unsigned char *strend; 675 size_t sz; 676 677 sz = mmapend - fr->mmapptr; 678 strend = memchr(fr->mmapptr, fr->elsymb, sz); 679 680 if (strend == NULL) { 681 ret = bwscsbdup(fr->mmapptr, sz); 682 fr->mmapptr = mmapend; 683 } else { 684 ret = bwscsbdup(fr->mmapptr, strend - 685 fr->mmapptr); 686 fr->mmapptr = strend + 1; 687 } 688 } 689 } else { 690 int delim = sort_opts_vals.zflag ? '\0' : '\n'; 691 ssize_t len = getdelim(&fr->buffer, &fr->bsz, delim, fr->file); 692 if (len < 0) { 693 if (!feof(fr->file)) 694 err(2, NULL); 695 return (NULL); 696 } 697 if (len > 0 && fr->buffer[len - 1] == delim) 698 len--; 699 ret = bwscsbdup(fr->buffer, len); 700 } 701 702 return (ret); 703 } 704 705 static void 706 file_reader_clean(struct file_reader *fr) 707 { 708 709 if (fr) { 710 if (fr->mmapaddr) 711 munmap(fr->mmapaddr, fr->mmapsize); 712 713 if (fr->fd) 714 close(fr->fd); 715 716 if (fr->buffer) 717 sort_free(fr->buffer); 718 719 if (fr->file) 720 if (fr->file != stdin) 721 closefile(fr->file, fr->fname); 722 723 if(fr->fname) 724 sort_free(fr->fname); 725 726 memset(fr, 0, sizeof(struct file_reader)); 727 } 728 } 729 730 void 731 file_reader_free(struct file_reader *fr) 732 { 733 734 if (fr) { 735 file_reader_clean(fr); 736 sort_free(fr); 737 } 738 } 739 740 int 741 procfile(const char *fsrc, struct sort_list *list, struct file_list *fl) 742 { 743 struct file_reader *fr; 744 745 fr = file_reader_init(fsrc); 746 if (fr == NULL) 747 err(2, NULL); 748 749 /* file browse cycle */ 750 for (;;) { 751 struct bwstring *bws; 752 753 bws = file_reader_readline(fr); 754 755 if (bws == NULL) 756 break; 757 758 sort_list_add(list, bws); 759 760 if (list->memsize >= available_free_memory) { 761 char *fn; 762 763 fn = new_tmp_file_name(); 764 sort_list_to_file(list, fn); 765 file_list_add(fl, fn, false); 766 sort_list_clean(list); 767 } 768 } 769 770 file_reader_free(fr); 771 772 return (0); 773 } 774 775 /* 776 * Compare file headers. Files with EOF always go to the end of the list. 777 */ 778 static int 779 file_header_cmp(struct file_header *f1, struct file_header *f2) 780 { 781 782 if (f1 == f2) 783 return (0); 784 else { 785 if (f1->fr == NULL) { 786 return ((f2->fr == NULL) ? 0 : +1); 787 } else if (f2->fr == NULL) 788 return (-1); 789 else { 790 int ret; 791 792 ret = list_coll(&(f1->si), &(f2->si)); 793 if (!ret) 794 return ((f1->file_pos < f2->file_pos) ? -1 : +1); 795 return (ret); 796 } 797 } 798 } 799 800 /* 801 * Allocate and init file header structure 802 */ 803 static void 804 file_header_init(struct file_header **fh, const char *fn, size_t file_pos) 805 { 806 807 if (fh && fn) { 808 struct bwstring *line; 809 810 *fh = sort_malloc(sizeof(struct file_header)); 811 (*fh)->file_pos = file_pos; 812 (*fh)->fr = file_reader_init(fn); 813 if ((*fh)->fr == NULL) { 814 perror(fn); 815 err(2, "%s", getstr(8)); 816 } 817 line = file_reader_readline((*fh)->fr); 818 if (line == NULL) { 819 file_reader_free((*fh)->fr); 820 (*fh)->fr = NULL; 821 (*fh)->si = NULL; 822 } else { 823 (*fh)->si = sort_list_item_alloc(); 824 sort_list_item_set((*fh)->si, line); 825 } 826 } 827 } 828 829 /* 830 * Close file 831 */ 832 static void 833 file_header_close(struct file_header **fh) 834 { 835 836 if (fh && *fh) { 837 if ((*fh)->fr) { 838 file_reader_free((*fh)->fr); 839 (*fh)->fr = NULL; 840 } 841 if ((*fh)->si) { 842 sort_list_item_clean((*fh)->si); 843 sort_free((*fh)->si); 844 (*fh)->si = NULL; 845 } 846 sort_free(*fh); 847 *fh = NULL; 848 } 849 } 850 851 /* 852 * Swap two array elements 853 */ 854 static void 855 file_header_swap(struct file_header **fh, size_t i1, size_t i2) 856 { 857 struct file_header *tmp; 858 859 tmp = fh[i1]; 860 fh[i1] = fh[i2]; 861 fh[i2] = tmp; 862 } 863 864 /* heap algorithm ==>> */ 865 866 /* 867 * See heap sort algorithm 868 * "Raises" last element to its right place 869 */ 870 static void 871 file_header_heap_swim(struct file_header **fh, size_t indx) 872 { 873 874 if (indx > 0) { 875 size_t parent_index; 876 877 parent_index = (indx - 1) >> 1; 878 879 if (file_header_cmp(fh[indx], fh[parent_index]) < 0) { 880 /* swap child and parent and continue */ 881 file_header_swap(fh, indx, parent_index); 882 file_header_heap_swim(fh, parent_index); 883 } 884 } 885 } 886 887 /* 888 * Sink the top element to its correct position 889 */ 890 static void 891 file_header_heap_sink(struct file_header **fh, size_t indx, size_t size) 892 { 893 size_t left_child_index; 894 size_t right_child_index; 895 896 left_child_index = indx + indx + 1; 897 right_child_index = left_child_index + 1; 898 899 if (left_child_index < size) { 900 size_t min_child_index; 901 902 min_child_index = left_child_index; 903 904 if ((right_child_index < size) && 905 (file_header_cmp(fh[left_child_index], 906 fh[right_child_index]) > 0)) 907 min_child_index = right_child_index; 908 if (file_header_cmp(fh[indx], fh[min_child_index]) > 0) { 909 file_header_swap(fh, indx, min_child_index); 910 file_header_heap_sink(fh, min_child_index, size); 911 } 912 } 913 } 914 915 /* <<== heap algorithm */ 916 917 /* 918 * Adds element to the "left" end 919 */ 920 static void 921 file_header_list_rearrange_from_header(struct file_header **fh, size_t size) 922 { 923 924 file_header_heap_sink(fh, 0, size); 925 } 926 927 /* 928 * Adds element to the "right" end 929 */ 930 static void 931 file_header_list_push(struct file_header *f, struct file_header **fh, size_t size) 932 { 933 934 fh[size++] = f; 935 file_header_heap_swim(fh, size - 1); 936 } 937 938 struct last_printed 939 { 940 struct bwstring *str; 941 }; 942 943 /* 944 * Prints the current line of the file 945 */ 946 static void 947 file_header_print(struct file_header *fh, FILE *f_out, struct last_printed *lp) 948 { 949 950 if (fh && fh->fr && f_out && fh->si && fh->si->str) { 951 if (sort_opts_vals.uflag) { 952 if ((lp->str == NULL) || (str_list_coll(lp->str, &(fh->si)))) { 953 bwsfwrite(fh->si->str, f_out, sort_opts_vals.zflag); 954 if (lp->str) 955 bwsfree(lp->str); 956 lp->str = bwsdup(fh->si->str); 957 } 958 } else 959 bwsfwrite(fh->si->str, f_out, sort_opts_vals.zflag); 960 } 961 } 962 963 /* 964 * Read next line 965 */ 966 static void 967 file_header_read_next(struct file_header *fh) 968 { 969 970 if (fh && fh->fr) { 971 struct bwstring *tmp; 972 973 tmp = file_reader_readline(fh->fr); 974 if (tmp == NULL) { 975 file_reader_free(fh->fr); 976 fh->fr = NULL; 977 if (fh->si) { 978 sort_list_item_clean(fh->si); 979 sort_free(fh->si); 980 fh->si = NULL; 981 } 982 } else { 983 if (fh->si == NULL) 984 fh->si = sort_list_item_alloc(); 985 sort_list_item_set(fh->si, tmp); 986 } 987 } 988 } 989 990 /* 991 * Merge array of "files headers" 992 */ 993 static void 994 file_headers_merge(size_t fnum, struct file_header **fh, FILE *f_out) 995 { 996 struct last_printed lp; 997 size_t i; 998 999 memset(&lp, 0, sizeof(lp)); 1000 1001 /* 1002 * construct the initial sort structure 1003 */ 1004 for (i = 0; i < fnum; i++) 1005 file_header_list_push(fh[i], fh, i); 1006 1007 while (fh[0]->fr) { /* unfinished files are always in front */ 1008 /* output the smallest line: */ 1009 file_header_print(fh[0], f_out, &lp); 1010 /* read a new line, if possible: */ 1011 file_header_read_next(fh[0]); 1012 /* re-arrange the list: */ 1013 file_header_list_rearrange_from_header(fh, fnum); 1014 } 1015 1016 if (lp.str) 1017 bwsfree(lp.str); 1018 } 1019 1020 /* 1021 * Merges the given files into the output file, which can be 1022 * stdout. 1023 */ 1024 static void 1025 merge_files_array(size_t argc, const char **argv, const char *fn_out) 1026 { 1027 1028 if (argv && fn_out) { 1029 struct file_header **fh; 1030 FILE *f_out; 1031 size_t i; 1032 1033 f_out = openfile(fn_out, "w"); 1034 1035 if (f_out == NULL) 1036 err(2, NULL); 1037 1038 fh = sort_malloc((argc + 1) * sizeof(struct file_header *)); 1039 1040 for (i = 0; i < argc; i++) 1041 file_header_init(fh + i, argv[i], (size_t) i); 1042 1043 file_headers_merge(argc, fh, f_out); 1044 1045 for (i = 0; i < argc; i++) 1046 file_header_close(fh + i); 1047 1048 sort_free(fh); 1049 1050 closefile(f_out, fn_out); 1051 } 1052 } 1053 1054 /* 1055 * Shrinks the file list until its size smaller than max number of opened files 1056 */ 1057 static int 1058 shrink_file_list(struct file_list *fl) 1059 { 1060 1061 if ((fl == NULL) || (size_t) (fl->count) < max_open_files) 1062 return (0); 1063 else { 1064 struct file_list new_fl; 1065 size_t indx = 0; 1066 1067 file_list_init(&new_fl, true); 1068 while (indx < fl->count) { 1069 char *fnew; 1070 size_t num; 1071 1072 num = fl->count - indx; 1073 fnew = new_tmp_file_name(); 1074 1075 if ((size_t) num >= max_open_files) 1076 num = max_open_files - 1; 1077 merge_files_array(num, fl->fns + indx, fnew); 1078 if (fl->tmp) { 1079 size_t i; 1080 1081 for (i = 0; i < num; i++) 1082 unlink(fl->fns[indx + i]); 1083 } 1084 file_list_add(&new_fl, fnew, false); 1085 indx += num; 1086 } 1087 fl->tmp = false; /* already taken care of */ 1088 file_list_clean(fl); 1089 1090 fl->count = new_fl.count; 1091 fl->fns = new_fl.fns; 1092 fl->sz = new_fl.sz; 1093 fl->tmp = new_fl.tmp; 1094 1095 return (1); 1096 } 1097 } 1098 1099 /* 1100 * Merge list of files 1101 */ 1102 void 1103 merge_files(struct file_list *fl, const char *fn_out) 1104 { 1105 1106 if (fl && fn_out) { 1107 while (shrink_file_list(fl)); 1108 1109 merge_files_array(fl->count, fl->fns, fn_out); 1110 } 1111 } 1112 1113 static const char * 1114 get_sort_method_name(int sm) 1115 { 1116 1117 if (sm == SORT_MERGESORT) 1118 return "mergesort"; 1119 else if (sort_opts_vals.sort_method == SORT_RADIXSORT) 1120 return "radixsort"; 1121 else if (sort_opts_vals.sort_method == SORT_HEAPSORT) 1122 return "heapsort"; 1123 else 1124 return "quicksort"; 1125 } 1126 1127 /* 1128 * Wrapper for qsort 1129 */ 1130 static int sort_qsort(void *list, size_t count, size_t elem_size, 1131 int (*cmp_func)(const void *, const void *)) 1132 { 1133 1134 qsort(list, count, elem_size, cmp_func); 1135 return (0); 1136 } 1137 1138 /* 1139 * Sort list of lines and writes it to the file 1140 */ 1141 void 1142 sort_list_to_file(struct sort_list *list, const char *outfile) 1143 { 1144 struct sort_mods *sm = &(keys[0].sm); 1145 1146 if (!(sm->Mflag) && !(sm->Rflag) && !(sm->Vflag) && 1147 !(sm->gflag) && !(sm->hflag) && !(sm->nflag)) { 1148 if ((sort_opts_vals.sort_method == SORT_DEFAULT) && byte_sort) 1149 sort_opts_vals.sort_method = SORT_RADIXSORT; 1150 1151 } else if (sort_opts_vals.sort_method == SORT_RADIXSORT) 1152 err(2, "%s", getstr(9)); 1153 1154 /* 1155 * to handle stable sort and the unique cases in the 1156 * right order, we need stable basic algorithm 1157 */ 1158 if (sort_opts_vals.sflag) { 1159 switch (sort_opts_vals.sort_method){ 1160 case SORT_MERGESORT: 1161 break; 1162 case SORT_RADIXSORT: 1163 break; 1164 case SORT_DEFAULT: 1165 sort_opts_vals.sort_method = SORT_MERGESORT; 1166 break; 1167 default: 1168 errx(2, "%s", getstr(10)); 1169 } 1170 } 1171 1172 if (sort_opts_vals.sort_method == SORT_DEFAULT) 1173 sort_opts_vals.sort_method = DEFAULT_SORT_ALGORITHM; 1174 1175 if (debug_sort) 1176 printf("sort_method=%s\n", 1177 get_sort_method_name(sort_opts_vals.sort_method)); 1178 1179 switch (sort_opts_vals.sort_method){ 1180 case SORT_RADIXSORT: 1181 rxsort(list->list, list->count); 1182 sort_list_dump(list, outfile); 1183 break; 1184 case SORT_MERGESORT: 1185 mt_sort(list, mergesort, outfile); 1186 break; 1187 case SORT_HEAPSORT: 1188 mt_sort(list, heapsort, outfile); 1189 break; 1190 case SORT_QSORT: 1191 mt_sort(list, sort_qsort, outfile); 1192 break; 1193 default: 1194 mt_sort(list, DEFAULT_SORT_FUNC, outfile); 1195 break; 1196 } 1197 } 1198 1199 /******************* MT SORT ************************/ 1200 1201 #if defined(SORT_THREADS) 1202 /* semaphore to count threads */ 1203 static sem_t mtsem; 1204 1205 /* current system sort function */ 1206 static int (*g_sort_func)(void *, size_t, size_t, 1207 int(*)(const void *, const void *)); 1208 1209 /* 1210 * Sort cycle thread (in multi-threaded mode) 1211 */ 1212 static void* 1213 mt_sort_thread(void* arg) 1214 { 1215 struct sort_list *list = arg; 1216 1217 g_sort_func(list->list, list->count, sizeof(struct sort_list_item *), 1218 (int(*)(const void *, const void *)) list_coll); 1219 1220 sem_post(&mtsem); 1221 1222 return (arg); 1223 } 1224 1225 /* 1226 * Compare sub-lists. Empty sub-lists always go to the end of the list. 1227 */ 1228 static int 1229 sub_list_cmp(struct sort_list *l1, struct sort_list *l2) 1230 { 1231 1232 if (l1 == l2) 1233 return (0); 1234 else { 1235 if (l1->count == 0) { 1236 return ((l2->count == 0) ? 0 : +1); 1237 } else if (l2->count == 0) { 1238 return (-1); 1239 } else { 1240 int ret; 1241 1242 ret = list_coll(&(l1->list[0]), &(l2->list[0])); 1243 if (!ret) 1244 return ((l1->sub_list_pos < l2->sub_list_pos) ? 1245 -1 : +1); 1246 return (ret); 1247 } 1248 } 1249 } 1250 1251 /* 1252 * Swap two array elements 1253 */ 1254 static void 1255 sub_list_swap(struct sort_list **sl, size_t i1, size_t i2) 1256 { 1257 struct sort_list *tmp; 1258 1259 tmp = sl[i1]; 1260 sl[i1] = sl[i2]; 1261 sl[i2] = tmp; 1262 } 1263 1264 /* heap algorithm ==>> */ 1265 1266 /* 1267 * See heap sort algorithm 1268 * "Raises" last element to its right place 1269 */ 1270 static void 1271 sub_list_swim(struct sort_list **sl, size_t indx) 1272 { 1273 1274 if (indx > 0) { 1275 size_t parent_index; 1276 1277 parent_index = (indx - 1) >> 1; 1278 1279 if (sub_list_cmp(sl[indx], sl[parent_index]) < 0) { 1280 /* swap child and parent and continue */ 1281 sub_list_swap(sl, indx, parent_index); 1282 sub_list_swim(sl, parent_index); 1283 } 1284 } 1285 } 1286 1287 /* 1288 * Sink the top element to its correct position 1289 */ 1290 static void 1291 sub_list_sink(struct sort_list **sl, size_t indx, size_t size) 1292 { 1293 size_t left_child_index; 1294 size_t right_child_index; 1295 1296 left_child_index = indx + indx + 1; 1297 right_child_index = left_child_index + 1; 1298 1299 if (left_child_index < size) { 1300 size_t min_child_index; 1301 1302 min_child_index = left_child_index; 1303 1304 if ((right_child_index < size) && 1305 (sub_list_cmp(sl[left_child_index], 1306 sl[right_child_index]) > 0)) 1307 min_child_index = right_child_index; 1308 if (sub_list_cmp(sl[indx], sl[min_child_index]) > 0) { 1309 sub_list_swap(sl, indx, min_child_index); 1310 sub_list_sink(sl, min_child_index, size); 1311 } 1312 } 1313 } 1314 1315 /* <<== heap algorithm */ 1316 1317 /* 1318 * Adds element to the "right" end 1319 */ 1320 static void 1321 sub_list_push(struct sort_list *s, struct sort_list **sl, size_t size) 1322 { 1323 1324 sl[size++] = s; 1325 sub_list_swim(sl, size - 1); 1326 } 1327 1328 struct last_printed_item 1329 { 1330 struct sort_list_item *item; 1331 }; 1332 1333 /* 1334 * Prints the current line of the file 1335 */ 1336 static void 1337 sub_list_header_print(struct sort_list *sl, FILE *f_out, 1338 struct last_printed_item *lp) 1339 { 1340 1341 if (sl && sl->count && f_out && sl->list[0]->str) { 1342 if (sort_opts_vals.uflag) { 1343 if ((lp->item == NULL) || (list_coll(&(lp->item), 1344 &(sl->list[0])))) { 1345 bwsfwrite(sl->list[0]->str, f_out, 1346 sort_opts_vals.zflag); 1347 lp->item = sl->list[0]; 1348 } 1349 } else 1350 bwsfwrite(sl->list[0]->str, f_out, 1351 sort_opts_vals.zflag); 1352 } 1353 } 1354 1355 /* 1356 * Read next line 1357 */ 1358 static void 1359 sub_list_next(struct sort_list *sl) 1360 { 1361 1362 if (sl && sl->count) { 1363 sl->list += 1; 1364 sl->count -= 1; 1365 } 1366 } 1367 1368 /* 1369 * Merge sub-lists to a file 1370 */ 1371 static void 1372 merge_sub_lists(struct sort_list **sl, size_t n, FILE* f_out) 1373 { 1374 struct last_printed_item lp; 1375 size_t i; 1376 1377 memset(&lp,0,sizeof(lp)); 1378 1379 /* construct the initial list: */ 1380 for (i = 0; i < n; i++) 1381 sub_list_push(sl[i], sl, i); 1382 1383 while (sl[0]->count) { /* unfinished lists are always in front */ 1384 /* output the smallest line: */ 1385 sub_list_header_print(sl[0], f_out, &lp); 1386 /* move to a new line, if possible: */ 1387 sub_list_next(sl[0]); 1388 /* re-arrange the list: */ 1389 sub_list_sink(sl, 0, n); 1390 } 1391 } 1392 1393 /* 1394 * Merge sub-lists to a file 1395 */ 1396 static void 1397 merge_list_parts(struct sort_list **parts, size_t n, const char *fn) 1398 { 1399 FILE* f_out; 1400 1401 f_out = openfile(fn,"w"); 1402 1403 merge_sub_lists(parts, n, f_out); 1404 1405 closefile(f_out, fn); 1406 } 1407 1408 #endif /* defined(SORT_THREADS) */ 1409 /* 1410 * Multi-threaded sort algorithm "driver" 1411 */ 1412 static void 1413 mt_sort(struct sort_list *list, 1414 int(*sort_func)(void *, size_t, size_t, int(*)(const void *, const void *)), 1415 const char* fn) 1416 { 1417 #if defined(SORT_THREADS) 1418 if (nthreads < 2 || list->count < MT_SORT_THRESHOLD) { 1419 size_t nthreads_save = nthreads; 1420 nthreads = 1; 1421 #endif 1422 /* if single thread or small data, do simple sort */ 1423 sort_func(list->list, list->count, 1424 sizeof(struct sort_list_item *), 1425 (int(*)(const void *, const void *)) list_coll); 1426 sort_list_dump(list, fn); 1427 #if defined(SORT_THREADS) 1428 nthreads = nthreads_save; 1429 } else { 1430 /* multi-threaded sort */ 1431 struct sort_list **parts; 1432 size_t avgsize, cstart, i; 1433 1434 /* array of sub-lists */ 1435 parts = sort_malloc(sizeof(struct sort_list*) * nthreads); 1436 cstart = 0; 1437 avgsize = list->count / nthreads; 1438 1439 /* set global system sort function */ 1440 g_sort_func = sort_func; 1441 1442 /* set sublists */ 1443 for (i = 0; i < nthreads; ++i) { 1444 size_t sz = 0; 1445 1446 parts[i] = sort_malloc(sizeof(struct sort_list)); 1447 parts[i]->list = list->list + cstart; 1448 parts[i]->memsize = 0; 1449 parts[i]->sub_list_pos = i; 1450 1451 sz = (i == nthreads - 1) ? list->count - cstart : 1452 avgsize; 1453 1454 parts[i]->count = sz; 1455 1456 parts[i]->size = parts[i]->count; 1457 1458 cstart += sz; 1459 } 1460 1461 /* init threads counting semaphore */ 1462 sem_init(&mtsem, 0, 0); 1463 1464 /* start threads */ 1465 for (i = 0; i < nthreads; ++i) { 1466 pthread_t pth; 1467 pthread_attr_t attr; 1468 1469 pthread_attr_init(&attr); 1470 pthread_attr_setdetachstate(&attr, PTHREAD_DETACHED); 1471 1472 for (;;) { 1473 int res = pthread_create(&pth, &attr, 1474 mt_sort_thread, parts[i]); 1475 1476 if (res >= 0) 1477 break; 1478 if (errno == EAGAIN) { 1479 pthread_yield(); 1480 continue; 1481 } 1482 err(2, NULL); 1483 } 1484 1485 pthread_attr_destroy(&attr); 1486 } 1487 1488 /* wait for threads completion */ 1489 for (i = 0; i < nthreads; ++i) { 1490 sem_wait(&mtsem); 1491 } 1492 /* destroy the semaphore - we do not need it anymore */ 1493 sem_destroy(&mtsem); 1494 1495 /* merge sorted sub-lists to the file */ 1496 merge_list_parts(parts, nthreads, fn); 1497 1498 /* free sub-lists data */ 1499 for (i = 0; i < nthreads; ++i) { 1500 sort_free(parts[i]); 1501 } 1502 sort_free(parts); 1503 } 1504 #endif /* defined(SORT_THREADS) */ 1505 } 1506