xref: /illumos-gate/usr/src/cmd/sort/merge.c (revision 101e15b5f8a77d9433805e541996abaabc9ca8c1)
1*101e15b5SRichard Lowe /*
2*101e15b5SRichard Lowe  * CDDL HEADER START
3*101e15b5SRichard Lowe  *
4*101e15b5SRichard Lowe  * The contents of this file are subject to the terms of the
5*101e15b5SRichard Lowe  * Common Development and Distribution License, Version 1.0 only
6*101e15b5SRichard Lowe  * (the "License").  You may not use this file except in compliance
7*101e15b5SRichard Lowe  * with the License.
8*101e15b5SRichard Lowe  *
9*101e15b5SRichard Lowe  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10*101e15b5SRichard Lowe  * or http://www.opensolaris.org/os/licensing.
11*101e15b5SRichard Lowe  * See the License for the specific language governing permissions
12*101e15b5SRichard Lowe  * and limitations under the License.
13*101e15b5SRichard Lowe  *
14*101e15b5SRichard Lowe  * When distributing Covered Code, include this CDDL HEADER in each
15*101e15b5SRichard Lowe  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16*101e15b5SRichard Lowe  * If applicable, add the following below this CDDL HEADER, with the
17*101e15b5SRichard Lowe  * fields enclosed by brackets "[]" replaced with your own identifying
18*101e15b5SRichard Lowe  * information: Portions Copyright [yyyy] [name of copyright owner]
19*101e15b5SRichard Lowe  *
20*101e15b5SRichard Lowe  * CDDL HEADER END
21*101e15b5SRichard Lowe  */
22*101e15b5SRichard Lowe /*
23*101e15b5SRichard Lowe  * Copyright 1998-2003 Sun Microsystems, Inc.  All rights reserved.
24*101e15b5SRichard Lowe  * Use is subject to license terms.
25*101e15b5SRichard Lowe  */
26*101e15b5SRichard Lowe 
27*101e15b5SRichard Lowe #include "merge.h"
28*101e15b5SRichard Lowe 
29*101e15b5SRichard Lowe /*
30*101e15b5SRichard Lowe  * External merge sort
31*101e15b5SRichard Lowe  *
32*101e15b5SRichard Lowe  *   The following code implements the merge phase of sort(1) using a heap-based
33*101e15b5SRichard Lowe  *   priority queue.  Fast paths for merging two files as well as outputting a
34*101e15b5SRichard Lowe  *   single file are provided.
35*101e15b5SRichard Lowe  *
36*101e15b5SRichard Lowe  * Memory footprint management
37*101e15b5SRichard Lowe  *
38*101e15b5SRichard Lowe  *   The N-way fan-out of the merge phase can lead to compromising memory
39*101e15b5SRichard Lowe  *   consumption if not constrained, so two mechanisms are used to regulate
40*101e15b5SRichard Lowe  *   the memory footprint during the merge phase:
41*101e15b5SRichard Lowe  *
42*101e15b5SRichard Lowe  *   1.  Single use memory advice.  Since we proceed through each merge file in
43*101e15b5SRichard Lowe  *       order, any line we have output is never required again--at least, not
44*101e15b5SRichard Lowe  *       from that input file.  Accordingly, we use the SOP_RELEASE_LINE()
45*101e15b5SRichard Lowe  *       operation to advise that the memory backing the raw data for the stream
46*101e15b5SRichard Lowe  *       up to that line is no longer of interest.  (For certain classes of
47*101e15b5SRichard Lowe  *       streams, this leads to an madvise(3C) call with the MADV_DONTNEED
48*101e15b5SRichard Lowe  *       flag.)
49*101e15b5SRichard Lowe  *
50*101e15b5SRichard Lowe  *   2.  Number of merge files.  The number of merge files is constrained based
51*101e15b5SRichard Lowe  *       on the amount of physical memory specified via the -S option (or deemed
52*101e15b5SRichard Lowe  *       available based on an inquiry of sysconf(3C) for _SC_AVPHYS_PAGES).
53*101e15b5SRichard Lowe  *       The number of merge files is calculated based on the average resident
54*101e15b5SRichard Lowe  *       size of a stream that supports the SOP_RELEASE_LINE() operation; this
55*101e15b5SRichard Lowe  *       number is conservative for streams that do not support this operation.
56*101e15b5SRichard Lowe  *       A minimum of four subfiles will always be used, resource limits
57*101e15b5SRichard Lowe  *       permitting.
58*101e15b5SRichard Lowe  *
59*101e15b5SRichard Lowe  * Temporary filespace footprint management
60*101e15b5SRichard Lowe  *
61*101e15b5SRichard Lowe  *   Once the merge sort has utilized a temporary file, it may be deleted at
62*101e15b5SRichard Lowe  *   close, as it's not used again and preserving the files until exit may
63*101e15b5SRichard Lowe  *   compromise sort completion when limited temporary space is available.
64*101e15b5SRichard Lowe  */
65*101e15b5SRichard Lowe 
66*101e15b5SRichard Lowe static int pq_N;
67*101e15b5SRichard Lowe static stream_t	**pq_queue;
68*101e15b5SRichard Lowe static int (*pq_coll_fcn)(line_rec_t *, line_rec_t *, ssize_t, flag_t);
69*101e15b5SRichard Lowe 
70*101e15b5SRichard Lowe static ssize_t (*mg_coll_convert)(field_t *, line_rec_t *, flag_t, vchar_t);
71*101e15b5SRichard Lowe 
72*101e15b5SRichard Lowe static int
prepare_output_stream(stream_t * ostrp,sort_t * S)73*101e15b5SRichard Lowe prepare_output_stream(stream_t *ostrp, sort_t *S)
74*101e15b5SRichard Lowe {
75*101e15b5SRichard Lowe 	stream_clear(ostrp);
76*101e15b5SRichard Lowe 	stream_unset(ostrp, STREAM_OPEN);
77*101e15b5SRichard Lowe 
78*101e15b5SRichard Lowe 	stream_set(ostrp,
79*101e15b5SRichard Lowe 	    (S->m_single_byte_locale ? STREAM_SINGLE : STREAM_WIDE) |
80*101e15b5SRichard Lowe 	    (S->m_unique_lines ? STREAM_UNIQUE : 0));
81*101e15b5SRichard Lowe 
82*101e15b5SRichard Lowe 	if (S->m_output_to_stdout) {
83*101e15b5SRichard Lowe 		stream_set(ostrp, STREAM_NOTFILE);
84*101e15b5SRichard Lowe 		ostrp->s_filename = (char *)filename_stdout;
85*101e15b5SRichard Lowe 	} else
86*101e15b5SRichard Lowe 		ostrp->s_filename = S->m_output_filename;
87*101e15b5SRichard Lowe 
88*101e15b5SRichard Lowe 	return (SOP_OPEN_FOR_WRITE(ostrp));
89*101e15b5SRichard Lowe }
90*101e15b5SRichard Lowe 
91*101e15b5SRichard Lowe static void
merge_one_stream(field_t * fields_chain,stream_t * strp,stream_t * outstrp,vchar_t field_separator)92*101e15b5SRichard Lowe merge_one_stream(field_t *fields_chain, stream_t *strp, stream_t *outstrp,
93*101e15b5SRichard Lowe     vchar_t field_separator)
94*101e15b5SRichard Lowe {
95*101e15b5SRichard Lowe 	size_t element_size = strp->s_element_size;
96*101e15b5SRichard Lowe 	size_t initial_size = INITIAL_COLLATION_SIZE * element_size;
97*101e15b5SRichard Lowe 
98*101e15b5SRichard Lowe 	if (strp->s_status & STREAM_SINGLE || strp->s_status & STREAM_WIDE)
99*101e15b5SRichard Lowe 		stream_set(strp, STREAM_INSTANT);
100*101e15b5SRichard Lowe 
101*101e15b5SRichard Lowe 	if (SOP_PRIME(strp) == PRIME_SUCCEEDED) {
102*101e15b5SRichard Lowe 		strp->s_current.l_collate_bufsize = initial_size;
103*101e15b5SRichard Lowe 		strp->s_current.l_collate.sp = safe_realloc(NULL, initial_size);
104*101e15b5SRichard Lowe 
105*101e15b5SRichard Lowe 		(void) mg_coll_convert(fields_chain, &strp->s_current,
106*101e15b5SRichard Lowe 		    FCV_REALLOC, field_separator);
107*101e15b5SRichard Lowe 		SOP_PUT_LINE(outstrp, &strp->s_current);
108*101e15b5SRichard Lowe 		SOP_RELEASE_LINE(strp);
109*101e15b5SRichard Lowe 
110*101e15b5SRichard Lowe 		while (!SOP_EOS(strp)) {
111*101e15b5SRichard Lowe 			SOP_FETCH(strp);
112*101e15b5SRichard Lowe 			if (strp->s_current.l_collate_length == 0)
113*101e15b5SRichard Lowe 				(void) mg_coll_convert(fields_chain,
114*101e15b5SRichard Lowe 				    &strp->s_current, FCV_REALLOC,
115*101e15b5SRichard Lowe 				    field_separator);
116*101e15b5SRichard Lowe 			SOP_PUT_LINE(outstrp, &strp->s_current);
117*101e15b5SRichard Lowe 			SOP_RELEASE_LINE(strp);
118*101e15b5SRichard Lowe 		}
119*101e15b5SRichard Lowe 
120*101e15b5SRichard Lowe 		(void) SOP_CLOSE(strp);
121*101e15b5SRichard Lowe 		SOP_FLUSH(outstrp);
122*101e15b5SRichard Lowe 	}
123*101e15b5SRichard Lowe }
124*101e15b5SRichard Lowe 
125*101e15b5SRichard Lowe static void
merge_two_streams(field_t * fields_chain,stream_t * str_a,stream_t * str_b,stream_t * outstrp,vchar_t field_separator,flag_t coll_flags)126*101e15b5SRichard Lowe merge_two_streams(field_t *fields_chain, stream_t *str_a, stream_t *str_b,
127*101e15b5SRichard Lowe     stream_t *outstrp, vchar_t field_separator, flag_t coll_flags)
128*101e15b5SRichard Lowe {
129*101e15b5SRichard Lowe 	int (*collate_fcn)(line_rec_t *, line_rec_t *, ssize_t, flag_t);
130*101e15b5SRichard Lowe 	size_t element_size = str_a->s_element_size;
131*101e15b5SRichard Lowe 	size_t initial_size = INITIAL_COLLATION_SIZE * element_size;
132*101e15b5SRichard Lowe 
133*101e15b5SRichard Lowe 	ASSERT(str_a->s_element_size == str_b->s_element_size);
134*101e15b5SRichard Lowe 
135*101e15b5SRichard Lowe 	if (str_a->s_element_size == sizeof (char))
136*101e15b5SRichard Lowe 		collate_fcn = collated;
137*101e15b5SRichard Lowe 	else
138*101e15b5SRichard Lowe 		collate_fcn = collated_wide;
139*101e15b5SRichard Lowe 
140*101e15b5SRichard Lowe 	if (str_a->s_status & STREAM_SINGLE || str_a->s_status & STREAM_WIDE)
141*101e15b5SRichard Lowe 		stream_set(str_a, STREAM_INSTANT);
142*101e15b5SRichard Lowe 	if (str_b->s_status & STREAM_SINGLE || str_b->s_status & STREAM_WIDE)
143*101e15b5SRichard Lowe 		stream_set(str_b, STREAM_INSTANT);
144*101e15b5SRichard Lowe 
145*101e15b5SRichard Lowe 	if (SOP_PRIME(str_a) != PRIME_SUCCEEDED) {
146*101e15b5SRichard Lowe 		if (SOP_PRIME(str_b) != PRIME_SUCCEEDED)
147*101e15b5SRichard Lowe 			return;
148*101e15b5SRichard Lowe 
149*101e15b5SRichard Lowe 		merge_one_stream(fields_chain, str_b, outstrp,
150*101e15b5SRichard Lowe 		    field_separator);
151*101e15b5SRichard Lowe 		return;
152*101e15b5SRichard Lowe 	}
153*101e15b5SRichard Lowe 
154*101e15b5SRichard Lowe 	if (SOP_PRIME(str_b) != PRIME_SUCCEEDED) {
155*101e15b5SRichard Lowe 		merge_one_stream(fields_chain, str_a, outstrp,
156*101e15b5SRichard Lowe 		    field_separator);
157*101e15b5SRichard Lowe 		return;
158*101e15b5SRichard Lowe 	}
159*101e15b5SRichard Lowe 
160*101e15b5SRichard Lowe 	str_a->s_current.l_collate_bufsize =
161*101e15b5SRichard Lowe 	    str_b->s_current.l_collate_bufsize = initial_size;
162*101e15b5SRichard Lowe 
163*101e15b5SRichard Lowe 	str_a->s_current.l_collate.sp = safe_realloc(NULL, initial_size);
164*101e15b5SRichard Lowe 	str_b->s_current.l_collate.sp = safe_realloc(NULL, initial_size);
165*101e15b5SRichard Lowe 
166*101e15b5SRichard Lowe 	(void) mg_coll_convert(fields_chain, &str_a->s_current, FCV_REALLOC,
167*101e15b5SRichard Lowe 	    field_separator);
168*101e15b5SRichard Lowe 	(void) mg_coll_convert(fields_chain, &str_b->s_current, FCV_REALLOC,
169*101e15b5SRichard Lowe 	    field_separator);
170*101e15b5SRichard Lowe 
171*101e15b5SRichard Lowe 	for (;;) {
172*101e15b5SRichard Lowe 		if (collate_fcn(&str_a->s_current, &str_b->s_current, 0,
173*101e15b5SRichard Lowe 		    coll_flags) < 0) {
174*101e15b5SRichard Lowe 			SOP_PUT_LINE(outstrp, &str_a->s_current);
175*101e15b5SRichard Lowe 			SOP_RELEASE_LINE(str_a);
176*101e15b5SRichard Lowe 			if (SOP_EOS(str_a)) {
177*101e15b5SRichard Lowe 				(void) SOP_CLOSE(str_a);
178*101e15b5SRichard Lowe 				str_a = str_b;
179*101e15b5SRichard Lowe 				break;
180*101e15b5SRichard Lowe 			}
181*101e15b5SRichard Lowe 			SOP_FETCH(str_a);
182*101e15b5SRichard Lowe 			if (str_a->s_current.l_collate_length != 0)
183*101e15b5SRichard Lowe 				continue;
184*101e15b5SRichard Lowe 			(void) mg_coll_convert(fields_chain, &str_a->s_current,
185*101e15b5SRichard Lowe 			    FCV_REALLOC, field_separator);
186*101e15b5SRichard Lowe 		} else {
187*101e15b5SRichard Lowe 			SOP_PUT_LINE(outstrp, &str_b->s_current);
188*101e15b5SRichard Lowe 			SOP_RELEASE_LINE(str_b);
189*101e15b5SRichard Lowe 			if (SOP_EOS(str_b)) {
190*101e15b5SRichard Lowe 				SOP_CLOSE(str_b);
191*101e15b5SRichard Lowe 				break;
192*101e15b5SRichard Lowe 			}
193*101e15b5SRichard Lowe 			SOP_FETCH(str_b);
194*101e15b5SRichard Lowe 			if (str_b->s_current.l_collate_length != 0)
195*101e15b5SRichard Lowe 				continue;
196*101e15b5SRichard Lowe 			(void) mg_coll_convert(fields_chain, &str_b->s_current,
197*101e15b5SRichard Lowe 			    FCV_REALLOC, field_separator);
198*101e15b5SRichard Lowe 		}
199*101e15b5SRichard Lowe 	}
200*101e15b5SRichard Lowe 
201*101e15b5SRichard Lowe 	SOP_PUT_LINE(outstrp, &str_a->s_current);
202*101e15b5SRichard Lowe 	SOP_RELEASE_LINE(str_a);
203*101e15b5SRichard Lowe 
204*101e15b5SRichard Lowe 	while (!SOP_EOS(str_a)) {
205*101e15b5SRichard Lowe 		SOP_FETCH(str_a);
206*101e15b5SRichard Lowe 		if (str_a->s_current.l_collate_length == 0)
207*101e15b5SRichard Lowe 			(void) mg_coll_convert(fields_chain, &str_a->s_current,
208*101e15b5SRichard Lowe 			    FCV_REALLOC, field_separator);
209*101e15b5SRichard Lowe 		SOP_PUT_LINE(outstrp, &str_a->s_current);
210*101e15b5SRichard Lowe 		SOP_RELEASE_LINE(str_a);
211*101e15b5SRichard Lowe 	}
212*101e15b5SRichard Lowe 
213*101e15b5SRichard Lowe 	(void) SOP_CLOSE(str_a);
214*101e15b5SRichard Lowe 	SOP_FLUSH(outstrp);
215*101e15b5SRichard Lowe }
216*101e15b5SRichard Lowe 
217*101e15b5SRichard Lowe /*
218*101e15b5SRichard Lowe  * priority queue routines
219*101e15b5SRichard Lowe  *   used for merges involving more than two sources
220*101e15b5SRichard Lowe  */
221*101e15b5SRichard Lowe static void
heap_up(stream_t ** A,int k,flag_t coll_flags)222*101e15b5SRichard Lowe heap_up(stream_t **A, int k, flag_t coll_flags)
223*101e15b5SRichard Lowe {
224*101e15b5SRichard Lowe 	while (k > 1 &&
225*101e15b5SRichard Lowe 	    pq_coll_fcn(&A[k / 2]->s_current, &A[k]->s_current, 0,
226*101e15b5SRichard Lowe 	    coll_flags) > 0) {
227*101e15b5SRichard Lowe 		swap((void **)&pq_queue[k], (void **)&pq_queue[k / 2]);
228*101e15b5SRichard Lowe 		k /= 2;
229*101e15b5SRichard Lowe 	}
230*101e15b5SRichard Lowe }
231*101e15b5SRichard Lowe 
232*101e15b5SRichard Lowe static void
heap_down(stream_t ** A,int k,int N,flag_t coll_flags)233*101e15b5SRichard Lowe heap_down(stream_t **A, int k, int N, flag_t coll_flags)
234*101e15b5SRichard Lowe {
235*101e15b5SRichard Lowe 	int	j;
236*101e15b5SRichard Lowe 
237*101e15b5SRichard Lowe 	while (2 * k <= N) {
238*101e15b5SRichard Lowe 		j = 2 * k;
239*101e15b5SRichard Lowe 		if (j < N && pq_coll_fcn(&A[j]->s_current,
240*101e15b5SRichard Lowe 		    &A[j + 1]->s_current, 0, coll_flags) > 0)
241*101e15b5SRichard Lowe 			j++;
242*101e15b5SRichard Lowe 		if (pq_coll_fcn(&A[k]->s_current, &A[j]->s_current, 0,
243*101e15b5SRichard Lowe 		    coll_flags) <= 0)
244*101e15b5SRichard Lowe 			break;
245*101e15b5SRichard Lowe 		swap((void **)&pq_queue[k], (void **)&pq_queue[j]);
246*101e15b5SRichard Lowe 		k = j;
247*101e15b5SRichard Lowe 	}
248*101e15b5SRichard Lowe }
249*101e15b5SRichard Lowe 
250*101e15b5SRichard Lowe static int
pqueue_empty()251*101e15b5SRichard Lowe pqueue_empty()
252*101e15b5SRichard Lowe {
253*101e15b5SRichard Lowe 	return (pq_N == 0);
254*101e15b5SRichard Lowe }
255*101e15b5SRichard Lowe 
256*101e15b5SRichard Lowe static void
pqueue_init(size_t max_size,int (* coll_fcn)(line_rec_t *,line_rec_t *,ssize_t,flag_t))257*101e15b5SRichard Lowe pqueue_init(size_t max_size,
258*101e15b5SRichard Lowe     int (*coll_fcn)(line_rec_t *, line_rec_t *, ssize_t, flag_t))
259*101e15b5SRichard Lowe {
260*101e15b5SRichard Lowe 	pq_queue = safe_realloc(NULL, sizeof (stream_t *) * (max_size + 1));
261*101e15b5SRichard Lowe 	pq_N = 0;
262*101e15b5SRichard Lowe 	pq_coll_fcn = coll_fcn;
263*101e15b5SRichard Lowe }
264*101e15b5SRichard Lowe 
265*101e15b5SRichard Lowe static void
pqueue_insert(stream_t * source,flag_t coll_flags)266*101e15b5SRichard Lowe pqueue_insert(stream_t *source, flag_t coll_flags)
267*101e15b5SRichard Lowe {
268*101e15b5SRichard Lowe 	pq_queue[++pq_N] = source;
269*101e15b5SRichard Lowe 	heap_up(pq_queue, pq_N, coll_flags);
270*101e15b5SRichard Lowe }
271*101e15b5SRichard Lowe 
272*101e15b5SRichard Lowe static stream_t *
pqueue_head(flag_t coll_flags)273*101e15b5SRichard Lowe pqueue_head(flag_t coll_flags)
274*101e15b5SRichard Lowe {
275*101e15b5SRichard Lowe 	swap((void **)&pq_queue[1], (void **)&pq_queue[pq_N]);
276*101e15b5SRichard Lowe 	heap_down(pq_queue, 1, pq_N - 1, coll_flags);
277*101e15b5SRichard Lowe 	return (pq_queue[pq_N--]);
278*101e15b5SRichard Lowe }
279*101e15b5SRichard Lowe 
280*101e15b5SRichard Lowe static void
merge_n_streams(sort_t * S,stream_t * head_streamp,int n_streams,stream_t * out_streamp,flag_t coll_flags)281*101e15b5SRichard Lowe merge_n_streams(sort_t *S, stream_t *head_streamp, int n_streams,
282*101e15b5SRichard Lowe     stream_t *out_streamp, flag_t coll_flags)
283*101e15b5SRichard Lowe {
284*101e15b5SRichard Lowe 	stream_t *top_streamp;
285*101e15b5SRichard Lowe 	stream_t *cur_streamp;
286*101e15b5SRichard Lowe 	stream_t *bot_streamp;
287*101e15b5SRichard Lowe 	stream_t *loop_out_streamp;
288*101e15b5SRichard Lowe 	flag_t is_single_byte = S->m_single_byte_locale;
289*101e15b5SRichard Lowe 
290*101e15b5SRichard Lowe 	int n_opens = 0;
291*101e15b5SRichard Lowe 	int threshold_opens;
292*101e15b5SRichard Lowe 
293*101e15b5SRichard Lowe 	threshold_opens = MAX(4,
294*101e15b5SRichard Lowe 	    2 * S->m_memory_available / DEFAULT_RELEASE_SIZE);
295*101e15b5SRichard Lowe 
296*101e15b5SRichard Lowe 	pqueue_init(n_streams, is_single_byte ? collated : collated_wide);
297*101e15b5SRichard Lowe 
298*101e15b5SRichard Lowe 	top_streamp = bot_streamp = head_streamp;
299*101e15b5SRichard Lowe 
300*101e15b5SRichard Lowe 	for (;;) {
301*101e15b5SRichard Lowe 		hold_file_descriptor();
302*101e15b5SRichard Lowe 		while (bot_streamp != NULL) {
303*101e15b5SRichard Lowe 
304*101e15b5SRichard Lowe 			if (n_opens > threshold_opens ||
305*101e15b5SRichard Lowe 			    stream_open_for_read(S, bot_streamp) == -1) {
306*101e15b5SRichard Lowe 				/*
307*101e15b5SRichard Lowe 				 * Available file descriptors would exceed
308*101e15b5SRichard Lowe 				 * memory target or have been exhausted; back
309*101e15b5SRichard Lowe 				 * off to the last valid, primed stream.
310*101e15b5SRichard Lowe 				 */
311*101e15b5SRichard Lowe 				bot_streamp = bot_streamp->s_previous;
312*101e15b5SRichard Lowe 				break;
313*101e15b5SRichard Lowe 			}
314*101e15b5SRichard Lowe 
315*101e15b5SRichard Lowe 			if (bot_streamp->s_status & STREAM_SINGLE ||
316*101e15b5SRichard Lowe 			    bot_streamp->s_status & STREAM_WIDE)
317*101e15b5SRichard Lowe 				stream_set(bot_streamp, STREAM_INSTANT);
318*101e15b5SRichard Lowe 
319*101e15b5SRichard Lowe 			bot_streamp = bot_streamp->s_next;
320*101e15b5SRichard Lowe 			n_opens++;
321*101e15b5SRichard Lowe 		}
322*101e15b5SRichard Lowe 		release_file_descriptor();
323*101e15b5SRichard Lowe 
324*101e15b5SRichard Lowe 		if (bot_streamp == NULL) {
325*101e15b5SRichard Lowe 			if (prepare_output_stream(out_streamp, S) != -1)
326*101e15b5SRichard Lowe 				loop_out_streamp = out_streamp;
327*101e15b5SRichard Lowe 			else
328*101e15b5SRichard Lowe 				die(EMSG_DESCRIPTORS);
329*101e15b5SRichard Lowe 		} else {
330*101e15b5SRichard Lowe 			loop_out_streamp = stream_push_to_temporary(
331*101e15b5SRichard Lowe 			    &head_streamp, NULL, ST_OPEN | ST_NOCACHE |
332*101e15b5SRichard Lowe 			    (is_single_byte ? 0 : ST_WIDE));
333*101e15b5SRichard Lowe 
334*101e15b5SRichard Lowe 			if (loop_out_streamp == NULL ||
335*101e15b5SRichard Lowe 			    top_streamp == bot_streamp)
336*101e15b5SRichard Lowe 				/*
337*101e15b5SRichard Lowe 				 * We need three file descriptors to make
338*101e15b5SRichard Lowe 				 * progress; if top_streamp == bot_streamp, then
339*101e15b5SRichard Lowe 				 * we have only two.
340*101e15b5SRichard Lowe 				 */
341*101e15b5SRichard Lowe 				die(EMSG_DESCRIPTORS);
342*101e15b5SRichard Lowe 		}
343*101e15b5SRichard Lowe 
344*101e15b5SRichard Lowe 		for (cur_streamp = top_streamp; cur_streamp != bot_streamp;
345*101e15b5SRichard Lowe 		    cur_streamp = cur_streamp->s_next) {
346*101e15b5SRichard Lowe 			/*
347*101e15b5SRichard Lowe 			 * Empty stream?
348*101e15b5SRichard Lowe 			 */
349*101e15b5SRichard Lowe 			if (!(cur_streamp->s_status & STREAM_ARRAY) &&
350*101e15b5SRichard Lowe 			    SOP_EOS(cur_streamp)) {
351*101e15b5SRichard Lowe 				stream_unlink_temporary(cur_streamp);
352*101e15b5SRichard Lowe 				continue;
353*101e15b5SRichard Lowe 			}
354*101e15b5SRichard Lowe 
355*101e15b5SRichard Lowe 			/*
356*101e15b5SRichard Lowe 			 * Given that stream is not empty, any error in priming
357*101e15b5SRichard Lowe 			 * must be fatal.
358*101e15b5SRichard Lowe 			 */
359*101e15b5SRichard Lowe 			if (SOP_PRIME(cur_streamp) != PRIME_SUCCEEDED)
360*101e15b5SRichard Lowe 				die(EMSG_BADPRIME);
361*101e15b5SRichard Lowe 
362*101e15b5SRichard Lowe 			cur_streamp->s_current.l_collate_bufsize =
363*101e15b5SRichard Lowe 			    INITIAL_COLLATION_SIZE;
364*101e15b5SRichard Lowe 			cur_streamp->s_current.l_collate.sp =
365*101e15b5SRichard Lowe 			    safe_realloc(NULL, INITIAL_COLLATION_SIZE);
366*101e15b5SRichard Lowe 			(void) mg_coll_convert(S->m_fields_head,
367*101e15b5SRichard Lowe 			    &cur_streamp->s_current, FCV_REALLOC,
368*101e15b5SRichard Lowe 			    S->m_field_separator);
369*101e15b5SRichard Lowe 
370*101e15b5SRichard Lowe 			pqueue_insert(cur_streamp, coll_flags);
371*101e15b5SRichard Lowe 		}
372*101e15b5SRichard Lowe 
373*101e15b5SRichard Lowe 		while (!pqueue_empty()) {
374*101e15b5SRichard Lowe 			cur_streamp = pqueue_head(coll_flags);
375*101e15b5SRichard Lowe 
376*101e15b5SRichard Lowe 			SOP_PUT_LINE(loop_out_streamp, &cur_streamp->s_current);
377*101e15b5SRichard Lowe 			SOP_RELEASE_LINE(cur_streamp);
378*101e15b5SRichard Lowe 
379*101e15b5SRichard Lowe 			if (!SOP_EOS(cur_streamp)) {
380*101e15b5SRichard Lowe 				SOP_FETCH(cur_streamp);
381*101e15b5SRichard Lowe 				(void) mg_coll_convert(S->m_fields_head,
382*101e15b5SRichard Lowe 				    &cur_streamp->s_current, FCV_REALLOC,
383*101e15b5SRichard Lowe 				    S->m_field_separator);
384*101e15b5SRichard Lowe 				pqueue_insert(cur_streamp, coll_flags);
385*101e15b5SRichard Lowe 			}
386*101e15b5SRichard Lowe 		}
387*101e15b5SRichard Lowe 
388*101e15b5SRichard Lowe 		cur_streamp = top_streamp;
389*101e15b5SRichard Lowe 		while (cur_streamp != bot_streamp) {
390*101e15b5SRichard Lowe 			if (!(cur_streamp->s_status & STREAM_ARRAY))
391*101e15b5SRichard Lowe 				safe_free(cur_streamp->s_current.l_collate.sp);
392*101e15b5SRichard Lowe 			cur_streamp->s_current.l_collate.sp = NULL;
393*101e15b5SRichard Lowe 
394*101e15b5SRichard Lowe 			(void) SOP_FREE(cur_streamp);
395*101e15b5SRichard Lowe 			stream_unlink_temporary(cur_streamp);
396*101e15b5SRichard Lowe 			(void) SOP_CLOSE(cur_streamp);
397*101e15b5SRichard Lowe 
398*101e15b5SRichard Lowe 			cur_streamp = cur_streamp->s_next;
399*101e15b5SRichard Lowe 		}
400*101e15b5SRichard Lowe 
401*101e15b5SRichard Lowe 		(void) SOP_FLUSH(loop_out_streamp);
402*101e15b5SRichard Lowe 
403*101e15b5SRichard Lowe 		if (bot_streamp == NULL)
404*101e15b5SRichard Lowe 			break;
405*101e15b5SRichard Lowe 
406*101e15b5SRichard Lowe 		if (!(loop_out_streamp->s_status & STREAM_NOTFILE)) {
407*101e15b5SRichard Lowe 			(void) SOP_CLOSE(loop_out_streamp);
408*101e15b5SRichard Lowe 			/*
409*101e15b5SRichard Lowe 			 * Get file size so that we may treat intermediate files
410*101e15b5SRichard Lowe 			 * with our stream_mmap facilities.
411*101e15b5SRichard Lowe 			 */
412*101e15b5SRichard Lowe 			stream_stat_chain(loop_out_streamp);
413*101e15b5SRichard Lowe 			__S(stats_incr_merge_files());
414*101e15b5SRichard Lowe 		}
415*101e15b5SRichard Lowe 
416*101e15b5SRichard Lowe 		n_opens = 0;
417*101e15b5SRichard Lowe 
418*101e15b5SRichard Lowe 		top_streamp = bot_streamp;
419*101e15b5SRichard Lowe 		bot_streamp = bot_streamp->s_next;
420*101e15b5SRichard Lowe 	}
421*101e15b5SRichard Lowe }
422*101e15b5SRichard Lowe 
423*101e15b5SRichard Lowe void
merge(sort_t * S)424*101e15b5SRichard Lowe merge(sort_t *S)
425*101e15b5SRichard Lowe {
426*101e15b5SRichard Lowe 	stream_t *merge_chain;
427*101e15b5SRichard Lowe 	stream_t *cur_streamp;
428*101e15b5SRichard Lowe 	stream_t out_stream;
429*101e15b5SRichard Lowe 	uint_t n_merges;
430*101e15b5SRichard Lowe 	flag_t coll_flags;
431*101e15b5SRichard Lowe 
432*101e15b5SRichard Lowe 	if (S->m_merge_only) {
433*101e15b5SRichard Lowe 		merge_chain = S->m_input_streams;
434*101e15b5SRichard Lowe 		set_cleanup_chain(&S->m_input_streams);
435*101e15b5SRichard Lowe 	} else {
436*101e15b5SRichard Lowe 		/*
437*101e15b5SRichard Lowe 		 * Otherwise we're inheriting the temporary output files from
438*101e15b5SRichard Lowe 		 * our internal sort.
439*101e15b5SRichard Lowe 		 */
440*101e15b5SRichard Lowe 		merge_chain = S->m_temporary_streams;
441*101e15b5SRichard Lowe 		stream_stat_chain(merge_chain);
442*101e15b5SRichard Lowe 		__S(stats_set_merge_files(stream_count_chain(merge_chain)));
443*101e15b5SRichard Lowe 	}
444*101e15b5SRichard Lowe 
445*101e15b5SRichard Lowe 	if (S->m_field_options & FIELD_REVERSE_COMPARISONS)
446*101e15b5SRichard Lowe 		coll_flags = COLL_REVERSE;
447*101e15b5SRichard Lowe 	else
448*101e15b5SRichard Lowe 		coll_flags = 0;
449*101e15b5SRichard Lowe 	if (S->m_entire_line)
450*101e15b5SRichard Lowe 		coll_flags |= COLL_UNIQUE;
451*101e15b5SRichard Lowe 
452*101e15b5SRichard Lowe 	n_merges = stream_count_chain(merge_chain);
453*101e15b5SRichard Lowe 
454*101e15b5SRichard Lowe 	mg_coll_convert = S->m_coll_convert;
455*101e15b5SRichard Lowe 	cur_streamp = merge_chain;
456*101e15b5SRichard Lowe 
457*101e15b5SRichard Lowe 	switch (n_merges) {
458*101e15b5SRichard Lowe 		case 0:
459*101e15b5SRichard Lowe 			/*
460*101e15b5SRichard Lowe 			 * No files for merge.
461*101e15b5SRichard Lowe 			 */
462*101e15b5SRichard Lowe 			warn(gettext("no files available to merge\n"));
463*101e15b5SRichard Lowe 			break;
464*101e15b5SRichard Lowe 		case 1:
465*101e15b5SRichard Lowe 			/*
466*101e15b5SRichard Lowe 			 * Fast path: only one file for merge.
467*101e15b5SRichard Lowe 			 */
468*101e15b5SRichard Lowe 			(void) stream_open_for_read(S, cur_streamp);
469*101e15b5SRichard Lowe 			(void) prepare_output_stream(&out_stream, S);
470*101e15b5SRichard Lowe 			merge_one_stream(S->m_fields_head, cur_streamp,
471*101e15b5SRichard Lowe 			    &out_stream, S->m_field_separator);
472*101e15b5SRichard Lowe 			break;
473*101e15b5SRichard Lowe 		case 2:
474*101e15b5SRichard Lowe 			/*
475*101e15b5SRichard Lowe 			 * Fast path: only two files for merge.
476*101e15b5SRichard Lowe 			 */
477*101e15b5SRichard Lowe 			(void) stream_open_for_read(S, cur_streamp);
478*101e15b5SRichard Lowe 			(void) stream_open_for_read(S, cur_streamp->s_next);
479*101e15b5SRichard Lowe 			if (prepare_output_stream(&out_stream, S) == -1)
480*101e15b5SRichard Lowe 				die(EMSG_DESCRIPTORS);
481*101e15b5SRichard Lowe 			merge_two_streams(S->m_fields_head, cur_streamp,
482*101e15b5SRichard Lowe 			    cur_streamp->s_next, &out_stream,
483*101e15b5SRichard Lowe 			    S->m_field_separator, coll_flags);
484*101e15b5SRichard Lowe 			break;
485*101e15b5SRichard Lowe 		default:
486*101e15b5SRichard Lowe 			/*
487*101e15b5SRichard Lowe 			 * Full merge.
488*101e15b5SRichard Lowe 			 */
489*101e15b5SRichard Lowe 			merge_n_streams(S, cur_streamp, n_merges, &out_stream,
490*101e15b5SRichard Lowe 			    coll_flags);
491*101e15b5SRichard Lowe 			break;
492*101e15b5SRichard Lowe 	}
493*101e15b5SRichard Lowe 
494*101e15b5SRichard Lowe 	remove_output_guard();
495*101e15b5SRichard Lowe }
496