1 /*- 2 * Copyright (c) 2011, 2012, 2013, 2014 Spectra Logic Corporation 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions, and the following disclaimer, 10 * without modification. 11 * 2. Redistributions in binary form must reproduce at minimum a disclaimer 12 * substantially similar to the "NO WARRANTY" disclaimer below 13 * ("Disclaimer") and any redistribution must be conditioned upon 14 * including a substantially similar Disclaimer requirement for further 15 * binary redistribution. 16 * 17 * NO WARRANTY 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 19 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 20 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTIBILITY AND FITNESS FOR 21 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 22 * HOLDERS OR CONTRIBUTORS BE LIABLE FOR SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, 26 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING 27 * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28 * POSSIBILITY OF SUCH DAMAGES. 29 * 30 * Authors: Justin T. Gibbs (Spectra Logic Corporation) 31 */ 32 33 /** 34 * \file consumer.cc 35 */ 36 37 #include <sys/cdefs.h> 38 #include <sys/poll.h> 39 #include <sys/socket.h> 40 #include <sys/un.h> 41 42 #include <err.h> 43 #include <errno.h> 44 #include <fcntl.h> 45 #include <syslog.h> 46 #include <unistd.h> 47 48 #include <cstdarg> 49 #include <cstring> 50 #include <list> 51 #include <map> 52 #include <string> 53 54 #include "guid.h" 55 #include "event.h" 56 #include "event_factory.h" 57 #include "exception.h" 58 59 #include "consumer.h" 60 61 __FBSDID("$FreeBSD$"); 62 63 /*================================== Macros ==================================*/ 64 #define NUM_ELEMENTS(x) (sizeof(x) / sizeof(*x)) 65 66 /*============================ Namespace Control =============================*/ 67 using std::string; 68 namespace DevdCtl 69 { 70 71 /*============================= Class Definitions ============================*/ 72 /*----------------------------- DevdCtl::Consumer ----------------------------*/ 73 //- Consumer Static Private Data ----------------------------------------------- 74 const char Consumer::s_devdSockPath[] = "/var/run/devd.seqpacket.pipe"; 75 76 //- Consumer Public Methods ---------------------------------------------------- 77 Consumer::Consumer(Event::BuildMethod *defBuilder, 78 EventFactory::Record *regEntries, 79 size_t numEntries) 80 : m_devdSockFD(-1), 81 m_eventFactory(defBuilder), 82 m_replayingEvents(false) 83 { 84 m_eventFactory.UpdateRegistry(regEntries, numEntries); 85 } 86 87 Consumer::~Consumer() 88 { 89 DisconnectFromDevd(); 90 } 91 92 bool 93 Consumer::ConnectToDevd() 94 { 95 struct sockaddr_un devdAddr; 96 int sLen; 97 int result; 98 99 if (m_devdSockFD != -1) { 100 /* Already connected. */ 101 syslog(LOG_DEBUG, "%s: Already connected.", __func__); 102 return (true); 103 } 104 syslog(LOG_INFO, "%s: Connecting to devd.", __func__); 105 106 memset(&devdAddr, 0, sizeof(devdAddr)); 107 devdAddr.sun_family= AF_UNIX; 108 strlcpy(devdAddr.sun_path, s_devdSockPath, sizeof(devdAddr.sun_path)); 109 sLen = SUN_LEN(&devdAddr); 110 111 m_devdSockFD = socket(AF_UNIX, SOCK_SEQPACKET, 0); 112 if (m_devdSockFD == -1) 113 err(1, "Unable to create socket"); 114 if (fcntl(m_devdSockFD, F_SETFL, O_NONBLOCK) < 0) 115 err(1, "fcntl"); 116 result = connect(m_devdSockFD, 117 reinterpret_cast<sockaddr *>(&devdAddr), 118 sLen); 119 if (result == -1) { 120 syslog(LOG_INFO, "Unable to connect to devd"); 121 DisconnectFromDevd(); 122 return (false); 123 } 124 125 syslog(LOG_INFO, "Connection to devd successful"); 126 return (true); 127 } 128 129 void 130 Consumer::DisconnectFromDevd() 131 { 132 if (m_devdSockFD != -1) { 133 syslog(LOG_INFO, "Disconnecting from devd."); 134 close(m_devdSockFD); 135 } 136 m_devdSockFD = -1; 137 } 138 139 std::string 140 Consumer::ReadEvent() 141 { 142 char buf[MAX_EVENT_SIZE + 1]; 143 ssize_t len; 144 145 len = ::recv(m_devdSockFD, buf, MAX_EVENT_SIZE, MSG_WAITALL); 146 if (len == -1) 147 return (std::string("")); 148 else { 149 /* NULL-terminate the result */ 150 buf[len] = '\0'; 151 return (std::string(buf)); 152 } 153 } 154 155 void 156 Consumer::ReplayUnconsumedEvents(bool discardUnconsumed) 157 { 158 EventList::iterator event(m_unconsumedEvents.begin()); 159 bool replayed_any = (event != m_unconsumedEvents.end()); 160 161 m_replayingEvents = true; 162 if (replayed_any) 163 syslog(LOG_INFO, "Started replaying unconsumed events"); 164 while (event != m_unconsumedEvents.end()) { 165 bool consumed((*event)->Process()); 166 if (consumed || discardUnconsumed) { 167 delete *event; 168 event = m_unconsumedEvents.erase(event); 169 } else { 170 event++; 171 } 172 } 173 if (replayed_any) 174 syslog(LOG_INFO, "Finished replaying unconsumed events"); 175 m_replayingEvents = false; 176 } 177 178 bool 179 Consumer::SaveEvent(const Event &event) 180 { 181 if (m_replayingEvents) 182 return (false); 183 m_unconsumedEvents.push_back(event.DeepCopy()); 184 return (true); 185 } 186 187 Event * 188 Consumer::NextEvent() 189 { 190 if (!Connected()) 191 return(NULL); 192 193 Event *event(NULL); 194 try { 195 string evString; 196 197 evString = ReadEvent(); 198 if (! evString.empty()) { 199 Event::TimestampEventString(evString); 200 event = Event::CreateEvent(m_eventFactory, evString); 201 } 202 } catch (const Exception &exp) { 203 exp.Log(); 204 DisconnectFromDevd(); 205 } 206 return (event); 207 } 208 209 /* Capture and process buffered events. */ 210 void 211 Consumer::ProcessEvents() 212 { 213 Event *event; 214 while ((event = NextEvent()) != NULL) { 215 if (event->Process()) 216 SaveEvent(*event); 217 delete event; 218 } 219 } 220 221 void 222 Consumer::FlushEvents() 223 { 224 std::string s; 225 226 do 227 s = ReadEvent(); 228 while (! s.empty()) ; 229 } 230 231 bool 232 Consumer::EventsPending() 233 { 234 struct pollfd fds[1]; 235 int result; 236 237 do { 238 fds->fd = m_devdSockFD; 239 fds->events = POLLIN; 240 fds->revents = 0; 241 result = poll(fds, NUM_ELEMENTS(fds), /*timeout*/0); 242 } while (result == -1 && errno == EINTR); 243 244 if (result == -1) 245 err(1, "Polling for devd events failed"); 246 247 if ((fds->revents & POLLERR) != 0) 248 throw Exception("Consumer::EventsPending(): " 249 "POLLERR detected on devd socket."); 250 251 if ((fds->revents & POLLHUP) != 0) 252 throw Exception("Consumer::EventsPending(): " 253 "POLLHUP detected on devd socket."); 254 255 return ((fds->revents & POLLIN) != 0); 256 } 257 258 } // namespace DevdCtl 259