xref: /illumos-gate/usr/src/cmd/sort/internal.c (revision 02c5427885675f9176f84efa4057c0fa1fe70982)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #include "internal.h"
27 
28 #define	INSERTION_THRESHOLD	12
29 
30 static void
31 swap_range(int i, int j, int n, line_rec_t **I)
32 {
33 	while (n-- > 0) {
34 		line_rec_t *t;
35 
36 		t = I[i];
37 		I[i++] = I[j];
38 		I[j++] = t;
39 	}
40 }
41 
42 /*
43  * offset_is_algorithm() implements a simple insertion sort on line records that
44  * allows comparison from an offset into the l_collate field (since the subfiles
45  * should have already been sorted to that depth).
46  */
47 static void
48 offset_is_algorithm(line_rec_t **X, ssize_t n,
49     int (*collate_fcn)(line_rec_t *, line_rec_t *, ssize_t, flag_t),
50     ssize_t depth, flag_t coll_flags)
51 {
52 	ssize_t i;
53 
54 	__S(stats_incr_subfiles());
55 
56 	/*
57 	 * Place lowest element in X[0].
58 	 */
59 	for (i = 0; i < n; i++) {
60 		if (collate_fcn(X[0], X[i], depth, coll_flags) > 0) {
61 			swap((void **)&X[0], (void **)&X[i]);
62 			ASSERT(collate_fcn(X[0], X[i], depth, coll_flags) <= 0);
63 		}
64 	}
65 
66 	/*
67 	 * Insertion sort.
68 	 */
69 	for (i = 2; i < n; i++) {
70 		ssize_t j = i;
71 		line_rec_t *t = X[i];
72 		while (collate_fcn(t, X[j - 1], depth, coll_flags) < 0) {
73 			X[j] = X[j - 1];
74 			j--;
75 			ASSERT(j > 0);
76 		}
77 		X[j] = t;
78 	}
79 }
80 
81 /*
82  * tqs_algorithm() is called from rqs_algorithm() when a subpartition is
83  * encountered whose line records share indistinguishable l_collate fields.  It
84  * implements a semi-iterative ternary quicksort.
85  */
86 static void
87 tqs_algorithm(line_rec_t **X, ssize_t n,
88     int (*collate_fcn)(line_rec_t *, line_rec_t *, ssize_t, flag_t),
89     flag_t coll_flags)
90 {
91 	ssize_t	l;			/* boundary of left partition */
92 	ssize_t	le;			/* boundary of left equal partition */
93 	ssize_t	r;			/* boundary of right partition */
94 	ssize_t	re;			/* boundary of right equal partition */
95 
96 	ssize_t	p, q;			/* scratch for indices, comparisons */
97 
98 	coll_flags |= COLL_DATA_ONLY;
99 
100 tqs_start:
101 
102 	/*
103 	 * completion criteria
104 	 */
105 	if (n <= 1)
106 		return;
107 
108 	if (n <= INSERTION_THRESHOLD) {
109 		offset_is_algorithm(X, n, collate_fcn, 0, coll_flags);
110 		return;
111 	}
112 
113 	/*
114 	 * selection of partition element
115 	 */
116 	le = rand() % n;
117 	swap((void **)&X[0], (void **)&X[le]);
118 
119 	le = l = 1;
120 	r = re = n - 1;
121 
122 	for (;;) {
123 		while (l <= r &&
124 		    (p = collate_fcn(X[l], X[0], 0, coll_flags)) <= 0) {
125 			if (p == 0)
126 				swap((void **)&X[le++], (void **)&X[l]);
127 			l++;
128 		}
129 
130 		while (l <= r &&
131 		    (p = collate_fcn(X[r], X[0], 0, coll_flags)) >= 0) {
132 			if (p == 0)
133 				swap((void **)&X[r], (void **)&X[re--]);
134 			r--;
135 		}
136 
137 		if (l > r)
138 			break;
139 
140 		swap((void **)&X[l++], (void **)&X[r--]);
141 	}
142 
143 	/*
144 	 * swap equal partitions into middle
145 	 */
146 	p = MIN(le, l - le);
147 	swap_range(0, l - p, p, X);
148 	p = MIN(re - r, n - re - 1);
149 	swap_range(l, n - p, p, X);
150 
151 	/*
152 	 * Iterate with larger subpartition, recurse into smaller.
153 	 */
154 	p = l - le;
155 	q = re - r;
156 
157 	if (p > q) {
158 		tqs_algorithm(&X[n - q], q, collate_fcn, coll_flags);
159 
160 		n = p;
161 	} else {
162 		tqs_algorithm(X, p, collate_fcn, coll_flags);
163 
164 		X = &X[n - q];
165 		n = q;
166 	}
167 
168 	goto tqs_start;
169 	/*NOTREACHED*/
170 }
171 
172 /*
173  * The following semi-iterative radix quicksort is derived from that presented
174  * in
175  *	J. Bentley and R. Sedgewick, Fast Algorithms for Sorting and Searching
176  *	Strings, in Eighth Annual ACM-SIAM Symposium on Discrete Algorithms,
177  *	1997 (SODA 1997),
178  * and
179  *	R. Sedgewick, Algorithms in C, 3rd ed., vol. 1, Addison-Wesley, 1998.
180  */
181 
182 static void
183 rqs_algorithm(line_rec_t **X, ssize_t n, ssize_t depth,
184     int (*collate_fcn)(line_rec_t *, line_rec_t *, ssize_t, flag_t),
185     flag_t coll_flags)
186 {
187 	uchar_t v;			/* partition radix value */
188 
189 	ssize_t	l;			/* boundary of left partition */
190 	ssize_t	le;			/* boundary of left equal partition */
191 	ssize_t	r;			/* boundary of right partition */
192 	ssize_t	re;			/* boundary of right equal partition */
193 
194 	ssize_t	p;			/* scratch for indices, comparisons */
195 	line_rec_t *t;			/* scratch for swaps */
196 
197 rqs_start:
198 
199 	/*
200 	 * completion criteria
201 	 */
202 	if (n <= 1)
203 		return;
204 
205 	if (n <= INSERTION_THRESHOLD) {
206 		offset_is_algorithm(X, n, collate_fcn, depth, coll_flags);
207 		return;
208 	}
209 
210 	/*
211 	 * selection of partition element
212 	 */
213 	le = rand() % n;
214 	swap((void **)&X[0], (void **)&X[le]);
215 	v = X[0]->l_collate.usp[depth];
216 
217 	le = l = 1;
218 	r = re = n - 1;
219 
220 	for (;;) {
221 		while (l <= r &&
222 		    (p = *(X[l]->l_collate.usp + depth) - v) <= 0) {
223 			if (p == 0) {
224 				t = X[le];
225 				X[le] = X[l];
226 				X[l] = t;
227 				le++;
228 			}
229 			(l)++;
230 		}
231 
232 		while (l <= r &&
233 		    (p = *(X[r]->l_collate.usp + depth) - v) >= 0) {
234 			if (p == 0) {
235 				t = X[r];
236 				X[r] = X[re];
237 				X[re] = t;
238 				(re)--;
239 			}
240 			(r)--;
241 		}
242 
243 		if (l > r)
244 			break;
245 
246 		t = X[l];
247 		X[l] = X[r];
248 		X[r] = t;
249 		(l)++;
250 		(r)--;
251 	}
252 
253 	/*
254 	 * swap equal partitions into middle
255 	 */
256 	p = MIN(le, l - le);
257 	swap_range(0, l - p, p, X);
258 	p = MIN(re - r, n - re - 1);
259 	swap_range(l, n - p, p, X);
260 
261 	/*
262 	 * recurse into subpartitions as necessary
263 	 */
264 	p = re - r;
265 	if (p > 0)
266 		rqs_algorithm(&X[n - p], p, depth, collate_fcn, coll_flags);
267 
268 	p = l - le;
269 	if (p > 0)
270 		rqs_algorithm(X, p, depth, collate_fcn, coll_flags);
271 
272 	if (le + n - re - 1 <= 1)
273 		return;
274 
275 	/*
276 	 * - 1 so that we don't count the final null.
277 	 */
278 	if (X[p]->l_collate_length - 1 > depth) {
279 		/*
280 		 * Equivalent recursion: rqs_algorithm(&X[p], le + n - re - 1,
281 		 * depth + 1, collate_fcn, coll_only);
282 		 */
283 		X = &X[p];
284 		n = le + n - re - 1;
285 		depth++;
286 		goto rqs_start;
287 	}
288 
289 	if (!(coll_flags & COLL_UNIQUE)) {
290 		__S(stats_incr_tqs_calls());
291 		tqs_algorithm(&X[p], le + n - re - 1, collate_fcn, coll_flags);
292 	}
293 }
294 
295 static void
296 radix_quicksort(stream_t *C, flag_t coll_flags)
297 {
298 	ASSERT((C->s_status & STREAM_SOURCE_MASK) == STREAM_ARRAY);
299 
300 	if (C->s_element_size == sizeof (char))
301 		rqs_algorithm(C->s_type.LA.s_array, C->s_type.LA.s_array_size,
302 		    0, collated, coll_flags);
303 	else
304 		rqs_algorithm(C->s_type.LA.s_array, C->s_type.LA.s_array_size,
305 		    0, collated_wide, coll_flags);
306 }
307 
308 void
309 internal_sort(sort_t *S)
310 {
311 	size_t input_mem, sort_mem;
312 	size_t prev_sort_mem = 0;
313 	void *prev_sort_buf = NULL;
314 
315 	int numerator, denominator;
316 	int memory_left;
317 	int currently_primed;
318 	flag_t coll_flags;
319 
320 	stream_t *sort_stream = NULL;
321 	stream_t *cur_stream;
322 
323 	set_memory_ratio(S, &numerator, &denominator);
324 	set_cleanup_chain(&S->m_temporary_streams);
325 
326 	if (S->m_field_options & FIELD_REVERSE_COMPARISONS)
327 		coll_flags = COLL_REVERSE;
328 	else
329 		coll_flags = 0;
330 
331 	/*
332 	 * For the entire line special case, we can speed comparisons by
333 	 * recognizing that the collation vector contains all the information
334 	 * required to order the line against other lines of the file.
335 	 * COLL_UNIQUE provides such an exit; if we use the standard put-line
336 	 * operation for the output stream, we achieve the desired fast path.
337 	 */
338 	if (S->m_entire_line)
339 		coll_flags |= COLL_UNIQUE;
340 
341 	hold_file_descriptor();
342 
343 	cur_stream = S->m_input_streams;
344 	while (cur_stream != NULL) {
345 		if (!(cur_stream->s_status & STREAM_OPEN)) {
346 			if (stream_open_for_read(S, cur_stream) == -1)
347 				die(EMSG_DESCRIPTORS);
348 		}
349 
350 		if (cur_stream->s_status & STREAM_MMAP) {
351 			input_mem = 0;
352 		} else {
353 			input_mem = (size_t)(((u_longlong_t)numerator *
354 			    S->m_memory_available) / denominator);
355 			stream_set_size(cur_stream, input_mem);
356 		}
357 
358 		sort_mem = S->m_memory_available - input_mem;
359 
360 		currently_primed = SOP_PRIME(cur_stream);
361 
362 		sort_stream = safe_realloc(sort_stream, sizeof (stream_t));
363 		stream_clear(sort_stream);
364 		sort_stream->s_buffer = prev_sort_buf;
365 		sort_stream->s_buffer_size = prev_sort_mem;
366 		stream_set(sort_stream, STREAM_OPEN | STREAM_ARRAY);
367 		sort_stream->s_element_size = S->m_single_byte_locale ?
368 		    sizeof (char) : sizeof (wchar_t);
369 		stream_set_size(sort_stream, sort_mem);
370 		prev_sort_buf = sort_stream->s_buffer;
371 		prev_sort_mem = sort_stream->s_buffer_size;
372 
373 		for (;;) {
374 			if (currently_primed == PRIME_SUCCEEDED) {
375 				memory_left =
376 				    stream_insert(S, cur_stream, sort_stream);
377 
378 				if (memory_left != ST_MEM_AVAIL)
379 					break;
380 			}
381 
382 			if (SOP_EOS(cur_stream)) {
383 				if (cur_stream->s_consumer == NULL) {
384 					(void) SOP_FREE(cur_stream);
385 					(void) SOP_CLOSE(cur_stream);
386 				}
387 
388 				cur_stream = cur_stream->s_next;
389 
390 				if (cur_stream == NULL)
391 					break;
392 
393 				if (!(cur_stream->s_status & STREAM_OPEN) &&
394 				    (stream_open_for_read(S, cur_stream) == -1))
395 					break;
396 
397 				if (!(cur_stream->s_status & STREAM_MMAP)) {
398 					input_mem = numerator *
399 					    S->m_memory_available / denominator;
400 					stream_set_size(cur_stream,
401 					    input_mem);
402 				}
403 				currently_primed = SOP_PRIME(cur_stream);
404 			}
405 		}
406 
407 		radix_quicksort(sort_stream, coll_flags);
408 
409 #ifndef DEBUG_NO_CACHE_TEMP
410 		/*
411 		 * cur_stream is set to NULL only when memory isn't filled and
412 		 * no more input streams remain.  In this case, we can safely
413 		 * cache the sort results.
414 		 *
415 		 * Otherwise, we have either exhausted available memory or
416 		 * available file descriptors.  If we've use all the available
417 		 * file descriptors, we aren't able to open the next input file.
418 		 * In this case, we can't cache the sort results, because more
419 		 * input files remain.
420 		 *
421 		 * If memory was filled, then there must be at least one
422 		 * leftover line unprocessed in the input stream, even though
423 		 * stream will indicated EOS if asked. We can't cache in this
424 		 * case, as we need one more round for the pending line or
425 		 * lines.
426 		 */
427 		if (cur_stream == NULL) {
428 			(void) stream_push_to_temporary(&S->m_temporary_streams,
429 			    sort_stream, ST_CACHE |
430 			    (S->m_single_byte_locale ? 0 : ST_WIDE));
431 			break;
432 		} else {
433 #endif
434 			release_file_descriptor();
435 			(void) stream_push_to_temporary(&S->m_temporary_streams,
436 			    sort_stream, ST_NOCACHE |
437 			    (S->m_single_byte_locale ? 0 : ST_WIDE));
438 
439 			hold_file_descriptor();
440 #ifdef DEBUG_NO_CACHE_TEMP
441 			if (cur_stream == NULL)
442 				break;
443 #endif
444 			stream_unset(cur_stream, STREAM_NOT_FREEABLE);
445 			stream_close_all_previous(cur_stream);
446 			cur_stream->s_consumer = NULL;
447 #ifndef DEBUG_NO_CACHE_TEMP
448 		}
449 #endif
450 	}
451 
452 	release_file_descriptor();
453 }
454