1 /*- 2 * SPDX-License-Identifier: BSD-2-Clause-FreeBSD 3 * 4 * Copyright (C) 2009 Gabor Kovesdan <gabor@FreeBSD.org> 5 * Copyright (C) 2012 Oleg Moskalenko <mom040267@gmail.com> 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 27 * SUCH DAMAGE. 28 */ 29 30 #include <sys/cdefs.h> 31 __FBSDID("$FreeBSD$"); 32 33 #include <sys/mman.h> 34 #include <sys/stat.h> 35 #include <sys/types.h> 36 #include <sys/queue.h> 37 38 #include <err.h> 39 #include <fcntl.h> 40 #if defined(SORT_THREADS) 41 #include <pthread.h> 42 #endif 43 #include <semaphore.h> 44 #include <stdio.h> 45 #include <stdlib.h> 46 #include <string.h> 47 #include <unistd.h> 48 #include <wchar.h> 49 #include <wctype.h> 50 51 #include "coll.h" 52 #include "file.h" 53 #include "radixsort.h" 54 55 unsigned long long free_memory = 1000000; 56 unsigned long long available_free_memory = 1000000; 57 58 bool use_mmap; 59 60 const char *tmpdir = "/var/tmp"; 61 const char *compress_program; 62 63 size_t max_open_files = 16; 64 65 /* 66 * How much space we read from file at once 67 */ 68 #define READ_CHUNK (4096) 69 70 /* 71 * File reader structure 72 */ 73 struct file_reader 74 { 75 struct reader_buffer rb; 76 FILE *file; 77 char *fname; 78 unsigned char *buffer; 79 unsigned char *mmapaddr; 80 unsigned char *mmapptr; 81 size_t bsz; 82 size_t cbsz; 83 size_t mmapsize; 84 size_t strbeg; 85 int fd; 86 char elsymb; 87 }; 88 89 /* 90 * Structure to be used in file merge process. 91 */ 92 struct file_header 93 { 94 struct file_reader *fr; 95 struct sort_list_item *si; /* current top line */ 96 size_t file_pos; 97 }; 98 99 /* 100 * List elements of "cleanable" files list. 101 */ 102 struct CLEANABLE_FILE 103 { 104 char *fn; 105 LIST_ENTRY(CLEANABLE_FILE) files; 106 }; 107 108 /* 109 * List header of "cleanable" files list. 110 */ 111 static LIST_HEAD(CLEANABLE_FILES,CLEANABLE_FILE) tmp_files; 112 113 /* 114 * Semaphore to protect the tmp file list. 115 * We use semaphore here because it is signal-safe, according to POSIX. 116 * And semaphore does not require pthread library. 117 */ 118 static sem_t tmp_files_sem; 119 120 static void mt_sort(struct sort_list *list, 121 int (*sort_func)(void *, size_t, size_t, 122 int (*)(const void *, const void *)), const char* fn); 123 124 /* 125 * Init tmp files list 126 */ 127 void 128 init_tmp_files(void) 129 { 130 131 LIST_INIT(&tmp_files); 132 sem_init(&tmp_files_sem, 0, 1); 133 } 134 135 /* 136 * Save name of a tmp file for signal cleanup 137 */ 138 void 139 tmp_file_atexit(const char *tmp_file) 140 { 141 142 if (tmp_file) { 143 sem_wait(&tmp_files_sem); 144 struct CLEANABLE_FILE *item = 145 sort_malloc(sizeof(struct CLEANABLE_FILE)); 146 item->fn = sort_strdup(tmp_file); 147 LIST_INSERT_HEAD(&tmp_files, item, files); 148 sem_post(&tmp_files_sem); 149 } 150 } 151 152 /* 153 * Clear tmp files 154 */ 155 void 156 clear_tmp_files(void) 157 { 158 struct CLEANABLE_FILE *item; 159 160 sem_wait(&tmp_files_sem); 161 LIST_FOREACH(item,&tmp_files,files) { 162 if ((item) && (item->fn)) 163 unlink(item->fn); 164 } 165 sem_post(&tmp_files_sem); 166 } 167 168 /* 169 * Check whether a file is a temporary file 170 */ 171 static bool 172 file_is_tmp(const char* fn) 173 { 174 struct CLEANABLE_FILE *item; 175 bool ret = false; 176 177 if (fn) { 178 sem_wait(&tmp_files_sem); 179 LIST_FOREACH(item,&tmp_files,files) { 180 if ((item) && (item->fn)) 181 if (strcmp(item->fn, fn) == 0) { 182 ret = true; 183 break; 184 } 185 } 186 sem_post(&tmp_files_sem); 187 } 188 189 return (ret); 190 } 191 192 /* 193 * Generate new temporary file name 194 */ 195 char * 196 new_tmp_file_name(void) 197 { 198 static size_t tfcounter = 0; 199 static const char *fn = ".bsdsort."; 200 char *ret; 201 size_t sz; 202 203 sz = strlen(tmpdir) + 1 + strlen(fn) + 32 + 1; 204 ret = sort_malloc(sz); 205 206 sprintf(ret, "%s/%s%d.%lu", tmpdir, fn, (int) getpid(), (unsigned long)(tfcounter++)); 207 tmp_file_atexit(ret); 208 return (ret); 209 } 210 211 /* 212 * Initialize file list 213 */ 214 void 215 file_list_init(struct file_list *fl, bool tmp) 216 { 217 218 if (fl) { 219 fl->count = 0; 220 fl->sz = 0; 221 fl->fns = NULL; 222 fl->tmp = tmp; 223 } 224 } 225 226 /* 227 * Add a file name to the list 228 */ 229 void 230 file_list_add(struct file_list *fl, const char *fn, bool allocate) 231 { 232 233 if (fl && fn) { 234 if (fl->count >= fl->sz || (fl->fns == NULL)) { 235 fl->sz = (fl->sz) * 2 + 1; 236 fl->fns = sort_realloc(fl->fns, fl->sz * 237 sizeof(char *)); 238 } 239 fl->fns[fl->count] = allocate ? sort_strdup(fn) : fn; 240 fl->count += 1; 241 } 242 } 243 244 /* 245 * Populate file list from array of file names 246 */ 247 void 248 file_list_populate(struct file_list *fl, int argc, char **argv, bool allocate) 249 { 250 251 if (fl && argv) { 252 int i; 253 254 for (i = 0; i < argc; i++) 255 file_list_add(fl, argv[i], allocate); 256 } 257 } 258 259 /* 260 * Clean file list data and delete the files, 261 * if this is a list of temporary files 262 */ 263 void 264 file_list_clean(struct file_list *fl) 265 { 266 267 if (fl) { 268 if (fl->fns) { 269 size_t i; 270 271 for (i = 0; i < fl->count; i++) { 272 if (fl->fns[i]) { 273 if (fl->tmp) 274 unlink(fl->fns[i]); 275 sort_free(fl->fns[i]); 276 fl->fns[i] = 0; 277 } 278 } 279 sort_free(fl->fns); 280 fl->fns = NULL; 281 } 282 fl->sz = 0; 283 fl->count = 0; 284 fl->tmp = false; 285 } 286 } 287 288 /* 289 * Init sort list 290 */ 291 void 292 sort_list_init(struct sort_list *l) 293 { 294 295 if (l) { 296 l->count = 0; 297 l->size = 0; 298 l->memsize = sizeof(struct sort_list); 299 l->list = NULL; 300 } 301 } 302 303 /* 304 * Add string to sort list 305 */ 306 void 307 sort_list_add(struct sort_list *l, struct bwstring *str) 308 { 309 310 if (l && str) { 311 size_t indx = l->count; 312 313 if ((l->list == NULL) || (indx >= l->size)) { 314 size_t newsize = (l->size + 1) + 1024; 315 316 l->list = sort_realloc(l->list, 317 sizeof(struct sort_list_item*) * newsize); 318 l->memsize += (newsize - l->size) * 319 sizeof(struct sort_list_item*); 320 l->size = newsize; 321 } 322 l->list[indx] = sort_list_item_alloc(); 323 sort_list_item_set(l->list[indx], str); 324 l->memsize += sort_list_item_size(l->list[indx]); 325 l->count += 1; 326 } 327 } 328 329 /* 330 * Clean sort list data 331 */ 332 void 333 sort_list_clean(struct sort_list *l) 334 { 335 336 if (l) { 337 if (l->list) { 338 size_t i; 339 340 for (i = 0; i < l->count; i++) { 341 struct sort_list_item *item; 342 343 item = l->list[i]; 344 345 if (item) { 346 sort_list_item_clean(item); 347 sort_free(item); 348 l->list[i] = NULL; 349 } 350 } 351 sort_free(l->list); 352 l->list = NULL; 353 } 354 l->count = 0; 355 l->size = 0; 356 l->memsize = sizeof(struct sort_list); 357 } 358 } 359 360 /* 361 * Write sort list to file 362 */ 363 void 364 sort_list_dump(struct sort_list *l, const char *fn) 365 { 366 367 if (l && fn) { 368 FILE *f; 369 370 f = openfile(fn, "w"); 371 if (f == NULL) 372 err(2, NULL); 373 374 if (l->list) { 375 size_t i; 376 if (!(sort_opts_vals.uflag)) { 377 for (i = 0; i < l->count; ++i) 378 bwsfwrite(l->list[i]->str, f, 379 sort_opts_vals.zflag); 380 } else { 381 struct sort_list_item *last_printed_item = NULL; 382 struct sort_list_item *item; 383 for (i = 0; i < l->count; ++i) { 384 item = l->list[i]; 385 if ((last_printed_item == NULL) || 386 list_coll(&last_printed_item, &item)) { 387 bwsfwrite(item->str, f, sort_opts_vals.zflag); 388 last_printed_item = item; 389 } 390 } 391 } 392 } 393 394 closefile(f, fn); 395 } 396 } 397 398 /* 399 * Checks if the given file is sorted. Stops at the first disorder, 400 * prints the disordered line and returns 1. 401 */ 402 int 403 check(const char *fn) 404 { 405 struct bwstring *s1, *s2, *s1disorder, *s2disorder; 406 struct file_reader *fr; 407 struct keys_array *ka1, *ka2; 408 int res; 409 size_t pos, posdisorder; 410 411 s1 = s2 = s1disorder = s2disorder = NULL; 412 ka1 = ka2 = NULL; 413 414 fr = file_reader_init(fn); 415 416 res = 0; 417 pos = 1; 418 posdisorder = 1; 419 420 if (fr == NULL) { 421 err(2, NULL); 422 goto end; 423 } 424 425 s1 = file_reader_readline(fr); 426 if (s1 == NULL) 427 goto end; 428 429 ka1 = keys_array_alloc(); 430 preproc(s1, ka1); 431 432 s2 = file_reader_readline(fr); 433 if (s2 == NULL) 434 goto end; 435 436 ka2 = keys_array_alloc(); 437 preproc(s2, ka2); 438 439 for (;;) { 440 441 if (debug_sort) { 442 bwsprintf(stdout, s2, "s1=<", ">"); 443 bwsprintf(stdout, s1, "s2=<", ">"); 444 } 445 int cmp = key_coll(ka2, ka1, 0); 446 if (debug_sort) 447 printf("; cmp1=%d", cmp); 448 449 if (!cmp && sort_opts_vals.complex_sort && 450 !(sort_opts_vals.uflag) && !(sort_opts_vals.sflag)) { 451 cmp = top_level_str_coll(s2, s1); 452 if (debug_sort) 453 printf("; cmp2=%d", cmp); 454 } 455 if (debug_sort) 456 printf("\n"); 457 458 if ((sort_opts_vals.uflag && (cmp <= 0)) || (cmp < 0)) { 459 if (!(sort_opts_vals.csilentflag)) { 460 s2disorder = bwsdup(s2); 461 posdisorder = pos; 462 if (debug_sort) 463 s1disorder = bwsdup(s1); 464 } 465 res = 1; 466 goto end; 467 } 468 469 pos++; 470 471 clean_keys_array(s1, ka1); 472 sort_free(ka1); 473 ka1 = ka2; 474 ka2 = NULL; 475 476 bwsfree(s1); 477 s1 = s2; 478 479 s2 = file_reader_readline(fr); 480 if (s2 == NULL) 481 goto end; 482 483 ka2 = keys_array_alloc(); 484 preproc(s2, ka2); 485 } 486 487 end: 488 if (ka1) { 489 clean_keys_array(s1, ka1); 490 sort_free(ka1); 491 } 492 493 if (s1) 494 bwsfree(s1); 495 496 if (ka2) { 497 clean_keys_array(s2, ka2); 498 sort_free(ka2); 499 } 500 501 if (s2) 502 bwsfree(s2); 503 504 if ((fn == NULL) || (*fn == 0) || (strcmp(fn, "-") == 0)) { 505 for (;;) { 506 s2 = file_reader_readline(fr); 507 if (s2 == NULL) 508 break; 509 bwsfree(s2); 510 } 511 } 512 513 file_reader_free(fr); 514 515 if (s2disorder) { 516 bws_disorder_warnx(s2disorder, fn, posdisorder); 517 if (s1disorder) { 518 bws_disorder_warnx(s1disorder, fn, posdisorder); 519 if (s1disorder != s2disorder) 520 bwsfree(s1disorder); 521 } 522 bwsfree(s2disorder); 523 s1disorder = NULL; 524 s2disorder = NULL; 525 } 526 527 if (res) 528 exit(res); 529 530 return (0); 531 } 532 533 /* 534 * Opens a file. If the given filename is "-", stdout will be 535 * opened. 536 */ 537 FILE * 538 openfile(const char *fn, const char *mode) 539 { 540 FILE *file; 541 542 if (strcmp(fn, "-") == 0) { 543 return ((mode && mode[0] == 'r') ? stdin : stdout); 544 } else { 545 mode_t orig_file_mask = 0; 546 int is_tmp = file_is_tmp(fn); 547 548 if (is_tmp && (mode[0] == 'w')) 549 orig_file_mask = umask(S_IWGRP | S_IWOTH | 550 S_IRGRP | S_IROTH); 551 552 if (is_tmp && (compress_program != NULL)) { 553 char *cmd; 554 size_t cmdsz; 555 556 cmdsz = strlen(fn) + 128; 557 cmd = sort_malloc(cmdsz); 558 559 fflush(stdout); 560 561 if (mode[0] == 'r') 562 snprintf(cmd, cmdsz - 1, "cat %s | %s -d", 563 fn, compress_program); 564 else if (mode[0] == 'w') 565 snprintf(cmd, cmdsz - 1, "%s > %s", 566 compress_program, fn); 567 else 568 err(2, "%s", getstr(7)); 569 570 if ((file = popen(cmd, mode)) == NULL) 571 err(2, NULL); 572 573 sort_free(cmd); 574 575 } else 576 if ((file = fopen(fn, mode)) == NULL) 577 err(2, NULL); 578 579 if (is_tmp && (mode[0] == 'w')) 580 umask(orig_file_mask); 581 } 582 583 return (file); 584 } 585 586 /* 587 * Close file 588 */ 589 void 590 closefile(FILE *f, const char *fn) 591 { 592 if (f == NULL) { 593 ; 594 } else if (f == stdin) { 595 ; 596 } else if (f == stdout) { 597 fflush(f); 598 } else { 599 if (file_is_tmp(fn) && compress_program != NULL) { 600 if(pclose(f)<0) 601 err(2,NULL); 602 } else 603 fclose(f); 604 } 605 } 606 607 /* 608 * Reads a file into the internal buffer. 609 */ 610 struct file_reader * 611 file_reader_init(const char *fsrc) 612 { 613 struct file_reader *ret; 614 615 if (fsrc == NULL) 616 fsrc = "-"; 617 618 ret = sort_malloc(sizeof(struct file_reader)); 619 memset(ret, 0, sizeof(struct file_reader)); 620 621 ret->elsymb = '\n'; 622 if (sort_opts_vals.zflag) 623 ret->elsymb = 0; 624 625 ret->fname = sort_strdup(fsrc); 626 627 if (strcmp(fsrc, "-") && (compress_program == NULL) && use_mmap) { 628 629 do { 630 struct stat stat_buf; 631 void *addr; 632 size_t sz = 0; 633 int fd, flags; 634 635 flags = MAP_NOCORE | MAP_NOSYNC; 636 637 fd = open(fsrc, O_RDONLY); 638 if (fd < 0) 639 err(2, NULL); 640 641 if (fstat(fd, &stat_buf) < 0) { 642 close(fd); 643 break; 644 } 645 646 sz = stat_buf.st_size; 647 648 #if defined(MAP_PREFAULT_READ) 649 flags |= MAP_PREFAULT_READ; 650 #endif 651 652 addr = mmap(NULL, sz, PROT_READ, flags, fd, 0); 653 if (addr == MAP_FAILED) { 654 close(fd); 655 break; 656 } 657 658 ret->fd = fd; 659 ret->mmapaddr = addr; 660 ret->mmapsize = sz; 661 ret->mmapptr = ret->mmapaddr; 662 663 } while (0); 664 } 665 666 if (ret->mmapaddr == NULL) { 667 ret->file = openfile(fsrc, "r"); 668 if (ret->file == NULL) 669 err(2, NULL); 670 671 if (strcmp(fsrc, "-")) { 672 ret->cbsz = READ_CHUNK; 673 ret->buffer = sort_malloc(ret->cbsz); 674 ret->bsz = 0; 675 ret->strbeg = 0; 676 677 ret->bsz = fread(ret->buffer, 1, ret->cbsz, ret->file); 678 if (ret->bsz == 0) { 679 if (ferror(ret->file)) 680 err(2, NULL); 681 } 682 } 683 } 684 685 return (ret); 686 } 687 688 struct bwstring * 689 file_reader_readline(struct file_reader *fr) 690 { 691 struct bwstring *ret = NULL; 692 693 if (fr->mmapaddr) { 694 unsigned char *mmapend; 695 696 mmapend = fr->mmapaddr + fr->mmapsize; 697 if (fr->mmapptr >= mmapend) 698 return (NULL); 699 else { 700 unsigned char *strend; 701 size_t sz; 702 703 sz = mmapend - fr->mmapptr; 704 strend = memchr(fr->mmapptr, fr->elsymb, sz); 705 706 if (strend == NULL) { 707 ret = bwscsbdup(fr->mmapptr, sz); 708 fr->mmapptr = mmapend; 709 } else { 710 ret = bwscsbdup(fr->mmapptr, strend - 711 fr->mmapptr); 712 fr->mmapptr = strend + 1; 713 } 714 } 715 716 } else if (fr->file != stdin) { 717 unsigned char *strend; 718 size_t bsz1, remsz, search_start; 719 720 search_start = 0; 721 remsz = 0; 722 strend = NULL; 723 724 if (fr->bsz > fr->strbeg) 725 remsz = fr->bsz - fr->strbeg; 726 727 /* line read cycle */ 728 for (;;) { 729 if (remsz > search_start) 730 strend = memchr(fr->buffer + fr->strbeg + 731 search_start, fr->elsymb, remsz - 732 search_start); 733 else 734 strend = NULL; 735 736 if (strend) 737 break; 738 if (feof(fr->file)) 739 break; 740 741 if (fr->bsz != fr->cbsz) 742 /* NOTREACHED */ 743 err(2, "File read software error 1"); 744 745 if (remsz > (READ_CHUNK >> 1)) { 746 search_start = fr->cbsz - fr->strbeg; 747 fr->cbsz += READ_CHUNK; 748 fr->buffer = sort_realloc(fr->buffer, 749 fr->cbsz); 750 bsz1 = fread(fr->buffer + fr->bsz, 1, 751 READ_CHUNK, fr->file); 752 if (bsz1 == 0) { 753 if (ferror(fr->file)) 754 err(2, NULL); 755 break; 756 } 757 fr->bsz += bsz1; 758 remsz += bsz1; 759 } else { 760 if (remsz > 0 && fr->strbeg>0) 761 bcopy(fr->buffer + fr->strbeg, 762 fr->buffer, remsz); 763 764 fr->strbeg = 0; 765 search_start = remsz; 766 bsz1 = fread(fr->buffer + remsz, 1, 767 fr->cbsz - remsz, fr->file); 768 if (bsz1 == 0) { 769 if (ferror(fr->file)) 770 err(2, NULL); 771 break; 772 } 773 fr->bsz = remsz + bsz1; 774 remsz = fr->bsz; 775 } 776 } 777 778 if (strend == NULL) 779 strend = fr->buffer + fr->bsz; 780 781 if ((fr->buffer + fr->strbeg <= strend) && 782 (fr->strbeg < fr->bsz) && (remsz>0)) 783 ret = bwscsbdup(fr->buffer + fr->strbeg, strend - 784 fr->buffer - fr->strbeg); 785 786 fr->strbeg = (strend - fr->buffer) + 1; 787 788 } else { 789 size_t len = 0; 790 791 ret = bwsfgetln(fr->file, &len, sort_opts_vals.zflag, 792 &(fr->rb)); 793 } 794 795 return (ret); 796 } 797 798 static void 799 file_reader_clean(struct file_reader *fr) 800 { 801 802 if (fr) { 803 if (fr->mmapaddr) 804 munmap(fr->mmapaddr, fr->mmapsize); 805 806 if (fr->fd) 807 close(fr->fd); 808 809 if (fr->buffer) 810 sort_free(fr->buffer); 811 812 if (fr->file) 813 if (fr->file != stdin) 814 closefile(fr->file, fr->fname); 815 816 if(fr->fname) 817 sort_free(fr->fname); 818 819 memset(fr, 0, sizeof(struct file_reader)); 820 } 821 } 822 823 void 824 file_reader_free(struct file_reader *fr) 825 { 826 827 if (fr) { 828 file_reader_clean(fr); 829 sort_free(fr); 830 } 831 } 832 833 int 834 procfile(const char *fsrc, struct sort_list *list, struct file_list *fl) 835 { 836 struct file_reader *fr; 837 838 fr = file_reader_init(fsrc); 839 if (fr == NULL) 840 err(2, NULL); 841 842 /* file browse cycle */ 843 for (;;) { 844 struct bwstring *bws; 845 846 bws = file_reader_readline(fr); 847 848 if (bws == NULL) 849 break; 850 851 sort_list_add(list, bws); 852 853 if (list->memsize >= available_free_memory) { 854 char *fn; 855 856 fn = new_tmp_file_name(); 857 sort_list_to_file(list, fn); 858 file_list_add(fl, fn, false); 859 sort_list_clean(list); 860 } 861 } 862 863 file_reader_free(fr); 864 865 return (0); 866 } 867 868 /* 869 * Compare file headers. Files with EOF always go to the end of the list. 870 */ 871 static int 872 file_header_cmp(struct file_header *f1, struct file_header *f2) 873 { 874 875 if (f1 == f2) 876 return (0); 877 else { 878 if (f1->fr == NULL) { 879 return ((f2->fr == NULL) ? 0 : +1); 880 } else if (f2->fr == NULL) 881 return (-1); 882 else { 883 int ret; 884 885 ret = list_coll(&(f1->si), &(f2->si)); 886 if (!ret) 887 return ((f1->file_pos < f2->file_pos) ? -1 : +1); 888 return (ret); 889 } 890 } 891 } 892 893 /* 894 * Allocate and init file header structure 895 */ 896 static void 897 file_header_init(struct file_header **fh, const char *fn, size_t file_pos) 898 { 899 900 if (fh && fn) { 901 struct bwstring *line; 902 903 *fh = sort_malloc(sizeof(struct file_header)); 904 (*fh)->file_pos = file_pos; 905 (*fh)->fr = file_reader_init(fn); 906 if ((*fh)->fr == NULL) { 907 perror(fn); 908 err(2, "%s", getstr(8)); 909 } 910 line = file_reader_readline((*fh)->fr); 911 if (line == NULL) { 912 file_reader_free((*fh)->fr); 913 (*fh)->fr = NULL; 914 (*fh)->si = NULL; 915 } else { 916 (*fh)->si = sort_list_item_alloc(); 917 sort_list_item_set((*fh)->si, line); 918 } 919 } 920 } 921 922 /* 923 * Close file 924 */ 925 static void 926 file_header_close(struct file_header **fh) 927 { 928 929 if (fh && *fh) { 930 if ((*fh)->fr) { 931 file_reader_free((*fh)->fr); 932 (*fh)->fr = NULL; 933 } 934 if ((*fh)->si) { 935 sort_list_item_clean((*fh)->si); 936 sort_free((*fh)->si); 937 (*fh)->si = NULL; 938 } 939 sort_free(*fh); 940 *fh = NULL; 941 } 942 } 943 944 /* 945 * Swap two array elements 946 */ 947 static void 948 file_header_swap(struct file_header **fh, size_t i1, size_t i2) 949 { 950 struct file_header *tmp; 951 952 tmp = fh[i1]; 953 fh[i1] = fh[i2]; 954 fh[i2] = tmp; 955 } 956 957 /* heap algorithm ==>> */ 958 959 /* 960 * See heap sort algorithm 961 * "Raises" last element to its right place 962 */ 963 static void 964 file_header_heap_swim(struct file_header **fh, size_t indx) 965 { 966 967 if (indx > 0) { 968 size_t parent_index; 969 970 parent_index = (indx - 1) >> 1; 971 972 if (file_header_cmp(fh[indx], fh[parent_index]) < 0) { 973 /* swap child and parent and continue */ 974 file_header_swap(fh, indx, parent_index); 975 file_header_heap_swim(fh, parent_index); 976 } 977 } 978 } 979 980 /* 981 * Sink the top element to its correct position 982 */ 983 static void 984 file_header_heap_sink(struct file_header **fh, size_t indx, size_t size) 985 { 986 size_t left_child_index; 987 size_t right_child_index; 988 989 left_child_index = indx + indx + 1; 990 right_child_index = left_child_index + 1; 991 992 if (left_child_index < size) { 993 size_t min_child_index; 994 995 min_child_index = left_child_index; 996 997 if ((right_child_index < size) && 998 (file_header_cmp(fh[left_child_index], 999 fh[right_child_index]) > 0)) 1000 min_child_index = right_child_index; 1001 if (file_header_cmp(fh[indx], fh[min_child_index]) > 0) { 1002 file_header_swap(fh, indx, min_child_index); 1003 file_header_heap_sink(fh, min_child_index, size); 1004 } 1005 } 1006 } 1007 1008 /* <<== heap algorithm */ 1009 1010 /* 1011 * Adds element to the "left" end 1012 */ 1013 static void 1014 file_header_list_rearrange_from_header(struct file_header **fh, size_t size) 1015 { 1016 1017 file_header_heap_sink(fh, 0, size); 1018 } 1019 1020 /* 1021 * Adds element to the "right" end 1022 */ 1023 static void 1024 file_header_list_push(struct file_header *f, struct file_header **fh, size_t size) 1025 { 1026 1027 fh[size++] = f; 1028 file_header_heap_swim(fh, size - 1); 1029 } 1030 1031 struct last_printed 1032 { 1033 struct bwstring *str; 1034 }; 1035 1036 /* 1037 * Prints the current line of the file 1038 */ 1039 static void 1040 file_header_print(struct file_header *fh, FILE *f_out, struct last_printed *lp) 1041 { 1042 1043 if (fh && fh->fr && f_out && fh->si && fh->si->str) { 1044 if (sort_opts_vals.uflag) { 1045 if ((lp->str == NULL) || (str_list_coll(lp->str, &(fh->si)))) { 1046 bwsfwrite(fh->si->str, f_out, sort_opts_vals.zflag); 1047 if (lp->str) 1048 bwsfree(lp->str); 1049 lp->str = bwsdup(fh->si->str); 1050 } 1051 } else 1052 bwsfwrite(fh->si->str, f_out, sort_opts_vals.zflag); 1053 } 1054 } 1055 1056 /* 1057 * Read next line 1058 */ 1059 static void 1060 file_header_read_next(struct file_header *fh) 1061 { 1062 1063 if (fh && fh->fr) { 1064 struct bwstring *tmp; 1065 1066 tmp = file_reader_readline(fh->fr); 1067 if (tmp == NULL) { 1068 file_reader_free(fh->fr); 1069 fh->fr = NULL; 1070 if (fh->si) { 1071 sort_list_item_clean(fh->si); 1072 sort_free(fh->si); 1073 fh->si = NULL; 1074 } 1075 } else { 1076 if (fh->si == NULL) 1077 fh->si = sort_list_item_alloc(); 1078 sort_list_item_set(fh->si, tmp); 1079 } 1080 } 1081 } 1082 1083 /* 1084 * Merge array of "files headers" 1085 */ 1086 static void 1087 file_headers_merge(size_t fnum, struct file_header **fh, FILE *f_out) 1088 { 1089 struct last_printed lp; 1090 size_t i; 1091 1092 memset(&lp, 0, sizeof(lp)); 1093 1094 /* 1095 * construct the initial sort structure 1096 */ 1097 for (i = 0; i < fnum; i++) 1098 file_header_list_push(fh[i], fh, i); 1099 1100 while (fh[0]->fr) { /* unfinished files are always in front */ 1101 /* output the smallest line: */ 1102 file_header_print(fh[0], f_out, &lp); 1103 /* read a new line, if possible: */ 1104 file_header_read_next(fh[0]); 1105 /* re-arrange the list: */ 1106 file_header_list_rearrange_from_header(fh, fnum); 1107 } 1108 1109 if (lp.str) 1110 bwsfree(lp.str); 1111 } 1112 1113 /* 1114 * Merges the given files into the output file, which can be 1115 * stdout. 1116 */ 1117 static void 1118 merge_files_array(size_t argc, const char **argv, const char *fn_out) 1119 { 1120 1121 if (argv && fn_out) { 1122 struct file_header **fh; 1123 FILE *f_out; 1124 size_t i; 1125 1126 f_out = openfile(fn_out, "w"); 1127 1128 if (f_out == NULL) 1129 err(2, NULL); 1130 1131 fh = sort_malloc((argc + 1) * sizeof(struct file_header *)); 1132 1133 for (i = 0; i < argc; i++) 1134 file_header_init(fh + i, argv[i], (size_t) i); 1135 1136 file_headers_merge(argc, fh, f_out); 1137 1138 for (i = 0; i < argc; i++) 1139 file_header_close(fh + i); 1140 1141 sort_free(fh); 1142 1143 closefile(f_out, fn_out); 1144 } 1145 } 1146 1147 /* 1148 * Shrinks the file list until its size smaller than max number of opened files 1149 */ 1150 static int 1151 shrink_file_list(struct file_list *fl) 1152 { 1153 1154 if ((fl == NULL) || (size_t) (fl->count) < max_open_files) 1155 return (0); 1156 else { 1157 struct file_list new_fl; 1158 size_t indx = 0; 1159 1160 file_list_init(&new_fl, true); 1161 while (indx < fl->count) { 1162 char *fnew; 1163 size_t num; 1164 1165 num = fl->count - indx; 1166 fnew = new_tmp_file_name(); 1167 1168 if ((size_t) num >= max_open_files) 1169 num = max_open_files - 1; 1170 merge_files_array(num, fl->fns + indx, fnew); 1171 if (fl->tmp) { 1172 size_t i; 1173 1174 for (i = 0; i < num; i++) 1175 unlink(fl->fns[indx + i]); 1176 } 1177 file_list_add(&new_fl, fnew, false); 1178 indx += num; 1179 } 1180 fl->tmp = false; /* already taken care of */ 1181 file_list_clean(fl); 1182 1183 fl->count = new_fl.count; 1184 fl->fns = new_fl.fns; 1185 fl->sz = new_fl.sz; 1186 fl->tmp = new_fl.tmp; 1187 1188 return (1); 1189 } 1190 } 1191 1192 /* 1193 * Merge list of files 1194 */ 1195 void 1196 merge_files(struct file_list *fl, const char *fn_out) 1197 { 1198 1199 if (fl && fn_out) { 1200 while (shrink_file_list(fl)); 1201 1202 merge_files_array(fl->count, fl->fns, fn_out); 1203 } 1204 } 1205 1206 static const char * 1207 get_sort_method_name(int sm) 1208 { 1209 1210 if (sm == SORT_MERGESORT) 1211 return "mergesort"; 1212 else if (sort_opts_vals.sort_method == SORT_RADIXSORT) 1213 return "radixsort"; 1214 else if (sort_opts_vals.sort_method == SORT_HEAPSORT) 1215 return "heapsort"; 1216 else 1217 return "quicksort"; 1218 } 1219 1220 /* 1221 * Wrapper for qsort 1222 */ 1223 static int sort_qsort(void *list, size_t count, size_t elem_size, 1224 int (*cmp_func)(const void *, const void *)) 1225 { 1226 1227 qsort(list, count, elem_size, cmp_func); 1228 return (0); 1229 } 1230 1231 /* 1232 * Sort list of lines and writes it to the file 1233 */ 1234 void 1235 sort_list_to_file(struct sort_list *list, const char *outfile) 1236 { 1237 struct sort_mods *sm = &(keys[0].sm); 1238 1239 if (!(sm->Mflag) && !(sm->Rflag) && !(sm->Vflag) && 1240 !(sm->gflag) && !(sm->hflag) && !(sm->nflag)) { 1241 if ((sort_opts_vals.sort_method == SORT_DEFAULT) && byte_sort) 1242 sort_opts_vals.sort_method = SORT_RADIXSORT; 1243 1244 } else if (sort_opts_vals.sort_method == SORT_RADIXSORT) 1245 err(2, "%s", getstr(9)); 1246 1247 /* 1248 * to handle stable sort and the unique cases in the 1249 * right order, we need stable basic algorithm 1250 */ 1251 if (sort_opts_vals.sflag) { 1252 switch (sort_opts_vals.sort_method){ 1253 case SORT_MERGESORT: 1254 break; 1255 case SORT_RADIXSORT: 1256 break; 1257 case SORT_DEFAULT: 1258 sort_opts_vals.sort_method = SORT_MERGESORT; 1259 break; 1260 default: 1261 errx(2, "%s", getstr(10)); 1262 } 1263 } 1264 1265 if (sort_opts_vals.sort_method == SORT_DEFAULT) 1266 sort_opts_vals.sort_method = DEFAULT_SORT_ALGORITHM; 1267 1268 if (debug_sort) 1269 printf("sort_method=%s\n", 1270 get_sort_method_name(sort_opts_vals.sort_method)); 1271 1272 switch (sort_opts_vals.sort_method){ 1273 case SORT_RADIXSORT: 1274 rxsort(list->list, list->count); 1275 sort_list_dump(list, outfile); 1276 break; 1277 case SORT_MERGESORT: 1278 mt_sort(list, mergesort, outfile); 1279 break; 1280 case SORT_HEAPSORT: 1281 mt_sort(list, heapsort, outfile); 1282 break; 1283 case SORT_QSORT: 1284 mt_sort(list, sort_qsort, outfile); 1285 break; 1286 default: 1287 mt_sort(list, DEFAULT_SORT_FUNC, outfile); 1288 break; 1289 } 1290 } 1291 1292 /******************* MT SORT ************************/ 1293 1294 #if defined(SORT_THREADS) 1295 /* semaphore to count threads */ 1296 static sem_t mtsem; 1297 1298 /* current system sort function */ 1299 static int (*g_sort_func)(void *, size_t, size_t, 1300 int(*)(const void *, const void *)); 1301 1302 /* 1303 * Sort cycle thread (in multi-threaded mode) 1304 */ 1305 static void* 1306 mt_sort_thread(void* arg) 1307 { 1308 struct sort_list *list = arg; 1309 1310 g_sort_func(list->list, list->count, sizeof(struct sort_list_item *), 1311 (int(*)(const void *, const void *)) list_coll); 1312 1313 sem_post(&mtsem); 1314 1315 return (arg); 1316 } 1317 1318 /* 1319 * Compare sub-lists. Empty sub-lists always go to the end of the list. 1320 */ 1321 static int 1322 sub_list_cmp(struct sort_list *l1, struct sort_list *l2) 1323 { 1324 1325 if (l1 == l2) 1326 return (0); 1327 else { 1328 if (l1->count == 0) { 1329 return ((l2->count == 0) ? 0 : +1); 1330 } else if (l2->count == 0) { 1331 return (-1); 1332 } else { 1333 int ret; 1334 1335 ret = list_coll(&(l1->list[0]), &(l2->list[0])); 1336 if (!ret) 1337 return ((l1->sub_list_pos < l2->sub_list_pos) ? 1338 -1 : +1); 1339 return (ret); 1340 } 1341 } 1342 } 1343 1344 /* 1345 * Swap two array elements 1346 */ 1347 static void 1348 sub_list_swap(struct sort_list **sl, size_t i1, size_t i2) 1349 { 1350 struct sort_list *tmp; 1351 1352 tmp = sl[i1]; 1353 sl[i1] = sl[i2]; 1354 sl[i2] = tmp; 1355 } 1356 1357 /* heap algorithm ==>> */ 1358 1359 /* 1360 * See heap sort algorithm 1361 * "Raises" last element to its right place 1362 */ 1363 static void 1364 sub_list_swim(struct sort_list **sl, size_t indx) 1365 { 1366 1367 if (indx > 0) { 1368 size_t parent_index; 1369 1370 parent_index = (indx - 1) >> 1; 1371 1372 if (sub_list_cmp(sl[indx], sl[parent_index]) < 0) { 1373 /* swap child and parent and continue */ 1374 sub_list_swap(sl, indx, parent_index); 1375 sub_list_swim(sl, parent_index); 1376 } 1377 } 1378 } 1379 1380 /* 1381 * Sink the top element to its correct position 1382 */ 1383 static void 1384 sub_list_sink(struct sort_list **sl, size_t indx, size_t size) 1385 { 1386 size_t left_child_index; 1387 size_t right_child_index; 1388 1389 left_child_index = indx + indx + 1; 1390 right_child_index = left_child_index + 1; 1391 1392 if (left_child_index < size) { 1393 size_t min_child_index; 1394 1395 min_child_index = left_child_index; 1396 1397 if ((right_child_index < size) && 1398 (sub_list_cmp(sl[left_child_index], 1399 sl[right_child_index]) > 0)) 1400 min_child_index = right_child_index; 1401 if (sub_list_cmp(sl[indx], sl[min_child_index]) > 0) { 1402 sub_list_swap(sl, indx, min_child_index); 1403 sub_list_sink(sl, min_child_index, size); 1404 } 1405 } 1406 } 1407 1408 /* <<== heap algorithm */ 1409 1410 /* 1411 * Adds element to the "right" end 1412 */ 1413 static void 1414 sub_list_push(struct sort_list *s, struct sort_list **sl, size_t size) 1415 { 1416 1417 sl[size++] = s; 1418 sub_list_swim(sl, size - 1); 1419 } 1420 1421 struct last_printed_item 1422 { 1423 struct sort_list_item *item; 1424 }; 1425 1426 /* 1427 * Prints the current line of the file 1428 */ 1429 static void 1430 sub_list_header_print(struct sort_list *sl, FILE *f_out, 1431 struct last_printed_item *lp) 1432 { 1433 1434 if (sl && sl->count && f_out && sl->list[0]->str) { 1435 if (sort_opts_vals.uflag) { 1436 if ((lp->item == NULL) || (list_coll(&(lp->item), 1437 &(sl->list[0])))) { 1438 bwsfwrite(sl->list[0]->str, f_out, 1439 sort_opts_vals.zflag); 1440 lp->item = sl->list[0]; 1441 } 1442 } else 1443 bwsfwrite(sl->list[0]->str, f_out, 1444 sort_opts_vals.zflag); 1445 } 1446 } 1447 1448 /* 1449 * Read next line 1450 */ 1451 static void 1452 sub_list_next(struct sort_list *sl) 1453 { 1454 1455 if (sl && sl->count) { 1456 sl->list += 1; 1457 sl->count -= 1; 1458 } 1459 } 1460 1461 /* 1462 * Merge sub-lists to a file 1463 */ 1464 static void 1465 merge_sub_lists(struct sort_list **sl, size_t n, FILE* f_out) 1466 { 1467 struct last_printed_item lp; 1468 size_t i; 1469 1470 memset(&lp,0,sizeof(lp)); 1471 1472 /* construct the initial list: */ 1473 for (i = 0; i < n; i++) 1474 sub_list_push(sl[i], sl, i); 1475 1476 while (sl[0]->count) { /* unfinished lists are always in front */ 1477 /* output the smallest line: */ 1478 sub_list_header_print(sl[0], f_out, &lp); 1479 /* move to a new line, if possible: */ 1480 sub_list_next(sl[0]); 1481 /* re-arrange the list: */ 1482 sub_list_sink(sl, 0, n); 1483 } 1484 } 1485 1486 /* 1487 * Merge sub-lists to a file 1488 */ 1489 static void 1490 merge_list_parts(struct sort_list **parts, size_t n, const char *fn) 1491 { 1492 FILE* f_out; 1493 1494 f_out = openfile(fn,"w"); 1495 1496 merge_sub_lists(parts, n, f_out); 1497 1498 closefile(f_out, fn); 1499 } 1500 1501 #endif /* defined(SORT_THREADS) */ 1502 /* 1503 * Multi-threaded sort algorithm "driver" 1504 */ 1505 static void 1506 mt_sort(struct sort_list *list, 1507 int(*sort_func)(void *, size_t, size_t, int(*)(const void *, const void *)), 1508 const char* fn) 1509 { 1510 #if defined(SORT_THREADS) 1511 if (nthreads < 2 || list->count < MT_SORT_THRESHOLD) { 1512 size_t nthreads_save = nthreads; 1513 nthreads = 1; 1514 #endif 1515 /* if single thread or small data, do simple sort */ 1516 sort_func(list->list, list->count, 1517 sizeof(struct sort_list_item *), 1518 (int(*)(const void *, const void *)) list_coll); 1519 sort_list_dump(list, fn); 1520 #if defined(SORT_THREADS) 1521 nthreads = nthreads_save; 1522 } else { 1523 /* multi-threaded sort */ 1524 struct sort_list **parts; 1525 size_t avgsize, cstart, i; 1526 1527 /* array of sub-lists */ 1528 parts = sort_malloc(sizeof(struct sort_list*) * nthreads); 1529 cstart = 0; 1530 avgsize = list->count / nthreads; 1531 1532 /* set global system sort function */ 1533 g_sort_func = sort_func; 1534 1535 /* set sublists */ 1536 for (i = 0; i < nthreads; ++i) { 1537 size_t sz = 0; 1538 1539 parts[i] = sort_malloc(sizeof(struct sort_list)); 1540 parts[i]->list = list->list + cstart; 1541 parts[i]->memsize = 0; 1542 parts[i]->sub_list_pos = i; 1543 1544 sz = (i == nthreads - 1) ? list->count - cstart : 1545 avgsize; 1546 1547 parts[i]->count = sz; 1548 1549 parts[i]->size = parts[i]->count; 1550 1551 cstart += sz; 1552 } 1553 1554 /* init threads counting semaphore */ 1555 sem_init(&mtsem, 0, 0); 1556 1557 /* start threads */ 1558 for (i = 0; i < nthreads; ++i) { 1559 pthread_t pth; 1560 pthread_attr_t attr; 1561 1562 pthread_attr_init(&attr); 1563 pthread_attr_setdetachstate(&attr, PTHREAD_DETACHED); 1564 1565 for (;;) { 1566 int res = pthread_create(&pth, &attr, 1567 mt_sort_thread, parts[i]); 1568 1569 if (res >= 0) 1570 break; 1571 if (errno == EAGAIN) { 1572 pthread_yield(); 1573 continue; 1574 } 1575 err(2, NULL); 1576 } 1577 1578 pthread_attr_destroy(&attr); 1579 } 1580 1581 /* wait for threads completion */ 1582 for (i = 0; i < nthreads; ++i) { 1583 sem_wait(&mtsem); 1584 } 1585 /* destroy the semaphore - we do not need it anymore */ 1586 sem_destroy(&mtsem); 1587 1588 /* merge sorted sub-lists to the file */ 1589 merge_list_parts(parts, nthreads, fn); 1590 1591 /* free sub-lists data */ 1592 for (i = 0; i < nthreads; ++i) { 1593 sort_free(parts[i]); 1594 } 1595 sort_free(parts); 1596 } 1597 #endif /* defined(SORT_THREADS) */ 1598 } 1599