xref: /titanic_52/usr/src/cmd/sort/common/internal.c (revision bc37da3aa8455efcf567c456746e3fb9d7f0a189)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include "internal.h"
29 
30 #define	INSERTION_THRESHOLD	12
31 
32 static void
33 swap_range(int i, int j, int n, line_rec_t **I)
34 {
35 	while (n-- > 0) {
36 		line_rec_t *t;
37 
38 		t = I[i];
39 		I[i++] = I[j];
40 		I[j++] = t;
41 	}
42 }
43 
44 /*
45  * offset_is_algorithm() implements a simple insertion sort on line records that
46  * allows comparison from an offset into the l_collate field (since the subfiles
47  * should have already been sorted to that depth).
48  */
49 static void
50 offset_is_algorithm(line_rec_t **X, ssize_t n,
51     int (*collate_fcn)(line_rec_t *, line_rec_t *, ssize_t, flag_t),
52     ssize_t depth, flag_t coll_flags)
53 {
54 	ssize_t i;
55 
56 	__S(stats_incr_subfiles());
57 
58 	/*
59 	 * Place lowest element in X[0].
60 	 */
61 	for (i = 0; i < n; i++) {
62 		if (collate_fcn(X[0], X[i], depth, coll_flags) > 0) {
63 			swap((void **)&X[0], (void **)&X[i]);
64 			ASSERT(collate_fcn(X[0], X[i], depth, coll_flags) <= 0);
65 		}
66 	}
67 
68 	/*
69 	 * Insertion sort.
70 	 */
71 	for (i = 2; i < n; i++) {
72 		ssize_t j = i;
73 		line_rec_t *t = X[i];
74 		while (collate_fcn(t, X[j - 1], depth, coll_flags) < 0) {
75 			X[j] = X[j - 1];
76 			j--;
77 			ASSERT(j > 0);
78 		}
79 		X[j] = t;
80 	}
81 }
82 
83 /*
84  * tqs_algorithm() is called from rqs_algorithm() when a subpartition is
85  * encountered whose line records share indistinguishable l_collate fields.  It
86  * implements a semi-iterative ternary quicksort.
87  */
88 static void
89 tqs_algorithm(line_rec_t **X, ssize_t n,
90     int (*collate_fcn)(line_rec_t *, line_rec_t *, ssize_t, flag_t),
91     flag_t coll_flags)
92 {
93 	ssize_t	l;			/* boundary of left partition */
94 	ssize_t	le;			/* boundary of left equal partition */
95 	ssize_t	r;			/* boundary of right partition */
96 	ssize_t	re;			/* boundary of right equal partition */
97 
98 	ssize_t	p, q;			/* scratch for indices, comparisons */
99 
100 	coll_flags |= COLL_DATA_ONLY;
101 
102 tqs_start:
103 
104 	/*
105 	 * completion criteria
106 	 */
107 	if (n <= 1)
108 		return;
109 
110 	if (n <= INSERTION_THRESHOLD) {
111 		offset_is_algorithm(X, n, collate_fcn, 0, coll_flags);
112 		return;
113 	}
114 
115 	/*
116 	 * selection of partition element
117 	 */
118 	le = rand() % n;
119 	swap((void **)&X[0], (void **)&X[le]);
120 
121 	le = l = 1;
122 	r = re = n - 1;
123 
124 	for (;;) {
125 		while (l <= r &&
126 		    (p = collate_fcn(X[l], X[0], 0, coll_flags)) <= 0) {
127 			if (p == 0)
128 				swap((void **)&X[le++], (void **)&X[l]);
129 			l++;
130 		}
131 
132 		while (l <= r &&
133 		    (p = collate_fcn(X[r], X[0], 0, coll_flags)) >= 0) {
134 			if (p == 0)
135 				swap((void **)&X[r], (void **)&X[re--]);
136 			r--;
137 		}
138 
139 		if (l > r)
140 			break;
141 
142 		swap((void **)&X[l++], (void **)&X[r--]);
143 	}
144 
145 	/*
146 	 * swap equal partitions into middle
147 	 */
148 	p = MIN(le, l - le);
149 	swap_range(0, l - p, p, X);
150 	p = MIN(re - r, n - re - 1);
151 	swap_range(l, n - p, p, X);
152 
153 	/*
154 	 * Iterate with larger subpartition, recurse into smaller.
155 	 */
156 	p = l - le;
157 	q = re - r;
158 
159 	if (p > q) {
160 		tqs_algorithm(&X[n - q], q, collate_fcn, coll_flags);
161 
162 		n = p;
163 	} else {
164 		tqs_algorithm(X, p, collate_fcn, coll_flags);
165 
166 		X = &X[n - q];
167 		n = q;
168 	}
169 
170 	goto tqs_start;
171 	/*NOTREACHED*/
172 }
173 
174 /*
175  * The following semi-iterative radix quicksort is derived from that presented
176  * in
177  *	J. Bentley and R. Sedgewick, Fast Algorithms for Sorting and Searching
178  *	Strings, in Eighth Annual ACM-SIAM Symposium on Discrete Algorithms,
179  *	1997 (SODA 1997),
180  * and
181  *	R. Sedgewick, Algorithms in C, 3rd ed., vol. 1, Addison-Wesley, 1998.
182  */
183 
184 static void
185 rqs_algorithm(line_rec_t **X, ssize_t n, ssize_t depth,
186     int (*collate_fcn)(line_rec_t *, line_rec_t *, ssize_t, flag_t),
187     flag_t coll_flags)
188 {
189 	uchar_t v;			/* partition radix value */
190 
191 	ssize_t	l;			/* boundary of left partition */
192 	ssize_t	le;			/* boundary of left equal partition */
193 	ssize_t	r;			/* boundary of right partition */
194 	ssize_t	re;			/* boundary of right equal partition */
195 
196 	ssize_t	p;			/* scratch for indices, comparisons */
197 	line_rec_t *t;			/* scratch for swaps */
198 
199 rqs_start:
200 
201 	/*
202 	 * completion criteria
203 	 */
204 	if (n <= 1)
205 		return;
206 
207 	if (n <= INSERTION_THRESHOLD) {
208 		offset_is_algorithm(X, n, collate_fcn, depth, coll_flags);
209 		return;
210 	}
211 
212 	/*
213 	 * selection of partition element
214 	 */
215 	le = rand() % n;
216 	swap((void **)&X[0], (void **)&X[le]);
217 	v = X[0]->l_collate.usp[depth];
218 
219 	le = l = 1;
220 	r = re = n - 1;
221 
222 	for (;;) {
223 		while (l <= r &&
224 		    (p = *(X[l]->l_collate.usp + depth) - v) <= 0) {
225 			if (p == 0) {
226 				t = X[le];
227 				X[le] = X[l];
228 				X[l] = t;
229 				le++;
230 			}
231 			(l)++;
232 		}
233 
234 		while (l <= r &&
235 		    (p = *(X[r]->l_collate.usp + depth) - v) >= 0) {
236 			if (p == 0) {
237 				t = X[r];
238 				X[r] = X[re];
239 				X[re] = t;
240 				(re)--;
241 			}
242 			(r)--;
243 		}
244 
245 		if (l > r)
246 			break;
247 
248 		t = X[l];
249 		X[l] = X[r];
250 		X[r] = t;
251 		(l)++;
252 		(r)--;
253 	}
254 
255 	/*
256 	 * swap equal partitions into middle
257 	 */
258 	p = MIN(le, l - le);
259 	swap_range(0, l - p, p, X);
260 	p = MIN(re - r, n - re - 1);
261 	swap_range(l, n - p, p, X);
262 
263 	/*
264 	 * recurse into subpartitions as necessary
265 	 */
266 	p = re - r;
267 	if (p > 0)
268 		rqs_algorithm(&X[n - p], p, depth, collate_fcn, coll_flags);
269 
270 	p = l - le;
271 	if (p > 0)
272 		rqs_algorithm(X, p, depth, collate_fcn, coll_flags);
273 
274 	if (le + n - re - 1 <= 1)
275 		return;
276 
277 	/*
278 	 * - 1 so that we don't count the final null.
279 	 */
280 	if (X[p]->l_collate_length - 1 > depth) {
281 		/*
282 		 * Equivalent recursion: rqs_algorithm(&X[p], le + n - re - 1,
283 		 * depth + 1, collate_fcn, coll_only);
284 		 */
285 		X = &X[p];
286 		n = le + n - re - 1;
287 		depth++;
288 		goto rqs_start;
289 	}
290 
291 	if (!(coll_flags & COLL_UNIQUE)) {
292 		__S(stats_incr_tqs_calls());
293 		tqs_algorithm(&X[p], le + n - re - 1, collate_fcn, coll_flags);
294 	}
295 }
296 
297 static void
298 radix_quicksort(stream_t *C, flag_t coll_flags)
299 {
300 	ASSERT((C->s_status & STREAM_SOURCE_MASK) == STREAM_ARRAY);
301 
302 	if (C->s_element_size == sizeof (char))
303 		rqs_algorithm(C->s_type.LA.s_array, C->s_type.LA.s_array_size,
304 		    0, collated, coll_flags);
305 	else
306 		rqs_algorithm(C->s_type.LA.s_array, C->s_type.LA.s_array_size,
307 		    0, collated_wide, coll_flags);
308 }
309 
310 void
311 internal_sort(sort_t *S)
312 {
313 	size_t input_mem, sort_mem;
314 	size_t prev_sort_mem = 0;
315 	void *prev_sort_buf = NULL;
316 
317 	int numerator, denominator;
318 	int memory_left;
319 	int currently_primed;
320 	flag_t coll_flags;
321 
322 	stream_t *sort_stream = NULL;
323 	stream_t *cur_stream;
324 
325 	set_memory_ratio(S, &numerator, &denominator);
326 	set_cleanup_chain(&S->m_temporary_streams);
327 
328 	if (S->m_field_options & FIELD_REVERSE_COMPARISONS)
329 		coll_flags = COLL_REVERSE;
330 	else
331 		coll_flags = 0;
332 
333 	/*
334 	 * For the entire line special case, we can speed comparisons by
335 	 * recognizing that the collation vector contains all the information
336 	 * required to order the line against other lines of the file.
337 	 * COLL_UNIQUE provides such an exit; if we use the standard put-line
338 	 * operation for the output stream, we achieve the desired fast path.
339 	 */
340 	if (S->m_entire_line)
341 		coll_flags |= COLL_UNIQUE;
342 
343 	hold_file_descriptor();
344 
345 	cur_stream = S->m_input_streams;
346 	while (cur_stream != NULL) {
347 		if (!(cur_stream->s_status & STREAM_OPEN)) {
348 			if (stream_open_for_read(S, cur_stream) == -1)
349 				die(EMSG_DESCRIPTORS);
350 		}
351 
352 		if (cur_stream->s_status & STREAM_MMAP) {
353 			input_mem = 0;
354 		} else {
355 			input_mem = (size_t)(((u_longlong_t)numerator *
356 			    S->m_memory_available) / denominator);
357 			stream_set_size(cur_stream, input_mem);
358 		}
359 
360 		sort_mem = S->m_memory_available - input_mem;
361 
362 		currently_primed = SOP_PRIME(cur_stream);
363 
364 		sort_stream = safe_realloc(sort_stream, sizeof (stream_t));
365 		stream_clear(sort_stream);
366 		sort_stream->s_buffer = prev_sort_buf;
367 		sort_stream->s_buffer_size = prev_sort_mem;
368 		stream_set(sort_stream, STREAM_OPEN | STREAM_ARRAY);
369 		sort_stream->s_element_size = S->m_single_byte_locale ?
370 		    sizeof (char) : sizeof (wchar_t);
371 		stream_set_size(sort_stream, sort_mem);
372 		prev_sort_buf = sort_stream->s_buffer;
373 		prev_sort_mem = sort_stream->s_buffer_size;
374 
375 		for (;;) {
376 			if (currently_primed == PRIME_SUCCEEDED) {
377 				memory_left =
378 				    stream_insert(S, cur_stream, sort_stream);
379 
380 				if (memory_left != ST_MEM_AVAIL)
381 					break;
382 			}
383 
384 			if (SOP_EOS(cur_stream)) {
385 				if (cur_stream->s_consumer == NULL) {
386 					(void) SOP_FREE(cur_stream);
387 					(void) SOP_CLOSE(cur_stream);
388 				}
389 
390 				cur_stream = cur_stream->s_next;
391 
392 				if (cur_stream == NULL)
393 					break;
394 
395 				if (!(cur_stream->s_status & STREAM_OPEN) &&
396 				    (stream_open_for_read(S, cur_stream) == -1))
397 					break;
398 
399 				if (!(cur_stream->s_status & STREAM_MMAP)) {
400 					input_mem = numerator *
401 					    S->m_memory_available / denominator;
402 					stream_set_size(cur_stream,
403 					    input_mem);
404 				}
405 				currently_primed = SOP_PRIME(cur_stream);
406 			}
407 		}
408 
409 		radix_quicksort(sort_stream, coll_flags);
410 
411 #ifndef DEBUG_NO_CACHE_TEMP
412 		/*
413 		 * cur_stream is set to NULL only when memory isn't filled and
414 		 * no more input streams remain.  In this case, we can safely
415 		 * cache the sort results.
416 		 *
417 		 * Otherwise, we have either exhausted available memory or
418 		 * available file descriptors.  If we've use all the available
419 		 * file descriptors, we aren't able to open the next input file.
420 		 * In this case, we can't cache the sort results, because more
421 		 * input files remain.
422 		 *
423 		 * If memory was filled, then there must be at least one
424 		 * leftover line unprocessed in the input stream, even though
425 		 * stream will indicated EOS if asked. We can't cache in this
426 		 * case, as we need one more round for the pending line or
427 		 * lines.
428 		 */
429 		if (cur_stream == NULL) {
430 			(void) stream_push_to_temporary(&S->m_temporary_streams,
431 			    sort_stream, ST_CACHE |
432 			    (S->m_single_byte_locale ? 0 : ST_WIDE));
433 			break;
434 		} else {
435 #endif
436 			release_file_descriptor();
437 			(void) stream_push_to_temporary(&S->m_temporary_streams,
438 			    sort_stream, ST_NOCACHE |
439 			    (S->m_single_byte_locale ? 0 : ST_WIDE));
440 
441 			hold_file_descriptor();
442 #ifdef DEBUG_NO_CACHE_TEMP
443 			if (cur_stream == NULL)
444 				break;
445 #endif
446 			stream_unset(cur_stream, STREAM_NOT_FREEABLE);
447 			stream_close_all_previous(cur_stream);
448 			cur_stream->s_consumer = NULL;
449 #ifndef DEBUG_NO_CACHE_TEMP
450 		}
451 #endif
452 	}
453 
454 	release_file_descriptor();
455 }
456