1 /*- 2 * Copyright (C) 2009 Gabor Kovesdan <gabor@FreeBSD.org> 3 * Copyright (C) 2012 Oleg Moskalenko <oleg.moskalenko@citrix.com> 4 * All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 1. Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 16 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 18 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 19 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 20 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 21 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 22 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 23 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 24 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 25 * SUCH DAMAGE. 26 */ 27 28 #include <sys/cdefs.h> 29 __FBSDID("$FreeBSD$"); 30 31 #include <sys/mman.h> 32 #include <sys/stat.h> 33 #include <sys/types.h> 34 #include <sys/queue.h> 35 36 #include <err.h> 37 #include <fcntl.h> 38 #if defined(SORT_THREADS) 39 #include <pthread.h> 40 #endif 41 #include <semaphore.h> 42 #include <stdio.h> 43 #include <stdlib.h> 44 #include <string.h> 45 #include <unistd.h> 46 #include <wchar.h> 47 #include <wctype.h> 48 49 #include "coll.h" 50 #include "file.h" 51 #include "radixsort.h" 52 53 unsigned long long free_memory = 1000000; 54 unsigned long long available_free_memory = 1000000; 55 56 bool use_mmap; 57 58 const char *tmpdir = "/var/tmp"; 59 const char *compress_program; 60 61 size_t max_open_files = 16; 62 63 /* 64 * How much space we read from file at once 65 */ 66 #define READ_CHUNK (4096) 67 68 /* 69 * File reader structure 70 */ 71 struct file_reader 72 { 73 struct reader_buffer rb; 74 FILE *file; 75 char *fname; 76 unsigned char *buffer; 77 unsigned char *mmapaddr; 78 unsigned char *mmapptr; 79 size_t bsz; 80 size_t cbsz; 81 size_t mmapsize; 82 size_t strbeg; 83 int fd; 84 char elsymb; 85 }; 86 87 /* 88 * Structure to be used in file merge process. 89 */ 90 struct file_header 91 { 92 struct file_reader *fr; 93 struct sort_list_item *si; /* current top line */ 94 size_t file_pos; 95 }; 96 97 /* 98 * List elements of "cleanable" files list. 99 */ 100 struct CLEANABLE_FILE 101 { 102 char *fn; 103 LIST_ENTRY(CLEANABLE_FILE) files; 104 }; 105 106 /* 107 * List header of "cleanable" files list. 108 */ 109 static LIST_HEAD(CLEANABLE_FILES,CLEANABLE_FILE) tmp_files; 110 111 /* 112 * Semaphore to protect the tmp file list. 113 * We use semaphore here because it is signal-safe, according to POSIX. 114 * And semaphore does not require pthread library. 115 */ 116 static sem_t tmp_files_sem; 117 118 static void mt_sort(struct sort_list *list, 119 int (*sort_func)(void *, size_t, size_t, 120 int (*)(const void *, const void *)), const char* fn); 121 122 /* 123 * Init tmp files list 124 */ 125 void 126 init_tmp_files(void) 127 { 128 129 LIST_INIT(&tmp_files); 130 sem_init(&tmp_files_sem, 0, 1); 131 } 132 133 /* 134 * Save name of a tmp file for signal cleanup 135 */ 136 void 137 tmp_file_atexit(const char *tmp_file) 138 { 139 140 if (tmp_file) { 141 sem_wait(&tmp_files_sem); 142 struct CLEANABLE_FILE *item = 143 sort_malloc(sizeof(struct CLEANABLE_FILE)); 144 item->fn = sort_strdup(tmp_file); 145 LIST_INSERT_HEAD(&tmp_files, item, files); 146 sem_post(&tmp_files_sem); 147 } 148 } 149 150 /* 151 * Clear tmp files 152 */ 153 void 154 clear_tmp_files(void) 155 { 156 struct CLEANABLE_FILE *item; 157 158 sem_wait(&tmp_files_sem); 159 LIST_FOREACH(item,&tmp_files,files) { 160 if ((item) && (item->fn)) 161 unlink(item->fn); 162 } 163 sem_post(&tmp_files_sem); 164 } 165 166 /* 167 * Check whether a file is a temporary file 168 */ 169 static bool 170 file_is_tmp(const char* fn) 171 { 172 struct CLEANABLE_FILE *item; 173 bool ret = false; 174 175 if (fn) { 176 sem_wait(&tmp_files_sem); 177 LIST_FOREACH(item,&tmp_files,files) { 178 if ((item) && (item->fn)) 179 if (strcmp(item->fn, fn) == 0) { 180 ret = true; 181 break; 182 } 183 } 184 sem_post(&tmp_files_sem); 185 } 186 187 return (ret); 188 } 189 190 /* 191 * Read zero-terminated line from a file 192 */ 193 char * 194 read_file0_line(struct file0_reader *f0r) 195 { 196 size_t pos = 0; 197 int c; 198 199 if ((f0r->f == NULL) || feof(f0r->f)) 200 return (NULL); 201 202 if (f0r->current_line && f0r->current_sz > 0) 203 f0r->current_line[0] = 0; 204 205 while (!feof(f0r->f)) { 206 c = fgetc(f0r->f); 207 if (feof(f0r->f) || (c == -1)) 208 break; 209 if ((pos + 1) >= f0r->current_sz) { 210 size_t newsz = (f0r->current_sz + 2) * 2; 211 f0r->current_line = sort_realloc(f0r->current_line, 212 newsz); 213 f0r->current_sz = newsz; 214 } 215 f0r->current_line[pos] = (char)c; 216 if (c == 0) 217 break; 218 else 219 f0r->current_line[pos + 1] = 0; 220 ++pos; 221 } 222 223 return f0r->current_line; 224 } 225 226 /* 227 * Generate new temporary file name 228 */ 229 char * 230 new_tmp_file_name(void) 231 { 232 static unsigned int tfcounter = 0; 233 static const char *fn = ".bsdsort."; 234 char *ret; 235 size_t sz; 236 237 sz = strlen(tmpdir) + 1 + strlen(fn) + 32 + 1; 238 ret = sort_malloc(sz); 239 240 sprintf(ret, "%s/%s%d.%u", tmpdir, fn, (int) getpid(), tfcounter++); 241 tmp_file_atexit(ret); 242 return (ret); 243 } 244 245 /* 246 * Initialize file list 247 */ 248 void 249 file_list_init(struct file_list *fl, bool tmp) 250 { 251 252 if (fl) { 253 fl->count = 0; 254 fl->sz = 0; 255 fl->fns = NULL; 256 fl->tmp = tmp; 257 } 258 } 259 260 /* 261 * Add a file name to the list 262 */ 263 void 264 file_list_add(struct file_list *fl, char *fn, bool allocate) 265 { 266 267 if (fl && fn) { 268 if (fl->count >= fl->sz || (fl->fns == NULL)) { 269 fl->sz = (fl->sz) * 2 + 1; 270 fl->fns = sort_realloc(fl->fns, fl->sz * 271 sizeof(char *)); 272 } 273 fl->fns[fl->count] = allocate ? sort_strdup(fn) : fn; 274 fl->count += 1; 275 } 276 } 277 278 /* 279 * Populate file list from array of file names 280 */ 281 void 282 file_list_populate(struct file_list *fl, int argc, char **argv, bool allocate) 283 { 284 285 if (fl && argv) { 286 int i; 287 288 for (i = 0; i < argc; i++) 289 file_list_add(fl, argv[i], allocate); 290 } 291 } 292 293 /* 294 * Clean file list data and delete the files, 295 * if this is a list of temporary files 296 */ 297 void 298 file_list_clean(struct file_list *fl) 299 { 300 301 if (fl) { 302 if (fl->fns) { 303 int i; 304 305 for (i = 0; i < fl->count; i++) { 306 if (fl->fns[i]) { 307 if (fl->tmp) 308 unlink(fl->fns[i]); 309 sort_free(fl->fns[i]); 310 fl->fns[i] = 0; 311 } 312 } 313 sort_free(fl->fns); 314 fl->fns = NULL; 315 } 316 fl->sz = 0; 317 fl->count = 0; 318 fl->tmp = false; 319 } 320 } 321 322 /* 323 * Init sort list 324 */ 325 void 326 sort_list_init(struct sort_list *l) 327 { 328 329 if (l) { 330 l->count = 0; 331 l->size = 0; 332 l->memsize = sizeof(struct sort_list); 333 l->list = NULL; 334 } 335 } 336 337 /* 338 * Add string to sort list 339 */ 340 void 341 sort_list_add(struct sort_list *l, struct bwstring *str) 342 { 343 344 if (l && str) { 345 size_t indx = l->count; 346 347 if ((l->list == NULL) || (indx >= l->size)) { 348 int newsize = (l->size + 1) + 1024; 349 350 l->list = sort_realloc(l->list, 351 sizeof(struct sort_list_item*) * newsize); 352 l->memsize += (newsize - l->size) * 353 sizeof(struct sort_list_item*); 354 l->size = newsize; 355 } 356 l->list[indx] = sort_list_item_alloc(); 357 sort_list_item_set(l->list[indx], str); 358 l->memsize += sort_list_item_size(l->list[indx]); 359 l->count += 1; 360 } 361 } 362 363 /* 364 * Clean sort list data 365 */ 366 void 367 sort_list_clean(struct sort_list *l) 368 { 369 370 if (l) { 371 if (l->list) { 372 size_t i; 373 374 for (i = 0; i < l->count; i++) { 375 struct sort_list_item *item; 376 377 item = l->list[i]; 378 379 if (item) { 380 sort_list_item_clean(item); 381 sort_free(item); 382 l->list[i] = NULL; 383 } 384 } 385 sort_free(l->list); 386 l->list = NULL; 387 } 388 l->count = 0; 389 l->size = 0; 390 l->memsize = sizeof(struct sort_list); 391 } 392 } 393 394 /* 395 * Write sort list to file 396 */ 397 void 398 sort_list_dump(struct sort_list *l, const char *fn) 399 { 400 401 if (l && fn) { 402 FILE *f; 403 404 f = openfile(fn, "w"); 405 if (f == NULL) 406 err(2, NULL); 407 408 if (l->list) { 409 size_t i; 410 if (!(sort_opts_vals.uflag)) { 411 for (i = 0; i < l->count; ++i) 412 bwsfwrite(l->list[i]->str, f, 413 sort_opts_vals.zflag); 414 } else { 415 struct sort_list_item *last_printed_item = NULL; 416 struct sort_list_item *item; 417 for (i = 0; i < l->count; ++i) { 418 item = l->list[i]; 419 if ((last_printed_item == NULL) || 420 list_coll(&last_printed_item, &item)) { 421 bwsfwrite(item->str, f, sort_opts_vals.zflag); 422 last_printed_item = item; 423 } 424 } 425 } 426 } 427 428 closefile(f, fn); 429 } 430 } 431 432 /* 433 * Checks if the given file is sorted. Stops at the first disorder, 434 * prints the disordered line and returns 1. 435 */ 436 int 437 check(const char *fn) 438 { 439 struct bwstring *s1, *s2, *s1disorder, *s2disorder; 440 struct file_reader *fr; 441 struct keys_array *ka1, *ka2; 442 int res, pos, posdisorder; 443 444 s1 = s2 = s1disorder = s2disorder = NULL; 445 ka1 = ka2 = NULL; 446 447 fr = file_reader_init(fn); 448 449 res = 0; 450 pos = 1; 451 posdisorder = 1; 452 453 if (fr == NULL) { 454 err(2, NULL); 455 goto end; 456 } 457 458 s1 = file_reader_readline(fr); 459 if (s1 == NULL) 460 goto end; 461 462 ka1 = keys_array_alloc(); 463 preproc(s1, ka1); 464 465 s2 = file_reader_readline(fr); 466 if (s2 == NULL) 467 goto end; 468 469 ka2 = keys_array_alloc(); 470 preproc(s2, ka2); 471 472 for (;;) { 473 474 if (debug_sort) { 475 bwsprintf(stdout, s2, "s1=<", ">"); 476 bwsprintf(stdout, s1, "s2=<", ">"); 477 } 478 int cmp = key_coll(ka2, ka1, 0); 479 if (debug_sort) 480 printf("; cmp1=%d", cmp); 481 482 if (!cmp && sort_opts_vals.complex_sort && 483 !(sort_opts_vals.uflag) && !(sort_opts_vals.sflag)) { 484 cmp = top_level_str_coll(s2, s1); 485 if (debug_sort) 486 printf("; cmp2=%d", cmp); 487 } 488 if (debug_sort) 489 printf("\n"); 490 491 if ((sort_opts_vals.uflag && (cmp <= 0)) || (cmp < 0)) { 492 if (!(sort_opts_vals.csilentflag)) { 493 s2disorder = bwsdup(s2); 494 posdisorder = pos; 495 if (debug_sort) 496 s1disorder = bwsdup(s1); 497 } 498 res = 1; 499 goto end; 500 } 501 502 pos++; 503 504 clean_keys_array(s1, ka1); 505 sort_free(ka1); 506 ka1 = ka2; 507 ka2 = NULL; 508 509 bwsfree(s1); 510 s1 = s2; 511 512 s2 = file_reader_readline(fr); 513 if (s2 == NULL) 514 goto end; 515 516 ka2 = keys_array_alloc(); 517 preproc(s2, ka2); 518 } 519 520 end: 521 if (ka1) { 522 clean_keys_array(s1, ka1); 523 sort_free(ka1); 524 } 525 526 if (s1) 527 bwsfree(s1); 528 529 if (ka2) { 530 clean_keys_array(s2, ka2); 531 sort_free(ka2); 532 } 533 534 if (s2) 535 bwsfree(s2); 536 537 if ((fn == NULL) || (*fn == 0) || (strcmp(fn, "-") == 0)) { 538 for (;;) { 539 s2 = file_reader_readline(fr); 540 if (s2 == NULL) 541 break; 542 bwsfree(s2); 543 } 544 } 545 546 file_reader_free(fr); 547 548 if (s2disorder) { 549 bws_disorder_warnx(s2disorder, fn, posdisorder); 550 if (s1disorder) { 551 bws_disorder_warnx(s1disorder, fn, posdisorder); 552 if (s1disorder != s2disorder) 553 bwsfree(s1disorder); 554 } 555 bwsfree(s2disorder); 556 s1disorder = NULL; 557 s2disorder = NULL; 558 } 559 560 if (res) 561 exit(res); 562 563 return (0); 564 } 565 566 /* 567 * Opens a file. If the given filename is "-", stdout will be 568 * opened. 569 */ 570 FILE * 571 openfile(const char *fn, const char *mode) 572 { 573 FILE *file; 574 575 if (strcmp(fn, "-") == 0) { 576 return ((mode && mode[0] == 'r') ? stdin : stdout); 577 } else { 578 mode_t orig_file_mask = 0; 579 int is_tmp = file_is_tmp(fn); 580 581 if (is_tmp && (mode[0] == 'w')) 582 orig_file_mask = umask(S_IWGRP | S_IWOTH | 583 S_IRGRP | S_IROTH); 584 585 if (is_tmp && (compress_program != NULL)) { 586 char *cmd; 587 size_t cmdsz; 588 589 cmdsz = strlen(fn) + 128; 590 cmd = sort_malloc(cmdsz); 591 592 fflush(stdout); 593 594 if (mode[0] == 'r') 595 snprintf(cmd, cmdsz - 1, "cat %s | %s -d", 596 fn, compress_program); 597 else if (mode[0] == 'w') 598 snprintf(cmd, cmdsz - 1, "%s > %s", 599 compress_program, fn); 600 else 601 err(2, "%s", getstr(7)); 602 603 if ((file = popen(cmd, mode)) == NULL) 604 err(2, NULL); 605 606 sort_free(cmd); 607 608 } else 609 if ((file = fopen(fn, mode)) == NULL) 610 err(2, NULL); 611 612 if (is_tmp && (mode[0] == 'w')) 613 umask(orig_file_mask); 614 } 615 616 return (file); 617 } 618 619 /* 620 * Close file 621 */ 622 void 623 closefile(FILE *f, const char *fn) 624 { 625 if (f == NULL) { 626 ; 627 } else if (f == stdin) { 628 ; 629 } else if (f == stdout) { 630 fflush(f); 631 } else { 632 if (file_is_tmp(fn) && compress_program != NULL) { 633 if(pclose(f)<0) 634 err(2,NULL); 635 } else 636 fclose(f); 637 } 638 } 639 640 /* 641 * Reads a file into the internal buffer. 642 */ 643 struct file_reader * 644 file_reader_init(const char *fsrc) 645 { 646 struct file_reader *ret; 647 648 if (fsrc == NULL) 649 fsrc = "-"; 650 651 ret = sort_malloc(sizeof(struct file_reader)); 652 memset(ret, 0, sizeof(struct file_reader)); 653 654 ret->elsymb = '\n'; 655 if (sort_opts_vals.zflag) 656 ret->elsymb = 0; 657 658 ret->fname = sort_strdup(fsrc); 659 660 if (strcmp(fsrc, "-") && (compress_program == NULL) && use_mmap) { 661 662 do { 663 struct stat stat_buf; 664 void *addr; 665 size_t sz = 0; 666 int fd, flags; 667 668 flags = MAP_NOCORE | MAP_NOSYNC; 669 addr = MAP_FAILED; 670 671 fd = open(fsrc, O_RDONLY); 672 if (fd < 0) 673 err(2, NULL); 674 675 if (fstat(fd, &stat_buf) < 0) { 676 close(fd); 677 break; 678 } 679 680 sz = stat_buf.st_size; 681 682 #if defined(MAP_PREFAULT_READ) 683 flags |= MAP_PREFAULT_READ; 684 #endif 685 686 addr = mmap(NULL, sz, PROT_READ, flags, fd, 0); 687 if (addr == MAP_FAILED) { 688 close(fd); 689 break; 690 } 691 692 ret->fd = fd; 693 ret->mmapaddr = addr; 694 ret->mmapsize = sz; 695 ret->mmapptr = ret->mmapaddr; 696 697 } while (0); 698 } 699 700 if (ret->mmapaddr == NULL) { 701 ret->file = openfile(fsrc, "r"); 702 if (ret->file == NULL) 703 err(2, NULL); 704 705 if (strcmp(fsrc, "-")) { 706 ret->cbsz = READ_CHUNK; 707 ret->buffer = sort_malloc(ret->cbsz); 708 ret->bsz = 0; 709 ret->strbeg = 0; 710 711 ret->bsz = fread(ret->buffer, 1, ret->cbsz, ret->file); 712 if (ret->bsz == 0) { 713 if (ferror(ret->file)) 714 err(2, NULL); 715 } 716 } 717 } 718 719 return (ret); 720 } 721 722 struct bwstring * 723 file_reader_readline(struct file_reader *fr) 724 { 725 struct bwstring *ret = NULL; 726 727 if (fr->mmapaddr) { 728 unsigned char *mmapend; 729 730 mmapend = fr->mmapaddr + fr->mmapsize; 731 if (fr->mmapptr >= mmapend) 732 return (NULL); 733 else { 734 unsigned char *strend; 735 size_t sz; 736 737 sz = mmapend - fr->mmapptr; 738 strend = memchr(fr->mmapptr, fr->elsymb, sz); 739 740 if (strend == NULL) { 741 ret = bwscsbdup(fr->mmapptr, sz); 742 fr->mmapptr = mmapend; 743 } else { 744 ret = bwscsbdup(fr->mmapptr, strend - 745 fr->mmapptr); 746 fr->mmapptr = strend + 1; 747 } 748 } 749 750 } else if (fr->file != stdin) { 751 unsigned char *strend; 752 size_t bsz1, remsz, search_start; 753 754 search_start = 0; 755 remsz = 0; 756 strend = NULL; 757 758 if (fr->bsz > fr->strbeg) 759 remsz = fr->bsz - fr->strbeg; 760 761 /* line read cycle */ 762 for (;;) { 763 if (remsz > search_start) 764 strend = memchr(fr->buffer + fr->strbeg + 765 search_start, fr->elsymb, remsz - 766 search_start); 767 else 768 strend = NULL; 769 770 if (strend) 771 break; 772 if (feof(fr->file)) 773 break; 774 775 if (fr->bsz != fr->cbsz) 776 /* NOTREACHED */ 777 err(2, "File read software error 1"); 778 779 if (remsz > (READ_CHUNK >> 1)) { 780 search_start = fr->cbsz - fr->strbeg; 781 fr->cbsz += READ_CHUNK; 782 fr->buffer = sort_realloc(fr->buffer, 783 fr->cbsz); 784 bsz1 = fread(fr->buffer + fr->bsz, 1, 785 READ_CHUNK, fr->file); 786 if (bsz1 == 0) { 787 if (ferror(fr->file)) 788 err(2, NULL); 789 break; 790 } 791 fr->bsz += bsz1; 792 remsz += bsz1; 793 } else { 794 if (remsz > 0 && fr->strbeg>0) 795 bcopy(fr->buffer + fr->strbeg, 796 fr->buffer, remsz); 797 798 fr->strbeg = 0; 799 search_start = remsz; 800 bsz1 = fread(fr->buffer + remsz, 1, 801 fr->cbsz - remsz, fr->file); 802 if (bsz1 == 0) { 803 if (ferror(fr->file)) 804 err(2, NULL); 805 break; 806 } 807 fr->bsz = remsz + bsz1; 808 remsz = fr->bsz; 809 } 810 } 811 812 if (strend == NULL) 813 strend = fr->buffer + fr->bsz; 814 815 if ((fr->buffer + fr->strbeg <= strend) && 816 (fr->strbeg < fr->bsz) && (remsz>0)) 817 ret = bwscsbdup(fr->buffer + fr->strbeg, strend - 818 fr->buffer - fr->strbeg); 819 820 fr->strbeg = (strend - fr->buffer) + 1; 821 822 } else { 823 size_t len = 0; 824 825 ret = bwsfgetln(fr->file, &len, sort_opts_vals.zflag, 826 &(fr->rb)); 827 } 828 829 return (ret); 830 } 831 832 static void 833 file_reader_clean(struct file_reader *fr) 834 { 835 836 if (fr) { 837 if (fr->mmapaddr) 838 munmap(fr->mmapaddr, fr->mmapsize); 839 840 if (fr->fd) 841 close(fr->fd); 842 843 if (fr->buffer) 844 sort_free(fr->buffer); 845 846 if (fr->file) 847 if (fr->file != stdin) 848 closefile(fr->file, fr->fname); 849 850 if(fr->fname) 851 sort_free(fr->fname); 852 853 memset(fr, 0, sizeof(struct file_reader)); 854 } 855 } 856 857 void 858 file_reader_free(struct file_reader *fr) 859 { 860 861 if (fr) { 862 file_reader_clean(fr); 863 sort_free(fr); 864 } 865 } 866 867 int 868 procfile(const char *fsrc, struct sort_list *list, struct file_list *fl) 869 { 870 struct file_reader *fr; 871 872 fr = file_reader_init(fsrc); 873 if (fr == NULL) 874 err(2, NULL); 875 876 /* file browse cycle */ 877 for (;;) { 878 struct bwstring *bws; 879 880 bws = file_reader_readline(fr); 881 882 if (bws == NULL) 883 break; 884 885 sort_list_add(list, bws); 886 887 if (list->memsize >= available_free_memory) { 888 char *fn; 889 890 fn = new_tmp_file_name(); 891 sort_list_to_file(list, fn); 892 file_list_add(fl, fn, false); 893 sort_list_clean(list); 894 } 895 } 896 897 file_reader_free(fr); 898 899 return (0); 900 } 901 902 /* 903 * Compare file headers. Files with EOF always go to the end of the list. 904 */ 905 static int 906 file_header_cmp(struct file_header *f1, struct file_header *f2) 907 { 908 909 if (f1 == f2) 910 return (0); 911 else { 912 if (f1->fr == NULL) { 913 return ((f2->fr == NULL) ? 0 : +1); 914 } else if (f2->fr == NULL) 915 return (-1); 916 else { 917 int ret; 918 919 ret = list_coll(&(f1->si), &(f2->si)); 920 if (!ret) 921 return ((f1->file_pos < f2->file_pos) ? -1 : +1); 922 return (ret); 923 } 924 } 925 } 926 927 /* 928 * Allocate and init file header structure 929 */ 930 static void 931 file_header_init(struct file_header **fh, const char *fn, size_t file_pos) 932 { 933 934 if (fh && fn) { 935 struct bwstring *line; 936 937 *fh = sort_malloc(sizeof(struct file_header)); 938 (*fh)->file_pos = file_pos; 939 (*fh)->fr = file_reader_init(fn); 940 if ((*fh)->fr == NULL) { 941 perror(fn); 942 err(2, "%s", getstr(8)); 943 } 944 line = file_reader_readline((*fh)->fr); 945 if (line == NULL) { 946 file_reader_free((*fh)->fr); 947 (*fh)->fr = NULL; 948 (*fh)->si = NULL; 949 } else { 950 (*fh)->si = sort_list_item_alloc(); 951 sort_list_item_set((*fh)->si, line); 952 } 953 } 954 } 955 956 /* 957 * Close file 958 */ 959 static void 960 file_header_close(struct file_header **fh) 961 { 962 963 if (fh && *fh) { 964 if ((*fh)->fr) { 965 file_reader_free((*fh)->fr); 966 (*fh)->fr = NULL; 967 } 968 if ((*fh)->si) { 969 sort_list_item_clean((*fh)->si); 970 sort_free((*fh)->si); 971 (*fh)->si = NULL; 972 } 973 sort_free(*fh); 974 *fh = NULL; 975 } 976 } 977 978 /* 979 * Swap two array elements 980 */ 981 static void 982 file_header_swap(struct file_header **fh, int i1, int i2) 983 { 984 struct file_header *tmp; 985 986 tmp = fh[i1]; 987 fh[i1] = fh[i2]; 988 fh[i2] = tmp; 989 } 990 991 /* heap algorithm ==>> */ 992 993 /* 994 * See heap sort algorithm 995 * "Raises" last element to its right place 996 */ 997 static void 998 file_header_heap_swim(struct file_header **fh, int indx) 999 { 1000 1001 if (indx > 0) { 1002 int parent_index; 1003 1004 parent_index = (indx - 1) >> 1; 1005 1006 if (file_header_cmp(fh[indx], fh[parent_index]) < 0) { 1007 /* swap child and parent and continue */ 1008 file_header_swap(fh, indx, parent_index); 1009 file_header_heap_swim(fh, parent_index); 1010 } 1011 } 1012 } 1013 1014 /* 1015 * Sink the top element to its correct position 1016 */ 1017 static void 1018 file_header_heap_sink(struct file_header **fh, int indx, int size) 1019 { 1020 int left_child_index; 1021 int right_child_index; 1022 1023 left_child_index = indx + indx + 1; 1024 right_child_index = left_child_index + 1; 1025 1026 if (left_child_index < size) { 1027 int min_child_index; 1028 1029 min_child_index = left_child_index; 1030 1031 if ((right_child_index < size) && 1032 (file_header_cmp(fh[left_child_index], 1033 fh[right_child_index]) > 0)) 1034 min_child_index = right_child_index; 1035 if (file_header_cmp(fh[indx], fh[min_child_index]) > 0) { 1036 file_header_swap(fh, indx, min_child_index); 1037 file_header_heap_sink(fh, min_child_index, size); 1038 } 1039 } 1040 } 1041 1042 /* <<== heap algorithm */ 1043 1044 /* 1045 * Adds element to the "left" end 1046 */ 1047 static void 1048 file_header_list_rearrange_from_header(struct file_header **fh, int size) 1049 { 1050 1051 file_header_heap_sink(fh, 0, size); 1052 } 1053 1054 /* 1055 * Adds element to the "right" end 1056 */ 1057 static void 1058 file_header_list_push(struct file_header *f, struct file_header **fh, int size) 1059 { 1060 1061 fh[size++] = f; 1062 file_header_heap_swim(fh, size - 1); 1063 } 1064 1065 struct last_printed 1066 { 1067 struct bwstring *str; 1068 }; 1069 1070 /* 1071 * Prints the current line of the file 1072 */ 1073 static void 1074 file_header_print(struct file_header *fh, FILE *f_out, struct last_printed *lp) 1075 { 1076 1077 if (fh && fh->fr && f_out && fh->si && fh->si->str) { 1078 if (sort_opts_vals.uflag) { 1079 if ((lp->str == NULL) || (str_list_coll(lp->str, &(fh->si)))) { 1080 bwsfwrite(fh->si->str, f_out, sort_opts_vals.zflag); 1081 if (lp->str) 1082 bwsfree(lp->str); 1083 lp->str = bwsdup(fh->si->str); 1084 } 1085 } else 1086 bwsfwrite(fh->si->str, f_out, sort_opts_vals.zflag); 1087 } 1088 } 1089 1090 /* 1091 * Read next line 1092 */ 1093 static void 1094 file_header_read_next(struct file_header *fh) 1095 { 1096 1097 if (fh && fh->fr) { 1098 struct bwstring *tmp; 1099 1100 tmp = file_reader_readline(fh->fr); 1101 if (tmp == NULL) { 1102 file_reader_free(fh->fr); 1103 fh->fr = NULL; 1104 if (fh->si) { 1105 sort_list_item_clean(fh->si); 1106 sort_free(fh->si); 1107 fh->si = NULL; 1108 } 1109 } else { 1110 if (fh->si == NULL) 1111 fh->si = sort_list_item_alloc(); 1112 sort_list_item_set(fh->si, tmp); 1113 } 1114 } 1115 } 1116 1117 /* 1118 * Merge array of "files headers" 1119 */ 1120 static void 1121 file_headers_merge(int fnum, struct file_header **fh, FILE *f_out) 1122 { 1123 struct last_printed lp; 1124 int i; 1125 1126 memset(&lp, 0, sizeof(lp)); 1127 1128 /* 1129 * construct the initial sort structure 1130 */ 1131 for (i = 0; i < fnum; i++) 1132 file_header_list_push(fh[i], fh, i); 1133 1134 while (fh[0]->fr) { /* unfinished files are always in front */ 1135 /* output the smallest line: */ 1136 file_header_print(fh[0], f_out, &lp); 1137 /* read a new line, if possible: */ 1138 file_header_read_next(fh[0]); 1139 /* re-arrange the list: */ 1140 file_header_list_rearrange_from_header(fh, fnum); 1141 } 1142 1143 if (lp.str) 1144 bwsfree(lp.str); 1145 } 1146 1147 /* 1148 * Merges the given files into the output file, which can be 1149 * stdout. 1150 */ 1151 static void 1152 merge_files_array(int argc, char **argv, const char *fn_out) 1153 { 1154 1155 if (argv && fn_out) { 1156 struct file_header **fh; 1157 FILE *f_out; 1158 int i; 1159 1160 f_out = openfile(fn_out, "w"); 1161 1162 if (f_out == NULL) 1163 err(2, NULL); 1164 1165 fh = sort_malloc((argc + 1) * sizeof(struct file_header *)); 1166 1167 for (i = 0; i < argc; i++) 1168 file_header_init(fh + i, argv[i], (size_t) i); 1169 1170 file_headers_merge(argc, fh, f_out); 1171 1172 for (i = 0; i < argc; i++) 1173 file_header_close(fh + i); 1174 1175 sort_free(fh); 1176 1177 closefile(f_out, fn_out); 1178 } 1179 } 1180 1181 /* 1182 * Shrinks the file list until its size smaller than max number of opened files 1183 */ 1184 static int 1185 shrink_file_list(struct file_list *fl) 1186 { 1187 1188 if ((fl == NULL) || (size_t) (fl->count) < max_open_files) 1189 return (0); 1190 else { 1191 struct file_list new_fl; 1192 int indx = 0; 1193 1194 file_list_init(&new_fl, true); 1195 while (indx < fl->count) { 1196 char *fnew; 1197 int num; 1198 1199 num = fl->count - indx; 1200 fnew = new_tmp_file_name(); 1201 1202 if ((size_t) num >= max_open_files) 1203 num = max_open_files - 1; 1204 merge_files_array(num, fl->fns + indx, fnew); 1205 if (fl->tmp) { 1206 int i; 1207 1208 for (i = 0; i < num; i++) 1209 unlink(fl->fns[indx + i]); 1210 } 1211 file_list_add(&new_fl, fnew, false); 1212 indx += num; 1213 } 1214 fl->tmp = false; /* already taken care of */ 1215 file_list_clean(fl); 1216 1217 fl->count = new_fl.count; 1218 fl->fns = new_fl.fns; 1219 fl->sz = new_fl.sz; 1220 fl->tmp = new_fl.tmp; 1221 1222 return (1); 1223 } 1224 } 1225 1226 /* 1227 * Merge list of files 1228 */ 1229 void 1230 merge_files(struct file_list *fl, const char *fn_out) 1231 { 1232 1233 if (fl && fn_out) { 1234 while (shrink_file_list(fl)); 1235 1236 merge_files_array(fl->count, fl->fns, fn_out); 1237 } 1238 } 1239 1240 static const char * 1241 get_sort_method_name(int sm) 1242 { 1243 1244 if (sm == SORT_MERGESORT) 1245 return "mergesort"; 1246 else if (sort_opts_vals.sort_method == SORT_RADIXSORT) 1247 return "radixsort"; 1248 else if (sort_opts_vals.sort_method == SORT_HEAPSORT) 1249 return "heapsort"; 1250 else 1251 return "quicksort"; 1252 } 1253 1254 /* 1255 * Wrapper for qsort 1256 */ 1257 static int sort_qsort(void *list, size_t count, size_t elem_size, 1258 int (*cmp_func)(const void *, const void *)) 1259 { 1260 1261 qsort(list, count, elem_size, cmp_func); 1262 return (0); 1263 } 1264 1265 /* 1266 * Sort list of lines and writes it to the file 1267 */ 1268 void 1269 sort_list_to_file(struct sort_list *list, const char *outfile) 1270 { 1271 struct sort_mods *sm = &(keys[0].sm); 1272 1273 if (!(sm->Mflag) && !(sm->Rflag) && !(sm->Vflag) && !(sm->Vflag) && 1274 !(sm->gflag) && !(sm->hflag) && !(sm->nflag)) { 1275 if ((sort_opts_vals.sort_method == SORT_DEFAULT) && byte_sort) 1276 sort_opts_vals.sort_method = SORT_RADIXSORT; 1277 1278 } else if (sort_opts_vals.sort_method == SORT_RADIXSORT) 1279 err(2, "%s", getstr(9)); 1280 1281 /* 1282 * to handle stable sort and the unique cases in the 1283 * right order, we need stable basic algorithm 1284 */ 1285 if (sort_opts_vals.sflag) { 1286 switch (sort_opts_vals.sort_method){ 1287 case SORT_MERGESORT: 1288 break; 1289 case SORT_RADIXSORT: 1290 break; 1291 case SORT_DEFAULT: 1292 sort_opts_vals.sort_method = SORT_MERGESORT; 1293 break; 1294 default: 1295 errx(2, "%s", getstr(10)); 1296 }; 1297 } 1298 1299 if (sort_opts_vals.sort_method == SORT_DEFAULT) 1300 sort_opts_vals.sort_method = DEFAULT_SORT_ALGORITHM; 1301 1302 if (debug_sort) 1303 printf("sort_method=%s\n", 1304 get_sort_method_name(sort_opts_vals.sort_method)); 1305 1306 switch (sort_opts_vals.sort_method){ 1307 case SORT_RADIXSORT: 1308 rxsort(list->list, list->count); 1309 sort_list_dump(list, outfile); 1310 break; 1311 case SORT_MERGESORT: 1312 mt_sort(list, mergesort, outfile); 1313 break; 1314 case SORT_HEAPSORT: 1315 mt_sort(list, heapsort, outfile); 1316 break; 1317 case SORT_QSORT: 1318 mt_sort(list, sort_qsort, outfile); 1319 break; 1320 default: 1321 mt_sort(list, DEFAULT_SORT_FUNC, outfile); 1322 break; 1323 } 1324 } 1325 1326 /******************* MT SORT ************************/ 1327 1328 #if defined(SORT_THREADS) 1329 /* semaphore to count threads */ 1330 static sem_t mtsem; 1331 1332 /* current system sort function */ 1333 static int (*g_sort_func)(void *, size_t, size_t, 1334 int(*)(const void *, const void *)); 1335 1336 /* 1337 * Sort cycle thread (in multi-threaded mode) 1338 */ 1339 static void* 1340 mt_sort_thread(void* arg) 1341 { 1342 struct sort_list *list = arg; 1343 1344 g_sort_func(list->list, list->count, sizeof(struct sort_list_item *), 1345 (int(*)(const void *, const void *)) list_coll); 1346 1347 sem_post(&mtsem); 1348 1349 return (arg); 1350 } 1351 1352 /* 1353 * Compare sub-lists. Empty sub-lists always go to the end of the list. 1354 */ 1355 static int 1356 sub_list_cmp(struct sort_list *l1, struct sort_list *l2) 1357 { 1358 1359 if (l1 == l2) 1360 return (0); 1361 else { 1362 if (l1->count == 0) { 1363 return ((l2->count == 0) ? 0 : +1); 1364 } else if (l2->count == 0) { 1365 return (-1); 1366 } else { 1367 int ret; 1368 1369 ret = list_coll(&(l1->list[0]), &(l2->list[0])); 1370 if (!ret) 1371 return ((l1->sub_list_pos < l2->sub_list_pos) ? 1372 -1 : +1); 1373 return (ret); 1374 } 1375 } 1376 } 1377 1378 /* 1379 * Swap two array elements 1380 */ 1381 static void 1382 sub_list_swap(struct sort_list **sl, int i1, int i2) 1383 { 1384 struct sort_list *tmp; 1385 1386 tmp = sl[i1]; 1387 sl[i1] = sl[i2]; 1388 sl[i2] = tmp; 1389 } 1390 1391 /* heap algorithm ==>> */ 1392 1393 /* 1394 * See heap sort algorithm 1395 * "Raises" last element to its right place 1396 */ 1397 static void 1398 sub_list_swim(struct sort_list **sl, int indx) 1399 { 1400 1401 if (indx > 0) { 1402 int parent_index; 1403 1404 parent_index = (indx - 1) >> 1; 1405 1406 if (sub_list_cmp(sl[indx], sl[parent_index]) < 0) { 1407 /* swap child and parent and continue */ 1408 sub_list_swap(sl, indx, parent_index); 1409 sub_list_swim(sl, parent_index); 1410 } 1411 } 1412 } 1413 1414 /* 1415 * Sink the top element to its correct position 1416 */ 1417 static void 1418 sub_list_sink(struct sort_list **sl, int indx, int size) 1419 { 1420 int left_child_index; 1421 int right_child_index; 1422 1423 left_child_index = indx + indx + 1; 1424 right_child_index = left_child_index + 1; 1425 1426 if (left_child_index < size) { 1427 int min_child_index; 1428 1429 min_child_index = left_child_index; 1430 1431 if ((right_child_index < size) && 1432 (sub_list_cmp(sl[left_child_index], 1433 sl[right_child_index]) > 0)) 1434 min_child_index = right_child_index; 1435 if (sub_list_cmp(sl[indx], sl[min_child_index]) > 0) { 1436 sub_list_swap(sl, indx, min_child_index); 1437 sub_list_sink(sl, min_child_index, size); 1438 } 1439 } 1440 } 1441 1442 /* <<== heap algorithm */ 1443 1444 /* 1445 * Adds element to the "right" end 1446 */ 1447 static void 1448 sub_list_push(struct sort_list *s, struct sort_list **sl, int size) 1449 { 1450 1451 sl[size++] = s; 1452 sub_list_swim(sl, size - 1); 1453 } 1454 1455 struct last_printed_item 1456 { 1457 struct sort_list_item *item; 1458 }; 1459 1460 /* 1461 * Prints the current line of the file 1462 */ 1463 static void 1464 sub_list_header_print(struct sort_list *sl, FILE *f_out, 1465 struct last_printed_item *lp) 1466 { 1467 1468 if (sl && sl->count && f_out && sl->list[0]->str) { 1469 if (sort_opts_vals.uflag) { 1470 if ((lp->item == NULL) || (list_coll(&(lp->item), 1471 &(sl->list[0])))) { 1472 bwsfwrite(sl->list[0]->str, f_out, 1473 sort_opts_vals.zflag); 1474 lp->item = sl->list[0]; 1475 } 1476 } else 1477 bwsfwrite(sl->list[0]->str, f_out, 1478 sort_opts_vals.zflag); 1479 } 1480 } 1481 1482 /* 1483 * Read next line 1484 */ 1485 static void 1486 sub_list_next(struct sort_list *sl) 1487 { 1488 1489 if (sl && sl->count) { 1490 sl->list += 1; 1491 sl->count -= 1; 1492 } 1493 } 1494 1495 /* 1496 * Merge sub-lists to a file 1497 */ 1498 static void 1499 merge_sub_lists(struct sort_list **sl, size_t n, FILE* f_out) 1500 { 1501 struct last_printed_item lp; 1502 size_t i; 1503 1504 memset(&lp,0,sizeof(lp)); 1505 1506 /* construct the initial list: */ 1507 for (i = 0; i < n; i++) 1508 sub_list_push(sl[i], sl, i); 1509 1510 while (sl[0]->count) { /* unfinished lists are always in front */ 1511 /* output the smallest line: */ 1512 sub_list_header_print(sl[0], f_out, &lp); 1513 /* move to a new line, if possible: */ 1514 sub_list_next(sl[0]); 1515 /* re-arrange the list: */ 1516 sub_list_sink(sl, 0, n); 1517 } 1518 } 1519 1520 /* 1521 * Merge sub-lists to a file 1522 */ 1523 static void 1524 merge_list_parts(struct sort_list **parts, size_t n, const char *fn) 1525 { 1526 FILE* f_out; 1527 1528 f_out = openfile(fn,"w"); 1529 1530 merge_sub_lists(parts, n, f_out); 1531 1532 closefile(f_out, fn); 1533 } 1534 1535 #endif /* defined(SORT_THREADS) */ 1536 /* 1537 * Multi-threaded sort algorithm "driver" 1538 */ 1539 static void 1540 mt_sort(struct sort_list *list, 1541 int(*sort_func)(void *, size_t, size_t, int(*)(const void *, const void *)), 1542 const char* fn) 1543 { 1544 #if defined(SORT_THREADS) 1545 if (nthreads < 2 || list->count < MT_SORT_THRESHOLD) { 1546 size_t nthreads_save = nthreads; 1547 nthreads = 1; 1548 #endif 1549 /* if single thread or small data, do simple sort */ 1550 sort_func(list->list, list->count, 1551 sizeof(struct sort_list_item *), 1552 (int(*)(const void *, const void *)) list_coll); 1553 sort_list_dump(list, fn); 1554 #if defined(SORT_THREADS) 1555 nthreads = nthreads_save; 1556 } else { 1557 /* multi-threaded sort */ 1558 struct sort_list **parts; 1559 size_t avgsize, cstart, i; 1560 1561 /* array of sub-lists */ 1562 parts = sort_malloc(sizeof(struct sort_list*) * nthreads); 1563 cstart = 0; 1564 avgsize = list->count / nthreads; 1565 1566 /* set global system sort function */ 1567 g_sort_func = sort_func; 1568 1569 /* set sublists */ 1570 for (i = 0; i < nthreads; ++i) { 1571 size_t sz = 0; 1572 1573 parts[i] = sort_malloc(sizeof(struct sort_list)); 1574 parts[i]->list = list->list + cstart; 1575 parts[i]->memsize = 0; 1576 parts[i]->sub_list_pos = i; 1577 1578 sz = (i == nthreads - 1) ? list->count - cstart : 1579 avgsize; 1580 1581 parts[i]->count = sz; 1582 1583 parts[i]->size = parts[i]->count; 1584 1585 cstart += sz; 1586 } 1587 1588 /* init threads counting semaphore */ 1589 sem_init(&mtsem, 0, 0); 1590 1591 /* start threads */ 1592 for (i = 0; i < nthreads; ++i) { 1593 pthread_t pth; 1594 pthread_attr_t attr; 1595 1596 pthread_attr_init(&attr); 1597 pthread_attr_setdetachstate(&attr, PTHREAD_DETACHED); 1598 1599 for (;;) { 1600 int res = pthread_create(&pth, &attr, 1601 mt_sort_thread, parts[i]); 1602 1603 if (res >= 0) 1604 break; 1605 if (errno == EAGAIN) { 1606 pthread_yield(); 1607 continue; 1608 } 1609 err(2, NULL); 1610 } 1611 1612 pthread_attr_destroy(&attr); 1613 } 1614 1615 /* wait for threads completion */ 1616 for (i = 0; i < nthreads; ++i) { 1617 sem_wait(&mtsem); 1618 } 1619 /* destroy the semaphore - we do not need it anymore */ 1620 sem_destroy(&mtsem); 1621 1622 /* merge sorted sub-lists to the file */ 1623 merge_list_parts(parts, nthreads, fn); 1624 1625 /* free sub-lists data */ 1626 for (i = 0; i < nthreads; ++i) { 1627 sort_free(parts[i]); 1628 } 1629 sort_free(parts); 1630 } 1631 #endif /* defined(SORT_THREADS) */ 1632 } 1633