1 /*- 2 * Copyright (C) 2009 Gabor Kovesdan <gabor@FreeBSD.org> 3 * Copyright (C) 2012 Oleg Moskalenko <mom040267@gmail.com> 4 * All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 1. Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 16 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 18 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 19 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 20 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 21 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 22 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 23 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 24 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 25 * SUCH DAMAGE. 26 */ 27 28 #include <sys/cdefs.h> 29 __FBSDID("$FreeBSD$"); 30 31 #include <sys/mman.h> 32 #include <sys/stat.h> 33 #include <sys/types.h> 34 #include <sys/queue.h> 35 36 #include <err.h> 37 #include <fcntl.h> 38 #if defined(SORT_THREADS) 39 #include <pthread.h> 40 #endif 41 #include <semaphore.h> 42 #include <stdio.h> 43 #include <stdlib.h> 44 #include <string.h> 45 #include <unistd.h> 46 #include <wchar.h> 47 #include <wctype.h> 48 49 #include "coll.h" 50 #include "file.h" 51 #include "radixsort.h" 52 53 unsigned long long free_memory = 1000000; 54 unsigned long long available_free_memory = 1000000; 55 56 bool use_mmap; 57 58 const char *tmpdir = "/var/tmp"; 59 const char *compress_program; 60 61 size_t max_open_files = 16; 62 63 /* 64 * How much space we read from file at once 65 */ 66 #define READ_CHUNK (4096) 67 68 /* 69 * File reader structure 70 */ 71 struct file_reader 72 { 73 struct reader_buffer rb; 74 FILE *file; 75 char *fname; 76 unsigned char *buffer; 77 unsigned char *mmapaddr; 78 unsigned char *mmapptr; 79 size_t bsz; 80 size_t cbsz; 81 size_t mmapsize; 82 size_t strbeg; 83 int fd; 84 char elsymb; 85 }; 86 87 /* 88 * Structure to be used in file merge process. 89 */ 90 struct file_header 91 { 92 struct file_reader *fr; 93 struct sort_list_item *si; /* current top line */ 94 size_t file_pos; 95 }; 96 97 /* 98 * List elements of "cleanable" files list. 99 */ 100 struct CLEANABLE_FILE 101 { 102 char *fn; 103 LIST_ENTRY(CLEANABLE_FILE) files; 104 }; 105 106 /* 107 * List header of "cleanable" files list. 108 */ 109 static LIST_HEAD(CLEANABLE_FILES,CLEANABLE_FILE) tmp_files; 110 111 /* 112 * Semaphore to protect the tmp file list. 113 * We use semaphore here because it is signal-safe, according to POSIX. 114 * And semaphore does not require pthread library. 115 */ 116 static sem_t tmp_files_sem; 117 118 static void mt_sort(struct sort_list *list, 119 int (*sort_func)(void *, size_t, size_t, 120 int (*)(const void *, const void *)), const char* fn); 121 122 /* 123 * Init tmp files list 124 */ 125 void 126 init_tmp_files(void) 127 { 128 129 LIST_INIT(&tmp_files); 130 sem_init(&tmp_files_sem, 0, 1); 131 } 132 133 /* 134 * Save name of a tmp file for signal cleanup 135 */ 136 void 137 tmp_file_atexit(const char *tmp_file) 138 { 139 140 if (tmp_file) { 141 sem_wait(&tmp_files_sem); 142 struct CLEANABLE_FILE *item = 143 sort_malloc(sizeof(struct CLEANABLE_FILE)); 144 item->fn = sort_strdup(tmp_file); 145 LIST_INSERT_HEAD(&tmp_files, item, files); 146 sem_post(&tmp_files_sem); 147 } 148 } 149 150 /* 151 * Clear tmp files 152 */ 153 void 154 clear_tmp_files(void) 155 { 156 struct CLEANABLE_FILE *item; 157 158 sem_wait(&tmp_files_sem); 159 LIST_FOREACH(item,&tmp_files,files) { 160 if ((item) && (item->fn)) 161 unlink(item->fn); 162 } 163 sem_post(&tmp_files_sem); 164 } 165 166 /* 167 * Check whether a file is a temporary file 168 */ 169 static bool 170 file_is_tmp(const char* fn) 171 { 172 struct CLEANABLE_FILE *item; 173 bool ret = false; 174 175 if (fn) { 176 sem_wait(&tmp_files_sem); 177 LIST_FOREACH(item,&tmp_files,files) { 178 if ((item) && (item->fn)) 179 if (strcmp(item->fn, fn) == 0) { 180 ret = true; 181 break; 182 } 183 } 184 sem_post(&tmp_files_sem); 185 } 186 187 return (ret); 188 } 189 190 /* 191 * Generate new temporary file name 192 */ 193 char * 194 new_tmp_file_name(void) 195 { 196 static size_t tfcounter = 0; 197 static const char *fn = ".bsdsort."; 198 char *ret; 199 size_t sz; 200 201 sz = strlen(tmpdir) + 1 + strlen(fn) + 32 + 1; 202 ret = sort_malloc(sz); 203 204 sprintf(ret, "%s/%s%d.%lu", tmpdir, fn, (int) getpid(), (unsigned long)(tfcounter++)); 205 tmp_file_atexit(ret); 206 return (ret); 207 } 208 209 /* 210 * Initialize file list 211 */ 212 void 213 file_list_init(struct file_list *fl, bool tmp) 214 { 215 216 if (fl) { 217 fl->count = 0; 218 fl->sz = 0; 219 fl->fns = NULL; 220 fl->tmp = tmp; 221 } 222 } 223 224 /* 225 * Add a file name to the list 226 */ 227 void 228 file_list_add(struct file_list *fl, char *fn, bool allocate) 229 { 230 231 if (fl && fn) { 232 if (fl->count >= fl->sz || (fl->fns == NULL)) { 233 fl->sz = (fl->sz) * 2 + 1; 234 fl->fns = sort_realloc(fl->fns, fl->sz * 235 sizeof(char *)); 236 } 237 fl->fns[fl->count] = allocate ? sort_strdup(fn) : fn; 238 fl->count += 1; 239 } 240 } 241 242 /* 243 * Populate file list from array of file names 244 */ 245 void 246 file_list_populate(struct file_list *fl, int argc, char **argv, bool allocate) 247 { 248 249 if (fl && argv) { 250 int i; 251 252 for (i = 0; i < argc; i++) 253 file_list_add(fl, argv[i], allocate); 254 } 255 } 256 257 /* 258 * Clean file list data and delete the files, 259 * if this is a list of temporary files 260 */ 261 void 262 file_list_clean(struct file_list *fl) 263 { 264 265 if (fl) { 266 if (fl->fns) { 267 size_t i; 268 269 for (i = 0; i < fl->count; i++) { 270 if (fl->fns[i]) { 271 if (fl->tmp) 272 unlink(fl->fns[i]); 273 sort_free(fl->fns[i]); 274 fl->fns[i] = 0; 275 } 276 } 277 sort_free(fl->fns); 278 fl->fns = NULL; 279 } 280 fl->sz = 0; 281 fl->count = 0; 282 fl->tmp = false; 283 } 284 } 285 286 /* 287 * Init sort list 288 */ 289 void 290 sort_list_init(struct sort_list *l) 291 { 292 293 if (l) { 294 l->count = 0; 295 l->size = 0; 296 l->memsize = sizeof(struct sort_list); 297 l->list = NULL; 298 } 299 } 300 301 /* 302 * Add string to sort list 303 */ 304 void 305 sort_list_add(struct sort_list *l, struct bwstring *str) 306 { 307 308 if (l && str) { 309 size_t indx = l->count; 310 311 if ((l->list == NULL) || (indx >= l->size)) { 312 size_t newsize = (l->size + 1) + 1024; 313 314 l->list = sort_realloc(l->list, 315 sizeof(struct sort_list_item*) * newsize); 316 l->memsize += (newsize - l->size) * 317 sizeof(struct sort_list_item*); 318 l->size = newsize; 319 } 320 l->list[indx] = sort_list_item_alloc(); 321 sort_list_item_set(l->list[indx], str); 322 l->memsize += sort_list_item_size(l->list[indx]); 323 l->count += 1; 324 } 325 } 326 327 /* 328 * Clean sort list data 329 */ 330 void 331 sort_list_clean(struct sort_list *l) 332 { 333 334 if (l) { 335 if (l->list) { 336 size_t i; 337 338 for (i = 0; i < l->count; i++) { 339 struct sort_list_item *item; 340 341 item = l->list[i]; 342 343 if (item) { 344 sort_list_item_clean(item); 345 sort_free(item); 346 l->list[i] = NULL; 347 } 348 } 349 sort_free(l->list); 350 l->list = NULL; 351 } 352 l->count = 0; 353 l->size = 0; 354 l->memsize = sizeof(struct sort_list); 355 } 356 } 357 358 /* 359 * Write sort list to file 360 */ 361 void 362 sort_list_dump(struct sort_list *l, const char *fn) 363 { 364 365 if (l && fn) { 366 FILE *f; 367 368 f = openfile(fn, "w"); 369 if (f == NULL) 370 err(2, NULL); 371 372 if (l->list) { 373 size_t i; 374 if (!(sort_opts_vals.uflag)) { 375 for (i = 0; i < l->count; ++i) 376 bwsfwrite(l->list[i]->str, f, 377 sort_opts_vals.zflag); 378 } else { 379 struct sort_list_item *last_printed_item = NULL; 380 struct sort_list_item *item; 381 for (i = 0; i < l->count; ++i) { 382 item = l->list[i]; 383 if ((last_printed_item == NULL) || 384 list_coll(&last_printed_item, &item)) { 385 bwsfwrite(item->str, f, sort_opts_vals.zflag); 386 last_printed_item = item; 387 } 388 } 389 } 390 } 391 392 closefile(f, fn); 393 } 394 } 395 396 /* 397 * Checks if the given file is sorted. Stops at the first disorder, 398 * prints the disordered line and returns 1. 399 */ 400 int 401 check(const char *fn) 402 { 403 struct bwstring *s1, *s2, *s1disorder, *s2disorder; 404 struct file_reader *fr; 405 struct keys_array *ka1, *ka2; 406 int res; 407 size_t pos, posdisorder; 408 409 s1 = s2 = s1disorder = s2disorder = NULL; 410 ka1 = ka2 = NULL; 411 412 fr = file_reader_init(fn); 413 414 res = 0; 415 pos = 1; 416 posdisorder = 1; 417 418 if (fr == NULL) { 419 err(2, NULL); 420 goto end; 421 } 422 423 s1 = file_reader_readline(fr); 424 if (s1 == NULL) 425 goto end; 426 427 ka1 = keys_array_alloc(); 428 preproc(s1, ka1); 429 430 s2 = file_reader_readline(fr); 431 if (s2 == NULL) 432 goto end; 433 434 ka2 = keys_array_alloc(); 435 preproc(s2, ka2); 436 437 for (;;) { 438 439 if (debug_sort) { 440 bwsprintf(stdout, s2, "s1=<", ">"); 441 bwsprintf(stdout, s1, "s2=<", ">"); 442 } 443 int cmp = key_coll(ka2, ka1, 0); 444 if (debug_sort) 445 printf("; cmp1=%d", cmp); 446 447 if (!cmp && sort_opts_vals.complex_sort && 448 !(sort_opts_vals.uflag) && !(sort_opts_vals.sflag)) { 449 cmp = top_level_str_coll(s2, s1); 450 if (debug_sort) 451 printf("; cmp2=%d", cmp); 452 } 453 if (debug_sort) 454 printf("\n"); 455 456 if ((sort_opts_vals.uflag && (cmp <= 0)) || (cmp < 0)) { 457 if (!(sort_opts_vals.csilentflag)) { 458 s2disorder = bwsdup(s2); 459 posdisorder = pos; 460 if (debug_sort) 461 s1disorder = bwsdup(s1); 462 } 463 res = 1; 464 goto end; 465 } 466 467 pos++; 468 469 clean_keys_array(s1, ka1); 470 sort_free(ka1); 471 ka1 = ka2; 472 ka2 = NULL; 473 474 bwsfree(s1); 475 s1 = s2; 476 477 s2 = file_reader_readline(fr); 478 if (s2 == NULL) 479 goto end; 480 481 ka2 = keys_array_alloc(); 482 preproc(s2, ka2); 483 } 484 485 end: 486 if (ka1) { 487 clean_keys_array(s1, ka1); 488 sort_free(ka1); 489 } 490 491 if (s1) 492 bwsfree(s1); 493 494 if (ka2) { 495 clean_keys_array(s2, ka2); 496 sort_free(ka2); 497 } 498 499 if (s2) 500 bwsfree(s2); 501 502 if ((fn == NULL) || (*fn == 0) || (strcmp(fn, "-") == 0)) { 503 for (;;) { 504 s2 = file_reader_readline(fr); 505 if (s2 == NULL) 506 break; 507 bwsfree(s2); 508 } 509 } 510 511 file_reader_free(fr); 512 513 if (s2disorder) { 514 bws_disorder_warnx(s2disorder, fn, posdisorder); 515 if (s1disorder) { 516 bws_disorder_warnx(s1disorder, fn, posdisorder); 517 if (s1disorder != s2disorder) 518 bwsfree(s1disorder); 519 } 520 bwsfree(s2disorder); 521 s1disorder = NULL; 522 s2disorder = NULL; 523 } 524 525 if (res) 526 exit(res); 527 528 return (0); 529 } 530 531 /* 532 * Opens a file. If the given filename is "-", stdout will be 533 * opened. 534 */ 535 FILE * 536 openfile(const char *fn, const char *mode) 537 { 538 FILE *file; 539 540 if (strcmp(fn, "-") == 0) { 541 return ((mode && mode[0] == 'r') ? stdin : stdout); 542 } else { 543 mode_t orig_file_mask = 0; 544 int is_tmp = file_is_tmp(fn); 545 546 if (is_tmp && (mode[0] == 'w')) 547 orig_file_mask = umask(S_IWGRP | S_IWOTH | 548 S_IRGRP | S_IROTH); 549 550 if (is_tmp && (compress_program != NULL)) { 551 char *cmd; 552 size_t cmdsz; 553 554 cmdsz = strlen(fn) + 128; 555 cmd = sort_malloc(cmdsz); 556 557 fflush(stdout); 558 559 if (mode[0] == 'r') 560 snprintf(cmd, cmdsz - 1, "cat %s | %s -d", 561 fn, compress_program); 562 else if (mode[0] == 'w') 563 snprintf(cmd, cmdsz - 1, "%s > %s", 564 compress_program, fn); 565 else 566 err(2, "%s", getstr(7)); 567 568 if ((file = popen(cmd, mode)) == NULL) 569 err(2, NULL); 570 571 sort_free(cmd); 572 573 } else 574 if ((file = fopen(fn, mode)) == NULL) 575 err(2, NULL); 576 577 if (is_tmp && (mode[0] == 'w')) 578 umask(orig_file_mask); 579 } 580 581 return (file); 582 } 583 584 /* 585 * Close file 586 */ 587 void 588 closefile(FILE *f, const char *fn) 589 { 590 if (f == NULL) { 591 ; 592 } else if (f == stdin) { 593 ; 594 } else if (f == stdout) { 595 fflush(f); 596 } else { 597 if (file_is_tmp(fn) && compress_program != NULL) { 598 if(pclose(f)<0) 599 err(2,NULL); 600 } else 601 fclose(f); 602 } 603 } 604 605 /* 606 * Reads a file into the internal buffer. 607 */ 608 struct file_reader * 609 file_reader_init(const char *fsrc) 610 { 611 struct file_reader *ret; 612 613 if (fsrc == NULL) 614 fsrc = "-"; 615 616 ret = sort_malloc(sizeof(struct file_reader)); 617 memset(ret, 0, sizeof(struct file_reader)); 618 619 ret->elsymb = '\n'; 620 if (sort_opts_vals.zflag) 621 ret->elsymb = 0; 622 623 ret->fname = sort_strdup(fsrc); 624 625 if (strcmp(fsrc, "-") && (compress_program == NULL) && use_mmap) { 626 627 do { 628 struct stat stat_buf; 629 void *addr; 630 size_t sz = 0; 631 int fd, flags; 632 633 flags = MAP_NOCORE | MAP_NOSYNC; 634 635 fd = open(fsrc, O_RDONLY); 636 if (fd < 0) 637 err(2, NULL); 638 639 if (fstat(fd, &stat_buf) < 0) { 640 close(fd); 641 break; 642 } 643 644 sz = stat_buf.st_size; 645 646 #if defined(MAP_PREFAULT_READ) 647 flags |= MAP_PREFAULT_READ; 648 #endif 649 650 addr = mmap(NULL, sz, PROT_READ, flags, fd, 0); 651 if (addr == MAP_FAILED) { 652 close(fd); 653 break; 654 } 655 656 ret->fd = fd; 657 ret->mmapaddr = addr; 658 ret->mmapsize = sz; 659 ret->mmapptr = ret->mmapaddr; 660 661 } while (0); 662 } 663 664 if (ret->mmapaddr == NULL) { 665 ret->file = openfile(fsrc, "r"); 666 if (ret->file == NULL) 667 err(2, NULL); 668 669 if (strcmp(fsrc, "-")) { 670 ret->cbsz = READ_CHUNK; 671 ret->buffer = sort_malloc(ret->cbsz); 672 ret->bsz = 0; 673 ret->strbeg = 0; 674 675 ret->bsz = fread(ret->buffer, 1, ret->cbsz, ret->file); 676 if (ret->bsz == 0) { 677 if (ferror(ret->file)) 678 err(2, NULL); 679 } 680 } 681 } 682 683 return (ret); 684 } 685 686 struct bwstring * 687 file_reader_readline(struct file_reader *fr) 688 { 689 struct bwstring *ret = NULL; 690 691 if (fr->mmapaddr) { 692 unsigned char *mmapend; 693 694 mmapend = fr->mmapaddr + fr->mmapsize; 695 if (fr->mmapptr >= mmapend) 696 return (NULL); 697 else { 698 unsigned char *strend; 699 size_t sz; 700 701 sz = mmapend - fr->mmapptr; 702 strend = memchr(fr->mmapptr, fr->elsymb, sz); 703 704 if (strend == NULL) { 705 ret = bwscsbdup(fr->mmapptr, sz); 706 fr->mmapptr = mmapend; 707 } else { 708 ret = bwscsbdup(fr->mmapptr, strend - 709 fr->mmapptr); 710 fr->mmapptr = strend + 1; 711 } 712 } 713 714 } else if (fr->file != stdin) { 715 unsigned char *strend; 716 size_t bsz1, remsz, search_start; 717 718 search_start = 0; 719 remsz = 0; 720 strend = NULL; 721 722 if (fr->bsz > fr->strbeg) 723 remsz = fr->bsz - fr->strbeg; 724 725 /* line read cycle */ 726 for (;;) { 727 if (remsz > search_start) 728 strend = memchr(fr->buffer + fr->strbeg + 729 search_start, fr->elsymb, remsz - 730 search_start); 731 else 732 strend = NULL; 733 734 if (strend) 735 break; 736 if (feof(fr->file)) 737 break; 738 739 if (fr->bsz != fr->cbsz) 740 /* NOTREACHED */ 741 err(2, "File read software error 1"); 742 743 if (remsz > (READ_CHUNK >> 1)) { 744 search_start = fr->cbsz - fr->strbeg; 745 fr->cbsz += READ_CHUNK; 746 fr->buffer = sort_realloc(fr->buffer, 747 fr->cbsz); 748 bsz1 = fread(fr->buffer + fr->bsz, 1, 749 READ_CHUNK, fr->file); 750 if (bsz1 == 0) { 751 if (ferror(fr->file)) 752 err(2, NULL); 753 break; 754 } 755 fr->bsz += bsz1; 756 remsz += bsz1; 757 } else { 758 if (remsz > 0 && fr->strbeg>0) 759 bcopy(fr->buffer + fr->strbeg, 760 fr->buffer, remsz); 761 762 fr->strbeg = 0; 763 search_start = remsz; 764 bsz1 = fread(fr->buffer + remsz, 1, 765 fr->cbsz - remsz, fr->file); 766 if (bsz1 == 0) { 767 if (ferror(fr->file)) 768 err(2, NULL); 769 break; 770 } 771 fr->bsz = remsz + bsz1; 772 remsz = fr->bsz; 773 } 774 } 775 776 if (strend == NULL) 777 strend = fr->buffer + fr->bsz; 778 779 if ((fr->buffer + fr->strbeg <= strend) && 780 (fr->strbeg < fr->bsz) && (remsz>0)) 781 ret = bwscsbdup(fr->buffer + fr->strbeg, strend - 782 fr->buffer - fr->strbeg); 783 784 fr->strbeg = (strend - fr->buffer) + 1; 785 786 } else { 787 size_t len = 0; 788 789 ret = bwsfgetln(fr->file, &len, sort_opts_vals.zflag, 790 &(fr->rb)); 791 } 792 793 return (ret); 794 } 795 796 static void 797 file_reader_clean(struct file_reader *fr) 798 { 799 800 if (fr) { 801 if (fr->mmapaddr) 802 munmap(fr->mmapaddr, fr->mmapsize); 803 804 if (fr->fd) 805 close(fr->fd); 806 807 if (fr->buffer) 808 sort_free(fr->buffer); 809 810 if (fr->file) 811 if (fr->file != stdin) 812 closefile(fr->file, fr->fname); 813 814 if(fr->fname) 815 sort_free(fr->fname); 816 817 memset(fr, 0, sizeof(struct file_reader)); 818 } 819 } 820 821 void 822 file_reader_free(struct file_reader *fr) 823 { 824 825 if (fr) { 826 file_reader_clean(fr); 827 sort_free(fr); 828 } 829 } 830 831 int 832 procfile(const char *fsrc, struct sort_list *list, struct file_list *fl) 833 { 834 struct file_reader *fr; 835 836 fr = file_reader_init(fsrc); 837 if (fr == NULL) 838 err(2, NULL); 839 840 /* file browse cycle */ 841 for (;;) { 842 struct bwstring *bws; 843 844 bws = file_reader_readline(fr); 845 846 if (bws == NULL) 847 break; 848 849 sort_list_add(list, bws); 850 851 if (list->memsize >= available_free_memory) { 852 char *fn; 853 854 fn = new_tmp_file_name(); 855 sort_list_to_file(list, fn); 856 file_list_add(fl, fn, false); 857 sort_list_clean(list); 858 } 859 } 860 861 file_reader_free(fr); 862 863 return (0); 864 } 865 866 /* 867 * Compare file headers. Files with EOF always go to the end of the list. 868 */ 869 static int 870 file_header_cmp(struct file_header *f1, struct file_header *f2) 871 { 872 873 if (f1 == f2) 874 return (0); 875 else { 876 if (f1->fr == NULL) { 877 return ((f2->fr == NULL) ? 0 : +1); 878 } else if (f2->fr == NULL) 879 return (-1); 880 else { 881 int ret; 882 883 ret = list_coll(&(f1->si), &(f2->si)); 884 if (!ret) 885 return ((f1->file_pos < f2->file_pos) ? -1 : +1); 886 return (ret); 887 } 888 } 889 } 890 891 /* 892 * Allocate and init file header structure 893 */ 894 static void 895 file_header_init(struct file_header **fh, const char *fn, size_t file_pos) 896 { 897 898 if (fh && fn) { 899 struct bwstring *line; 900 901 *fh = sort_malloc(sizeof(struct file_header)); 902 (*fh)->file_pos = file_pos; 903 (*fh)->fr = file_reader_init(fn); 904 if ((*fh)->fr == NULL) { 905 perror(fn); 906 err(2, "%s", getstr(8)); 907 } 908 line = file_reader_readline((*fh)->fr); 909 if (line == NULL) { 910 file_reader_free((*fh)->fr); 911 (*fh)->fr = NULL; 912 (*fh)->si = NULL; 913 } else { 914 (*fh)->si = sort_list_item_alloc(); 915 sort_list_item_set((*fh)->si, line); 916 } 917 } 918 } 919 920 /* 921 * Close file 922 */ 923 static void 924 file_header_close(struct file_header **fh) 925 { 926 927 if (fh && *fh) { 928 if ((*fh)->fr) { 929 file_reader_free((*fh)->fr); 930 (*fh)->fr = NULL; 931 } 932 if ((*fh)->si) { 933 sort_list_item_clean((*fh)->si); 934 sort_free((*fh)->si); 935 (*fh)->si = NULL; 936 } 937 sort_free(*fh); 938 *fh = NULL; 939 } 940 } 941 942 /* 943 * Swap two array elements 944 */ 945 static void 946 file_header_swap(struct file_header **fh, size_t i1, size_t i2) 947 { 948 struct file_header *tmp; 949 950 tmp = fh[i1]; 951 fh[i1] = fh[i2]; 952 fh[i2] = tmp; 953 } 954 955 /* heap algorithm ==>> */ 956 957 /* 958 * See heap sort algorithm 959 * "Raises" last element to its right place 960 */ 961 static void 962 file_header_heap_swim(struct file_header **fh, size_t indx) 963 { 964 965 if (indx > 0) { 966 size_t parent_index; 967 968 parent_index = (indx - 1) >> 1; 969 970 if (file_header_cmp(fh[indx], fh[parent_index]) < 0) { 971 /* swap child and parent and continue */ 972 file_header_swap(fh, indx, parent_index); 973 file_header_heap_swim(fh, parent_index); 974 } 975 } 976 } 977 978 /* 979 * Sink the top element to its correct position 980 */ 981 static void 982 file_header_heap_sink(struct file_header **fh, size_t indx, size_t size) 983 { 984 size_t left_child_index; 985 size_t right_child_index; 986 987 left_child_index = indx + indx + 1; 988 right_child_index = left_child_index + 1; 989 990 if (left_child_index < size) { 991 size_t min_child_index; 992 993 min_child_index = left_child_index; 994 995 if ((right_child_index < size) && 996 (file_header_cmp(fh[left_child_index], 997 fh[right_child_index]) > 0)) 998 min_child_index = right_child_index; 999 if (file_header_cmp(fh[indx], fh[min_child_index]) > 0) { 1000 file_header_swap(fh, indx, min_child_index); 1001 file_header_heap_sink(fh, min_child_index, size); 1002 } 1003 } 1004 } 1005 1006 /* <<== heap algorithm */ 1007 1008 /* 1009 * Adds element to the "left" end 1010 */ 1011 static void 1012 file_header_list_rearrange_from_header(struct file_header **fh, size_t size) 1013 { 1014 1015 file_header_heap_sink(fh, 0, size); 1016 } 1017 1018 /* 1019 * Adds element to the "right" end 1020 */ 1021 static void 1022 file_header_list_push(struct file_header *f, struct file_header **fh, size_t size) 1023 { 1024 1025 fh[size++] = f; 1026 file_header_heap_swim(fh, size - 1); 1027 } 1028 1029 struct last_printed 1030 { 1031 struct bwstring *str; 1032 }; 1033 1034 /* 1035 * Prints the current line of the file 1036 */ 1037 static void 1038 file_header_print(struct file_header *fh, FILE *f_out, struct last_printed *lp) 1039 { 1040 1041 if (fh && fh->fr && f_out && fh->si && fh->si->str) { 1042 if (sort_opts_vals.uflag) { 1043 if ((lp->str == NULL) || (str_list_coll(lp->str, &(fh->si)))) { 1044 bwsfwrite(fh->si->str, f_out, sort_opts_vals.zflag); 1045 if (lp->str) 1046 bwsfree(lp->str); 1047 lp->str = bwsdup(fh->si->str); 1048 } 1049 } else 1050 bwsfwrite(fh->si->str, f_out, sort_opts_vals.zflag); 1051 } 1052 } 1053 1054 /* 1055 * Read next line 1056 */ 1057 static void 1058 file_header_read_next(struct file_header *fh) 1059 { 1060 1061 if (fh && fh->fr) { 1062 struct bwstring *tmp; 1063 1064 tmp = file_reader_readline(fh->fr); 1065 if (tmp == NULL) { 1066 file_reader_free(fh->fr); 1067 fh->fr = NULL; 1068 if (fh->si) { 1069 sort_list_item_clean(fh->si); 1070 sort_free(fh->si); 1071 fh->si = NULL; 1072 } 1073 } else { 1074 if (fh->si == NULL) 1075 fh->si = sort_list_item_alloc(); 1076 sort_list_item_set(fh->si, tmp); 1077 } 1078 } 1079 } 1080 1081 /* 1082 * Merge array of "files headers" 1083 */ 1084 static void 1085 file_headers_merge(size_t fnum, struct file_header **fh, FILE *f_out) 1086 { 1087 struct last_printed lp; 1088 size_t i; 1089 1090 memset(&lp, 0, sizeof(lp)); 1091 1092 /* 1093 * construct the initial sort structure 1094 */ 1095 for (i = 0; i < fnum; i++) 1096 file_header_list_push(fh[i], fh, i); 1097 1098 while (fh[0]->fr) { /* unfinished files are always in front */ 1099 /* output the smallest line: */ 1100 file_header_print(fh[0], f_out, &lp); 1101 /* read a new line, if possible: */ 1102 file_header_read_next(fh[0]); 1103 /* re-arrange the list: */ 1104 file_header_list_rearrange_from_header(fh, fnum); 1105 } 1106 1107 if (lp.str) 1108 bwsfree(lp.str); 1109 } 1110 1111 /* 1112 * Merges the given files into the output file, which can be 1113 * stdout. 1114 */ 1115 static void 1116 merge_files_array(size_t argc, char **argv, const char *fn_out) 1117 { 1118 1119 if (argv && fn_out) { 1120 struct file_header **fh; 1121 FILE *f_out; 1122 size_t i; 1123 1124 f_out = openfile(fn_out, "w"); 1125 1126 if (f_out == NULL) 1127 err(2, NULL); 1128 1129 fh = sort_malloc((argc + 1) * sizeof(struct file_header *)); 1130 1131 for (i = 0; i < argc; i++) 1132 file_header_init(fh + i, argv[i], (size_t) i); 1133 1134 file_headers_merge(argc, fh, f_out); 1135 1136 for (i = 0; i < argc; i++) 1137 file_header_close(fh + i); 1138 1139 sort_free(fh); 1140 1141 closefile(f_out, fn_out); 1142 } 1143 } 1144 1145 /* 1146 * Shrinks the file list until its size smaller than max number of opened files 1147 */ 1148 static int 1149 shrink_file_list(struct file_list *fl) 1150 { 1151 1152 if ((fl == NULL) || (size_t) (fl->count) < max_open_files) 1153 return (0); 1154 else { 1155 struct file_list new_fl; 1156 size_t indx = 0; 1157 1158 file_list_init(&new_fl, true); 1159 while (indx < fl->count) { 1160 char *fnew; 1161 size_t num; 1162 1163 num = fl->count - indx; 1164 fnew = new_tmp_file_name(); 1165 1166 if ((size_t) num >= max_open_files) 1167 num = max_open_files - 1; 1168 merge_files_array(num, fl->fns + indx, fnew); 1169 if (fl->tmp) { 1170 size_t i; 1171 1172 for (i = 0; i < num; i++) 1173 unlink(fl->fns[indx + i]); 1174 } 1175 file_list_add(&new_fl, fnew, false); 1176 indx += num; 1177 } 1178 fl->tmp = false; /* already taken care of */ 1179 file_list_clean(fl); 1180 1181 fl->count = new_fl.count; 1182 fl->fns = new_fl.fns; 1183 fl->sz = new_fl.sz; 1184 fl->tmp = new_fl.tmp; 1185 1186 return (1); 1187 } 1188 } 1189 1190 /* 1191 * Merge list of files 1192 */ 1193 void 1194 merge_files(struct file_list *fl, const char *fn_out) 1195 { 1196 1197 if (fl && fn_out) { 1198 while (shrink_file_list(fl)); 1199 1200 merge_files_array(fl->count, fl->fns, fn_out); 1201 } 1202 } 1203 1204 static const char * 1205 get_sort_method_name(int sm) 1206 { 1207 1208 if (sm == SORT_MERGESORT) 1209 return "mergesort"; 1210 else if (sort_opts_vals.sort_method == SORT_RADIXSORT) 1211 return "radixsort"; 1212 else if (sort_opts_vals.sort_method == SORT_HEAPSORT) 1213 return "heapsort"; 1214 else 1215 return "quicksort"; 1216 } 1217 1218 /* 1219 * Wrapper for qsort 1220 */ 1221 static int sort_qsort(void *list, size_t count, size_t elem_size, 1222 int (*cmp_func)(const void *, const void *)) 1223 { 1224 1225 qsort(list, count, elem_size, cmp_func); 1226 return (0); 1227 } 1228 1229 /* 1230 * Sort list of lines and writes it to the file 1231 */ 1232 void 1233 sort_list_to_file(struct sort_list *list, const char *outfile) 1234 { 1235 struct sort_mods *sm = &(keys[0].sm); 1236 1237 if (!(sm->Mflag) && !(sm->Rflag) && !(sm->Vflag) && !(sm->Vflag) && 1238 !(sm->gflag) && !(sm->hflag) && !(sm->nflag)) { 1239 if ((sort_opts_vals.sort_method == SORT_DEFAULT) && byte_sort) 1240 sort_opts_vals.sort_method = SORT_RADIXSORT; 1241 1242 } else if (sort_opts_vals.sort_method == SORT_RADIXSORT) 1243 err(2, "%s", getstr(9)); 1244 1245 /* 1246 * to handle stable sort and the unique cases in the 1247 * right order, we need stable basic algorithm 1248 */ 1249 if (sort_opts_vals.sflag) { 1250 switch (sort_opts_vals.sort_method){ 1251 case SORT_MERGESORT: 1252 break; 1253 case SORT_RADIXSORT: 1254 break; 1255 case SORT_DEFAULT: 1256 sort_opts_vals.sort_method = SORT_MERGESORT; 1257 break; 1258 default: 1259 errx(2, "%s", getstr(10)); 1260 } 1261 } 1262 1263 if (sort_opts_vals.sort_method == SORT_DEFAULT) 1264 sort_opts_vals.sort_method = DEFAULT_SORT_ALGORITHM; 1265 1266 if (debug_sort) 1267 printf("sort_method=%s\n", 1268 get_sort_method_name(sort_opts_vals.sort_method)); 1269 1270 switch (sort_opts_vals.sort_method){ 1271 case SORT_RADIXSORT: 1272 rxsort(list->list, list->count); 1273 sort_list_dump(list, outfile); 1274 break; 1275 case SORT_MERGESORT: 1276 mt_sort(list, mergesort, outfile); 1277 break; 1278 case SORT_HEAPSORT: 1279 mt_sort(list, heapsort, outfile); 1280 break; 1281 case SORT_QSORT: 1282 mt_sort(list, sort_qsort, outfile); 1283 break; 1284 default: 1285 mt_sort(list, DEFAULT_SORT_FUNC, outfile); 1286 break; 1287 } 1288 } 1289 1290 /******************* MT SORT ************************/ 1291 1292 #if defined(SORT_THREADS) 1293 /* semaphore to count threads */ 1294 static sem_t mtsem; 1295 1296 /* current system sort function */ 1297 static int (*g_sort_func)(void *, size_t, size_t, 1298 int(*)(const void *, const void *)); 1299 1300 /* 1301 * Sort cycle thread (in multi-threaded mode) 1302 */ 1303 static void* 1304 mt_sort_thread(void* arg) 1305 { 1306 struct sort_list *list = arg; 1307 1308 g_sort_func(list->list, list->count, sizeof(struct sort_list_item *), 1309 (int(*)(const void *, const void *)) list_coll); 1310 1311 sem_post(&mtsem); 1312 1313 return (arg); 1314 } 1315 1316 /* 1317 * Compare sub-lists. Empty sub-lists always go to the end of the list. 1318 */ 1319 static int 1320 sub_list_cmp(struct sort_list *l1, struct sort_list *l2) 1321 { 1322 1323 if (l1 == l2) 1324 return (0); 1325 else { 1326 if (l1->count == 0) { 1327 return ((l2->count == 0) ? 0 : +1); 1328 } else if (l2->count == 0) { 1329 return (-1); 1330 } else { 1331 int ret; 1332 1333 ret = list_coll(&(l1->list[0]), &(l2->list[0])); 1334 if (!ret) 1335 return ((l1->sub_list_pos < l2->sub_list_pos) ? 1336 -1 : +1); 1337 return (ret); 1338 } 1339 } 1340 } 1341 1342 /* 1343 * Swap two array elements 1344 */ 1345 static void 1346 sub_list_swap(struct sort_list **sl, size_t i1, size_t i2) 1347 { 1348 struct sort_list *tmp; 1349 1350 tmp = sl[i1]; 1351 sl[i1] = sl[i2]; 1352 sl[i2] = tmp; 1353 } 1354 1355 /* heap algorithm ==>> */ 1356 1357 /* 1358 * See heap sort algorithm 1359 * "Raises" last element to its right place 1360 */ 1361 static void 1362 sub_list_swim(struct sort_list **sl, size_t indx) 1363 { 1364 1365 if (indx > 0) { 1366 size_t parent_index; 1367 1368 parent_index = (indx - 1) >> 1; 1369 1370 if (sub_list_cmp(sl[indx], sl[parent_index]) < 0) { 1371 /* swap child and parent and continue */ 1372 sub_list_swap(sl, indx, parent_index); 1373 sub_list_swim(sl, parent_index); 1374 } 1375 } 1376 } 1377 1378 /* 1379 * Sink the top element to its correct position 1380 */ 1381 static void 1382 sub_list_sink(struct sort_list **sl, size_t indx, size_t size) 1383 { 1384 size_t left_child_index; 1385 size_t right_child_index; 1386 1387 left_child_index = indx + indx + 1; 1388 right_child_index = left_child_index + 1; 1389 1390 if (left_child_index < size) { 1391 size_t min_child_index; 1392 1393 min_child_index = left_child_index; 1394 1395 if ((right_child_index < size) && 1396 (sub_list_cmp(sl[left_child_index], 1397 sl[right_child_index]) > 0)) 1398 min_child_index = right_child_index; 1399 if (sub_list_cmp(sl[indx], sl[min_child_index]) > 0) { 1400 sub_list_swap(sl, indx, min_child_index); 1401 sub_list_sink(sl, min_child_index, size); 1402 } 1403 } 1404 } 1405 1406 /* <<== heap algorithm */ 1407 1408 /* 1409 * Adds element to the "right" end 1410 */ 1411 static void 1412 sub_list_push(struct sort_list *s, struct sort_list **sl, size_t size) 1413 { 1414 1415 sl[size++] = s; 1416 sub_list_swim(sl, size - 1); 1417 } 1418 1419 struct last_printed_item 1420 { 1421 struct sort_list_item *item; 1422 }; 1423 1424 /* 1425 * Prints the current line of the file 1426 */ 1427 static void 1428 sub_list_header_print(struct sort_list *sl, FILE *f_out, 1429 struct last_printed_item *lp) 1430 { 1431 1432 if (sl && sl->count && f_out && sl->list[0]->str) { 1433 if (sort_opts_vals.uflag) { 1434 if ((lp->item == NULL) || (list_coll(&(lp->item), 1435 &(sl->list[0])))) { 1436 bwsfwrite(sl->list[0]->str, f_out, 1437 sort_opts_vals.zflag); 1438 lp->item = sl->list[0]; 1439 } 1440 } else 1441 bwsfwrite(sl->list[0]->str, f_out, 1442 sort_opts_vals.zflag); 1443 } 1444 } 1445 1446 /* 1447 * Read next line 1448 */ 1449 static void 1450 sub_list_next(struct sort_list *sl) 1451 { 1452 1453 if (sl && sl->count) { 1454 sl->list += 1; 1455 sl->count -= 1; 1456 } 1457 } 1458 1459 /* 1460 * Merge sub-lists to a file 1461 */ 1462 static void 1463 merge_sub_lists(struct sort_list **sl, size_t n, FILE* f_out) 1464 { 1465 struct last_printed_item lp; 1466 size_t i; 1467 1468 memset(&lp,0,sizeof(lp)); 1469 1470 /* construct the initial list: */ 1471 for (i = 0; i < n; i++) 1472 sub_list_push(sl[i], sl, i); 1473 1474 while (sl[0]->count) { /* unfinished lists are always in front */ 1475 /* output the smallest line: */ 1476 sub_list_header_print(sl[0], f_out, &lp); 1477 /* move to a new line, if possible: */ 1478 sub_list_next(sl[0]); 1479 /* re-arrange the list: */ 1480 sub_list_sink(sl, 0, n); 1481 } 1482 } 1483 1484 /* 1485 * Merge sub-lists to a file 1486 */ 1487 static void 1488 merge_list_parts(struct sort_list **parts, size_t n, const char *fn) 1489 { 1490 FILE* f_out; 1491 1492 f_out = openfile(fn,"w"); 1493 1494 merge_sub_lists(parts, n, f_out); 1495 1496 closefile(f_out, fn); 1497 } 1498 1499 #endif /* defined(SORT_THREADS) */ 1500 /* 1501 * Multi-threaded sort algorithm "driver" 1502 */ 1503 static void 1504 mt_sort(struct sort_list *list, 1505 int(*sort_func)(void *, size_t, size_t, int(*)(const void *, const void *)), 1506 const char* fn) 1507 { 1508 #if defined(SORT_THREADS) 1509 if (nthreads < 2 || list->count < MT_SORT_THRESHOLD) { 1510 size_t nthreads_save = nthreads; 1511 nthreads = 1; 1512 #endif 1513 /* if single thread or small data, do simple sort */ 1514 sort_func(list->list, list->count, 1515 sizeof(struct sort_list_item *), 1516 (int(*)(const void *, const void *)) list_coll); 1517 sort_list_dump(list, fn); 1518 #if defined(SORT_THREADS) 1519 nthreads = nthreads_save; 1520 } else { 1521 /* multi-threaded sort */ 1522 struct sort_list **parts; 1523 size_t avgsize, cstart, i; 1524 1525 /* array of sub-lists */ 1526 parts = sort_malloc(sizeof(struct sort_list*) * nthreads); 1527 cstart = 0; 1528 avgsize = list->count / nthreads; 1529 1530 /* set global system sort function */ 1531 g_sort_func = sort_func; 1532 1533 /* set sublists */ 1534 for (i = 0; i < nthreads; ++i) { 1535 size_t sz = 0; 1536 1537 parts[i] = sort_malloc(sizeof(struct sort_list)); 1538 parts[i]->list = list->list + cstart; 1539 parts[i]->memsize = 0; 1540 parts[i]->sub_list_pos = i; 1541 1542 sz = (i == nthreads - 1) ? list->count - cstart : 1543 avgsize; 1544 1545 parts[i]->count = sz; 1546 1547 parts[i]->size = parts[i]->count; 1548 1549 cstart += sz; 1550 } 1551 1552 /* init threads counting semaphore */ 1553 sem_init(&mtsem, 0, 0); 1554 1555 /* start threads */ 1556 for (i = 0; i < nthreads; ++i) { 1557 pthread_t pth; 1558 pthread_attr_t attr; 1559 1560 pthread_attr_init(&attr); 1561 pthread_attr_setdetachstate(&attr, PTHREAD_DETACHED); 1562 1563 for (;;) { 1564 int res = pthread_create(&pth, &attr, 1565 mt_sort_thread, parts[i]); 1566 1567 if (res >= 0) 1568 break; 1569 if (errno == EAGAIN) { 1570 pthread_yield(); 1571 continue; 1572 } 1573 err(2, NULL); 1574 } 1575 1576 pthread_attr_destroy(&attr); 1577 } 1578 1579 /* wait for threads completion */ 1580 for (i = 0; i < nthreads; ++i) { 1581 sem_wait(&mtsem); 1582 } 1583 /* destroy the semaphore - we do not need it anymore */ 1584 sem_destroy(&mtsem); 1585 1586 /* merge sorted sub-lists to the file */ 1587 merge_list_parts(parts, nthreads, fn); 1588 1589 /* free sub-lists data */ 1590 for (i = 0; i < nthreads; ++i) { 1591 sort_free(parts[i]); 1592 } 1593 sort_free(parts); 1594 } 1595 #endif /* defined(SORT_THREADS) */ 1596 } 1597