1 /*- 2 * Copyright (C) 2009 Gabor Kovesdan <gabor@FreeBSD.org> 3 * Copyright (C) 2012 Oleg Moskalenko <mom040267@gmail.com> 4 * All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 1. Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 16 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 18 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 19 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 20 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 21 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 22 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 23 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 24 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 25 * SUCH DAMAGE. 26 */ 27 28 #include <sys/cdefs.h> 29 __FBSDID("$FreeBSD$"); 30 31 #include <sys/mman.h> 32 #include <sys/stat.h> 33 #include <sys/types.h> 34 #include <sys/queue.h> 35 36 #include <err.h> 37 #include <fcntl.h> 38 #if defined(SORT_THREADS) 39 #include <pthread.h> 40 #endif 41 #include <semaphore.h> 42 #include <stdio.h> 43 #include <stdlib.h> 44 #include <string.h> 45 #include <unistd.h> 46 #include <wchar.h> 47 #include <wctype.h> 48 49 #include "coll.h" 50 #include "file.h" 51 #include "radixsort.h" 52 53 unsigned long long free_memory = 1000000; 54 unsigned long long available_free_memory = 1000000; 55 56 bool use_mmap; 57 58 const char *tmpdir = "/var/tmp"; 59 const char *compress_program; 60 61 size_t max_open_files = 16; 62 63 /* 64 * How much space we read from file at once 65 */ 66 #define READ_CHUNK (4096) 67 68 /* 69 * File reader structure 70 */ 71 struct file_reader 72 { 73 struct reader_buffer rb; 74 FILE *file; 75 char *fname; 76 unsigned char *buffer; 77 unsigned char *mmapaddr; 78 unsigned char *mmapptr; 79 size_t bsz; 80 size_t cbsz; 81 size_t mmapsize; 82 size_t strbeg; 83 int fd; 84 char elsymb; 85 }; 86 87 /* 88 * Structure to be used in file merge process. 89 */ 90 struct file_header 91 { 92 struct file_reader *fr; 93 struct sort_list_item *si; /* current top line */ 94 size_t file_pos; 95 }; 96 97 /* 98 * List elements of "cleanable" files list. 99 */ 100 struct CLEANABLE_FILE 101 { 102 char *fn; 103 LIST_ENTRY(CLEANABLE_FILE) files; 104 }; 105 106 /* 107 * List header of "cleanable" files list. 108 */ 109 static LIST_HEAD(CLEANABLE_FILES,CLEANABLE_FILE) tmp_files; 110 111 /* 112 * Semaphore to protect the tmp file list. 113 * We use semaphore here because it is signal-safe, according to POSIX. 114 * And semaphore does not require pthread library. 115 */ 116 static sem_t tmp_files_sem; 117 118 static void mt_sort(struct sort_list *list, 119 int (*sort_func)(void *, size_t, size_t, 120 int (*)(const void *, const void *)), const char* fn); 121 122 /* 123 * Init tmp files list 124 */ 125 void 126 init_tmp_files(void) 127 { 128 129 LIST_INIT(&tmp_files); 130 sem_init(&tmp_files_sem, 0, 1); 131 } 132 133 /* 134 * Save name of a tmp file for signal cleanup 135 */ 136 void 137 tmp_file_atexit(const char *tmp_file) 138 { 139 140 if (tmp_file) { 141 sem_wait(&tmp_files_sem); 142 struct CLEANABLE_FILE *item = 143 sort_malloc(sizeof(struct CLEANABLE_FILE)); 144 item->fn = sort_strdup(tmp_file); 145 LIST_INSERT_HEAD(&tmp_files, item, files); 146 sem_post(&tmp_files_sem); 147 } 148 } 149 150 /* 151 * Clear tmp files 152 */ 153 void 154 clear_tmp_files(void) 155 { 156 struct CLEANABLE_FILE *item; 157 158 sem_wait(&tmp_files_sem); 159 LIST_FOREACH(item,&tmp_files,files) { 160 if ((item) && (item->fn)) 161 unlink(item->fn); 162 } 163 sem_post(&tmp_files_sem); 164 } 165 166 /* 167 * Check whether a file is a temporary file 168 */ 169 static bool 170 file_is_tmp(const char* fn) 171 { 172 struct CLEANABLE_FILE *item; 173 bool ret = false; 174 175 if (fn) { 176 sem_wait(&tmp_files_sem); 177 LIST_FOREACH(item,&tmp_files,files) { 178 if ((item) && (item->fn)) 179 if (strcmp(item->fn, fn) == 0) { 180 ret = true; 181 break; 182 } 183 } 184 sem_post(&tmp_files_sem); 185 } 186 187 return (ret); 188 } 189 190 /* 191 * Generate new temporary file name 192 */ 193 char * 194 new_tmp_file_name(void) 195 { 196 static size_t tfcounter = 0; 197 static const char *fn = ".bsdsort."; 198 char *ret; 199 size_t sz; 200 201 sz = strlen(tmpdir) + 1 + strlen(fn) + 32 + 1; 202 ret = sort_malloc(sz); 203 204 sprintf(ret, "%s/%s%d.%lu", tmpdir, fn, (int) getpid(), (unsigned long)(tfcounter++)); 205 tmp_file_atexit(ret); 206 return (ret); 207 } 208 209 /* 210 * Initialize file list 211 */ 212 void 213 file_list_init(struct file_list *fl, bool tmp) 214 { 215 216 if (fl) { 217 fl->count = 0; 218 fl->sz = 0; 219 fl->fns = NULL; 220 fl->tmp = tmp; 221 } 222 } 223 224 /* 225 * Add a file name to the list 226 */ 227 void 228 file_list_add(struct file_list *fl, char *fn, bool allocate) 229 { 230 231 if (fl && fn) { 232 if (fl->count >= fl->sz || (fl->fns == NULL)) { 233 fl->sz = (fl->sz) * 2 + 1; 234 fl->fns = sort_realloc(fl->fns, fl->sz * 235 sizeof(char *)); 236 } 237 fl->fns[fl->count] = allocate ? sort_strdup(fn) : fn; 238 fl->count += 1; 239 } 240 } 241 242 /* 243 * Populate file list from array of file names 244 */ 245 void 246 file_list_populate(struct file_list *fl, int argc, char **argv, bool allocate) 247 { 248 249 if (fl && argv) { 250 int i; 251 252 for (i = 0; i < argc; i++) 253 file_list_add(fl, argv[i], allocate); 254 } 255 } 256 257 /* 258 * Clean file list data and delete the files, 259 * if this is a list of temporary files 260 */ 261 void 262 file_list_clean(struct file_list *fl) 263 { 264 265 if (fl) { 266 if (fl->fns) { 267 size_t i; 268 269 for (i = 0; i < fl->count; i++) { 270 if (fl->fns[i]) { 271 if (fl->tmp) 272 unlink(fl->fns[i]); 273 sort_free(fl->fns[i]); 274 fl->fns[i] = 0; 275 } 276 } 277 sort_free(fl->fns); 278 fl->fns = NULL; 279 } 280 fl->sz = 0; 281 fl->count = 0; 282 fl->tmp = false; 283 } 284 } 285 286 /* 287 * Init sort list 288 */ 289 void 290 sort_list_init(struct sort_list *l) 291 { 292 293 if (l) { 294 l->count = 0; 295 l->size = 0; 296 l->memsize = sizeof(struct sort_list); 297 l->list = NULL; 298 } 299 } 300 301 /* 302 * Add string to sort list 303 */ 304 void 305 sort_list_add(struct sort_list *l, struct bwstring *str) 306 { 307 308 if (l && str) { 309 size_t indx = l->count; 310 311 if ((l->list == NULL) || (indx >= l->size)) { 312 size_t newsize = (l->size + 1) + 1024; 313 314 l->list = sort_realloc(l->list, 315 sizeof(struct sort_list_item*) * newsize); 316 l->memsize += (newsize - l->size) * 317 sizeof(struct sort_list_item*); 318 l->size = newsize; 319 } 320 l->list[indx] = sort_list_item_alloc(); 321 sort_list_item_set(l->list[indx], str); 322 l->memsize += sort_list_item_size(l->list[indx]); 323 l->count += 1; 324 } 325 } 326 327 /* 328 * Clean sort list data 329 */ 330 void 331 sort_list_clean(struct sort_list *l) 332 { 333 334 if (l) { 335 if (l->list) { 336 size_t i; 337 338 for (i = 0; i < l->count; i++) { 339 struct sort_list_item *item; 340 341 item = l->list[i]; 342 343 if (item) { 344 sort_list_item_clean(item); 345 sort_free(item); 346 l->list[i] = NULL; 347 } 348 } 349 sort_free(l->list); 350 l->list = NULL; 351 } 352 l->count = 0; 353 l->size = 0; 354 l->memsize = sizeof(struct sort_list); 355 } 356 } 357 358 /* 359 * Write sort list to file 360 */ 361 void 362 sort_list_dump(struct sort_list *l, const char *fn) 363 { 364 365 if (l && fn) { 366 FILE *f; 367 368 f = openfile(fn, "w"); 369 if (f == NULL) 370 err(2, NULL); 371 372 if (l->list) { 373 size_t i; 374 if (!(sort_opts_vals.uflag)) { 375 for (i = 0; i < l->count; ++i) 376 bwsfwrite(l->list[i]->str, f, 377 sort_opts_vals.zflag); 378 } else { 379 struct sort_list_item *last_printed_item = NULL; 380 struct sort_list_item *item; 381 for (i = 0; i < l->count; ++i) { 382 item = l->list[i]; 383 if ((last_printed_item == NULL) || 384 list_coll(&last_printed_item, &item)) { 385 bwsfwrite(item->str, f, sort_opts_vals.zflag); 386 last_printed_item = item; 387 } 388 } 389 } 390 } 391 392 closefile(f, fn); 393 } 394 } 395 396 /* 397 * Checks if the given file is sorted. Stops at the first disorder, 398 * prints the disordered line and returns 1. 399 */ 400 int 401 check(const char *fn) 402 { 403 struct bwstring *s1, *s2, *s1disorder, *s2disorder; 404 struct file_reader *fr; 405 struct keys_array *ka1, *ka2; 406 int res; 407 size_t pos, posdisorder; 408 409 s1 = s2 = s1disorder = s2disorder = NULL; 410 ka1 = ka2 = NULL; 411 412 fr = file_reader_init(fn); 413 414 res = 0; 415 pos = 1; 416 posdisorder = 1; 417 418 if (fr == NULL) { 419 err(2, NULL); 420 goto end; 421 } 422 423 s1 = file_reader_readline(fr); 424 if (s1 == NULL) 425 goto end; 426 427 ka1 = keys_array_alloc(); 428 preproc(s1, ka1); 429 430 s2 = file_reader_readline(fr); 431 if (s2 == NULL) 432 goto end; 433 434 ka2 = keys_array_alloc(); 435 preproc(s2, ka2); 436 437 for (;;) { 438 439 if (debug_sort) { 440 bwsprintf(stdout, s2, "s1=<", ">"); 441 bwsprintf(stdout, s1, "s2=<", ">"); 442 } 443 int cmp = key_coll(ka2, ka1, 0); 444 if (debug_sort) 445 printf("; cmp1=%d", cmp); 446 447 if (!cmp && sort_opts_vals.complex_sort && 448 !(sort_opts_vals.uflag) && !(sort_opts_vals.sflag)) { 449 cmp = top_level_str_coll(s2, s1); 450 if (debug_sort) 451 printf("; cmp2=%d", cmp); 452 } 453 if (debug_sort) 454 printf("\n"); 455 456 if ((sort_opts_vals.uflag && (cmp <= 0)) || (cmp < 0)) { 457 if (!(sort_opts_vals.csilentflag)) { 458 s2disorder = bwsdup(s2); 459 posdisorder = pos; 460 if (debug_sort) 461 s1disorder = bwsdup(s1); 462 } 463 res = 1; 464 goto end; 465 } 466 467 pos++; 468 469 clean_keys_array(s1, ka1); 470 sort_free(ka1); 471 ka1 = ka2; 472 ka2 = NULL; 473 474 bwsfree(s1); 475 s1 = s2; 476 477 s2 = file_reader_readline(fr); 478 if (s2 == NULL) 479 goto end; 480 481 ka2 = keys_array_alloc(); 482 preproc(s2, ka2); 483 } 484 485 end: 486 if (ka1) { 487 clean_keys_array(s1, ka1); 488 sort_free(ka1); 489 } 490 491 if (s1) 492 bwsfree(s1); 493 494 if (ka2) { 495 clean_keys_array(s2, ka2); 496 sort_free(ka2); 497 } 498 499 if (s2) 500 bwsfree(s2); 501 502 if ((fn == NULL) || (*fn == 0) || (strcmp(fn, "-") == 0)) { 503 for (;;) { 504 s2 = file_reader_readline(fr); 505 if (s2 == NULL) 506 break; 507 bwsfree(s2); 508 } 509 } 510 511 file_reader_free(fr); 512 513 if (s2disorder) { 514 bws_disorder_warnx(s2disorder, fn, posdisorder); 515 if (s1disorder) { 516 bws_disorder_warnx(s1disorder, fn, posdisorder); 517 if (s1disorder != s2disorder) 518 bwsfree(s1disorder); 519 } 520 bwsfree(s2disorder); 521 s1disorder = NULL; 522 s2disorder = NULL; 523 } 524 525 if (res) 526 exit(res); 527 528 return (0); 529 } 530 531 /* 532 * Opens a file. If the given filename is "-", stdout will be 533 * opened. 534 */ 535 FILE * 536 openfile(const char *fn, const char *mode) 537 { 538 FILE *file; 539 540 if (strcmp(fn, "-") == 0) { 541 return ((mode && mode[0] == 'r') ? stdin : stdout); 542 } else { 543 mode_t orig_file_mask = 0; 544 int is_tmp = file_is_tmp(fn); 545 546 if (is_tmp && (mode[0] == 'w')) 547 orig_file_mask = umask(S_IWGRP | S_IWOTH | 548 S_IRGRP | S_IROTH); 549 550 if (is_tmp && (compress_program != NULL)) { 551 char *cmd; 552 size_t cmdsz; 553 554 cmdsz = strlen(fn) + 128; 555 cmd = sort_malloc(cmdsz); 556 557 fflush(stdout); 558 559 if (mode[0] == 'r') 560 snprintf(cmd, cmdsz - 1, "cat %s | %s -d", 561 fn, compress_program); 562 else if (mode[0] == 'w') 563 snprintf(cmd, cmdsz - 1, "%s > %s", 564 compress_program, fn); 565 else 566 err(2, "%s", getstr(7)); 567 568 if ((file = popen(cmd, mode)) == NULL) 569 err(2, NULL); 570 571 sort_free(cmd); 572 573 } else 574 if ((file = fopen(fn, mode)) == NULL) 575 err(2, NULL); 576 577 if (is_tmp && (mode[0] == 'w')) 578 umask(orig_file_mask); 579 } 580 581 return (file); 582 } 583 584 /* 585 * Close file 586 */ 587 void 588 closefile(FILE *f, const char *fn) 589 { 590 if (f == NULL) { 591 ; 592 } else if (f == stdin) { 593 ; 594 } else if (f == stdout) { 595 fflush(f); 596 } else { 597 if (file_is_tmp(fn) && compress_program != NULL) { 598 if(pclose(f)<0) 599 err(2,NULL); 600 } else 601 fclose(f); 602 } 603 } 604 605 /* 606 * Reads a file into the internal buffer. 607 */ 608 struct file_reader * 609 file_reader_init(const char *fsrc) 610 { 611 struct file_reader *ret; 612 613 if (fsrc == NULL) 614 fsrc = "-"; 615 616 ret = sort_malloc(sizeof(struct file_reader)); 617 memset(ret, 0, sizeof(struct file_reader)); 618 619 ret->elsymb = '\n'; 620 if (sort_opts_vals.zflag) 621 ret->elsymb = 0; 622 623 ret->fname = sort_strdup(fsrc); 624 625 if (strcmp(fsrc, "-") && (compress_program == NULL) && use_mmap) { 626 627 do { 628 struct stat stat_buf; 629 void *addr; 630 size_t sz = 0; 631 int fd, flags; 632 633 flags = MAP_NOCORE | MAP_NOSYNC; 634 addr = MAP_FAILED; 635 636 fd = open(fsrc, O_RDONLY); 637 if (fd < 0) 638 err(2, NULL); 639 640 if (fstat(fd, &stat_buf) < 0) { 641 close(fd); 642 break; 643 } 644 645 sz = stat_buf.st_size; 646 647 #if defined(MAP_PREFAULT_READ) 648 flags |= MAP_PREFAULT_READ; 649 #endif 650 651 addr = mmap(NULL, sz, PROT_READ, flags, fd, 0); 652 if (addr == MAP_FAILED) { 653 close(fd); 654 break; 655 } 656 657 ret->fd = fd; 658 ret->mmapaddr = addr; 659 ret->mmapsize = sz; 660 ret->mmapptr = ret->mmapaddr; 661 662 } while (0); 663 } 664 665 if (ret->mmapaddr == NULL) { 666 ret->file = openfile(fsrc, "r"); 667 if (ret->file == NULL) 668 err(2, NULL); 669 670 if (strcmp(fsrc, "-")) { 671 ret->cbsz = READ_CHUNK; 672 ret->buffer = sort_malloc(ret->cbsz); 673 ret->bsz = 0; 674 ret->strbeg = 0; 675 676 ret->bsz = fread(ret->buffer, 1, ret->cbsz, ret->file); 677 if (ret->bsz == 0) { 678 if (ferror(ret->file)) 679 err(2, NULL); 680 } 681 } 682 } 683 684 return (ret); 685 } 686 687 struct bwstring * 688 file_reader_readline(struct file_reader *fr) 689 { 690 struct bwstring *ret = NULL; 691 692 if (fr->mmapaddr) { 693 unsigned char *mmapend; 694 695 mmapend = fr->mmapaddr + fr->mmapsize; 696 if (fr->mmapptr >= mmapend) 697 return (NULL); 698 else { 699 unsigned char *strend; 700 size_t sz; 701 702 sz = mmapend - fr->mmapptr; 703 strend = memchr(fr->mmapptr, fr->elsymb, sz); 704 705 if (strend == NULL) { 706 ret = bwscsbdup(fr->mmapptr, sz); 707 fr->mmapptr = mmapend; 708 } else { 709 ret = bwscsbdup(fr->mmapptr, strend - 710 fr->mmapptr); 711 fr->mmapptr = strend + 1; 712 } 713 } 714 715 } else if (fr->file != stdin) { 716 unsigned char *strend; 717 size_t bsz1, remsz, search_start; 718 719 search_start = 0; 720 remsz = 0; 721 strend = NULL; 722 723 if (fr->bsz > fr->strbeg) 724 remsz = fr->bsz - fr->strbeg; 725 726 /* line read cycle */ 727 for (;;) { 728 if (remsz > search_start) 729 strend = memchr(fr->buffer + fr->strbeg + 730 search_start, fr->elsymb, remsz - 731 search_start); 732 else 733 strend = NULL; 734 735 if (strend) 736 break; 737 if (feof(fr->file)) 738 break; 739 740 if (fr->bsz != fr->cbsz) 741 /* NOTREACHED */ 742 err(2, "File read software error 1"); 743 744 if (remsz > (READ_CHUNK >> 1)) { 745 search_start = fr->cbsz - fr->strbeg; 746 fr->cbsz += READ_CHUNK; 747 fr->buffer = sort_realloc(fr->buffer, 748 fr->cbsz); 749 bsz1 = fread(fr->buffer + fr->bsz, 1, 750 READ_CHUNK, fr->file); 751 if (bsz1 == 0) { 752 if (ferror(fr->file)) 753 err(2, NULL); 754 break; 755 } 756 fr->bsz += bsz1; 757 remsz += bsz1; 758 } else { 759 if (remsz > 0 && fr->strbeg>0) 760 bcopy(fr->buffer + fr->strbeg, 761 fr->buffer, remsz); 762 763 fr->strbeg = 0; 764 search_start = remsz; 765 bsz1 = fread(fr->buffer + remsz, 1, 766 fr->cbsz - remsz, fr->file); 767 if (bsz1 == 0) { 768 if (ferror(fr->file)) 769 err(2, NULL); 770 break; 771 } 772 fr->bsz = remsz + bsz1; 773 remsz = fr->bsz; 774 } 775 } 776 777 if (strend == NULL) 778 strend = fr->buffer + fr->bsz; 779 780 if ((fr->buffer + fr->strbeg <= strend) && 781 (fr->strbeg < fr->bsz) && (remsz>0)) 782 ret = bwscsbdup(fr->buffer + fr->strbeg, strend - 783 fr->buffer - fr->strbeg); 784 785 fr->strbeg = (strend - fr->buffer) + 1; 786 787 } else { 788 size_t len = 0; 789 790 ret = bwsfgetln(fr->file, &len, sort_opts_vals.zflag, 791 &(fr->rb)); 792 } 793 794 return (ret); 795 } 796 797 static void 798 file_reader_clean(struct file_reader *fr) 799 { 800 801 if (fr) { 802 if (fr->mmapaddr) 803 munmap(fr->mmapaddr, fr->mmapsize); 804 805 if (fr->fd) 806 close(fr->fd); 807 808 if (fr->buffer) 809 sort_free(fr->buffer); 810 811 if (fr->file) 812 if (fr->file != stdin) 813 closefile(fr->file, fr->fname); 814 815 if(fr->fname) 816 sort_free(fr->fname); 817 818 memset(fr, 0, sizeof(struct file_reader)); 819 } 820 } 821 822 void 823 file_reader_free(struct file_reader *fr) 824 { 825 826 if (fr) { 827 file_reader_clean(fr); 828 sort_free(fr); 829 } 830 } 831 832 int 833 procfile(const char *fsrc, struct sort_list *list, struct file_list *fl) 834 { 835 struct file_reader *fr; 836 837 fr = file_reader_init(fsrc); 838 if (fr == NULL) 839 err(2, NULL); 840 841 /* file browse cycle */ 842 for (;;) { 843 struct bwstring *bws; 844 845 bws = file_reader_readline(fr); 846 847 if (bws == NULL) 848 break; 849 850 sort_list_add(list, bws); 851 852 if (list->memsize >= available_free_memory) { 853 char *fn; 854 855 fn = new_tmp_file_name(); 856 sort_list_to_file(list, fn); 857 file_list_add(fl, fn, false); 858 sort_list_clean(list); 859 } 860 } 861 862 file_reader_free(fr); 863 864 return (0); 865 } 866 867 /* 868 * Compare file headers. Files with EOF always go to the end of the list. 869 */ 870 static int 871 file_header_cmp(struct file_header *f1, struct file_header *f2) 872 { 873 874 if (f1 == f2) 875 return (0); 876 else { 877 if (f1->fr == NULL) { 878 return ((f2->fr == NULL) ? 0 : +1); 879 } else if (f2->fr == NULL) 880 return (-1); 881 else { 882 int ret; 883 884 ret = list_coll(&(f1->si), &(f2->si)); 885 if (!ret) 886 return ((f1->file_pos < f2->file_pos) ? -1 : +1); 887 return (ret); 888 } 889 } 890 } 891 892 /* 893 * Allocate and init file header structure 894 */ 895 static void 896 file_header_init(struct file_header **fh, const char *fn, size_t file_pos) 897 { 898 899 if (fh && fn) { 900 struct bwstring *line; 901 902 *fh = sort_malloc(sizeof(struct file_header)); 903 (*fh)->file_pos = file_pos; 904 (*fh)->fr = file_reader_init(fn); 905 if ((*fh)->fr == NULL) { 906 perror(fn); 907 err(2, "%s", getstr(8)); 908 } 909 line = file_reader_readline((*fh)->fr); 910 if (line == NULL) { 911 file_reader_free((*fh)->fr); 912 (*fh)->fr = NULL; 913 (*fh)->si = NULL; 914 } else { 915 (*fh)->si = sort_list_item_alloc(); 916 sort_list_item_set((*fh)->si, line); 917 } 918 } 919 } 920 921 /* 922 * Close file 923 */ 924 static void 925 file_header_close(struct file_header **fh) 926 { 927 928 if (fh && *fh) { 929 if ((*fh)->fr) { 930 file_reader_free((*fh)->fr); 931 (*fh)->fr = NULL; 932 } 933 if ((*fh)->si) { 934 sort_list_item_clean((*fh)->si); 935 sort_free((*fh)->si); 936 (*fh)->si = NULL; 937 } 938 sort_free(*fh); 939 *fh = NULL; 940 } 941 } 942 943 /* 944 * Swap two array elements 945 */ 946 static void 947 file_header_swap(struct file_header **fh, size_t i1, size_t i2) 948 { 949 struct file_header *tmp; 950 951 tmp = fh[i1]; 952 fh[i1] = fh[i2]; 953 fh[i2] = tmp; 954 } 955 956 /* heap algorithm ==>> */ 957 958 /* 959 * See heap sort algorithm 960 * "Raises" last element to its right place 961 */ 962 static void 963 file_header_heap_swim(struct file_header **fh, size_t indx) 964 { 965 966 if (indx > 0) { 967 size_t parent_index; 968 969 parent_index = (indx - 1) >> 1; 970 971 if (file_header_cmp(fh[indx], fh[parent_index]) < 0) { 972 /* swap child and parent and continue */ 973 file_header_swap(fh, indx, parent_index); 974 file_header_heap_swim(fh, parent_index); 975 } 976 } 977 } 978 979 /* 980 * Sink the top element to its correct position 981 */ 982 static void 983 file_header_heap_sink(struct file_header **fh, size_t indx, size_t size) 984 { 985 size_t left_child_index; 986 size_t right_child_index; 987 988 left_child_index = indx + indx + 1; 989 right_child_index = left_child_index + 1; 990 991 if (left_child_index < size) { 992 size_t min_child_index; 993 994 min_child_index = left_child_index; 995 996 if ((right_child_index < size) && 997 (file_header_cmp(fh[left_child_index], 998 fh[right_child_index]) > 0)) 999 min_child_index = right_child_index; 1000 if (file_header_cmp(fh[indx], fh[min_child_index]) > 0) { 1001 file_header_swap(fh, indx, min_child_index); 1002 file_header_heap_sink(fh, min_child_index, size); 1003 } 1004 } 1005 } 1006 1007 /* <<== heap algorithm */ 1008 1009 /* 1010 * Adds element to the "left" end 1011 */ 1012 static void 1013 file_header_list_rearrange_from_header(struct file_header **fh, size_t size) 1014 { 1015 1016 file_header_heap_sink(fh, 0, size); 1017 } 1018 1019 /* 1020 * Adds element to the "right" end 1021 */ 1022 static void 1023 file_header_list_push(struct file_header *f, struct file_header **fh, size_t size) 1024 { 1025 1026 fh[size++] = f; 1027 file_header_heap_swim(fh, size - 1); 1028 } 1029 1030 struct last_printed 1031 { 1032 struct bwstring *str; 1033 }; 1034 1035 /* 1036 * Prints the current line of the file 1037 */ 1038 static void 1039 file_header_print(struct file_header *fh, FILE *f_out, struct last_printed *lp) 1040 { 1041 1042 if (fh && fh->fr && f_out && fh->si && fh->si->str) { 1043 if (sort_opts_vals.uflag) { 1044 if ((lp->str == NULL) || (str_list_coll(lp->str, &(fh->si)))) { 1045 bwsfwrite(fh->si->str, f_out, sort_opts_vals.zflag); 1046 if (lp->str) 1047 bwsfree(lp->str); 1048 lp->str = bwsdup(fh->si->str); 1049 } 1050 } else 1051 bwsfwrite(fh->si->str, f_out, sort_opts_vals.zflag); 1052 } 1053 } 1054 1055 /* 1056 * Read next line 1057 */ 1058 static void 1059 file_header_read_next(struct file_header *fh) 1060 { 1061 1062 if (fh && fh->fr) { 1063 struct bwstring *tmp; 1064 1065 tmp = file_reader_readline(fh->fr); 1066 if (tmp == NULL) { 1067 file_reader_free(fh->fr); 1068 fh->fr = NULL; 1069 if (fh->si) { 1070 sort_list_item_clean(fh->si); 1071 sort_free(fh->si); 1072 fh->si = NULL; 1073 } 1074 } else { 1075 if (fh->si == NULL) 1076 fh->si = sort_list_item_alloc(); 1077 sort_list_item_set(fh->si, tmp); 1078 } 1079 } 1080 } 1081 1082 /* 1083 * Merge array of "files headers" 1084 */ 1085 static void 1086 file_headers_merge(size_t fnum, struct file_header **fh, FILE *f_out) 1087 { 1088 struct last_printed lp; 1089 size_t i; 1090 1091 memset(&lp, 0, sizeof(lp)); 1092 1093 /* 1094 * construct the initial sort structure 1095 */ 1096 for (i = 0; i < fnum; i++) 1097 file_header_list_push(fh[i], fh, i); 1098 1099 while (fh[0]->fr) { /* unfinished files are always in front */ 1100 /* output the smallest line: */ 1101 file_header_print(fh[0], f_out, &lp); 1102 /* read a new line, if possible: */ 1103 file_header_read_next(fh[0]); 1104 /* re-arrange the list: */ 1105 file_header_list_rearrange_from_header(fh, fnum); 1106 } 1107 1108 if (lp.str) 1109 bwsfree(lp.str); 1110 } 1111 1112 /* 1113 * Merges the given files into the output file, which can be 1114 * stdout. 1115 */ 1116 static void 1117 merge_files_array(size_t argc, char **argv, const char *fn_out) 1118 { 1119 1120 if (argv && fn_out) { 1121 struct file_header **fh; 1122 FILE *f_out; 1123 size_t i; 1124 1125 f_out = openfile(fn_out, "w"); 1126 1127 if (f_out == NULL) 1128 err(2, NULL); 1129 1130 fh = sort_malloc((argc + 1) * sizeof(struct file_header *)); 1131 1132 for (i = 0; i < argc; i++) 1133 file_header_init(fh + i, argv[i], (size_t) i); 1134 1135 file_headers_merge(argc, fh, f_out); 1136 1137 for (i = 0; i < argc; i++) 1138 file_header_close(fh + i); 1139 1140 sort_free(fh); 1141 1142 closefile(f_out, fn_out); 1143 } 1144 } 1145 1146 /* 1147 * Shrinks the file list until its size smaller than max number of opened files 1148 */ 1149 static int 1150 shrink_file_list(struct file_list *fl) 1151 { 1152 1153 if ((fl == NULL) || (size_t) (fl->count) < max_open_files) 1154 return (0); 1155 else { 1156 struct file_list new_fl; 1157 size_t indx = 0; 1158 1159 file_list_init(&new_fl, true); 1160 while (indx < fl->count) { 1161 char *fnew; 1162 size_t num; 1163 1164 num = fl->count - indx; 1165 fnew = new_tmp_file_name(); 1166 1167 if ((size_t) num >= max_open_files) 1168 num = max_open_files - 1; 1169 merge_files_array(num, fl->fns + indx, fnew); 1170 if (fl->tmp) { 1171 size_t i; 1172 1173 for (i = 0; i < num; i++) 1174 unlink(fl->fns[indx + i]); 1175 } 1176 file_list_add(&new_fl, fnew, false); 1177 indx += num; 1178 } 1179 fl->tmp = false; /* already taken care of */ 1180 file_list_clean(fl); 1181 1182 fl->count = new_fl.count; 1183 fl->fns = new_fl.fns; 1184 fl->sz = new_fl.sz; 1185 fl->tmp = new_fl.tmp; 1186 1187 return (1); 1188 } 1189 } 1190 1191 /* 1192 * Merge list of files 1193 */ 1194 void 1195 merge_files(struct file_list *fl, const char *fn_out) 1196 { 1197 1198 if (fl && fn_out) { 1199 while (shrink_file_list(fl)); 1200 1201 merge_files_array(fl->count, fl->fns, fn_out); 1202 } 1203 } 1204 1205 static const char * 1206 get_sort_method_name(int sm) 1207 { 1208 1209 if (sm == SORT_MERGESORT) 1210 return "mergesort"; 1211 else if (sort_opts_vals.sort_method == SORT_RADIXSORT) 1212 return "radixsort"; 1213 else if (sort_opts_vals.sort_method == SORT_HEAPSORT) 1214 return "heapsort"; 1215 else 1216 return "quicksort"; 1217 } 1218 1219 /* 1220 * Wrapper for qsort 1221 */ 1222 static int sort_qsort(void *list, size_t count, size_t elem_size, 1223 int (*cmp_func)(const void *, const void *)) 1224 { 1225 1226 qsort(list, count, elem_size, cmp_func); 1227 return (0); 1228 } 1229 1230 /* 1231 * Sort list of lines and writes it to the file 1232 */ 1233 void 1234 sort_list_to_file(struct sort_list *list, const char *outfile) 1235 { 1236 struct sort_mods *sm = &(keys[0].sm); 1237 1238 if (!(sm->Mflag) && !(sm->Rflag) && !(sm->Vflag) && !(sm->Vflag) && 1239 !(sm->gflag) && !(sm->hflag) && !(sm->nflag)) { 1240 if ((sort_opts_vals.sort_method == SORT_DEFAULT) && byte_sort) 1241 sort_opts_vals.sort_method = SORT_RADIXSORT; 1242 1243 } else if (sort_opts_vals.sort_method == SORT_RADIXSORT) 1244 err(2, "%s", getstr(9)); 1245 1246 /* 1247 * to handle stable sort and the unique cases in the 1248 * right order, we need stable basic algorithm 1249 */ 1250 if (sort_opts_vals.sflag) { 1251 switch (sort_opts_vals.sort_method){ 1252 case SORT_MERGESORT: 1253 break; 1254 case SORT_RADIXSORT: 1255 break; 1256 case SORT_DEFAULT: 1257 sort_opts_vals.sort_method = SORT_MERGESORT; 1258 break; 1259 default: 1260 errx(2, "%s", getstr(10)); 1261 }; 1262 } 1263 1264 if (sort_opts_vals.sort_method == SORT_DEFAULT) 1265 sort_opts_vals.sort_method = DEFAULT_SORT_ALGORITHM; 1266 1267 if (debug_sort) 1268 printf("sort_method=%s\n", 1269 get_sort_method_name(sort_opts_vals.sort_method)); 1270 1271 switch (sort_opts_vals.sort_method){ 1272 case SORT_RADIXSORT: 1273 rxsort(list->list, list->count); 1274 sort_list_dump(list, outfile); 1275 break; 1276 case SORT_MERGESORT: 1277 mt_sort(list, mergesort, outfile); 1278 break; 1279 case SORT_HEAPSORT: 1280 mt_sort(list, heapsort, outfile); 1281 break; 1282 case SORT_QSORT: 1283 mt_sort(list, sort_qsort, outfile); 1284 break; 1285 default: 1286 mt_sort(list, DEFAULT_SORT_FUNC, outfile); 1287 break; 1288 } 1289 } 1290 1291 /******************* MT SORT ************************/ 1292 1293 #if defined(SORT_THREADS) 1294 /* semaphore to count threads */ 1295 static sem_t mtsem; 1296 1297 /* current system sort function */ 1298 static int (*g_sort_func)(void *, size_t, size_t, 1299 int(*)(const void *, const void *)); 1300 1301 /* 1302 * Sort cycle thread (in multi-threaded mode) 1303 */ 1304 static void* 1305 mt_sort_thread(void* arg) 1306 { 1307 struct sort_list *list = arg; 1308 1309 g_sort_func(list->list, list->count, sizeof(struct sort_list_item *), 1310 (int(*)(const void *, const void *)) list_coll); 1311 1312 sem_post(&mtsem); 1313 1314 return (arg); 1315 } 1316 1317 /* 1318 * Compare sub-lists. Empty sub-lists always go to the end of the list. 1319 */ 1320 static int 1321 sub_list_cmp(struct sort_list *l1, struct sort_list *l2) 1322 { 1323 1324 if (l1 == l2) 1325 return (0); 1326 else { 1327 if (l1->count == 0) { 1328 return ((l2->count == 0) ? 0 : +1); 1329 } else if (l2->count == 0) { 1330 return (-1); 1331 } else { 1332 int ret; 1333 1334 ret = list_coll(&(l1->list[0]), &(l2->list[0])); 1335 if (!ret) 1336 return ((l1->sub_list_pos < l2->sub_list_pos) ? 1337 -1 : +1); 1338 return (ret); 1339 } 1340 } 1341 } 1342 1343 /* 1344 * Swap two array elements 1345 */ 1346 static void 1347 sub_list_swap(struct sort_list **sl, size_t i1, size_t i2) 1348 { 1349 struct sort_list *tmp; 1350 1351 tmp = sl[i1]; 1352 sl[i1] = sl[i2]; 1353 sl[i2] = tmp; 1354 } 1355 1356 /* heap algorithm ==>> */ 1357 1358 /* 1359 * See heap sort algorithm 1360 * "Raises" last element to its right place 1361 */ 1362 static void 1363 sub_list_swim(struct sort_list **sl, size_t indx) 1364 { 1365 1366 if (indx > 0) { 1367 size_t parent_index; 1368 1369 parent_index = (indx - 1) >> 1; 1370 1371 if (sub_list_cmp(sl[indx], sl[parent_index]) < 0) { 1372 /* swap child and parent and continue */ 1373 sub_list_swap(sl, indx, parent_index); 1374 sub_list_swim(sl, parent_index); 1375 } 1376 } 1377 } 1378 1379 /* 1380 * Sink the top element to its correct position 1381 */ 1382 static void 1383 sub_list_sink(struct sort_list **sl, size_t indx, size_t size) 1384 { 1385 size_t left_child_index; 1386 size_t right_child_index; 1387 1388 left_child_index = indx + indx + 1; 1389 right_child_index = left_child_index + 1; 1390 1391 if (left_child_index < size) { 1392 size_t min_child_index; 1393 1394 min_child_index = left_child_index; 1395 1396 if ((right_child_index < size) && 1397 (sub_list_cmp(sl[left_child_index], 1398 sl[right_child_index]) > 0)) 1399 min_child_index = right_child_index; 1400 if (sub_list_cmp(sl[indx], sl[min_child_index]) > 0) { 1401 sub_list_swap(sl, indx, min_child_index); 1402 sub_list_sink(sl, min_child_index, size); 1403 } 1404 } 1405 } 1406 1407 /* <<== heap algorithm */ 1408 1409 /* 1410 * Adds element to the "right" end 1411 */ 1412 static void 1413 sub_list_push(struct sort_list *s, struct sort_list **sl, size_t size) 1414 { 1415 1416 sl[size++] = s; 1417 sub_list_swim(sl, size - 1); 1418 } 1419 1420 struct last_printed_item 1421 { 1422 struct sort_list_item *item; 1423 }; 1424 1425 /* 1426 * Prints the current line of the file 1427 */ 1428 static void 1429 sub_list_header_print(struct sort_list *sl, FILE *f_out, 1430 struct last_printed_item *lp) 1431 { 1432 1433 if (sl && sl->count && f_out && sl->list[0]->str) { 1434 if (sort_opts_vals.uflag) { 1435 if ((lp->item == NULL) || (list_coll(&(lp->item), 1436 &(sl->list[0])))) { 1437 bwsfwrite(sl->list[0]->str, f_out, 1438 sort_opts_vals.zflag); 1439 lp->item = sl->list[0]; 1440 } 1441 } else 1442 bwsfwrite(sl->list[0]->str, f_out, 1443 sort_opts_vals.zflag); 1444 } 1445 } 1446 1447 /* 1448 * Read next line 1449 */ 1450 static void 1451 sub_list_next(struct sort_list *sl) 1452 { 1453 1454 if (sl && sl->count) { 1455 sl->list += 1; 1456 sl->count -= 1; 1457 } 1458 } 1459 1460 /* 1461 * Merge sub-lists to a file 1462 */ 1463 static void 1464 merge_sub_lists(struct sort_list **sl, size_t n, FILE* f_out) 1465 { 1466 struct last_printed_item lp; 1467 size_t i; 1468 1469 memset(&lp,0,sizeof(lp)); 1470 1471 /* construct the initial list: */ 1472 for (i = 0; i < n; i++) 1473 sub_list_push(sl[i], sl, i); 1474 1475 while (sl[0]->count) { /* unfinished lists are always in front */ 1476 /* output the smallest line: */ 1477 sub_list_header_print(sl[0], f_out, &lp); 1478 /* move to a new line, if possible: */ 1479 sub_list_next(sl[0]); 1480 /* re-arrange the list: */ 1481 sub_list_sink(sl, 0, n); 1482 } 1483 } 1484 1485 /* 1486 * Merge sub-lists to a file 1487 */ 1488 static void 1489 merge_list_parts(struct sort_list **parts, size_t n, const char *fn) 1490 { 1491 FILE* f_out; 1492 1493 f_out = openfile(fn,"w"); 1494 1495 merge_sub_lists(parts, n, f_out); 1496 1497 closefile(f_out, fn); 1498 } 1499 1500 #endif /* defined(SORT_THREADS) */ 1501 /* 1502 * Multi-threaded sort algorithm "driver" 1503 */ 1504 static void 1505 mt_sort(struct sort_list *list, 1506 int(*sort_func)(void *, size_t, size_t, int(*)(const void *, const void *)), 1507 const char* fn) 1508 { 1509 #if defined(SORT_THREADS) 1510 if (nthreads < 2 || list->count < MT_SORT_THRESHOLD) { 1511 size_t nthreads_save = nthreads; 1512 nthreads = 1; 1513 #endif 1514 /* if single thread or small data, do simple sort */ 1515 sort_func(list->list, list->count, 1516 sizeof(struct sort_list_item *), 1517 (int(*)(const void *, const void *)) list_coll); 1518 sort_list_dump(list, fn); 1519 #if defined(SORT_THREADS) 1520 nthreads = nthreads_save; 1521 } else { 1522 /* multi-threaded sort */ 1523 struct sort_list **parts; 1524 size_t avgsize, cstart, i; 1525 1526 /* array of sub-lists */ 1527 parts = sort_malloc(sizeof(struct sort_list*) * nthreads); 1528 cstart = 0; 1529 avgsize = list->count / nthreads; 1530 1531 /* set global system sort function */ 1532 g_sort_func = sort_func; 1533 1534 /* set sublists */ 1535 for (i = 0; i < nthreads; ++i) { 1536 size_t sz = 0; 1537 1538 parts[i] = sort_malloc(sizeof(struct sort_list)); 1539 parts[i]->list = list->list + cstart; 1540 parts[i]->memsize = 0; 1541 parts[i]->sub_list_pos = i; 1542 1543 sz = (i == nthreads - 1) ? list->count - cstart : 1544 avgsize; 1545 1546 parts[i]->count = sz; 1547 1548 parts[i]->size = parts[i]->count; 1549 1550 cstart += sz; 1551 } 1552 1553 /* init threads counting semaphore */ 1554 sem_init(&mtsem, 0, 0); 1555 1556 /* start threads */ 1557 for (i = 0; i < nthreads; ++i) { 1558 pthread_t pth; 1559 pthread_attr_t attr; 1560 1561 pthread_attr_init(&attr); 1562 pthread_attr_setdetachstate(&attr, PTHREAD_DETACHED); 1563 1564 for (;;) { 1565 int res = pthread_create(&pth, &attr, 1566 mt_sort_thread, parts[i]); 1567 1568 if (res >= 0) 1569 break; 1570 if (errno == EAGAIN) { 1571 pthread_yield(); 1572 continue; 1573 } 1574 err(2, NULL); 1575 } 1576 1577 pthread_attr_destroy(&attr); 1578 } 1579 1580 /* wait for threads completion */ 1581 for (i = 0; i < nthreads; ++i) { 1582 sem_wait(&mtsem); 1583 } 1584 /* destroy the semaphore - we do not need it anymore */ 1585 sem_destroy(&mtsem); 1586 1587 /* merge sorted sub-lists to the file */ 1588 merge_list_parts(parts, nthreads, fn); 1589 1590 /* free sub-lists data */ 1591 for (i = 0; i < nthreads; ++i) { 1592 sort_free(parts[i]); 1593 } 1594 sort_free(parts); 1595 } 1596 #endif /* defined(SORT_THREADS) */ 1597 } 1598