1 /*- 2 * Copyright (C) 2009 Gabor Kovesdan <gabor@FreeBSD.org> 3 * Copyright (C) 2012 Oleg Moskalenko <mom040267@gmail.com> 4 * All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 1. Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 16 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 18 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 19 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 20 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 21 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 22 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 23 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 24 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 25 * SUCH DAMAGE. 26 */ 27 28 #include <sys/cdefs.h> 29 __FBSDID("$FreeBSD$"); 30 31 #include <sys/mman.h> 32 #include <sys/stat.h> 33 #include <sys/types.h> 34 #include <sys/queue.h> 35 36 #include <err.h> 37 #include <fcntl.h> 38 #if defined(SORT_THREADS) 39 #include <pthread.h> 40 #endif 41 #include <semaphore.h> 42 #include <stdio.h> 43 #include <stdlib.h> 44 #include <string.h> 45 #include <unistd.h> 46 #include <wchar.h> 47 #include <wctype.h> 48 49 #include "coll.h" 50 #include "file.h" 51 #include "radixsort.h" 52 53 unsigned long long free_memory = 1000000; 54 unsigned long long available_free_memory = 1000000; 55 56 bool use_mmap; 57 58 const char *tmpdir = "/var/tmp"; 59 const char *compress_program; 60 61 size_t max_open_files = 16; 62 63 /* 64 * How much space we read from file at once 65 */ 66 #define READ_CHUNK (4096) 67 68 /* 69 * File reader structure 70 */ 71 struct file_reader 72 { 73 struct reader_buffer rb; 74 FILE *file; 75 char *fname; 76 unsigned char *buffer; 77 unsigned char *mmapaddr; 78 unsigned char *mmapptr; 79 size_t bsz; 80 size_t cbsz; 81 size_t mmapsize; 82 size_t strbeg; 83 int fd; 84 char elsymb; 85 }; 86 87 /* 88 * Structure to be used in file merge process. 89 */ 90 struct file_header 91 { 92 struct file_reader *fr; 93 struct sort_list_item *si; /* current top line */ 94 size_t file_pos; 95 }; 96 97 /* 98 * List elements of "cleanable" files list. 99 */ 100 struct CLEANABLE_FILE 101 { 102 char *fn; 103 LIST_ENTRY(CLEANABLE_FILE) files; 104 }; 105 106 /* 107 * List header of "cleanable" files list. 108 */ 109 static LIST_HEAD(CLEANABLE_FILES,CLEANABLE_FILE) tmp_files; 110 111 /* 112 * Semaphore to protect the tmp file list. 113 * We use semaphore here because it is signal-safe, according to POSIX. 114 * And semaphore does not require pthread library. 115 */ 116 static sem_t tmp_files_sem; 117 118 static void mt_sort(struct sort_list *list, 119 int (*sort_func)(void *, size_t, size_t, 120 int (*)(const void *, const void *)), const char* fn); 121 122 /* 123 * Init tmp files list 124 */ 125 void 126 init_tmp_files(void) 127 { 128 129 LIST_INIT(&tmp_files); 130 sem_init(&tmp_files_sem, 0, 1); 131 } 132 133 /* 134 * Save name of a tmp file for signal cleanup 135 */ 136 void 137 tmp_file_atexit(const char *tmp_file) 138 { 139 140 if (tmp_file) { 141 sem_wait(&tmp_files_sem); 142 struct CLEANABLE_FILE *item = 143 sort_malloc(sizeof(struct CLEANABLE_FILE)); 144 item->fn = sort_strdup(tmp_file); 145 LIST_INSERT_HEAD(&tmp_files, item, files); 146 sem_post(&tmp_files_sem); 147 } 148 } 149 150 /* 151 * Clear tmp files 152 */ 153 void 154 clear_tmp_files(void) 155 { 156 struct CLEANABLE_FILE *item; 157 158 sem_wait(&tmp_files_sem); 159 LIST_FOREACH(item,&tmp_files,files) { 160 if ((item) && (item->fn)) 161 unlink(item->fn); 162 } 163 sem_post(&tmp_files_sem); 164 } 165 166 /* 167 * Check whether a file is a temporary file 168 */ 169 static bool 170 file_is_tmp(const char* fn) 171 { 172 struct CLEANABLE_FILE *item; 173 bool ret = false; 174 175 if (fn) { 176 sem_wait(&tmp_files_sem); 177 LIST_FOREACH(item,&tmp_files,files) { 178 if ((item) && (item->fn)) 179 if (strcmp(item->fn, fn) == 0) { 180 ret = true; 181 break; 182 } 183 } 184 sem_post(&tmp_files_sem); 185 } 186 187 return (ret); 188 } 189 190 /* 191 * Read zero-terminated line from a file 192 */ 193 char * 194 read_file0_line(struct file0_reader *f0r) 195 { 196 size_t pos = 0; 197 int c; 198 199 if ((f0r->f == NULL) || feof(f0r->f)) 200 return (NULL); 201 202 if (f0r->current_line && f0r->current_sz > 0) 203 f0r->current_line[0] = 0; 204 205 while (!feof(f0r->f)) { 206 c = fgetc(f0r->f); 207 if (feof(f0r->f) || (c == -1)) 208 break; 209 if ((pos + 1) >= f0r->current_sz) { 210 size_t newsz = (f0r->current_sz + 2) * 2; 211 f0r->current_line = sort_realloc(f0r->current_line, 212 newsz); 213 f0r->current_sz = newsz; 214 } 215 f0r->current_line[pos] = (char)c; 216 if (c == 0) 217 break; 218 else 219 f0r->current_line[pos + 1] = 0; 220 ++pos; 221 } 222 223 return f0r->current_line; 224 } 225 226 /* 227 * Generate new temporary file name 228 */ 229 char * 230 new_tmp_file_name(void) 231 { 232 static size_t tfcounter = 0; 233 static const char *fn = ".bsdsort."; 234 char *ret; 235 size_t sz; 236 237 sz = strlen(tmpdir) + 1 + strlen(fn) + 32 + 1; 238 ret = sort_malloc(sz); 239 240 sprintf(ret, "%s/%s%d.%lu", tmpdir, fn, (int) getpid(), (unsigned long)(tfcounter++)); 241 tmp_file_atexit(ret); 242 return (ret); 243 } 244 245 /* 246 * Initialize file list 247 */ 248 void 249 file_list_init(struct file_list *fl, bool tmp) 250 { 251 252 if (fl) { 253 fl->count = 0; 254 fl->sz = 0; 255 fl->fns = NULL; 256 fl->tmp = tmp; 257 } 258 } 259 260 /* 261 * Add a file name to the list 262 */ 263 void 264 file_list_add(struct file_list *fl, char *fn, bool allocate) 265 { 266 267 if (fl && fn) { 268 if (fl->count >= fl->sz || (fl->fns == NULL)) { 269 fl->sz = (fl->sz) * 2 + 1; 270 fl->fns = sort_realloc(fl->fns, fl->sz * 271 sizeof(char *)); 272 } 273 fl->fns[fl->count] = allocate ? sort_strdup(fn) : fn; 274 fl->count += 1; 275 } 276 } 277 278 /* 279 * Populate file list from array of file names 280 */ 281 void 282 file_list_populate(struct file_list *fl, int argc, char **argv, bool allocate) 283 { 284 285 if (fl && argv) { 286 int i; 287 288 for (i = 0; i < argc; i++) 289 file_list_add(fl, argv[i], allocate); 290 } 291 } 292 293 /* 294 * Clean file list data and delete the files, 295 * if this is a list of temporary files 296 */ 297 void 298 file_list_clean(struct file_list *fl) 299 { 300 301 if (fl) { 302 if (fl->fns) { 303 size_t i; 304 305 for (i = 0; i < fl->count; i++) { 306 if (fl->fns[i]) { 307 if (fl->tmp) 308 unlink(fl->fns[i]); 309 sort_free(fl->fns[i]); 310 fl->fns[i] = 0; 311 } 312 } 313 sort_free(fl->fns); 314 fl->fns = NULL; 315 } 316 fl->sz = 0; 317 fl->count = 0; 318 fl->tmp = false; 319 } 320 } 321 322 /* 323 * Init sort list 324 */ 325 void 326 sort_list_init(struct sort_list *l) 327 { 328 329 if (l) { 330 l->count = 0; 331 l->size = 0; 332 l->memsize = sizeof(struct sort_list); 333 l->list = NULL; 334 } 335 } 336 337 /* 338 * Add string to sort list 339 */ 340 void 341 sort_list_add(struct sort_list *l, struct bwstring *str) 342 { 343 344 if (l && str) { 345 size_t indx = l->count; 346 347 if ((l->list == NULL) || (indx >= l->size)) { 348 size_t newsize = (l->size + 1) + 1024; 349 350 l->list = sort_realloc(l->list, 351 sizeof(struct sort_list_item*) * newsize); 352 l->memsize += (newsize - l->size) * 353 sizeof(struct sort_list_item*); 354 l->size = newsize; 355 } 356 l->list[indx] = sort_list_item_alloc(); 357 sort_list_item_set(l->list[indx], str); 358 l->memsize += sort_list_item_size(l->list[indx]); 359 l->count += 1; 360 } 361 } 362 363 /* 364 * Clean sort list data 365 */ 366 void 367 sort_list_clean(struct sort_list *l) 368 { 369 370 if (l) { 371 if (l->list) { 372 size_t i; 373 374 for (i = 0; i < l->count; i++) { 375 struct sort_list_item *item; 376 377 item = l->list[i]; 378 379 if (item) { 380 sort_list_item_clean(item); 381 sort_free(item); 382 l->list[i] = NULL; 383 } 384 } 385 sort_free(l->list); 386 l->list = NULL; 387 } 388 l->count = 0; 389 l->size = 0; 390 l->memsize = sizeof(struct sort_list); 391 } 392 } 393 394 /* 395 * Write sort list to file 396 */ 397 void 398 sort_list_dump(struct sort_list *l, const char *fn) 399 { 400 401 if (l && fn) { 402 FILE *f; 403 404 f = openfile(fn, "w"); 405 if (f == NULL) 406 err(2, NULL); 407 408 if (l->list) { 409 size_t i; 410 if (!(sort_opts_vals.uflag)) { 411 for (i = 0; i < l->count; ++i) 412 bwsfwrite(l->list[i]->str, f, 413 sort_opts_vals.zflag); 414 } else { 415 struct sort_list_item *last_printed_item = NULL; 416 struct sort_list_item *item; 417 for (i = 0; i < l->count; ++i) { 418 item = l->list[i]; 419 if ((last_printed_item == NULL) || 420 list_coll(&last_printed_item, &item)) { 421 bwsfwrite(item->str, f, sort_opts_vals.zflag); 422 last_printed_item = item; 423 } 424 } 425 } 426 } 427 428 closefile(f, fn); 429 } 430 } 431 432 /* 433 * Checks if the given file is sorted. Stops at the first disorder, 434 * prints the disordered line and returns 1. 435 */ 436 int 437 check(const char *fn) 438 { 439 struct bwstring *s1, *s2, *s1disorder, *s2disorder; 440 struct file_reader *fr; 441 struct keys_array *ka1, *ka2; 442 int res; 443 size_t pos, posdisorder; 444 445 s1 = s2 = s1disorder = s2disorder = NULL; 446 ka1 = ka2 = NULL; 447 448 fr = file_reader_init(fn); 449 450 res = 0; 451 pos = 1; 452 posdisorder = 1; 453 454 if (fr == NULL) { 455 err(2, NULL); 456 goto end; 457 } 458 459 s1 = file_reader_readline(fr); 460 if (s1 == NULL) 461 goto end; 462 463 ka1 = keys_array_alloc(); 464 preproc(s1, ka1); 465 466 s2 = file_reader_readline(fr); 467 if (s2 == NULL) 468 goto end; 469 470 ka2 = keys_array_alloc(); 471 preproc(s2, ka2); 472 473 for (;;) { 474 475 if (debug_sort) { 476 bwsprintf(stdout, s2, "s1=<", ">"); 477 bwsprintf(stdout, s1, "s2=<", ">"); 478 } 479 int cmp = key_coll(ka2, ka1, 0); 480 if (debug_sort) 481 printf("; cmp1=%d", cmp); 482 483 if (!cmp && sort_opts_vals.complex_sort && 484 !(sort_opts_vals.uflag) && !(sort_opts_vals.sflag)) { 485 cmp = top_level_str_coll(s2, s1); 486 if (debug_sort) 487 printf("; cmp2=%d", cmp); 488 } 489 if (debug_sort) 490 printf("\n"); 491 492 if ((sort_opts_vals.uflag && (cmp <= 0)) || (cmp < 0)) { 493 if (!(sort_opts_vals.csilentflag)) { 494 s2disorder = bwsdup(s2); 495 posdisorder = pos; 496 if (debug_sort) 497 s1disorder = bwsdup(s1); 498 } 499 res = 1; 500 goto end; 501 } 502 503 pos++; 504 505 clean_keys_array(s1, ka1); 506 sort_free(ka1); 507 ka1 = ka2; 508 ka2 = NULL; 509 510 bwsfree(s1); 511 s1 = s2; 512 513 s2 = file_reader_readline(fr); 514 if (s2 == NULL) 515 goto end; 516 517 ka2 = keys_array_alloc(); 518 preproc(s2, ka2); 519 } 520 521 end: 522 if (ka1) { 523 clean_keys_array(s1, ka1); 524 sort_free(ka1); 525 } 526 527 if (s1) 528 bwsfree(s1); 529 530 if (ka2) { 531 clean_keys_array(s2, ka2); 532 sort_free(ka2); 533 } 534 535 if (s2) 536 bwsfree(s2); 537 538 if ((fn == NULL) || (*fn == 0) || (strcmp(fn, "-") == 0)) { 539 for (;;) { 540 s2 = file_reader_readline(fr); 541 if (s2 == NULL) 542 break; 543 bwsfree(s2); 544 } 545 } 546 547 file_reader_free(fr); 548 549 if (s2disorder) { 550 bws_disorder_warnx(s2disorder, fn, posdisorder); 551 if (s1disorder) { 552 bws_disorder_warnx(s1disorder, fn, posdisorder); 553 if (s1disorder != s2disorder) 554 bwsfree(s1disorder); 555 } 556 bwsfree(s2disorder); 557 s1disorder = NULL; 558 s2disorder = NULL; 559 } 560 561 if (res) 562 exit(res); 563 564 return (0); 565 } 566 567 /* 568 * Opens a file. If the given filename is "-", stdout will be 569 * opened. 570 */ 571 FILE * 572 openfile(const char *fn, const char *mode) 573 { 574 FILE *file; 575 576 if (strcmp(fn, "-") == 0) { 577 return ((mode && mode[0] == 'r') ? stdin : stdout); 578 } else { 579 mode_t orig_file_mask = 0; 580 int is_tmp = file_is_tmp(fn); 581 582 if (is_tmp && (mode[0] == 'w')) 583 orig_file_mask = umask(S_IWGRP | S_IWOTH | 584 S_IRGRP | S_IROTH); 585 586 if (is_tmp && (compress_program != NULL)) { 587 char *cmd; 588 size_t cmdsz; 589 590 cmdsz = strlen(fn) + 128; 591 cmd = sort_malloc(cmdsz); 592 593 fflush(stdout); 594 595 if (mode[0] == 'r') 596 snprintf(cmd, cmdsz - 1, "cat %s | %s -d", 597 fn, compress_program); 598 else if (mode[0] == 'w') 599 snprintf(cmd, cmdsz - 1, "%s > %s", 600 compress_program, fn); 601 else 602 err(2, "%s", getstr(7)); 603 604 if ((file = popen(cmd, mode)) == NULL) 605 err(2, NULL); 606 607 sort_free(cmd); 608 609 } else 610 if ((file = fopen(fn, mode)) == NULL) 611 err(2, NULL); 612 613 if (is_tmp && (mode[0] == 'w')) 614 umask(orig_file_mask); 615 } 616 617 return (file); 618 } 619 620 /* 621 * Close file 622 */ 623 void 624 closefile(FILE *f, const char *fn) 625 { 626 if (f == NULL) { 627 ; 628 } else if (f == stdin) { 629 ; 630 } else if (f == stdout) { 631 fflush(f); 632 } else { 633 if (file_is_tmp(fn) && compress_program != NULL) { 634 if(pclose(f)<0) 635 err(2,NULL); 636 } else 637 fclose(f); 638 } 639 } 640 641 /* 642 * Reads a file into the internal buffer. 643 */ 644 struct file_reader * 645 file_reader_init(const char *fsrc) 646 { 647 struct file_reader *ret; 648 649 if (fsrc == NULL) 650 fsrc = "-"; 651 652 ret = sort_malloc(sizeof(struct file_reader)); 653 memset(ret, 0, sizeof(struct file_reader)); 654 655 ret->elsymb = '\n'; 656 if (sort_opts_vals.zflag) 657 ret->elsymb = 0; 658 659 ret->fname = sort_strdup(fsrc); 660 661 if (strcmp(fsrc, "-") && (compress_program == NULL) && use_mmap) { 662 663 do { 664 struct stat stat_buf; 665 void *addr; 666 size_t sz = 0; 667 int fd, flags; 668 669 flags = MAP_NOCORE | MAP_NOSYNC; 670 addr = MAP_FAILED; 671 672 fd = open(fsrc, O_RDONLY); 673 if (fd < 0) 674 err(2, NULL); 675 676 if (fstat(fd, &stat_buf) < 0) { 677 close(fd); 678 break; 679 } 680 681 sz = stat_buf.st_size; 682 683 #if defined(MAP_PREFAULT_READ) 684 flags |= MAP_PREFAULT_READ; 685 #endif 686 687 addr = mmap(NULL, sz, PROT_READ, flags, fd, 0); 688 if (addr == MAP_FAILED) { 689 close(fd); 690 break; 691 } 692 693 ret->fd = fd; 694 ret->mmapaddr = addr; 695 ret->mmapsize = sz; 696 ret->mmapptr = ret->mmapaddr; 697 698 } while (0); 699 } 700 701 if (ret->mmapaddr == NULL) { 702 ret->file = openfile(fsrc, "r"); 703 if (ret->file == NULL) 704 err(2, NULL); 705 706 if (strcmp(fsrc, "-")) { 707 ret->cbsz = READ_CHUNK; 708 ret->buffer = sort_malloc(ret->cbsz); 709 ret->bsz = 0; 710 ret->strbeg = 0; 711 712 ret->bsz = fread(ret->buffer, 1, ret->cbsz, ret->file); 713 if (ret->bsz == 0) { 714 if (ferror(ret->file)) 715 err(2, NULL); 716 } 717 } 718 } 719 720 return (ret); 721 } 722 723 struct bwstring * 724 file_reader_readline(struct file_reader *fr) 725 { 726 struct bwstring *ret = NULL; 727 728 if (fr->mmapaddr) { 729 unsigned char *mmapend; 730 731 mmapend = fr->mmapaddr + fr->mmapsize; 732 if (fr->mmapptr >= mmapend) 733 return (NULL); 734 else { 735 unsigned char *strend; 736 size_t sz; 737 738 sz = mmapend - fr->mmapptr; 739 strend = memchr(fr->mmapptr, fr->elsymb, sz); 740 741 if (strend == NULL) { 742 ret = bwscsbdup(fr->mmapptr, sz); 743 fr->mmapptr = mmapend; 744 } else { 745 ret = bwscsbdup(fr->mmapptr, strend - 746 fr->mmapptr); 747 fr->mmapptr = strend + 1; 748 } 749 } 750 751 } else if (fr->file != stdin) { 752 unsigned char *strend; 753 size_t bsz1, remsz, search_start; 754 755 search_start = 0; 756 remsz = 0; 757 strend = NULL; 758 759 if (fr->bsz > fr->strbeg) 760 remsz = fr->bsz - fr->strbeg; 761 762 /* line read cycle */ 763 for (;;) { 764 if (remsz > search_start) 765 strend = memchr(fr->buffer + fr->strbeg + 766 search_start, fr->elsymb, remsz - 767 search_start); 768 else 769 strend = NULL; 770 771 if (strend) 772 break; 773 if (feof(fr->file)) 774 break; 775 776 if (fr->bsz != fr->cbsz) 777 /* NOTREACHED */ 778 err(2, "File read software error 1"); 779 780 if (remsz > (READ_CHUNK >> 1)) { 781 search_start = fr->cbsz - fr->strbeg; 782 fr->cbsz += READ_CHUNK; 783 fr->buffer = sort_realloc(fr->buffer, 784 fr->cbsz); 785 bsz1 = fread(fr->buffer + fr->bsz, 1, 786 READ_CHUNK, fr->file); 787 if (bsz1 == 0) { 788 if (ferror(fr->file)) 789 err(2, NULL); 790 break; 791 } 792 fr->bsz += bsz1; 793 remsz += bsz1; 794 } else { 795 if (remsz > 0 && fr->strbeg>0) 796 bcopy(fr->buffer + fr->strbeg, 797 fr->buffer, remsz); 798 799 fr->strbeg = 0; 800 search_start = remsz; 801 bsz1 = fread(fr->buffer + remsz, 1, 802 fr->cbsz - remsz, fr->file); 803 if (bsz1 == 0) { 804 if (ferror(fr->file)) 805 err(2, NULL); 806 break; 807 } 808 fr->bsz = remsz + bsz1; 809 remsz = fr->bsz; 810 } 811 } 812 813 if (strend == NULL) 814 strend = fr->buffer + fr->bsz; 815 816 if ((fr->buffer + fr->strbeg <= strend) && 817 (fr->strbeg < fr->bsz) && (remsz>0)) 818 ret = bwscsbdup(fr->buffer + fr->strbeg, strend - 819 fr->buffer - fr->strbeg); 820 821 fr->strbeg = (strend - fr->buffer) + 1; 822 823 } else { 824 size_t len = 0; 825 826 ret = bwsfgetln(fr->file, &len, sort_opts_vals.zflag, 827 &(fr->rb)); 828 } 829 830 return (ret); 831 } 832 833 static void 834 file_reader_clean(struct file_reader *fr) 835 { 836 837 if (fr) { 838 if (fr->mmapaddr) 839 munmap(fr->mmapaddr, fr->mmapsize); 840 841 if (fr->fd) 842 close(fr->fd); 843 844 if (fr->buffer) 845 sort_free(fr->buffer); 846 847 if (fr->file) 848 if (fr->file != stdin) 849 closefile(fr->file, fr->fname); 850 851 if(fr->fname) 852 sort_free(fr->fname); 853 854 memset(fr, 0, sizeof(struct file_reader)); 855 } 856 } 857 858 void 859 file_reader_free(struct file_reader *fr) 860 { 861 862 if (fr) { 863 file_reader_clean(fr); 864 sort_free(fr); 865 } 866 } 867 868 int 869 procfile(const char *fsrc, struct sort_list *list, struct file_list *fl) 870 { 871 struct file_reader *fr; 872 873 fr = file_reader_init(fsrc); 874 if (fr == NULL) 875 err(2, NULL); 876 877 /* file browse cycle */ 878 for (;;) { 879 struct bwstring *bws; 880 881 bws = file_reader_readline(fr); 882 883 if (bws == NULL) 884 break; 885 886 sort_list_add(list, bws); 887 888 if (list->memsize >= available_free_memory) { 889 char *fn; 890 891 fn = new_tmp_file_name(); 892 sort_list_to_file(list, fn); 893 file_list_add(fl, fn, false); 894 sort_list_clean(list); 895 } 896 } 897 898 file_reader_free(fr); 899 900 return (0); 901 } 902 903 /* 904 * Compare file headers. Files with EOF always go to the end of the list. 905 */ 906 static int 907 file_header_cmp(struct file_header *f1, struct file_header *f2) 908 { 909 910 if (f1 == f2) 911 return (0); 912 else { 913 if (f1->fr == NULL) { 914 return ((f2->fr == NULL) ? 0 : +1); 915 } else if (f2->fr == NULL) 916 return (-1); 917 else { 918 int ret; 919 920 ret = list_coll(&(f1->si), &(f2->si)); 921 if (!ret) 922 return ((f1->file_pos < f2->file_pos) ? -1 : +1); 923 return (ret); 924 } 925 } 926 } 927 928 /* 929 * Allocate and init file header structure 930 */ 931 static void 932 file_header_init(struct file_header **fh, const char *fn, size_t file_pos) 933 { 934 935 if (fh && fn) { 936 struct bwstring *line; 937 938 *fh = sort_malloc(sizeof(struct file_header)); 939 (*fh)->file_pos = file_pos; 940 (*fh)->fr = file_reader_init(fn); 941 if ((*fh)->fr == NULL) { 942 perror(fn); 943 err(2, "%s", getstr(8)); 944 } 945 line = file_reader_readline((*fh)->fr); 946 if (line == NULL) { 947 file_reader_free((*fh)->fr); 948 (*fh)->fr = NULL; 949 (*fh)->si = NULL; 950 } else { 951 (*fh)->si = sort_list_item_alloc(); 952 sort_list_item_set((*fh)->si, line); 953 } 954 } 955 } 956 957 /* 958 * Close file 959 */ 960 static void 961 file_header_close(struct file_header **fh) 962 { 963 964 if (fh && *fh) { 965 if ((*fh)->fr) { 966 file_reader_free((*fh)->fr); 967 (*fh)->fr = NULL; 968 } 969 if ((*fh)->si) { 970 sort_list_item_clean((*fh)->si); 971 sort_free((*fh)->si); 972 (*fh)->si = NULL; 973 } 974 sort_free(*fh); 975 *fh = NULL; 976 } 977 } 978 979 /* 980 * Swap two array elements 981 */ 982 static void 983 file_header_swap(struct file_header **fh, size_t i1, size_t i2) 984 { 985 struct file_header *tmp; 986 987 tmp = fh[i1]; 988 fh[i1] = fh[i2]; 989 fh[i2] = tmp; 990 } 991 992 /* heap algorithm ==>> */ 993 994 /* 995 * See heap sort algorithm 996 * "Raises" last element to its right place 997 */ 998 static void 999 file_header_heap_swim(struct file_header **fh, size_t indx) 1000 { 1001 1002 if (indx > 0) { 1003 size_t parent_index; 1004 1005 parent_index = (indx - 1) >> 1; 1006 1007 if (file_header_cmp(fh[indx], fh[parent_index]) < 0) { 1008 /* swap child and parent and continue */ 1009 file_header_swap(fh, indx, parent_index); 1010 file_header_heap_swim(fh, parent_index); 1011 } 1012 } 1013 } 1014 1015 /* 1016 * Sink the top element to its correct position 1017 */ 1018 static void 1019 file_header_heap_sink(struct file_header **fh, size_t indx, size_t size) 1020 { 1021 size_t left_child_index; 1022 size_t right_child_index; 1023 1024 left_child_index = indx + indx + 1; 1025 right_child_index = left_child_index + 1; 1026 1027 if (left_child_index < size) { 1028 size_t min_child_index; 1029 1030 min_child_index = left_child_index; 1031 1032 if ((right_child_index < size) && 1033 (file_header_cmp(fh[left_child_index], 1034 fh[right_child_index]) > 0)) 1035 min_child_index = right_child_index; 1036 if (file_header_cmp(fh[indx], fh[min_child_index]) > 0) { 1037 file_header_swap(fh, indx, min_child_index); 1038 file_header_heap_sink(fh, min_child_index, size); 1039 } 1040 } 1041 } 1042 1043 /* <<== heap algorithm */ 1044 1045 /* 1046 * Adds element to the "left" end 1047 */ 1048 static void 1049 file_header_list_rearrange_from_header(struct file_header **fh, size_t size) 1050 { 1051 1052 file_header_heap_sink(fh, 0, size); 1053 } 1054 1055 /* 1056 * Adds element to the "right" end 1057 */ 1058 static void 1059 file_header_list_push(struct file_header *f, struct file_header **fh, size_t size) 1060 { 1061 1062 fh[size++] = f; 1063 file_header_heap_swim(fh, size - 1); 1064 } 1065 1066 struct last_printed 1067 { 1068 struct bwstring *str; 1069 }; 1070 1071 /* 1072 * Prints the current line of the file 1073 */ 1074 static void 1075 file_header_print(struct file_header *fh, FILE *f_out, struct last_printed *lp) 1076 { 1077 1078 if (fh && fh->fr && f_out && fh->si && fh->si->str) { 1079 if (sort_opts_vals.uflag) { 1080 if ((lp->str == NULL) || (str_list_coll(lp->str, &(fh->si)))) { 1081 bwsfwrite(fh->si->str, f_out, sort_opts_vals.zflag); 1082 if (lp->str) 1083 bwsfree(lp->str); 1084 lp->str = bwsdup(fh->si->str); 1085 } 1086 } else 1087 bwsfwrite(fh->si->str, f_out, sort_opts_vals.zflag); 1088 } 1089 } 1090 1091 /* 1092 * Read next line 1093 */ 1094 static void 1095 file_header_read_next(struct file_header *fh) 1096 { 1097 1098 if (fh && fh->fr) { 1099 struct bwstring *tmp; 1100 1101 tmp = file_reader_readline(fh->fr); 1102 if (tmp == NULL) { 1103 file_reader_free(fh->fr); 1104 fh->fr = NULL; 1105 if (fh->si) { 1106 sort_list_item_clean(fh->si); 1107 sort_free(fh->si); 1108 fh->si = NULL; 1109 } 1110 } else { 1111 if (fh->si == NULL) 1112 fh->si = sort_list_item_alloc(); 1113 sort_list_item_set(fh->si, tmp); 1114 } 1115 } 1116 } 1117 1118 /* 1119 * Merge array of "files headers" 1120 */ 1121 static void 1122 file_headers_merge(size_t fnum, struct file_header **fh, FILE *f_out) 1123 { 1124 struct last_printed lp; 1125 size_t i; 1126 1127 memset(&lp, 0, sizeof(lp)); 1128 1129 /* 1130 * construct the initial sort structure 1131 */ 1132 for (i = 0; i < fnum; i++) 1133 file_header_list_push(fh[i], fh, i); 1134 1135 while (fh[0]->fr) { /* unfinished files are always in front */ 1136 /* output the smallest line: */ 1137 file_header_print(fh[0], f_out, &lp); 1138 /* read a new line, if possible: */ 1139 file_header_read_next(fh[0]); 1140 /* re-arrange the list: */ 1141 file_header_list_rearrange_from_header(fh, fnum); 1142 } 1143 1144 if (lp.str) 1145 bwsfree(lp.str); 1146 } 1147 1148 /* 1149 * Merges the given files into the output file, which can be 1150 * stdout. 1151 */ 1152 static void 1153 merge_files_array(size_t argc, char **argv, const char *fn_out) 1154 { 1155 1156 if (argv && fn_out) { 1157 struct file_header **fh; 1158 FILE *f_out; 1159 size_t i; 1160 1161 f_out = openfile(fn_out, "w"); 1162 1163 if (f_out == NULL) 1164 err(2, NULL); 1165 1166 fh = sort_malloc((argc + 1) * sizeof(struct file_header *)); 1167 1168 for (i = 0; i < argc; i++) 1169 file_header_init(fh + i, argv[i], (size_t) i); 1170 1171 file_headers_merge(argc, fh, f_out); 1172 1173 for (i = 0; i < argc; i++) 1174 file_header_close(fh + i); 1175 1176 sort_free(fh); 1177 1178 closefile(f_out, fn_out); 1179 } 1180 } 1181 1182 /* 1183 * Shrinks the file list until its size smaller than max number of opened files 1184 */ 1185 static int 1186 shrink_file_list(struct file_list *fl) 1187 { 1188 1189 if ((fl == NULL) || (size_t) (fl->count) < max_open_files) 1190 return (0); 1191 else { 1192 struct file_list new_fl; 1193 size_t indx = 0; 1194 1195 file_list_init(&new_fl, true); 1196 while (indx < fl->count) { 1197 char *fnew; 1198 size_t num; 1199 1200 num = fl->count - indx; 1201 fnew = new_tmp_file_name(); 1202 1203 if ((size_t) num >= max_open_files) 1204 num = max_open_files - 1; 1205 merge_files_array(num, fl->fns + indx, fnew); 1206 if (fl->tmp) { 1207 size_t i; 1208 1209 for (i = 0; i < num; i++) 1210 unlink(fl->fns[indx + i]); 1211 } 1212 file_list_add(&new_fl, fnew, false); 1213 indx += num; 1214 } 1215 fl->tmp = false; /* already taken care of */ 1216 file_list_clean(fl); 1217 1218 fl->count = new_fl.count; 1219 fl->fns = new_fl.fns; 1220 fl->sz = new_fl.sz; 1221 fl->tmp = new_fl.tmp; 1222 1223 return (1); 1224 } 1225 } 1226 1227 /* 1228 * Merge list of files 1229 */ 1230 void 1231 merge_files(struct file_list *fl, const char *fn_out) 1232 { 1233 1234 if (fl && fn_out) { 1235 while (shrink_file_list(fl)); 1236 1237 merge_files_array(fl->count, fl->fns, fn_out); 1238 } 1239 } 1240 1241 static const char * 1242 get_sort_method_name(int sm) 1243 { 1244 1245 if (sm == SORT_MERGESORT) 1246 return "mergesort"; 1247 else if (sort_opts_vals.sort_method == SORT_RADIXSORT) 1248 return "radixsort"; 1249 else if (sort_opts_vals.sort_method == SORT_HEAPSORT) 1250 return "heapsort"; 1251 else 1252 return "quicksort"; 1253 } 1254 1255 /* 1256 * Wrapper for qsort 1257 */ 1258 static int sort_qsort(void *list, size_t count, size_t elem_size, 1259 int (*cmp_func)(const void *, const void *)) 1260 { 1261 1262 qsort(list, count, elem_size, cmp_func); 1263 return (0); 1264 } 1265 1266 /* 1267 * Sort list of lines and writes it to the file 1268 */ 1269 void 1270 sort_list_to_file(struct sort_list *list, const char *outfile) 1271 { 1272 struct sort_mods *sm = &(keys[0].sm); 1273 1274 if (!(sm->Mflag) && !(sm->Rflag) && !(sm->Vflag) && !(sm->Vflag) && 1275 !(sm->gflag) && !(sm->hflag) && !(sm->nflag)) { 1276 if ((sort_opts_vals.sort_method == SORT_DEFAULT) && byte_sort) 1277 sort_opts_vals.sort_method = SORT_RADIXSORT; 1278 1279 } else if (sort_opts_vals.sort_method == SORT_RADIXSORT) 1280 err(2, "%s", getstr(9)); 1281 1282 /* 1283 * to handle stable sort and the unique cases in the 1284 * right order, we need stable basic algorithm 1285 */ 1286 if (sort_opts_vals.sflag) { 1287 switch (sort_opts_vals.sort_method){ 1288 case SORT_MERGESORT: 1289 break; 1290 case SORT_RADIXSORT: 1291 break; 1292 case SORT_DEFAULT: 1293 sort_opts_vals.sort_method = SORT_MERGESORT; 1294 break; 1295 default: 1296 errx(2, "%s", getstr(10)); 1297 }; 1298 } 1299 1300 if (sort_opts_vals.sort_method == SORT_DEFAULT) 1301 sort_opts_vals.sort_method = DEFAULT_SORT_ALGORITHM; 1302 1303 if (debug_sort) 1304 printf("sort_method=%s\n", 1305 get_sort_method_name(sort_opts_vals.sort_method)); 1306 1307 switch (sort_opts_vals.sort_method){ 1308 case SORT_RADIXSORT: 1309 rxsort(list->list, list->count); 1310 sort_list_dump(list, outfile); 1311 break; 1312 case SORT_MERGESORT: 1313 mt_sort(list, mergesort, outfile); 1314 break; 1315 case SORT_HEAPSORT: 1316 mt_sort(list, heapsort, outfile); 1317 break; 1318 case SORT_QSORT: 1319 mt_sort(list, sort_qsort, outfile); 1320 break; 1321 default: 1322 mt_sort(list, DEFAULT_SORT_FUNC, outfile); 1323 break; 1324 } 1325 } 1326 1327 /******************* MT SORT ************************/ 1328 1329 #if defined(SORT_THREADS) 1330 /* semaphore to count threads */ 1331 static sem_t mtsem; 1332 1333 /* current system sort function */ 1334 static int (*g_sort_func)(void *, size_t, size_t, 1335 int(*)(const void *, const void *)); 1336 1337 /* 1338 * Sort cycle thread (in multi-threaded mode) 1339 */ 1340 static void* 1341 mt_sort_thread(void* arg) 1342 { 1343 struct sort_list *list = arg; 1344 1345 g_sort_func(list->list, list->count, sizeof(struct sort_list_item *), 1346 (int(*)(const void *, const void *)) list_coll); 1347 1348 sem_post(&mtsem); 1349 1350 return (arg); 1351 } 1352 1353 /* 1354 * Compare sub-lists. Empty sub-lists always go to the end of the list. 1355 */ 1356 static int 1357 sub_list_cmp(struct sort_list *l1, struct sort_list *l2) 1358 { 1359 1360 if (l1 == l2) 1361 return (0); 1362 else { 1363 if (l1->count == 0) { 1364 return ((l2->count == 0) ? 0 : +1); 1365 } else if (l2->count == 0) { 1366 return (-1); 1367 } else { 1368 int ret; 1369 1370 ret = list_coll(&(l1->list[0]), &(l2->list[0])); 1371 if (!ret) 1372 return ((l1->sub_list_pos < l2->sub_list_pos) ? 1373 -1 : +1); 1374 return (ret); 1375 } 1376 } 1377 } 1378 1379 /* 1380 * Swap two array elements 1381 */ 1382 static void 1383 sub_list_swap(struct sort_list **sl, size_t i1, size_t i2) 1384 { 1385 struct sort_list *tmp; 1386 1387 tmp = sl[i1]; 1388 sl[i1] = sl[i2]; 1389 sl[i2] = tmp; 1390 } 1391 1392 /* heap algorithm ==>> */ 1393 1394 /* 1395 * See heap sort algorithm 1396 * "Raises" last element to its right place 1397 */ 1398 static void 1399 sub_list_swim(struct sort_list **sl, size_t indx) 1400 { 1401 1402 if (indx > 0) { 1403 size_t parent_index; 1404 1405 parent_index = (indx - 1) >> 1; 1406 1407 if (sub_list_cmp(sl[indx], sl[parent_index]) < 0) { 1408 /* swap child and parent and continue */ 1409 sub_list_swap(sl, indx, parent_index); 1410 sub_list_swim(sl, parent_index); 1411 } 1412 } 1413 } 1414 1415 /* 1416 * Sink the top element to its correct position 1417 */ 1418 static void 1419 sub_list_sink(struct sort_list **sl, size_t indx, size_t size) 1420 { 1421 size_t left_child_index; 1422 size_t right_child_index; 1423 1424 left_child_index = indx + indx + 1; 1425 right_child_index = left_child_index + 1; 1426 1427 if (left_child_index < size) { 1428 size_t min_child_index; 1429 1430 min_child_index = left_child_index; 1431 1432 if ((right_child_index < size) && 1433 (sub_list_cmp(sl[left_child_index], 1434 sl[right_child_index]) > 0)) 1435 min_child_index = right_child_index; 1436 if (sub_list_cmp(sl[indx], sl[min_child_index]) > 0) { 1437 sub_list_swap(sl, indx, min_child_index); 1438 sub_list_sink(sl, min_child_index, size); 1439 } 1440 } 1441 } 1442 1443 /* <<== heap algorithm */ 1444 1445 /* 1446 * Adds element to the "right" end 1447 */ 1448 static void 1449 sub_list_push(struct sort_list *s, struct sort_list **sl, size_t size) 1450 { 1451 1452 sl[size++] = s; 1453 sub_list_swim(sl, size - 1); 1454 } 1455 1456 struct last_printed_item 1457 { 1458 struct sort_list_item *item; 1459 }; 1460 1461 /* 1462 * Prints the current line of the file 1463 */ 1464 static void 1465 sub_list_header_print(struct sort_list *sl, FILE *f_out, 1466 struct last_printed_item *lp) 1467 { 1468 1469 if (sl && sl->count && f_out && sl->list[0]->str) { 1470 if (sort_opts_vals.uflag) { 1471 if ((lp->item == NULL) || (list_coll(&(lp->item), 1472 &(sl->list[0])))) { 1473 bwsfwrite(sl->list[0]->str, f_out, 1474 sort_opts_vals.zflag); 1475 lp->item = sl->list[0]; 1476 } 1477 } else 1478 bwsfwrite(sl->list[0]->str, f_out, 1479 sort_opts_vals.zflag); 1480 } 1481 } 1482 1483 /* 1484 * Read next line 1485 */ 1486 static void 1487 sub_list_next(struct sort_list *sl) 1488 { 1489 1490 if (sl && sl->count) { 1491 sl->list += 1; 1492 sl->count -= 1; 1493 } 1494 } 1495 1496 /* 1497 * Merge sub-lists to a file 1498 */ 1499 static void 1500 merge_sub_lists(struct sort_list **sl, size_t n, FILE* f_out) 1501 { 1502 struct last_printed_item lp; 1503 size_t i; 1504 1505 memset(&lp,0,sizeof(lp)); 1506 1507 /* construct the initial list: */ 1508 for (i = 0; i < n; i++) 1509 sub_list_push(sl[i], sl, i); 1510 1511 while (sl[0]->count) { /* unfinished lists are always in front */ 1512 /* output the smallest line: */ 1513 sub_list_header_print(sl[0], f_out, &lp); 1514 /* move to a new line, if possible: */ 1515 sub_list_next(sl[0]); 1516 /* re-arrange the list: */ 1517 sub_list_sink(sl, 0, n); 1518 } 1519 } 1520 1521 /* 1522 * Merge sub-lists to a file 1523 */ 1524 static void 1525 merge_list_parts(struct sort_list **parts, size_t n, const char *fn) 1526 { 1527 FILE* f_out; 1528 1529 f_out = openfile(fn,"w"); 1530 1531 merge_sub_lists(parts, n, f_out); 1532 1533 closefile(f_out, fn); 1534 } 1535 1536 #endif /* defined(SORT_THREADS) */ 1537 /* 1538 * Multi-threaded sort algorithm "driver" 1539 */ 1540 static void 1541 mt_sort(struct sort_list *list, 1542 int(*sort_func)(void *, size_t, size_t, int(*)(const void *, const void *)), 1543 const char* fn) 1544 { 1545 #if defined(SORT_THREADS) 1546 if (nthreads < 2 || list->count < MT_SORT_THRESHOLD) { 1547 size_t nthreads_save = nthreads; 1548 nthreads = 1; 1549 #endif 1550 /* if single thread or small data, do simple sort */ 1551 sort_func(list->list, list->count, 1552 sizeof(struct sort_list_item *), 1553 (int(*)(const void *, const void *)) list_coll); 1554 sort_list_dump(list, fn); 1555 #if defined(SORT_THREADS) 1556 nthreads = nthreads_save; 1557 } else { 1558 /* multi-threaded sort */ 1559 struct sort_list **parts; 1560 size_t avgsize, cstart, i; 1561 1562 /* array of sub-lists */ 1563 parts = sort_malloc(sizeof(struct sort_list*) * nthreads); 1564 cstart = 0; 1565 avgsize = list->count / nthreads; 1566 1567 /* set global system sort function */ 1568 g_sort_func = sort_func; 1569 1570 /* set sublists */ 1571 for (i = 0; i < nthreads; ++i) { 1572 size_t sz = 0; 1573 1574 parts[i] = sort_malloc(sizeof(struct sort_list)); 1575 parts[i]->list = list->list + cstart; 1576 parts[i]->memsize = 0; 1577 parts[i]->sub_list_pos = i; 1578 1579 sz = (i == nthreads - 1) ? list->count - cstart : 1580 avgsize; 1581 1582 parts[i]->count = sz; 1583 1584 parts[i]->size = parts[i]->count; 1585 1586 cstart += sz; 1587 } 1588 1589 /* init threads counting semaphore */ 1590 sem_init(&mtsem, 0, 0); 1591 1592 /* start threads */ 1593 for (i = 0; i < nthreads; ++i) { 1594 pthread_t pth; 1595 pthread_attr_t attr; 1596 1597 pthread_attr_init(&attr); 1598 pthread_attr_setdetachstate(&attr, PTHREAD_DETACHED); 1599 1600 for (;;) { 1601 int res = pthread_create(&pth, &attr, 1602 mt_sort_thread, parts[i]); 1603 1604 if (res >= 0) 1605 break; 1606 if (errno == EAGAIN) { 1607 pthread_yield(); 1608 continue; 1609 } 1610 err(2, NULL); 1611 } 1612 1613 pthread_attr_destroy(&attr); 1614 } 1615 1616 /* wait for threads completion */ 1617 for (i = 0; i < nthreads; ++i) { 1618 sem_wait(&mtsem); 1619 } 1620 /* destroy the semaphore - we do not need it anymore */ 1621 sem_destroy(&mtsem); 1622 1623 /* merge sorted sub-lists to the file */ 1624 merge_list_parts(parts, nthreads, fn); 1625 1626 /* free sub-lists data */ 1627 for (i = 0; i < nthreads; ++i) { 1628 sort_free(parts[i]); 1629 } 1630 sort_free(parts); 1631 } 1632 #endif /* defined(SORT_THREADS) */ 1633 } 1634