Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Class List | File List | Namespace Members | Class Members | File Members | Related Pages

atoms/epoll_poller.cc

Go to the documentation of this file.
00001 
00009 #include <atoms/epoll_poller.hh>
00010 #include <atoms/debug.hh>
00011 #include <sys/epoll.h>
00012 #include <unistd.h>
00013 #include <errno.h>
00014 
00015 using namespace std;
00016 using namespace boost;
00017 
00018 namespace atoms {
00019 
00020 DebugLogger EpollPoller::LOGGER("org.slamb.atoms.EpollPoller");
00021 const int EpollPoller::CREATE_SIZE = 128;
00022 const int EpollPoller::NUM_EVENTS  = 128;
00023 
00024 EpollPoller::EpollPoller() {
00025     epfd = epoll_create(CREATE_SIZE);
00026     if (epfd < 0) {
00027         if (errno == ENOSYS) {
00028             throw UnsupportedError("No epoll support in kernel.");
00029         }
00030         assert(false); 
00031     }
00032     events.reset(new epoll_event[NUM_EVENTS]);
00033 }
00034 
00035 EpollPoller::~EpollPoller() {
00036     if (close(epfd) < 0) {
00037         assert(false); 
00038     }
00039 }
00040 
00041 void EpollPoller::add(int fd, Event eventMask, const EventHandler &onEvent) {
00042     map<int, boost::shared_ptr<DescriptorInfo> >::const_iterator it =
00043         descriptorMap.find(fd);
00044     boost::shared_ptr<DescriptorInfo> d;
00045     bool edgeTriggered = (eventMask & eEdgeTriggered) != 0;
00046     int op;
00047 
00048     assert(   (eventMask &  eAddable) != 0
00049            && (eventMask & ~eAddable & ~eEdgeTriggered) == 0);
00050 
00051     if (it != descriptorMap.end()) {
00052         // Adding an event to an existing descriptor
00053         d = it->second;
00054         op = EPOLL_CTL_MOD;
00055         assert(d->edgeTriggered == edgeTriggered);
00056     } else {
00057         // Adding an entirely new descriptor
00058         op = EPOLL_CTL_ADD;
00059         d.reset(new DescriptorInfo);
00060         d->stale = false;
00061         d->edgeTriggered = edgeTriggered;
00062         descriptorMap.insert(make_pair(fd,d));
00063     }
00064 
00065     // New events
00066     if ((eventMask & eInput) != 0) {
00067         if (!d->eventHandlerMap.insert(make_pair(eInput, onEvent)).second) {
00068             LOGGER.logFatal("Descriptor %d already being monitored for input in epfd %d",
00069                             fd, epfd);
00070         } else {
00071             LOGGER.log(DebugLogger::lInfo, "Descriptor %d added for input on epfd %d",
00072                        fd, epfd);
00073         }
00074     }
00075     if ((eventMask & eOutput) != 0) {
00076         if (!d->eventHandlerMap.insert(make_pair(eOutput, onEvent)).second) {
00077             LOGGER.logFatal("Descriptor %d already being monitored for output on epfd %d",
00078                             fd, epfd);
00079         } else {
00080             LOGGER.log(DebugLogger::lInfo, "Descriptor %d added for output on epfd %d",
00081                        fd, epfd);
00082         }
00083     }
00084     if ((eventMask & ePriorityInput) != 0) {
00085         if (!d->eventHandlerMap.insert(
00086             make_pair(ePriorityInput, onEvent)).second) {
00087             LOGGER.logFatal("Descriptor %d already being monitored for "
00088                             "priority input on epfd %d", fd, epfd);
00089         } else {
00090             LOGGER.log(DebugLogger::lInfo,
00091                        "Descriptor %d added for priority input on epfd %d", fd, epfd);
00092         }
00093     }
00094 
00095     // Construct the structure epoll_ctl likes
00096     struct epoll_event event;
00097     event.events = edgeTriggered ? EPOLLET : 0;
00098     map<Event,EventHandler>::const_iterator ehIt;
00099     for (ehIt = d->eventHandlerMap.begin();
00100          ehIt != d->eventHandlerMap.end(); ++ehIt) {
00101         if (ehIt->first == eInput)
00102             event.events |= EPOLLIN;
00103         else if (ehIt->first == eOutput)
00104             event.events |= EPOLLOUT;
00105         else if (ehIt->first == ePriorityInput)
00106             event.events |= EPOLLPRI;
00107     }
00108     event.data.ptr = d.get();
00109 
00110     if (epoll_ctl(epfd, op, fd, &event) < 0) {
00111         assert(false); 
00112     }
00113 
00115 }
00116 
00117 void EpollPoller::remove(int fd, Event eventMask) {
00118     map<int, boost::shared_ptr<DescriptorInfo> >::iterator it =
00119         descriptorMap.find(fd);
00120     if (it == descriptorMap.end()) {
00121         LOGGER.logFatal("Descriptor %d was not in set for epfd %d", fd, epfd);
00122     }
00123     boost::shared_ptr<DescriptorInfo> d = it->second;
00124 
00125     assert(   (eventMask &  eAddable) != 0
00126            && (eventMask & ~eAddable) == 0);
00127 
00128     map<Event,EventHandler> oldEventHandlerMap(d->eventHandlerMap);
00129     if ((eventMask & eInput) != 0) {
00130         if (d->eventHandlerMap.erase(eInput) == 0) {
00131             LOGGER.logFatal("Descriptor %d not registered for input on epfd %d", fd, epfd);
00132         } else {
00133             LOGGER.log(DebugLogger::lInfo,
00134                        "Descriptor %d deregistered for input on epfd %d", fd, epfd);
00135         }
00136     }
00137     if ((eventMask & eOutput) != 0) {
00138         if (d->eventHandlerMap.erase(eOutput) == 0) {
00139             LOGGER.logFatal("Descriptor %d not registered for output on epfd %d", fd, epfd);
00140         } else {
00141             LOGGER.log(DebugLogger::lInfo,
00142                        "Descriptor %d deregistered for output on epfd %d", fd, epfd);
00143         }
00144     }
00145     if ((eventMask & ePriorityInput) != 0) {
00146         if (d->eventHandlerMap.erase(ePriorityInput) == 0) {
00147             LOGGER.logFatal("Descriptor %d not registered for prio input on epfd %d", fd, epfd);
00148         } else {
00149             LOGGER.log(DebugLogger::lInfo,
00150                        "Descriptor %d deregistered for prio input on epfd %d", fd, epfd);
00151         }
00152     }
00153 
00154     if (d->eventHandlerMap.empty()) {
00155         // If no event types are left, remove our mapping
00156         // removeInternal(it, false);
00165     } else {
00167         struct epoll_event event;
00168         event.events = d->edgeTriggered ? EPOLLET : 0;
00169         map<Event,EventHandler>::const_iterator ehIt;
00170         for (ehIt = d->eventHandlerMap.begin();
00171              ehIt != d->eventHandlerMap.end(); ++ehIt) {
00172             if (ehIt->first == eInput)
00173                 event.events |= EPOLLIN;
00174             else if (ehIt->first == eOutput)
00175                 event.events |= EPOLLOUT;
00176             else if (ehIt->first == ePriorityInput)
00177                 event.events |= EPOLLPRI;
00178         }
00179         event.data.ptr = d.get();
00180 
00181         if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &event) < 0) {
00182             assert(false); 
00183         }
00184     }
00185 }
00186 
00187 void EpollPoller::remove(int fd, bool isClosing) {
00188     map<int, boost::shared_ptr<DescriptorInfo> >::iterator it =
00189         descriptorMap.find(fd);
00190     if (it == descriptorMap.end()) {
00191         LOGGER.logFatal("Descriptor %d was not in set", fd);
00192     } else {
00193         LOGGER.log(DebugLogger::lInfo,
00194                    "Descriptor %d deregistered entirely", fd);
00195     }
00196     removeInternal(it, isClosing);
00197 }
00198 
00199 void EpollPoller::removeInternal(
00200         map<int, boost::shared_ptr<DescriptorInfo> >::iterator it,
00201         bool isClosing) {
00202     int fd = it->first;
00203     boost::shared_ptr<DescriptorInfo> d = it->second;
00204     descriptorMap.erase(it);
00205     d->stale = true;
00206     staleDescriptors.push_back(d);
00207     if (!isClosing) {
00208         // It doesn't seem like it should be necessary for us to fill out
00209         // an epoll_event here, but epoll_Ctl gives us EFAULT if we pass
00210         // it NULL
00211         struct epoll_event event;
00212         event.events = 0;
00213         event.data.ptr = d.get();
00214         if (epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &event) < 0) {
00215             LOGGER.logFatal("epoll_ctl(EPOLL_CTL_DEL) failed: %d (%s)",
00216                             errno, strerror(errno));
00217         }
00218     }
00219 }
00220 
00221 void EpollPoller::poll(const Timespan &interval) {
00222     staleDescriptors.clear();
00223     int retval;
00224     int milliseconds = -1;
00225     const struct timespec *ts = interval.getTimespec();
00226     if (ts != NULL) {
00227         milliseconds = ts->tv_sec * 1000 + ts->tv_nsec / 1000000;
00228     }
00229     {
00230         DebugLogger::Mon dm(LOGGER, DebugLogger::lInfo, "Polling on epfd %d", epfd);
00231         retval = epoll_wait(epfd, events.get(), NUM_EVENTS, milliseconds);
00232     }
00233     if (retval < 0 && errno != EINTR) {
00234         LOGGER.logFatal("epoll_wait failed: %d (%s)", errno, strerror(errno));
00235     }
00236     for (int i = 0; i < retval; i++) {
00237         DescriptorInfo *d = reinterpret_cast<DescriptorInfo*>
00238             (events[i].data.ptr);
00239         if (d->stale)
00240             continue;
00241         int eventMask = 0;
00242         if ((events[i].events & EPOLLIN ) != 0) eventMask |= eInput;
00243         if ((events[i].events & EPOLLOUT) != 0) eventMask |= eOutput;
00244         if ((events[i].events & EPOLLPRI) != 0) eventMask |= ePriorityInput;
00245         if ((events[i].events & EPOLLERR) != 0) eventMask |= eError;
00246         if ((events[i].events & EPOLLHUP) != 0) eventMask |= eHangup;
00247 
00248         map<Event, EventHandler>::const_iterator it;
00249         for (it = d->eventHandlerMap.begin(); it != d->eventHandlerMap.end();
00250              ++it) {
00251             if ((it->first & eventMask) != 0)
00252                 it->second(static_cast<Event>(eventMask));
00253         }
00254     }
00255 }
00256 
00257 } // namespace atoms

Generated on Wed Jun 15 01:20:35 2005 for atoms++ by doxygen 1.3.5