xref: /freebsd/lib/libdevdctl/consumer.cc (revision eb9da1ada8b6b2c74378a5c17029ec5a7fb199e6)
1 /*-
2  * Copyright (c) 2011, 2012, 2013, 2014 Spectra Logic Corporation
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions, and the following disclaimer,
10  *    without modification.
11  * 2. Redistributions in binary form must reproduce at minimum a disclaimer
12  *    substantially similar to the "NO WARRANTY" disclaimer below
13  *    ("Disclaimer") and any redistribution must be conditioned upon
14  *    including a substantially similar Disclaimer requirement for further
15  *    binary redistribution.
16  *
17  * NO WARRANTY
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTIBILITY AND FITNESS FOR
21  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22  * HOLDERS OR CONTRIBUTORS BE LIABLE FOR SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
26  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
27  * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGES.
29  *
30  * Authors: Justin T. Gibbs     (Spectra Logic Corporation)
31  */
32 
33 /**
34  * \file consumer.cc
35  */
36 
37 #include <sys/cdefs.h>
38 #include <sys/poll.h>
39 #include <sys/socket.h>
40 #include <sys/un.h>
41 
42 #include <err.h>
43 #include <errno.h>
44 #include <fcntl.h>
45 #include <syslog.h>
46 #include <unistd.h>
47 
48 #include <cstdarg>
49 #include <cstring>
50 #include <list>
51 #include <map>
52 #include <string>
53 
54 #include "guid.h"
55 #include "event.h"
56 #include "event_factory.h"
57 #include "exception.h"
58 
59 #include "consumer.h"
60 
61 __FBSDID("$FreeBSD$");
62 
63 /*================================== Macros ==================================*/
64 #define NUM_ELEMENTS(x) (sizeof(x) / sizeof(*x))
65 
66 /*============================ Namespace Control =============================*/
67 using std::string;
68 namespace DevdCtl
69 {
70 
71 /*============================= Class Definitions ============================*/
72 /*----------------------------- DevdCtl::Consumer ----------------------------*/
73 //- Consumer Static Private Data -----------------------------------------------
74 const char Consumer::s_devdSockPath[] = "/var/run/devd.seqpacket.pipe";
75 
76 //- Consumer Public Methods ----------------------------------------------------
77 Consumer::Consumer(Event::BuildMethod *defBuilder,
78 		   EventFactory::Record *regEntries,
79 		   size_t numEntries)
80  : m_devdSockFD(-1),
81    m_eventFactory(defBuilder),
82    m_replayingEvents(false)
83 {
84 	m_eventFactory.UpdateRegistry(regEntries, numEntries);
85 }
86 
87 Consumer::~Consumer()
88 {
89 	DisconnectFromDevd();
90 }
91 
92 bool
93 Consumer::ConnectToDevd()
94 {
95 	struct sockaddr_un devdAddr;
96 	int		   sLen;
97 	int		   result;
98 
99 	if (m_devdSockFD != -1) {
100 		/* Already connected. */
101 		syslog(LOG_DEBUG, "%s: Already connected.", __func__);
102 		return (true);
103 	}
104 	syslog(LOG_INFO, "%s: Connecting to devd.", __func__);
105 
106 	memset(&devdAddr, 0, sizeof(devdAddr));
107 	devdAddr.sun_family= AF_UNIX;
108 	strlcpy(devdAddr.sun_path, s_devdSockPath, sizeof(devdAddr.sun_path));
109 	sLen = SUN_LEN(&devdAddr);
110 
111 	m_devdSockFD = socket(AF_UNIX, SOCK_SEQPACKET, 0);
112 	if (m_devdSockFD == -1)
113 		err(1, "Unable to create socket");
114         if (fcntl(m_devdSockFD, F_SETFL, O_NONBLOCK) < 0)
115                 err(1, "fcntl");
116 	result = connect(m_devdSockFD,
117 			 reinterpret_cast<sockaddr *>(&devdAddr),
118 			 sLen);
119 	if (result == -1) {
120 		syslog(LOG_INFO, "Unable to connect to devd");
121 		DisconnectFromDevd();
122 		return (false);
123 	}
124 
125 	syslog(LOG_INFO, "Connection to devd successful");
126 	return (true);
127 }
128 
129 void
130 Consumer::DisconnectFromDevd()
131 {
132 	if (m_devdSockFD != -1) {
133 		syslog(LOG_INFO, "Disconnecting from devd.");
134 		close(m_devdSockFD);
135 	}
136 	m_devdSockFD = -1;
137 }
138 
139 std::string
140 Consumer::ReadEvent()
141 {
142 	char buf[MAX_EVENT_SIZE + 1];
143 	ssize_t len;
144 
145 	len = ::recv(m_devdSockFD, buf, MAX_EVENT_SIZE, MSG_WAITALL);
146 	if (len == -1)
147 		return (std::string(""));
148 	else {
149 		/* NULL-terminate the result */
150 		buf[len] = '\0';
151 		return (std::string(buf));
152 	}
153 }
154 
155 void
156 Consumer::ReplayUnconsumedEvents(bool discardUnconsumed)
157 {
158 	EventList::iterator event(m_unconsumedEvents.begin());
159 	bool replayed_any = (event != m_unconsumedEvents.end());
160 
161 	m_replayingEvents = true;
162 	if (replayed_any)
163 		syslog(LOG_INFO, "Started replaying unconsumed events");
164 	while (event != m_unconsumedEvents.end()) {
165 		bool consumed((*event)->Process());
166 		if (consumed || discardUnconsumed) {
167 			delete *event;
168 			event = m_unconsumedEvents.erase(event);
169 		} else {
170 			event++;
171 		}
172 	}
173 	if (replayed_any)
174 		syslog(LOG_INFO, "Finished replaying unconsumed events");
175 	m_replayingEvents = false;
176 }
177 
178 bool
179 Consumer::SaveEvent(const Event &event)
180 {
181         if (m_replayingEvents)
182                 return (false);
183         m_unconsumedEvents.push_back(event.DeepCopy());
184         return (true);
185 }
186 
187 Event *
188 Consumer::NextEvent()
189 {
190 	if (!Connected())
191 		return(NULL);
192 
193 	Event *event(NULL);
194 	try {
195 		string evString;
196 
197 		evString = ReadEvent();
198 		if (! evString.empty()) {
199 			Event::TimestampEventString(evString);
200 			event = Event::CreateEvent(m_eventFactory, evString);
201 		}
202 	} catch (const Exception &exp) {
203 		exp.Log();
204 		DisconnectFromDevd();
205 	}
206 	return (event);
207 }
208 
209 /* Capture and process buffered events. */
210 void
211 Consumer::ProcessEvents()
212 {
213 	Event *event;
214 	while ((event = NextEvent()) != NULL) {
215 		if (event->Process())
216 			SaveEvent(*event);
217 		delete event;
218 	}
219 }
220 
221 void
222 Consumer::FlushEvents()
223 {
224 	std::string s;
225 
226 	do
227 		s = ReadEvent();
228 	while (! s.empty()) ;
229 }
230 
231 bool
232 Consumer::EventsPending()
233 {
234 	struct pollfd fds[1];
235 	int	      result;
236 
237 	do {
238 		fds->fd      = m_devdSockFD;
239 		fds->events  = POLLIN;
240 		fds->revents = 0;
241 		result = poll(fds, NUM_ELEMENTS(fds), /*timeout*/0);
242 	} while (result == -1 && errno == EINTR);
243 
244 	if (result == -1)
245 		err(1, "Polling for devd events failed");
246 
247 	if ((fds->revents & POLLERR) != 0)
248 		throw Exception("Consumer::EventsPending(): "
249 				"POLLERR detected on devd socket.");
250 
251 	if ((fds->revents & POLLHUP) != 0)
252 		throw Exception("Consumer::EventsPending(): "
253 				"POLLHUP detected on devd socket.");
254 
255 	return ((fds->revents & POLLIN) != 0);
256 }
257 
258 } // namespace DevdCtl
259