xref: /linux/fs/bcachefs/thread_with_file.c (revision f5c31bcf604db54470868f3118a60dc4a9ba8813)
1 // SPDX-License-Identifier: GPL-2.0
2 #ifndef NO_BCACHEFS_FS
3 
4 #include "bcachefs.h"
5 #include "printbuf.h"
6 #include "thread_with_file.h"
7 
8 #include <linux/anon_inodes.h>
9 #include <linux/file.h>
10 #include <linux/kthread.h>
11 #include <linux/pagemap.h>
12 #include <linux/poll.h>
13 
14 void bch2_thread_with_file_exit(struct thread_with_file *thr)
15 {
16 	if (thr->task) {
17 		kthread_stop(thr->task);
18 		put_task_struct(thr->task);
19 	}
20 }
21 
22 int bch2_run_thread_with_file(struct thread_with_file *thr,
23 			      const struct file_operations *fops,
24 			      int (*fn)(void *))
25 {
26 	struct file *file = NULL;
27 	int ret, fd = -1;
28 	unsigned fd_flags = O_CLOEXEC;
29 
30 	if (fops->read && fops->write)
31 		fd_flags |= O_RDWR;
32 	else if (fops->read)
33 		fd_flags |= O_RDONLY;
34 	else if (fops->write)
35 		fd_flags |= O_WRONLY;
36 
37 	char name[TASK_COMM_LEN];
38 	get_task_comm(name, current);
39 
40 	thr->ret = 0;
41 	thr->task = kthread_create(fn, thr, "%s", name);
42 	ret = PTR_ERR_OR_ZERO(thr->task);
43 	if (ret)
44 		return ret;
45 
46 	ret = get_unused_fd_flags(fd_flags);
47 	if (ret < 0)
48 		goto err;
49 	fd = ret;
50 
51 	file = anon_inode_getfile(name, fops, thr, fd_flags);
52 	ret = PTR_ERR_OR_ZERO(file);
53 	if (ret)
54 		goto err;
55 
56 	get_task_struct(thr->task);
57 	wake_up_process(thr->task);
58 	fd_install(fd, file);
59 	return fd;
60 err:
61 	if (fd >= 0)
62 		put_unused_fd(fd);
63 	if (thr->task)
64 		kthread_stop(thr->task);
65 	return ret;
66 }
67 
68 static inline bool thread_with_stdio_has_output(struct thread_with_stdio *thr)
69 {
70 	return thr->stdio.output_buf.pos ||
71 		thr->output2.nr ||
72 		thr->thr.done;
73 }
74 
75 static ssize_t thread_with_stdio_read(struct file *file, char __user *buf,
76 				      size_t len, loff_t *ppos)
77 {
78 	struct thread_with_stdio *thr =
79 		container_of(file->private_data, struct thread_with_stdio, thr);
80 	size_t copied = 0, b;
81 	int ret = 0;
82 
83 	if ((file->f_flags & O_NONBLOCK) &&
84 	    !thread_with_stdio_has_output(thr))
85 		return -EAGAIN;
86 
87 	ret = wait_event_interruptible(thr->stdio.output_wait,
88 		thread_with_stdio_has_output(thr));
89 	if (ret)
90 		return ret;
91 
92 	if (thr->thr.done)
93 		return 0;
94 
95 	while (len) {
96 		ret = darray_make_room(&thr->output2, thr->stdio.output_buf.pos);
97 		if (ret)
98 			break;
99 
100 		spin_lock_irq(&thr->stdio.output_lock);
101 		b = min_t(size_t, darray_room(thr->output2), thr->stdio.output_buf.pos);
102 
103 		memcpy(&darray_top(thr->output2), thr->stdio.output_buf.buf, b);
104 		memmove(thr->stdio.output_buf.buf,
105 			thr->stdio.output_buf.buf + b,
106 			thr->stdio.output_buf.pos - b);
107 
108 		thr->output2.nr += b;
109 		thr->stdio.output_buf.pos -= b;
110 		spin_unlock_irq(&thr->stdio.output_lock);
111 
112 		b = min(len, thr->output2.nr);
113 		if (!b)
114 			break;
115 
116 		b -= copy_to_user(buf, thr->output2.data, b);
117 		if (!b) {
118 			ret = -EFAULT;
119 			break;
120 		}
121 
122 		copied	+= b;
123 		buf	+= b;
124 		len	-= b;
125 
126 		memmove(thr->output2.data,
127 			thr->output2.data + b,
128 			thr->output2.nr - b);
129 		thr->output2.nr -= b;
130 	}
131 
132 	return copied ?: ret;
133 }
134 
135 static int thread_with_stdio_release(struct inode *inode, struct file *file)
136 {
137 	struct thread_with_stdio *thr =
138 		container_of(file->private_data, struct thread_with_stdio, thr);
139 
140 	bch2_thread_with_file_exit(&thr->thr);
141 	printbuf_exit(&thr->stdio.input_buf);
142 	printbuf_exit(&thr->stdio.output_buf);
143 	darray_exit(&thr->output2);
144 	thr->exit(thr);
145 	return 0;
146 }
147 
148 #define WRITE_BUFFER		4096
149 
150 static inline bool thread_with_stdio_has_input_space(struct thread_with_stdio *thr)
151 {
152 	return thr->stdio.input_buf.pos < WRITE_BUFFER || thr->thr.done;
153 }
154 
155 static ssize_t thread_with_stdio_write(struct file *file, const char __user *ubuf,
156 				       size_t len, loff_t *ppos)
157 {
158 	struct thread_with_stdio *thr =
159 		container_of(file->private_data, struct thread_with_stdio, thr);
160 	struct printbuf *buf = &thr->stdio.input_buf;
161 	size_t copied = 0;
162 	ssize_t ret = 0;
163 
164 	while (len) {
165 		if (thr->thr.done) {
166 			ret = -EPIPE;
167 			break;
168 		}
169 
170 		size_t b = len - fault_in_readable(ubuf, len);
171 		if (!b) {
172 			ret = -EFAULT;
173 			break;
174 		}
175 
176 		spin_lock(&thr->stdio.input_lock);
177 		if (buf->pos < WRITE_BUFFER)
178 			bch2_printbuf_make_room(buf, min(b, WRITE_BUFFER - buf->pos));
179 		b = min(len, printbuf_remaining_size(buf));
180 
181 		if (b && !copy_from_user_nofault(&buf->buf[buf->pos], ubuf, b)) {
182 			ubuf += b;
183 			len -= b;
184 			copied += b;
185 			buf->pos += b;
186 		}
187 		spin_unlock(&thr->stdio.input_lock);
188 
189 		if (b) {
190 			wake_up(&thr->stdio.input_wait);
191 		} else {
192 			if ((file->f_flags & O_NONBLOCK)) {
193 				ret = -EAGAIN;
194 				break;
195 			}
196 
197 			ret = wait_event_interruptible(thr->stdio.input_wait,
198 					thread_with_stdio_has_input_space(thr));
199 			if (ret)
200 				break;
201 		}
202 	}
203 
204 	return copied ?: ret;
205 }
206 
207 static __poll_t thread_with_stdio_poll(struct file *file, struct poll_table_struct *wait)
208 {
209 	struct thread_with_stdio *thr =
210 		container_of(file->private_data, struct thread_with_stdio, thr);
211 
212 	poll_wait(file, &thr->stdio.output_wait, wait);
213 	poll_wait(file, &thr->stdio.input_wait, wait);
214 
215 	__poll_t mask = 0;
216 
217 	if (thread_with_stdio_has_output(thr))
218 		mask |= EPOLLIN;
219 	if (thread_with_stdio_has_input_space(thr))
220 		mask |= EPOLLOUT;
221 	if (thr->thr.done)
222 		mask |= EPOLLHUP|EPOLLERR;
223 	return mask;
224 }
225 
226 static const struct file_operations thread_with_stdio_fops = {
227 	.release	= thread_with_stdio_release,
228 	.read		= thread_with_stdio_read,
229 	.write		= thread_with_stdio_write,
230 	.poll		= thread_with_stdio_poll,
231 	.llseek		= no_llseek,
232 };
233 
234 int bch2_run_thread_with_stdio(struct thread_with_stdio *thr,
235 			       void (*exit)(struct thread_with_stdio *),
236 			       int (*fn)(void *))
237 {
238 	thr->stdio.input_buf = PRINTBUF;
239 	thr->stdio.input_buf.atomic++;
240 	spin_lock_init(&thr->stdio.input_lock);
241 	init_waitqueue_head(&thr->stdio.input_wait);
242 
243 	thr->stdio.output_buf = PRINTBUF;
244 	thr->stdio.output_buf.atomic++;
245 	spin_lock_init(&thr->stdio.output_lock);
246 	init_waitqueue_head(&thr->stdio.output_wait);
247 
248 	darray_init(&thr->output2);
249 	thr->exit = exit;
250 
251 	return bch2_run_thread_with_file(&thr->thr, &thread_with_stdio_fops, fn);
252 }
253 
254 int bch2_stdio_redirect_read(struct stdio_redirect *stdio, char *buf, size_t len)
255 {
256 	wait_event(stdio->input_wait,
257 		   stdio->input_buf.pos || stdio->done);
258 
259 	if (stdio->done)
260 		return -1;
261 
262 	spin_lock(&stdio->input_lock);
263 	int ret = min(len, stdio->input_buf.pos);
264 	stdio->input_buf.pos -= ret;
265 	memcpy(buf, stdio->input_buf.buf, ret);
266 	memmove(stdio->input_buf.buf,
267 		stdio->input_buf.buf + ret,
268 		stdio->input_buf.pos);
269 	spin_unlock(&stdio->input_lock);
270 
271 	wake_up(&stdio->input_wait);
272 	return ret;
273 }
274 
275 int bch2_stdio_redirect_readline(struct stdio_redirect *stdio, char *buf, size_t len)
276 {
277 	wait_event(stdio->input_wait,
278 		   stdio->input_buf.pos || stdio->done);
279 
280 	if (stdio->done)
281 		return -1;
282 
283 	spin_lock(&stdio->input_lock);
284 	int ret = min(len, stdio->input_buf.pos);
285 	char *n = memchr(stdio->input_buf.buf, '\n', ret);
286 	if (n)
287 		ret = min(ret, n + 1 - stdio->input_buf.buf);
288 	stdio->input_buf.pos -= ret;
289 	memcpy(buf, stdio->input_buf.buf, ret);
290 	memmove(stdio->input_buf.buf,
291 		stdio->input_buf.buf + ret,
292 		stdio->input_buf.pos);
293 	spin_unlock(&stdio->input_lock);
294 
295 	wake_up(&stdio->input_wait);
296 	return ret;
297 }
298 
299 #endif /* NO_BCACHEFS_FS */
300