1 /*- 2 * SPDX-License-Identifier: BSD-2-Clause-FreeBSD 3 * 4 * Copyright (C) 2009 Gabor Kovesdan <gabor@FreeBSD.org> 5 * Copyright (C) 2012 Oleg Moskalenko <mom040267@gmail.com> 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 27 * SUCH DAMAGE. 28 */ 29 30 #include <sys/cdefs.h> 31 __FBSDID("$FreeBSD$"); 32 33 #include <sys/mman.h> 34 #include <sys/stat.h> 35 #include <sys/types.h> 36 #include <sys/queue.h> 37 38 #include <err.h> 39 #include <fcntl.h> 40 #if defined(SORT_THREADS) 41 #include <pthread.h> 42 #endif 43 #include <semaphore.h> 44 #include <stdio.h> 45 #include <stdlib.h> 46 #include <string.h> 47 #include <unistd.h> 48 #include <wchar.h> 49 #include <wctype.h> 50 51 #include "coll.h" 52 #include "file.h" 53 #include "radixsort.h" 54 55 unsigned long long free_memory = 1000000; 56 unsigned long long available_free_memory = 1000000; 57 58 bool use_mmap; 59 60 const char *tmpdir = "/var/tmp"; 61 const char *compress_program; 62 63 size_t max_open_files = 16; 64 65 /* 66 * File reader structure 67 */ 68 struct file_reader 69 { 70 FILE *file; 71 char *fname; 72 char *buffer; 73 unsigned char *mmapaddr; 74 unsigned char *mmapptr; 75 size_t bsz; 76 size_t mmapsize; 77 int fd; 78 char elsymb; 79 }; 80 81 /* 82 * Structure to be used in file merge process. 83 */ 84 struct file_header 85 { 86 struct file_reader *fr; 87 struct sort_list_item *si; /* current top line */ 88 size_t file_pos; 89 }; 90 91 /* 92 * List elements of "cleanable" files list. 93 */ 94 struct CLEANABLE_FILE 95 { 96 char *fn; 97 LIST_ENTRY(CLEANABLE_FILE) files; 98 }; 99 100 /* 101 * List header of "cleanable" files list. 102 */ 103 static LIST_HEAD(CLEANABLE_FILES,CLEANABLE_FILE) tmp_files; 104 105 /* 106 * Semaphore to protect the tmp file list. 107 * We use semaphore here because it is signal-safe, according to POSIX. 108 * And semaphore does not require pthread library. 109 */ 110 static sem_t tmp_files_sem; 111 112 static void mt_sort(struct sort_list *list, 113 int (*sort_func)(void *, size_t, size_t, 114 int (*)(const void *, const void *)), const char* fn); 115 116 /* 117 * Init tmp files list 118 */ 119 void 120 init_tmp_files(void) 121 { 122 123 LIST_INIT(&tmp_files); 124 sem_init(&tmp_files_sem, 0, 1); 125 } 126 127 /* 128 * Save name of a tmp file for signal cleanup 129 */ 130 void 131 tmp_file_atexit(const char *tmp_file) 132 { 133 134 if (tmp_file) { 135 sem_wait(&tmp_files_sem); 136 struct CLEANABLE_FILE *item = 137 sort_malloc(sizeof(struct CLEANABLE_FILE)); 138 item->fn = sort_strdup(tmp_file); 139 LIST_INSERT_HEAD(&tmp_files, item, files); 140 sem_post(&tmp_files_sem); 141 } 142 } 143 144 /* 145 * Clear tmp files 146 */ 147 void 148 clear_tmp_files(void) 149 { 150 struct CLEANABLE_FILE *item; 151 152 sem_wait(&tmp_files_sem); 153 LIST_FOREACH(item,&tmp_files,files) { 154 if ((item) && (item->fn)) 155 unlink(item->fn); 156 } 157 sem_post(&tmp_files_sem); 158 } 159 160 /* 161 * Check whether a file is a temporary file 162 */ 163 static bool 164 file_is_tmp(const char* fn) 165 { 166 struct CLEANABLE_FILE *item; 167 bool ret = false; 168 169 if (fn) { 170 sem_wait(&tmp_files_sem); 171 LIST_FOREACH(item,&tmp_files,files) { 172 if ((item) && (item->fn)) 173 if (strcmp(item->fn, fn) == 0) { 174 ret = true; 175 break; 176 } 177 } 178 sem_post(&tmp_files_sem); 179 } 180 181 return (ret); 182 } 183 184 /* 185 * Generate new temporary file name 186 */ 187 char * 188 new_tmp_file_name(void) 189 { 190 char *ret; 191 int fd; 192 193 if (asprintf(&ret, "%s/.bsdsort.XXXXXXXXXX", tmpdir) == -1) 194 err(2, "asprintf()"); 195 if ((fd = mkstemp(ret)) == -1) 196 err(2, "mkstemp()"); 197 close(fd); 198 199 tmp_file_atexit(ret); 200 return (ret); 201 } 202 203 /* 204 * Initialize file list 205 */ 206 void 207 file_list_init(struct file_list *fl, bool tmp) 208 { 209 210 if (fl) { 211 memset(fl, 0, sizeof(*fl)); 212 fl->tmp = tmp; 213 } 214 } 215 216 /* 217 * Add a file name to the list 218 */ 219 void 220 file_list_add(struct file_list *fl, const char *fn, bool allocate) 221 { 222 223 if (fl && fn) { 224 if (fl->count >= fl->sz || (fl->fns == NULL)) { 225 fl->sz = (fl->sz) * 2 + 1; 226 fl->fns = sort_realloc(fl->fns, fl->sz * 227 sizeof(char *)); 228 } 229 fl->fns[fl->count] = allocate ? sort_strdup(fn) : fn; 230 fl->count += 1; 231 } 232 } 233 234 /* 235 * Populate file list from array of file names 236 */ 237 void 238 file_list_populate(struct file_list *fl, int argc, char **argv, bool allocate) 239 { 240 241 if (fl && argv) { 242 int i; 243 244 for (i = 0; i < argc; i++) 245 file_list_add(fl, argv[i], allocate); 246 } 247 } 248 249 /* 250 * Clean file list data and delete the files, 251 * if this is a list of temporary files 252 */ 253 void 254 file_list_clean(struct file_list *fl) 255 { 256 257 if (fl) { 258 if (fl->fns) { 259 size_t i; 260 261 for (i = 0; i < fl->count; i++) { 262 if (fl->fns[i]) { 263 if (fl->tmp) 264 unlink(fl->fns[i]); 265 sort_free(fl->fns[i]); 266 fl->fns[i] = 0; 267 } 268 } 269 sort_free(fl->fns); 270 fl->fns = NULL; 271 } 272 fl->sz = 0; 273 fl->count = 0; 274 fl->tmp = false; 275 } 276 } 277 278 /* 279 * Init sort list 280 */ 281 void 282 sort_list_init(struct sort_list *l) 283 { 284 285 if (l) { 286 memset(l, 0, sizeof(*l)); 287 l->memsize = sizeof(struct sort_list); 288 } 289 } 290 291 /* 292 * Add string to sort list 293 */ 294 void 295 sort_list_add(struct sort_list *l, struct bwstring *str) 296 { 297 298 if (l && str) { 299 size_t indx = l->count; 300 301 if ((l->list == NULL) || (indx >= l->size)) { 302 size_t newsize = (l->size + 1) + 1024; 303 304 l->list = sort_realloc(l->list, 305 sizeof(struct sort_list_item*) * newsize); 306 l->memsize += (newsize - l->size) * 307 sizeof(struct sort_list_item*); 308 l->size = newsize; 309 } 310 l->list[indx] = sort_list_item_alloc(); 311 sort_list_item_set(l->list[indx], str); 312 l->memsize += sort_list_item_size(l->list[indx]); 313 l->count += 1; 314 } 315 } 316 317 /* 318 * Clean sort list data 319 */ 320 void 321 sort_list_clean(struct sort_list *l) 322 { 323 324 if (l) { 325 if (l->list) { 326 size_t i; 327 328 for (i = 0; i < l->count; i++) { 329 struct sort_list_item *item; 330 331 item = l->list[i]; 332 333 if (item) { 334 sort_list_item_clean(item); 335 sort_free(item); 336 l->list[i] = NULL; 337 } 338 } 339 sort_free(l->list); 340 l->list = NULL; 341 } 342 l->count = 0; 343 l->size = 0; 344 l->memsize = sizeof(struct sort_list); 345 } 346 } 347 348 /* 349 * Write sort list to file 350 */ 351 void 352 sort_list_dump(struct sort_list *l, const char *fn) 353 { 354 355 if (l && fn) { 356 FILE *f; 357 358 f = openfile(fn, "w"); 359 if (f == NULL) 360 err(2, NULL); 361 362 if (l->list) { 363 size_t i; 364 if (!(sort_opts_vals.uflag)) { 365 for (i = 0; i < l->count; ++i) 366 bwsfwrite(l->list[i]->str, f, 367 sort_opts_vals.zflag); 368 } else { 369 struct sort_list_item *last_printed_item = NULL; 370 struct sort_list_item *item; 371 for (i = 0; i < l->count; ++i) { 372 item = l->list[i]; 373 if ((last_printed_item == NULL) || 374 list_coll(&last_printed_item, &item)) { 375 bwsfwrite(item->str, f, sort_opts_vals.zflag); 376 last_printed_item = item; 377 } 378 } 379 } 380 } 381 382 closefile(f, fn); 383 } 384 } 385 386 /* 387 * Checks if the given file is sorted. Stops at the first disorder, 388 * prints the disordered line and returns 1. 389 */ 390 int 391 check(const char *fn) 392 { 393 struct bwstring *s1, *s2, *s1disorder, *s2disorder; 394 struct file_reader *fr; 395 struct keys_array *ka1, *ka2; 396 int res; 397 size_t pos, posdisorder; 398 399 s1 = s2 = s1disorder = s2disorder = NULL; 400 ka1 = ka2 = NULL; 401 402 fr = file_reader_init(fn); 403 404 res = 0; 405 pos = 1; 406 posdisorder = 1; 407 408 if (fr == NULL) { 409 err(2, NULL); 410 goto end; 411 } 412 413 s1 = file_reader_readline(fr); 414 if (s1 == NULL) 415 goto end; 416 417 ka1 = keys_array_alloc(); 418 preproc(s1, ka1); 419 420 s2 = file_reader_readline(fr); 421 if (s2 == NULL) 422 goto end; 423 424 ka2 = keys_array_alloc(); 425 preproc(s2, ka2); 426 427 for (;;) { 428 429 if (debug_sort) { 430 bwsprintf(stdout, s2, "s1=<", ">"); 431 bwsprintf(stdout, s1, "s2=<", ">"); 432 } 433 int cmp = key_coll(ka2, ka1, 0); 434 if (debug_sort) 435 printf("; cmp1=%d", cmp); 436 437 if (!cmp && sort_opts_vals.complex_sort && 438 !(sort_opts_vals.uflag) && !(sort_opts_vals.sflag)) { 439 cmp = top_level_str_coll(s2, s1); 440 if (debug_sort) 441 printf("; cmp2=%d", cmp); 442 } 443 if (debug_sort) 444 printf("\n"); 445 446 if ((sort_opts_vals.uflag && (cmp <= 0)) || (cmp < 0)) { 447 if (!(sort_opts_vals.csilentflag)) { 448 s2disorder = bwsdup(s2); 449 posdisorder = pos; 450 if (debug_sort) 451 s1disorder = bwsdup(s1); 452 } 453 res = 1; 454 goto end; 455 } 456 457 pos++; 458 459 clean_keys_array(s1, ka1); 460 sort_free(ka1); 461 ka1 = ka2; 462 ka2 = NULL; 463 464 bwsfree(s1); 465 s1 = s2; 466 467 s2 = file_reader_readline(fr); 468 if (s2 == NULL) 469 goto end; 470 471 ka2 = keys_array_alloc(); 472 preproc(s2, ka2); 473 } 474 475 end: 476 if (ka1) { 477 clean_keys_array(s1, ka1); 478 sort_free(ka1); 479 } 480 481 if (s1) 482 bwsfree(s1); 483 484 if (ka2) { 485 clean_keys_array(s2, ka2); 486 sort_free(ka2); 487 } 488 489 if (s2) 490 bwsfree(s2); 491 492 if ((fn == NULL) || (*fn == 0) || (strcmp(fn, "-") == 0)) { 493 for (;;) { 494 s2 = file_reader_readline(fr); 495 if (s2 == NULL) 496 break; 497 bwsfree(s2); 498 } 499 } 500 501 file_reader_free(fr); 502 503 if (s2disorder) { 504 bws_disorder_warnx(s2disorder, fn, posdisorder); 505 if (s1disorder) { 506 bws_disorder_warnx(s1disorder, fn, posdisorder); 507 if (s1disorder != s2disorder) 508 bwsfree(s1disorder); 509 } 510 bwsfree(s2disorder); 511 s1disorder = NULL; 512 s2disorder = NULL; 513 } 514 515 if (res) 516 exit(res); 517 518 return (0); 519 } 520 521 /* 522 * Opens a file. If the given filename is "-", stdout will be 523 * opened. 524 */ 525 FILE * 526 openfile(const char *fn, const char *mode) 527 { 528 FILE *file; 529 530 if (strcmp(fn, "-") == 0) 531 return ((mode && mode[0] == 'r') ? stdin : stdout); 532 533 mode_t orig_file_mask = 0; 534 int is_tmp = file_is_tmp(fn); 535 536 if (is_tmp && (mode[0] == 'w')) 537 orig_file_mask = umask(S_IWGRP | S_IWOTH | 538 S_IRGRP | S_IROTH); 539 540 if (is_tmp && (compress_program != NULL)) { 541 int r; 542 char *cmd; 543 544 fflush(stdout); 545 546 if (mode[0] == 'r') 547 r = asprintf(&cmd, "cat %s | %s -d", 548 fn, compress_program); 549 else if (mode[0] == 'w') 550 r = asprintf(&cmd, "%s > %s", 551 compress_program, fn); 552 else 553 err(2, "%s", getstr(7)); 554 555 if (r == -1) 556 err(2, "aspritnf()"); 557 558 if ((file = popen(cmd, mode)) == NULL) 559 err(2, NULL); 560 free(cmd); 561 } else 562 if ((file = fopen(fn, mode)) == NULL) 563 err(2, NULL); 564 565 if (is_tmp && (mode[0] == 'w')) 566 umask(orig_file_mask); 567 568 return (file); 569 } 570 571 /* 572 * Close file 573 */ 574 void 575 closefile(FILE *f, const char *fn) 576 { 577 if (f == NULL || f == stdin) 578 return; 579 if (f == stdout) { 580 fflush(f); 581 return; 582 } 583 if (file_is_tmp(fn) && compress_program != NULL) { 584 if(pclose(f)<0) 585 err(2,NULL); 586 } else 587 fclose(f); 588 } 589 590 /* 591 * Reads a file into the internal buffer. 592 */ 593 struct file_reader * 594 file_reader_init(const char *fsrc) 595 { 596 struct file_reader *ret; 597 598 if (fsrc == NULL) 599 fsrc = "-"; 600 601 ret = sort_calloc(1, sizeof(struct file_reader)); 602 603 ret->elsymb = sort_opts_vals.zflag ? '\0' : '\n'; 604 ret->fname = sort_strdup(fsrc); 605 606 if (strcmp(fsrc, "-") && (compress_program == NULL) && use_mmap) { 607 608 do { 609 struct stat stat_buf; 610 void *addr; 611 size_t sz = 0; 612 int fd, flags; 613 614 flags = MAP_NOCORE | MAP_NOSYNC; 615 616 fd = open(fsrc, O_RDONLY); 617 if (fd < 0) 618 err(2, NULL); 619 620 if (fstat(fd, &stat_buf) < 0) { 621 close(fd); 622 break; 623 } 624 625 sz = stat_buf.st_size; 626 627 #if defined(MAP_PREFAULT_READ) 628 flags |= MAP_PREFAULT_READ; 629 #endif 630 631 addr = mmap(NULL, sz, PROT_READ, flags, fd, 0); 632 if (addr == MAP_FAILED) { 633 close(fd); 634 break; 635 } 636 637 ret->fd = fd; 638 ret->mmapaddr = addr; 639 ret->mmapsize = sz; 640 ret->mmapptr = ret->mmapaddr; 641 642 } while (0); 643 } 644 645 if (ret->mmapaddr == NULL) { 646 ret->file = openfile(fsrc, "r"); 647 if (ret->file == NULL) 648 err(2, NULL); 649 } 650 651 return (ret); 652 } 653 654 struct bwstring * 655 file_reader_readline(struct file_reader *fr) 656 { 657 struct bwstring *ret = NULL; 658 659 if (fr->mmapaddr) { 660 unsigned char *mmapend; 661 662 mmapend = fr->mmapaddr + fr->mmapsize; 663 if (fr->mmapptr >= mmapend) 664 return (NULL); 665 else { 666 unsigned char *strend; 667 size_t sz; 668 669 sz = mmapend - fr->mmapptr; 670 strend = memchr(fr->mmapptr, fr->elsymb, sz); 671 672 if (strend == NULL) { 673 ret = bwscsbdup(fr->mmapptr, sz); 674 fr->mmapptr = mmapend; 675 } else { 676 ret = bwscsbdup(fr->mmapptr, strend - 677 fr->mmapptr); 678 fr->mmapptr = strend + 1; 679 } 680 } 681 } else { 682 ssize_t len; 683 684 len = getdelim(&fr->buffer, &fr->bsz, fr->elsymb, fr->file); 685 if (len < 0) { 686 if (!feof(fr->file)) 687 err(2, NULL); 688 return (NULL); 689 } 690 if (len > 0 && fr->buffer[len - 1] == fr->elsymb) 691 len--; 692 ret = bwscsbdup(fr->buffer, len); 693 } 694 695 return (ret); 696 } 697 698 static void 699 file_reader_clean(struct file_reader *fr) 700 { 701 702 if (fr == NULL) 703 return; 704 705 if (fr->mmapaddr) 706 munmap(fr->mmapaddr, fr->mmapsize); 707 if (fr->fd) 708 close(fr->fd); 709 710 free(fr->buffer); 711 closefile(fr->file, fr->fname); 712 free(fr->fname); 713 memset(fr, 0, sizeof(struct file_reader)); 714 } 715 716 void 717 file_reader_free(struct file_reader *fr) 718 { 719 720 if (fr == NULL) 721 return; 722 file_reader_clean(fr); 723 free(fr); 724 } 725 726 int 727 procfile(const char *fsrc, struct sort_list *list, struct file_list *fl) 728 { 729 struct file_reader *fr; 730 731 fr = file_reader_init(fsrc); 732 if (fr == NULL) 733 err(2, NULL); 734 735 /* file browse cycle */ 736 for (;;) { 737 struct bwstring *bws; 738 739 bws = file_reader_readline(fr); 740 741 if (bws == NULL) 742 break; 743 744 sort_list_add(list, bws); 745 746 if (list->memsize >= available_free_memory) { 747 char *fn; 748 749 fn = new_tmp_file_name(); 750 sort_list_to_file(list, fn); 751 file_list_add(fl, fn, false); 752 sort_list_clean(list); 753 } 754 } 755 756 file_reader_free(fr); 757 758 return (0); 759 } 760 761 /* 762 * Compare file headers. Files with EOF always go to the end of the list. 763 */ 764 static int 765 file_header_cmp(struct file_header *f1, struct file_header *f2) 766 { 767 768 if (f1 == f2) 769 return (0); 770 else { 771 if (f1->fr == NULL) { 772 return ((f2->fr == NULL) ? 0 : +1); 773 } else if (f2->fr == NULL) 774 return (-1); 775 else { 776 int ret; 777 778 ret = list_coll(&(f1->si), &(f2->si)); 779 if (!ret) 780 return ((f1->file_pos < f2->file_pos) ? -1 : +1); 781 return (ret); 782 } 783 } 784 } 785 786 /* 787 * Allocate and init file header structure 788 */ 789 static void 790 file_header_init(struct file_header **fh, const char *fn, size_t file_pos) 791 { 792 793 if (fh && fn) { 794 struct bwstring *line; 795 796 *fh = sort_malloc(sizeof(struct file_header)); 797 (*fh)->file_pos = file_pos; 798 (*fh)->fr = file_reader_init(fn); 799 if ((*fh)->fr == NULL) { 800 perror(fn); 801 err(2, "%s", getstr(8)); 802 } 803 line = file_reader_readline((*fh)->fr); 804 if (line == NULL) { 805 file_reader_free((*fh)->fr); 806 (*fh)->fr = NULL; 807 (*fh)->si = NULL; 808 } else { 809 (*fh)->si = sort_list_item_alloc(); 810 sort_list_item_set((*fh)->si, line); 811 } 812 } 813 } 814 815 /* 816 * Close file 817 */ 818 static void 819 file_header_close(struct file_header **fh) 820 { 821 822 if (fh && *fh) { 823 file_reader_free((*fh)->fr); 824 (*fh)->fr = NULL; 825 if ((*fh)->si) { 826 sort_list_item_clean((*fh)->si); 827 sort_free((*fh)->si); 828 (*fh)->si = NULL; 829 } 830 sort_free(*fh); 831 *fh = NULL; 832 } 833 } 834 835 /* 836 * Swap two array elements 837 */ 838 static void 839 file_header_swap(struct file_header **fh, size_t i1, size_t i2) 840 { 841 struct file_header *tmp; 842 843 tmp = fh[i1]; 844 fh[i1] = fh[i2]; 845 fh[i2] = tmp; 846 } 847 848 /* heap algorithm ==>> */ 849 850 /* 851 * See heap sort algorithm 852 * "Raises" last element to its right place 853 */ 854 static void 855 file_header_heap_swim(struct file_header **fh, size_t indx) 856 { 857 858 if (indx > 0) { 859 size_t parent_index; 860 861 parent_index = (indx - 1) >> 1; 862 863 if (file_header_cmp(fh[indx], fh[parent_index]) < 0) { 864 /* swap child and parent and continue */ 865 file_header_swap(fh, indx, parent_index); 866 file_header_heap_swim(fh, parent_index); 867 } 868 } 869 } 870 871 /* 872 * Sink the top element to its correct position 873 */ 874 static void 875 file_header_heap_sink(struct file_header **fh, size_t indx, size_t size) 876 { 877 size_t left_child_index; 878 size_t right_child_index; 879 880 left_child_index = indx + indx + 1; 881 right_child_index = left_child_index + 1; 882 883 if (left_child_index < size) { 884 size_t min_child_index; 885 886 min_child_index = left_child_index; 887 888 if ((right_child_index < size) && 889 (file_header_cmp(fh[left_child_index], 890 fh[right_child_index]) > 0)) 891 min_child_index = right_child_index; 892 if (file_header_cmp(fh[indx], fh[min_child_index]) > 0) { 893 file_header_swap(fh, indx, min_child_index); 894 file_header_heap_sink(fh, min_child_index, size); 895 } 896 } 897 } 898 899 /* <<== heap algorithm */ 900 901 /* 902 * Adds element to the "left" end 903 */ 904 static void 905 file_header_list_rearrange_from_header(struct file_header **fh, size_t size) 906 { 907 908 file_header_heap_sink(fh, 0, size); 909 } 910 911 /* 912 * Adds element to the "right" end 913 */ 914 static void 915 file_header_list_push(struct file_header *f, struct file_header **fh, size_t size) 916 { 917 918 fh[size++] = f; 919 file_header_heap_swim(fh, size - 1); 920 } 921 922 struct last_printed 923 { 924 struct bwstring *str; 925 }; 926 927 /* 928 * Prints the current line of the file 929 */ 930 static void 931 file_header_print(struct file_header *fh, FILE *f_out, struct last_printed *lp) 932 { 933 934 if (fh && fh->fr && f_out && fh->si && fh->si->str) { 935 if (sort_opts_vals.uflag) { 936 if ((lp->str == NULL) || (str_list_coll(lp->str, &(fh->si)))) { 937 bwsfwrite(fh->si->str, f_out, sort_opts_vals.zflag); 938 if (lp->str) 939 bwsfree(lp->str); 940 lp->str = bwsdup(fh->si->str); 941 } 942 } else 943 bwsfwrite(fh->si->str, f_out, sort_opts_vals.zflag); 944 } 945 } 946 947 /* 948 * Read next line 949 */ 950 static void 951 file_header_read_next(struct file_header *fh) 952 { 953 954 if (fh && fh->fr) { 955 struct bwstring *tmp; 956 957 tmp = file_reader_readline(fh->fr); 958 if (tmp == NULL) { 959 file_reader_free(fh->fr); 960 fh->fr = NULL; 961 if (fh->si) { 962 sort_list_item_clean(fh->si); 963 sort_free(fh->si); 964 fh->si = NULL; 965 } 966 } else { 967 if (fh->si == NULL) 968 fh->si = sort_list_item_alloc(); 969 sort_list_item_set(fh->si, tmp); 970 } 971 } 972 } 973 974 /* 975 * Merge array of "files headers" 976 */ 977 static void 978 file_headers_merge(size_t fnum, struct file_header **fh, FILE *f_out) 979 { 980 struct last_printed lp; 981 size_t i; 982 983 memset(&lp, 0, sizeof(lp)); 984 985 /* 986 * construct the initial sort structure 987 */ 988 for (i = 0; i < fnum; i++) 989 file_header_list_push(fh[i], fh, i); 990 991 while (fh[0]->fr) { /* unfinished files are always in front */ 992 /* output the smallest line: */ 993 file_header_print(fh[0], f_out, &lp); 994 /* read a new line, if possible: */ 995 file_header_read_next(fh[0]); 996 /* re-arrange the list: */ 997 file_header_list_rearrange_from_header(fh, fnum); 998 } 999 1000 if (lp.str) 1001 bwsfree(lp.str); 1002 } 1003 1004 /* 1005 * Merges the given files into the output file, which can be 1006 * stdout. 1007 */ 1008 static void 1009 merge_files_array(size_t argc, const char **argv, const char *fn_out) 1010 { 1011 1012 if (argv && fn_out) { 1013 struct file_header **fh; 1014 FILE *f_out; 1015 size_t i; 1016 1017 f_out = openfile(fn_out, "w"); 1018 1019 if (f_out == NULL) 1020 err(2, NULL); 1021 1022 fh = sort_malloc((argc + 1) * sizeof(struct file_header *)); 1023 1024 for (i = 0; i < argc; i++) 1025 file_header_init(fh + i, argv[i], (size_t) i); 1026 1027 file_headers_merge(argc, fh, f_out); 1028 1029 for (i = 0; i < argc; i++) 1030 file_header_close(fh + i); 1031 1032 sort_free(fh); 1033 1034 closefile(f_out, fn_out); 1035 } 1036 } 1037 1038 /* 1039 * Shrinks the file list until its size smaller than max number of opened files 1040 */ 1041 static int 1042 shrink_file_list(struct file_list *fl) 1043 { 1044 1045 if ((fl == NULL) || (size_t) (fl->count) < max_open_files) 1046 return (0); 1047 else { 1048 struct file_list new_fl; 1049 size_t indx = 0; 1050 1051 file_list_init(&new_fl, true); 1052 while (indx < fl->count) { 1053 char *fnew; 1054 size_t num; 1055 1056 num = fl->count - indx; 1057 fnew = new_tmp_file_name(); 1058 1059 if ((size_t) num >= max_open_files) 1060 num = max_open_files - 1; 1061 merge_files_array(num, fl->fns + indx, fnew); 1062 if (fl->tmp) { 1063 size_t i; 1064 1065 for (i = 0; i < num; i++) 1066 unlink(fl->fns[indx + i]); 1067 } 1068 file_list_add(&new_fl, fnew, false); 1069 indx += num; 1070 } 1071 fl->tmp = false; /* already taken care of */ 1072 file_list_clean(fl); 1073 1074 fl->count = new_fl.count; 1075 fl->fns = new_fl.fns; 1076 fl->sz = new_fl.sz; 1077 fl->tmp = new_fl.tmp; 1078 1079 return (1); 1080 } 1081 } 1082 1083 /* 1084 * Merge list of files 1085 */ 1086 void 1087 merge_files(struct file_list *fl, const char *fn_out) 1088 { 1089 1090 if (fl && fn_out) { 1091 while (shrink_file_list(fl)); 1092 1093 merge_files_array(fl->count, fl->fns, fn_out); 1094 } 1095 } 1096 1097 static const char * 1098 get_sort_method_name(int sm) 1099 { 1100 1101 if (sm == SORT_MERGESORT) 1102 return "mergesort"; 1103 else if (sort_opts_vals.sort_method == SORT_RADIXSORT) 1104 return "radixsort"; 1105 else if (sort_opts_vals.sort_method == SORT_HEAPSORT) 1106 return "heapsort"; 1107 else 1108 return "quicksort"; 1109 } 1110 1111 /* 1112 * Wrapper for qsort 1113 */ 1114 static int sort_qsort(void *list, size_t count, size_t elem_size, 1115 int (*cmp_func)(const void *, const void *)) 1116 { 1117 1118 qsort(list, count, elem_size, cmp_func); 1119 return (0); 1120 } 1121 1122 /* 1123 * Sort list of lines and writes it to the file 1124 */ 1125 void 1126 sort_list_to_file(struct sort_list *list, const char *outfile) 1127 { 1128 struct sort_mods *sm = &(keys[0].sm); 1129 1130 if (!(sm->Mflag) && !(sm->Rflag) && !(sm->Vflag) && 1131 !(sm->gflag) && !(sm->hflag) && !(sm->nflag)) { 1132 if ((sort_opts_vals.sort_method == SORT_DEFAULT) && byte_sort) 1133 sort_opts_vals.sort_method = SORT_RADIXSORT; 1134 1135 } else if (sort_opts_vals.sort_method == SORT_RADIXSORT) 1136 err(2, "%s", getstr(9)); 1137 1138 /* 1139 * to handle stable sort and the unique cases in the 1140 * right order, we need stable basic algorithm 1141 */ 1142 if (sort_opts_vals.sflag) { 1143 switch (sort_opts_vals.sort_method){ 1144 case SORT_MERGESORT: 1145 break; 1146 case SORT_RADIXSORT: 1147 break; 1148 case SORT_DEFAULT: 1149 sort_opts_vals.sort_method = SORT_MERGESORT; 1150 break; 1151 default: 1152 errx(2, "%s", getstr(10)); 1153 } 1154 } 1155 1156 if (sort_opts_vals.sort_method == SORT_DEFAULT) 1157 sort_opts_vals.sort_method = DEFAULT_SORT_ALGORITHM; 1158 1159 if (debug_sort) 1160 printf("sort_method=%s\n", 1161 get_sort_method_name(sort_opts_vals.sort_method)); 1162 1163 switch (sort_opts_vals.sort_method){ 1164 case SORT_RADIXSORT: 1165 rxsort(list->list, list->count); 1166 sort_list_dump(list, outfile); 1167 break; 1168 case SORT_MERGESORT: 1169 mt_sort(list, mergesort, outfile); 1170 break; 1171 case SORT_HEAPSORT: 1172 mt_sort(list, heapsort, outfile); 1173 break; 1174 case SORT_QSORT: 1175 mt_sort(list, sort_qsort, outfile); 1176 break; 1177 default: 1178 mt_sort(list, DEFAULT_SORT_FUNC, outfile); 1179 break; 1180 } 1181 } 1182 1183 /******************* MT SORT ************************/ 1184 1185 #if defined(SORT_THREADS) 1186 /* semaphore to count threads */ 1187 static sem_t mtsem; 1188 1189 /* current system sort function */ 1190 static int (*g_sort_func)(void *, size_t, size_t, 1191 int(*)(const void *, const void *)); 1192 1193 /* 1194 * Sort cycle thread (in multi-threaded mode) 1195 */ 1196 static void* 1197 mt_sort_thread(void* arg) 1198 { 1199 struct sort_list *list = arg; 1200 1201 g_sort_func(list->list, list->count, sizeof(struct sort_list_item *), 1202 (int(*)(const void *, const void *)) list_coll); 1203 1204 sem_post(&mtsem); 1205 1206 return (arg); 1207 } 1208 1209 /* 1210 * Compare sub-lists. Empty sub-lists always go to the end of the list. 1211 */ 1212 static int 1213 sub_list_cmp(struct sort_list *l1, struct sort_list *l2) 1214 { 1215 1216 if (l1 == l2) 1217 return (0); 1218 else { 1219 if (l1->count == 0) { 1220 return ((l2->count == 0) ? 0 : +1); 1221 } else if (l2->count == 0) { 1222 return (-1); 1223 } else { 1224 int ret; 1225 1226 ret = list_coll(&(l1->list[0]), &(l2->list[0])); 1227 if (!ret) 1228 return ((l1->sub_list_pos < l2->sub_list_pos) ? 1229 -1 : +1); 1230 return (ret); 1231 } 1232 } 1233 } 1234 1235 /* 1236 * Swap two array elements 1237 */ 1238 static void 1239 sub_list_swap(struct sort_list **sl, size_t i1, size_t i2) 1240 { 1241 struct sort_list *tmp; 1242 1243 tmp = sl[i1]; 1244 sl[i1] = sl[i2]; 1245 sl[i2] = tmp; 1246 } 1247 1248 /* heap algorithm ==>> */ 1249 1250 /* 1251 * See heap sort algorithm 1252 * "Raises" last element to its right place 1253 */ 1254 static void 1255 sub_list_swim(struct sort_list **sl, size_t indx) 1256 { 1257 1258 if (indx > 0) { 1259 size_t parent_index; 1260 1261 parent_index = (indx - 1) >> 1; 1262 1263 if (sub_list_cmp(sl[indx], sl[parent_index]) < 0) { 1264 /* swap child and parent and continue */ 1265 sub_list_swap(sl, indx, parent_index); 1266 sub_list_swim(sl, parent_index); 1267 } 1268 } 1269 } 1270 1271 /* 1272 * Sink the top element to its correct position 1273 */ 1274 static void 1275 sub_list_sink(struct sort_list **sl, size_t indx, size_t size) 1276 { 1277 size_t left_child_index; 1278 size_t right_child_index; 1279 1280 left_child_index = indx + indx + 1; 1281 right_child_index = left_child_index + 1; 1282 1283 if (left_child_index < size) { 1284 size_t min_child_index; 1285 1286 min_child_index = left_child_index; 1287 1288 if ((right_child_index < size) && 1289 (sub_list_cmp(sl[left_child_index], 1290 sl[right_child_index]) > 0)) 1291 min_child_index = right_child_index; 1292 if (sub_list_cmp(sl[indx], sl[min_child_index]) > 0) { 1293 sub_list_swap(sl, indx, min_child_index); 1294 sub_list_sink(sl, min_child_index, size); 1295 } 1296 } 1297 } 1298 1299 /* <<== heap algorithm */ 1300 1301 /* 1302 * Adds element to the "right" end 1303 */ 1304 static void 1305 sub_list_push(struct sort_list *s, struct sort_list **sl, size_t size) 1306 { 1307 1308 sl[size++] = s; 1309 sub_list_swim(sl, size - 1); 1310 } 1311 1312 struct last_printed_item 1313 { 1314 struct sort_list_item *item; 1315 }; 1316 1317 /* 1318 * Prints the current line of the file 1319 */ 1320 static void 1321 sub_list_header_print(struct sort_list *sl, FILE *f_out, 1322 struct last_printed_item *lp) 1323 { 1324 1325 if (sl && sl->count && f_out && sl->list[0]->str) { 1326 if (sort_opts_vals.uflag) { 1327 if ((lp->item == NULL) || (list_coll(&(lp->item), 1328 &(sl->list[0])))) { 1329 bwsfwrite(sl->list[0]->str, f_out, 1330 sort_opts_vals.zflag); 1331 lp->item = sl->list[0]; 1332 } 1333 } else 1334 bwsfwrite(sl->list[0]->str, f_out, 1335 sort_opts_vals.zflag); 1336 } 1337 } 1338 1339 /* 1340 * Read next line 1341 */ 1342 static void 1343 sub_list_next(struct sort_list *sl) 1344 { 1345 1346 if (sl && sl->count) { 1347 sl->list += 1; 1348 sl->count -= 1; 1349 } 1350 } 1351 1352 /* 1353 * Merge sub-lists to a file 1354 */ 1355 static void 1356 merge_sub_lists(struct sort_list **sl, size_t n, FILE* f_out) 1357 { 1358 struct last_printed_item lp; 1359 size_t i; 1360 1361 memset(&lp,0,sizeof(lp)); 1362 1363 /* construct the initial list: */ 1364 for (i = 0; i < n; i++) 1365 sub_list_push(sl[i], sl, i); 1366 1367 while (sl[0]->count) { /* unfinished lists are always in front */ 1368 /* output the smallest line: */ 1369 sub_list_header_print(sl[0], f_out, &lp); 1370 /* move to a new line, if possible: */ 1371 sub_list_next(sl[0]); 1372 /* re-arrange the list: */ 1373 sub_list_sink(sl, 0, n); 1374 } 1375 } 1376 1377 /* 1378 * Merge sub-lists to a file 1379 */ 1380 static void 1381 merge_list_parts(struct sort_list **parts, size_t n, const char *fn) 1382 { 1383 FILE* f_out; 1384 1385 f_out = openfile(fn,"w"); 1386 1387 merge_sub_lists(parts, n, f_out); 1388 1389 closefile(f_out, fn); 1390 } 1391 1392 #endif /* defined(SORT_THREADS) */ 1393 /* 1394 * Multi-threaded sort algorithm "driver" 1395 */ 1396 static void 1397 mt_sort(struct sort_list *list, 1398 int(*sort_func)(void *, size_t, size_t, int(*)(const void *, const void *)), 1399 const char* fn) 1400 { 1401 #if defined(SORT_THREADS) 1402 if (nthreads < 2 || list->count < MT_SORT_THRESHOLD) { 1403 size_t nthreads_save = nthreads; 1404 nthreads = 1; 1405 #endif 1406 /* if single thread or small data, do simple sort */ 1407 sort_func(list->list, list->count, 1408 sizeof(struct sort_list_item *), 1409 (int(*)(const void *, const void *)) list_coll); 1410 sort_list_dump(list, fn); 1411 #if defined(SORT_THREADS) 1412 nthreads = nthreads_save; 1413 } else { 1414 /* multi-threaded sort */ 1415 struct sort_list **parts; 1416 size_t avgsize, cstart, i; 1417 1418 /* array of sub-lists */ 1419 parts = sort_malloc(sizeof(struct sort_list*) * nthreads); 1420 cstart = 0; 1421 avgsize = list->count / nthreads; 1422 1423 /* set global system sort function */ 1424 g_sort_func = sort_func; 1425 1426 /* set sublists */ 1427 for (i = 0; i < nthreads; ++i) { 1428 size_t sz = 0; 1429 1430 parts[i] = sort_malloc(sizeof(struct sort_list)); 1431 parts[i]->list = list->list + cstart; 1432 parts[i]->memsize = 0; 1433 parts[i]->sub_list_pos = i; 1434 1435 sz = (i == nthreads - 1) ? list->count - cstart : 1436 avgsize; 1437 1438 parts[i]->count = sz; 1439 1440 parts[i]->size = parts[i]->count; 1441 1442 cstart += sz; 1443 } 1444 1445 /* init threads counting semaphore */ 1446 sem_init(&mtsem, 0, 0); 1447 1448 /* start threads */ 1449 for (i = 0; i < nthreads; ++i) { 1450 pthread_t pth; 1451 pthread_attr_t attr; 1452 1453 pthread_attr_init(&attr); 1454 pthread_attr_setdetachstate(&attr, PTHREAD_DETACHED); 1455 1456 for (;;) { 1457 int res = pthread_create(&pth, &attr, 1458 mt_sort_thread, parts[i]); 1459 1460 if (res >= 0) 1461 break; 1462 if (errno == EAGAIN) { 1463 pthread_yield(); 1464 continue; 1465 } 1466 err(2, NULL); 1467 } 1468 1469 pthread_attr_destroy(&attr); 1470 } 1471 1472 /* wait for threads completion */ 1473 for (i = 0; i < nthreads; ++i) { 1474 sem_wait(&mtsem); 1475 } 1476 /* destroy the semaphore - we do not need it anymore */ 1477 sem_destroy(&mtsem); 1478 1479 /* merge sorted sub-lists to the file */ 1480 merge_list_parts(parts, nthreads, fn); 1481 1482 /* free sub-lists data */ 1483 for (i = 0; i < nthreads; ++i) { 1484 sort_free(parts[i]); 1485 } 1486 sort_free(parts); 1487 } 1488 #endif /* defined(SORT_THREADS) */ 1489 } 1490