Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Class List | File List | Namespace Members | Class Members | File Members | Related Pages

atoms/kqueue_poller.cc

Go to the documentation of this file.
00001 
00009 #include <atoms/kqueue_poller.hh>
00010 #include <sys/types.h>
00011 #include <sys/event.h>
00012 #include <sys/time.h>
00013 #include <boost/cast.hpp>
00014 #include <atoms/debug.hh>
00015 
00016 using namespace std;
00017 using namespace boost;
00018 
00019 namespace atoms {
00020 
00021 const int KqueuePoller::NUM_EVENTS = 16;
00022 DebugLogger KqueuePoller::LOGGER("org.slamb.atoms.KqueuePoller");
00023 
00024 KqueuePoller::KqueuePoller() : events(new struct kevent[NUM_EVENTS]) {
00025     if ((kq = kqueue()) < 0) {
00027         LOGGER.logFatal("Unexpected kqueue error: %d (%s)",
00028                         errno, strerror(errno));
00029     }
00030     LOGGER.log(DebugLogger::lInfo, "Created kqueue fd %d", kq);
00031 }
00032 
00033 KqueuePoller::~KqueuePoller() {
00034     if (::close(kq) < 0) {
00035         LOGGER.logFatal("Unexpected close error on kqueue fd %d: %d (%s)",
00036                         kq, errno, strerror(errno));
00037     }
00038 }
00039 
00040 void KqueuePoller::add(int fd, Event eventMask, const EventHandler &onEvent) {
00041     map<int, shared_ptr<DescriptorInfo> >::const_iterator it =
00042         descriptorMap.find(fd);
00043     struct kevent changes[2];
00044     unsigned int numChanges = 0;
00045     bool edgeTriggered = (eventMask & eEdgeTriggered) != 0;
00046     assert(   (eventMask &  eAddable) != 0
00047            && (eventMask & ~eAddable & ~eEdgeTriggered) == 0);
00048 
00049     boost::shared_ptr<DescriptorInfo> d;
00050     if (it != descriptorMap.end()) {
00051         // Adding an event to an existing descriptor
00052         d = it->second;
00053         assert(d->edgeTriggered == edgeTriggered);
00054     } else {
00055         // Adding an entirely new descriptor
00056         d.reset(new DescriptorInfo);
00057         d->stale = false;
00058         d->edgeTriggered = edgeTriggered;
00059         descriptorMap.insert(make_pair(fd,d));
00060     }
00061 
00062     u_short flags = EV_ADD;
00063     if ((eventMask & eEdgeTriggered) != 0) {
00064         flags |= EV_CLEAR;
00065     }
00066     if ((eventMask & eInput) != 0) {
00067         EV_SET(&changes[numChanges++], fd, EVFILT_READ, flags, 0, 0, d.get());
00068         d->eventHandlerMap.insert(make_pair(eInput, onEvent));
00069     }
00070     if ((eventMask & eOutput) != 0) {
00071         EV_SET(&changes[numChanges++], fd, EVFILT_WRITE, flags, 0, 0, d.get());
00072         d->eventHandlerMap.insert(make_pair(eOutput, onEvent));
00073     }
00074     struct timespec ts;
00075     ts.tv_sec = 0L;
00076     ts.tv_nsec = 0L;
00077     if (kevent(kq, changes, numChanges, NULL, 0, &ts) < 0) {
00079         LOGGER.logFatal("Unexpected kevent error: %d (%s)",
00080                         errno, strerror(errno));
00081     }
00082 }
00083 
00084 void KqueuePoller::remove(int fd, Event eventMask) {
00085     map<int, boost::shared_ptr<DescriptorInfo> >::iterator it =
00086         descriptorMap.find(fd);
00087     if (it == descriptorMap.end()) {
00088         LOGGER.logFatal("Descriptor %d was not in set", fd);
00089     }
00090     boost::shared_ptr<DescriptorInfo> d = it->second;
00091 
00092     assert(   (eventMask &  eAddable) != 0
00093            && (eventMask & ~eAddable) == 0);
00094 
00095     map<Event,EventHandler> oldEventHandlerMap(d->eventHandlerMap);
00096     if ((eventMask & eInput) != 0) {
00097         if (d->eventHandlerMap.erase(eInput) == 0) {
00098             LOGGER.logFatal("Descriptor %d not registered for input", fd);
00099         } else {
00100             LOGGER.log(DebugLogger::lInfo,
00101                        "Descriptor %d deregistered for input", fd);
00102         }
00103     }
00104     if ((eventMask & eOutput) != 0) {
00105         if (d->eventHandlerMap.erase(eOutput) == 0) {
00106             LOGGER.logFatal("Descriptor %d not registered for output", fd);
00107         } else {
00108             LOGGER.log(DebugLogger::lInfo,
00109                        "Descriptor %d deregistered for output", fd);
00110         }
00111     }
00112 
00113     struct kevent changes[2];
00114     int numChanges = 0;
00115     if ((eventMask & eInput) != 0) {
00116         EV_SET(&changes[numChanges++], fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
00117     }
00118     if ((eventMask & eOutput) != 0) {
00119         EV_SET(&changes[numChanges++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
00120     }
00121     assert(numChanges != 0);
00122     struct timespec ts;
00123     ts.tv_sec = 0L;
00124     ts.tv_nsec = 0L;
00125     if (kevent(kq, changes, numChanges, NULL, 0, &ts) < 0) {
00127         LOGGER.logFatal("Unexpected kevent error: %d (%s)",
00128                         errno, strerror(errno));
00129     }
00130 
00131     if (d->eventHandlerMap.empty()) {
00133         boost::shared_ptr<DescriptorInfo> d = it->second;
00134         descriptorMap.erase(it);
00135         d->stale = true;
00136         staleDescriptors.push_back(d);
00137     }
00138 }
00139 
00140 void KqueuePoller::remove(int fd, bool isClosing) {
00141     map<int, boost::shared_ptr<DescriptorInfo> >::iterator it =
00142         descriptorMap.find(fd);
00143     if (it == descriptorMap.end()) {
00144         LOGGER.logFatal("Descriptor %d was not in set", fd);
00145     } else {
00146         LOGGER.log(DebugLogger::lInfo,
00147                    "Descriptor %d deregistered entirely", fd);
00148     }
00149     boost::shared_ptr<DescriptorInfo> d = it->second;
00150     if (!isClosing) {
00151         map<Event,EventHandler>::const_iterator it,
00152                                                 end = d->eventHandlerMap.end();
00153         struct kevent changes[2];
00154         int numChanges = 0;
00155         if ((it = d->eventHandlerMap.find(eInput)) != end) {
00156             EV_SET(&changes[numChanges++], fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
00157         }
00158         if ((it = d->eventHandlerMap.find(eOutput)) != end) {
00159             EV_SET(&changes[numChanges++], fd, EVFILT_WRITE, EV_DELETE,
00160                    0, 0, 0);
00161         }
00162         assert(numChanges != 0);
00163         struct timespec ts;
00164         ts.tv_sec = 0L;
00165         ts.tv_nsec = 0L;
00166         if (kevent(kq, changes, numChanges, NULL, 0, &ts) < 0) {
00168             LOGGER.logFatal("Unexpected kevent error: %d (%s)",
00169                             errno, strerror(errno));
00170         }
00171     }
00172     descriptorMap.erase(it);
00173     d->stale = true;
00174     staleDescriptors.push_back(d);
00175 }
00176 
00177 void KqueuePoller::poll(const Timespan &interval) {
00178     staleDescriptors.clear();
00179     const struct timespec *ts_p = interval.getTimespec();
00180     int retval;
00181     if ((retval = kevent(kq, NULL, 0, events.get(), NUM_EVENTS, ts_p)) < 0) {
00183         LOGGER.logFatal("Unexpected kevent error: %d (%s)",
00184                         errno, strerror(errno));
00185     }
00186     for (int i = 0; i < retval; i++) {
00187         DescriptorInfo *d = reinterpret_cast<DescriptorInfo*>(events[i].udata);
00188         if (d->stale)
00189             continue;
00190         Event e;
00191         if (events[i].filter == EVFILT_READ) {
00192             LOGGER.log(DebugLogger::lDebug, "Input on fd %d",
00193                        numeric_cast<int>(events[i].ident));
00194             e = eInput;
00195         } else /* events[i].filter == EVFILT_WRITE */ {
00196             LOGGER.log(DebugLogger::lDebug, "Output on fd %d",
00197                        numeric_cast<int>(events[i].ident));
00198             e = eOutput;
00199         }
00200         map<Event, EventHandler>::const_iterator it;
00201         for (it = d->eventHandlerMap.begin(); it != d->eventHandlerMap.end();
00202              ++it) {
00203             if ((it->first & e) != 0) {
00204                 DebugLogger::Mon dm(LOGGER, DebugLogger::lDebug,
00205                                     "Callback on fd %d",
00206                                     numeric_cast<int>(events[i].ident));
00207                 it->second(e);
00208             }
00209         }
00210     }
00211 }
00212 
00213 } // namespace atoms

Generated on Wed Jun 15 01:20:35 2005 for atoms++ by doxygen 1.3.5