00001
00009 #include <atoms/kqueue_poller.hh>
00010 #include <sys/types.h>
00011 #include <sys/event.h>
00012 #include <sys/time.h>
00013 #include <boost/cast.hpp>
00014 #include <atoms/debug.hh>
00015
00016 using namespace std;
00017 using namespace boost;
00018
00019 namespace atoms {
00020
00021 const int KqueuePoller::NUM_EVENTS = 16;
00022 DebugLogger KqueuePoller::LOGGER("org.slamb.atoms.KqueuePoller");
00023
00024 KqueuePoller::KqueuePoller() : events(new struct kevent[NUM_EVENTS]) {
00025 if ((kq = kqueue()) < 0) {
00027 LOGGER.logFatal("Unexpected kqueue error: %d (%s)",
00028 errno, strerror(errno));
00029 }
00030 LOGGER.log(DebugLogger::lInfo, "Created kqueue fd %d", kq);
00031 }
00032
00033 KqueuePoller::~KqueuePoller() {
00034 if (::close(kq) < 0) {
00035 LOGGER.logFatal("Unexpected close error on kqueue fd %d: %d (%s)",
00036 kq, errno, strerror(errno));
00037 }
00038 }
00039
00040 void KqueuePoller::add(int fd, Event eventMask, const EventHandler &onEvent) {
00041 map<int, shared_ptr<DescriptorInfo> >::const_iterator it =
00042 descriptorMap.find(fd);
00043 struct kevent changes[2];
00044 unsigned int numChanges = 0;
00045 bool edgeTriggered = (eventMask & eEdgeTriggered) != 0;
00046 assert( (eventMask & eAddable) != 0
00047 && (eventMask & ~eAddable & ~eEdgeTriggered) == 0);
00048
00049 boost::shared_ptr<DescriptorInfo> d;
00050 if (it != descriptorMap.end()) {
00051
00052 d = it->second;
00053 assert(d->edgeTriggered == edgeTriggered);
00054 } else {
00055
00056 d.reset(new DescriptorInfo);
00057 d->stale = false;
00058 d->edgeTriggered = edgeTriggered;
00059 descriptorMap.insert(make_pair(fd,d));
00060 }
00061
00062 u_short flags = EV_ADD;
00063 if ((eventMask & eEdgeTriggered) != 0) {
00064 flags |= EV_CLEAR;
00065 }
00066 if ((eventMask & eInput) != 0) {
00067 EV_SET(&changes[numChanges++], fd, EVFILT_READ, flags, 0, 0, d.get());
00068 d->eventHandlerMap.insert(make_pair(eInput, onEvent));
00069 }
00070 if ((eventMask & eOutput) != 0) {
00071 EV_SET(&changes[numChanges++], fd, EVFILT_WRITE, flags, 0, 0, d.get());
00072 d->eventHandlerMap.insert(make_pair(eOutput, onEvent));
00073 }
00074 struct timespec ts;
00075 ts.tv_sec = 0L;
00076 ts.tv_nsec = 0L;
00077 if (kevent(kq, changes, numChanges, NULL, 0, &ts) < 0) {
00079 LOGGER.logFatal("Unexpected kevent error: %d (%s)",
00080 errno, strerror(errno));
00081 }
00082 }
00083
00084 void KqueuePoller::remove(int fd, Event eventMask) {
00085 map<int, boost::shared_ptr<DescriptorInfo> >::iterator it =
00086 descriptorMap.find(fd);
00087 if (it == descriptorMap.end()) {
00088 LOGGER.logFatal("Descriptor %d was not in set", fd);
00089 }
00090 boost::shared_ptr<DescriptorInfo> d = it->second;
00091
00092 assert( (eventMask & eAddable) != 0
00093 && (eventMask & ~eAddable) == 0);
00094
00095 map<Event,EventHandler> oldEventHandlerMap(d->eventHandlerMap);
00096 if ((eventMask & eInput) != 0) {
00097 if (d->eventHandlerMap.erase(eInput) == 0) {
00098 LOGGER.logFatal("Descriptor %d not registered for input", fd);
00099 } else {
00100 LOGGER.log(DebugLogger::lInfo,
00101 "Descriptor %d deregistered for input", fd);
00102 }
00103 }
00104 if ((eventMask & eOutput) != 0) {
00105 if (d->eventHandlerMap.erase(eOutput) == 0) {
00106 LOGGER.logFatal("Descriptor %d not registered for output", fd);
00107 } else {
00108 LOGGER.log(DebugLogger::lInfo,
00109 "Descriptor %d deregistered for output", fd);
00110 }
00111 }
00112
00113 struct kevent changes[2];
00114 int numChanges = 0;
00115 if ((eventMask & eInput) != 0) {
00116 EV_SET(&changes[numChanges++], fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
00117 }
00118 if ((eventMask & eOutput) != 0) {
00119 EV_SET(&changes[numChanges++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
00120 }
00121 assert(numChanges != 0);
00122 struct timespec ts;
00123 ts.tv_sec = 0L;
00124 ts.tv_nsec = 0L;
00125 if (kevent(kq, changes, numChanges, NULL, 0, &ts) < 0) {
00127 LOGGER.logFatal("Unexpected kevent error: %d (%s)",
00128 errno, strerror(errno));
00129 }
00130
00131 if (d->eventHandlerMap.empty()) {
00133 boost::shared_ptr<DescriptorInfo> d = it->second;
00134 descriptorMap.erase(it);
00135 d->stale = true;
00136 staleDescriptors.push_back(d);
00137 }
00138 }
00139
00140 void KqueuePoller::remove(int fd, bool isClosing) {
00141 map<int, boost::shared_ptr<DescriptorInfo> >::iterator it =
00142 descriptorMap.find(fd);
00143 if (it == descriptorMap.end()) {
00144 LOGGER.logFatal("Descriptor %d was not in set", fd);
00145 } else {
00146 LOGGER.log(DebugLogger::lInfo,
00147 "Descriptor %d deregistered entirely", fd);
00148 }
00149 boost::shared_ptr<DescriptorInfo> d = it->second;
00150 if (!isClosing) {
00151 map<Event,EventHandler>::const_iterator it,
00152 end = d->eventHandlerMap.end();
00153 struct kevent changes[2];
00154 int numChanges = 0;
00155 if ((it = d->eventHandlerMap.find(eInput)) != end) {
00156 EV_SET(&changes[numChanges++], fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
00157 }
00158 if ((it = d->eventHandlerMap.find(eOutput)) != end) {
00159 EV_SET(&changes[numChanges++], fd, EVFILT_WRITE, EV_DELETE,
00160 0, 0, 0);
00161 }
00162 assert(numChanges != 0);
00163 struct timespec ts;
00164 ts.tv_sec = 0L;
00165 ts.tv_nsec = 0L;
00166 if (kevent(kq, changes, numChanges, NULL, 0, &ts) < 0) {
00168 LOGGER.logFatal("Unexpected kevent error: %d (%s)",
00169 errno, strerror(errno));
00170 }
00171 }
00172 descriptorMap.erase(it);
00173 d->stale = true;
00174 staleDescriptors.push_back(d);
00175 }
00176
00177 void KqueuePoller::poll(const Timespan &interval) {
00178 staleDescriptors.clear();
00179 const struct timespec *ts_p = interval.getTimespec();
00180 int retval;
00181 if ((retval = kevent(kq, NULL, 0, events.get(), NUM_EVENTS, ts_p)) < 0) {
00183 LOGGER.logFatal("Unexpected kevent error: %d (%s)",
00184 errno, strerror(errno));
00185 }
00186 for (int i = 0; i < retval; i++) {
00187 DescriptorInfo *d = reinterpret_cast<DescriptorInfo*>(events[i].udata);
00188 if (d->stale)
00189 continue;
00190 Event e;
00191 if (events[i].filter == EVFILT_READ) {
00192 LOGGER.log(DebugLogger::lDebug, "Input on fd %d",
00193 numeric_cast<int>(events[i].ident));
00194 e = eInput;
00195 } else {
00196 LOGGER.log(DebugLogger::lDebug, "Output on fd %d",
00197 numeric_cast<int>(events[i].ident));
00198 e = eOutput;
00199 }
00200 map<Event, EventHandler>::const_iterator it;
00201 for (it = d->eventHandlerMap.begin(); it != d->eventHandlerMap.end();
00202 ++it) {
00203 if ((it->first & e) != 0) {
00204 DebugLogger::Mon dm(LOGGER, DebugLogger::lDebug,
00205 "Callback on fd %d",
00206 numeric_cast<int>(events[i].ident));
00207 it->second(e);
00208 }
00209 }
00210 }
00211 }
00212
00213 }