00001
00009 #include <atoms/epoll_poller.hh>
00010 #include <atoms/debug.hh>
00011 #include <sys/epoll.h>
00012 #include <unistd.h>
00013 #include <errno.h>
00014
00015 using namespace std;
00016 using namespace boost;
00017
00018 namespace atoms {
00019
00020 DebugLogger EpollPoller::LOGGER("org.slamb.atoms.EpollPoller");
00021 const int EpollPoller::CREATE_SIZE = 128;
00022 const int EpollPoller::NUM_EVENTS = 128;
00023
00024 EpollPoller::EpollPoller() {
00025 epfd = epoll_create(CREATE_SIZE);
00026 if (epfd < 0) {
00027 if (errno == ENOSYS) {
00028 throw UnsupportedError("No epoll support in kernel.");
00029 }
00030 assert(false);
00031 }
00032 events.reset(new epoll_event[NUM_EVENTS]);
00033 }
00034
00035 EpollPoller::~EpollPoller() {
00036 if (close(epfd) < 0) {
00037 assert(false);
00038 }
00039 }
00040
00041 void EpollPoller::add(int fd, Event eventMask, const EventHandler &onEvent) {
00042 map<int, boost::shared_ptr<DescriptorInfo> >::const_iterator it =
00043 descriptorMap.find(fd);
00044 boost::shared_ptr<DescriptorInfo> d;
00045 bool edgeTriggered = (eventMask & eEdgeTriggered) != 0;
00046 int op;
00047
00048 assert( (eventMask & eAddable) != 0
00049 && (eventMask & ~eAddable & ~eEdgeTriggered) == 0);
00050
00051 if (it != descriptorMap.end()) {
00052
00053 d = it->second;
00054 op = EPOLL_CTL_MOD;
00055 assert(d->edgeTriggered == edgeTriggered);
00056 } else {
00057
00058 op = EPOLL_CTL_ADD;
00059 d.reset(new DescriptorInfo);
00060 d->stale = false;
00061 d->edgeTriggered = edgeTriggered;
00062 descriptorMap.insert(make_pair(fd,d));
00063 }
00064
00065
00066 if ((eventMask & eInput) != 0) {
00067 if (!d->eventHandlerMap.insert(make_pair(eInput, onEvent)).second) {
00068 LOGGER.logFatal("Descriptor %d already being monitored for input in epfd %d",
00069 fd, epfd);
00070 } else {
00071 LOGGER.log(DebugLogger::lInfo, "Descriptor %d added for input on epfd %d",
00072 fd, epfd);
00073 }
00074 }
00075 if ((eventMask & eOutput) != 0) {
00076 if (!d->eventHandlerMap.insert(make_pair(eOutput, onEvent)).second) {
00077 LOGGER.logFatal("Descriptor %d already being monitored for output on epfd %d",
00078 fd, epfd);
00079 } else {
00080 LOGGER.log(DebugLogger::lInfo, "Descriptor %d added for output on epfd %d",
00081 fd, epfd);
00082 }
00083 }
00084 if ((eventMask & ePriorityInput) != 0) {
00085 if (!d->eventHandlerMap.insert(
00086 make_pair(ePriorityInput, onEvent)).second) {
00087 LOGGER.logFatal("Descriptor %d already being monitored for "
00088 "priority input on epfd %d", fd, epfd);
00089 } else {
00090 LOGGER.log(DebugLogger::lInfo,
00091 "Descriptor %d added for priority input on epfd %d", fd, epfd);
00092 }
00093 }
00094
00095
00096 struct epoll_event event;
00097 event.events = edgeTriggered ? EPOLLET : 0;
00098 map<Event,EventHandler>::const_iterator ehIt;
00099 for (ehIt = d->eventHandlerMap.begin();
00100 ehIt != d->eventHandlerMap.end(); ++ehIt) {
00101 if (ehIt->first == eInput)
00102 event.events |= EPOLLIN;
00103 else if (ehIt->first == eOutput)
00104 event.events |= EPOLLOUT;
00105 else if (ehIt->first == ePriorityInput)
00106 event.events |= EPOLLPRI;
00107 }
00108 event.data.ptr = d.get();
00109
00110 if (epoll_ctl(epfd, op, fd, &event) < 0) {
00111 assert(false);
00112 }
00113
00115 }
00116
00117 void EpollPoller::remove(int fd, Event eventMask) {
00118 map<int, boost::shared_ptr<DescriptorInfo> >::iterator it =
00119 descriptorMap.find(fd);
00120 if (it == descriptorMap.end()) {
00121 LOGGER.logFatal("Descriptor %d was not in set for epfd %d", fd, epfd);
00122 }
00123 boost::shared_ptr<DescriptorInfo> d = it->second;
00124
00125 assert( (eventMask & eAddable) != 0
00126 && (eventMask & ~eAddable) == 0);
00127
00128 map<Event,EventHandler> oldEventHandlerMap(d->eventHandlerMap);
00129 if ((eventMask & eInput) != 0) {
00130 if (d->eventHandlerMap.erase(eInput) == 0) {
00131 LOGGER.logFatal("Descriptor %d not registered for input on epfd %d", fd, epfd);
00132 } else {
00133 LOGGER.log(DebugLogger::lInfo,
00134 "Descriptor %d deregistered for input on epfd %d", fd, epfd);
00135 }
00136 }
00137 if ((eventMask & eOutput) != 0) {
00138 if (d->eventHandlerMap.erase(eOutput) == 0) {
00139 LOGGER.logFatal("Descriptor %d not registered for output on epfd %d", fd, epfd);
00140 } else {
00141 LOGGER.log(DebugLogger::lInfo,
00142 "Descriptor %d deregistered for output on epfd %d", fd, epfd);
00143 }
00144 }
00145 if ((eventMask & ePriorityInput) != 0) {
00146 if (d->eventHandlerMap.erase(ePriorityInput) == 0) {
00147 LOGGER.logFatal("Descriptor %d not registered for prio input on epfd %d", fd, epfd);
00148 } else {
00149 LOGGER.log(DebugLogger::lInfo,
00150 "Descriptor %d deregistered for prio input on epfd %d", fd, epfd);
00151 }
00152 }
00153
00154 if (d->eventHandlerMap.empty()) {
00155
00156
00165 } else {
00167 struct epoll_event event;
00168 event.events = d->edgeTriggered ? EPOLLET : 0;
00169 map<Event,EventHandler>::const_iterator ehIt;
00170 for (ehIt = d->eventHandlerMap.begin();
00171 ehIt != d->eventHandlerMap.end(); ++ehIt) {
00172 if (ehIt->first == eInput)
00173 event.events |= EPOLLIN;
00174 else if (ehIt->first == eOutput)
00175 event.events |= EPOLLOUT;
00176 else if (ehIt->first == ePriorityInput)
00177 event.events |= EPOLLPRI;
00178 }
00179 event.data.ptr = d.get();
00180
00181 if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &event) < 0) {
00182 assert(false);
00183 }
00184 }
00185 }
00186
00187 void EpollPoller::remove(int fd, bool isClosing) {
00188 map<int, boost::shared_ptr<DescriptorInfo> >::iterator it =
00189 descriptorMap.find(fd);
00190 if (it == descriptorMap.end()) {
00191 LOGGER.logFatal("Descriptor %d was not in set", fd);
00192 } else {
00193 LOGGER.log(DebugLogger::lInfo,
00194 "Descriptor %d deregistered entirely", fd);
00195 }
00196 removeInternal(it, isClosing);
00197 }
00198
00199 void EpollPoller::removeInternal(
00200 map<int, boost::shared_ptr<DescriptorInfo> >::iterator it,
00201 bool isClosing) {
00202 int fd = it->first;
00203 boost::shared_ptr<DescriptorInfo> d = it->second;
00204 descriptorMap.erase(it);
00205 d->stale = true;
00206 staleDescriptors.push_back(d);
00207 if (!isClosing) {
00208
00209
00210
00211 struct epoll_event event;
00212 event.events = 0;
00213 event.data.ptr = d.get();
00214 if (epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &event) < 0) {
00215 LOGGER.logFatal("epoll_ctl(EPOLL_CTL_DEL) failed: %d (%s)",
00216 errno, strerror(errno));
00217 }
00218 }
00219 }
00220
00221 void EpollPoller::poll(const Timespan &interval) {
00222 staleDescriptors.clear();
00223 int retval;
00224 int milliseconds = -1;
00225 const struct timespec *ts = interval.getTimespec();
00226 if (ts != NULL) {
00227 milliseconds = ts->tv_sec * 1000 + ts->tv_nsec / 1000000;
00228 }
00229 {
00230 DebugLogger::Mon dm(LOGGER, DebugLogger::lInfo, "Polling on epfd %d", epfd);
00231 retval = epoll_wait(epfd, events.get(), NUM_EVENTS, milliseconds);
00232 }
00233 if (retval < 0 && errno != EINTR) {
00234 LOGGER.logFatal("epoll_wait failed: %d (%s)", errno, strerror(errno));
00235 }
00236 for (int i = 0; i < retval; i++) {
00237 DescriptorInfo *d = reinterpret_cast<DescriptorInfo*>
00238 (events[i].data.ptr);
00239 if (d->stale)
00240 continue;
00241 int eventMask = 0;
00242 if ((events[i].events & EPOLLIN ) != 0) eventMask |= eInput;
00243 if ((events[i].events & EPOLLOUT) != 0) eventMask |= eOutput;
00244 if ((events[i].events & EPOLLPRI) != 0) eventMask |= ePriorityInput;
00245 if ((events[i].events & EPOLLERR) != 0) eventMask |= eError;
00246 if ((events[i].events & EPOLLHUP) != 0) eventMask |= eHangup;
00247
00248 map<Event, EventHandler>::const_iterator it;
00249 for (it = d->eventHandlerMap.begin(); it != d->eventHandlerMap.end();
00250 ++it) {
00251 if ((it->first & eventMask) != 0)
00252 it->second(static_cast<Event>(eventMask));
00253 }
00254 }
00255 }
00256
00257 }