Goto sanos source index

//
// iomux.c
//
// I/O multiplexing
//
// Copyright (C) 2002 Michael Ringgaard. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
// 
// 1. Redistributions of source code must retain the above copyright 
//    notice, this list of conditions and the following disclaimer.  
// 2. Redistributions in binary form must reproduce the above copyright
//    notice, this list of conditions and the following disclaimer in the
//    documentation and/or other materials provided with the distribution.  
// 3. Neither the name of the project nor the names of its contributors
//    may be used to endorse or promote products derived from this software
//    without specific prior written permission. 
// 
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
// FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
// DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
// OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
// HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
// LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
// OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 
// SUCH DAMAGE.
// 

#include <os/krnl.h>

static void dump_iomux(struct iomux *iomux) {
  struct ioobject *iob;

  kprintf("iomux %c", iomux->object.signaled ? '+' : '-');
  if (iomux->ready_head) {
    kprintf(" r:");
    iob = iomux->ready_head;
    while (iob) {
      kprintf("%d(%x,%x) ", ((((unsigned long) iob) >> 2) & 0xFF), iob->events_monitored, iob->events_signaled);
      iob = iob->next;
    }
  }

  if (iomux->waiting_head) {
    kprintf(" w:");
    iob = iomux->waiting_head;
    while (iob) {
      kprintf("%d(%x,%x) ", ((((unsigned long) iob) >> 2) & 0xFF), iob->events_monitored, iob->events_signaled);
      iob = iob->next;
    }
  }
  kprintf("\n");
}

static void release_waiting_threads(struct iomux *iomux) {
  struct waitblock *wb;
  struct waitblock *wb_next;
  struct ioobject *iob;

  // Dispatch all ready I/O objects to all ready waiting threads
  wb = iomux->object.waitlist_head;
  iob = iomux->ready_head;
  while (iob && wb) {
    wb_next = wb->next_wait;

    if (thread_ready_to_run(wb->thread)) {
      // Overwrite waitkey for thread with context for object
      wb->thread->waitkey = iob->context;

      // Remove object from iomux
      detach_ioobject(iob);

      // Mark thread ready
      release_thread(wb->thread);

      iob = iomux->ready_head;
    }

    wb = wb_next;
  }
}

void init_iomux(struct iomux *iomux, int flags) {
  init_object(&iomux->object, OBJECT_IOMUX);
  iomux->flags = flags;
  iomux->ready_head = iomux->ready_tail = NULL;
  iomux->waiting_head = iomux->waiting_tail = NULL;
}

int close_iomux(struct iomux *iomux) {
  struct ioobject *iob;
  struct ioobject *next;

  // Remove all objects from ready queue
  iob = iomux->ready_head;
  while (iob) {
    next = iob->next;
    iob->iomux = NULL;
    iob->next = NULL;
    iob->prev = NULL;
    iob = next;
  }

  // Remove all objects from waiting queue
  iob = iomux->waiting_head;
  while (iob) {
    next = iob->next;
    iob->iomux = NULL;
    iob->next = NULL;
    iob->prev = NULL;
    iob = next;
  }

  return 0;
}

int queue_ioobject(struct iomux *iomux, object_t hobj, int events, int context) {
  struct ioobject *iob = (struct ioobject *) hobj;

  if (!ISIOOBJECT(iob)) return -EBADF;
  if (!events) return -EINVAL;

  if (iob->iomux) {
    // Do not allow already attached object to attach to another iomux
    if (iob->iomux != iomux) return -EPERM;

    // Update the event monitoring mask
    events |= iob->events_monitored;

    // Detach object, it will be inserted in the appropriate queue further down 
    detach_ioobject(iob);
  }

  iob->iomux = iomux;
  iob->events_monitored = events;
  iob->context = context;
  
  // If some signaled event is monitored insert in ready queue else in waiting queue
  if (iob->events_monitored & iob->events_signaled) {
    iob->next = NULL;
    iob->prev = iomux->ready_tail;

    if (iomux->ready_tail) iomux->ready_tail->next = iob;
    iomux->ready_tail = iob;
    if (!iomux->ready_head) iomux->ready_head = iob;

    iomux->object.signaled = 1;
  } else {
    iob->next = NULL;
    iob->prev = iomux->waiting_tail;

    if (iomux->waiting_tail) iomux->waiting_tail->next = iob;
    iomux->waiting_tail = iob;
    if (!iomux->waiting_head) iomux->waiting_head = iob;
  }

  // If iomux is signaled try to dispatch ready objects to waiting threads
  if (iob->object.signaled) release_waiting_threads(iomux);

  return 0;
}

void init_ioobject(struct ioobject *iob, int type) {
  init_object(&iob->object, type);
  iob->iomux = NULL;
  iob->context = 0;
  iob->next = iob->prev = NULL;
  iob->events_signaled = iob->events_monitored = 0;
}

void detach_ioobject(struct ioobject *iob) {
  if (iob->iomux) {
    if (iob->next) iob->next->prev = iob->prev;
    if (iob->prev) iob->prev->next = iob->next;

    if (iob->events_monitored & iob->events_signaled) {
      // Remove from ready queue
      if (iob->iomux->ready_head == iob) iob->iomux->ready_head = iob->next; 
      if (iob->iomux->ready_tail == iob) iob->iomux->ready_tail = iob->prev; 

      // If ready queue is empty the iomux is no longer signaled
      if (!iob->iomux->ready_head) iob->iomux->object.signaled = 0;
    } else {
      // Remove object from waiting queue
      if (iob->iomux->waiting_head == iob) iob->iomux->waiting_head = iob->next; 
      if (iob->iomux->waiting_tail == iob) iob->iomux->waiting_tail = iob->prev; 
    }

    iob->iomux = NULL;
    iob->next = iob->prev = NULL;
  }
}

void set_io_event(struct ioobject *iob, int events) {
  struct iomux *iomux = iob->iomux;
  if (iomux) {
    // Object is attached to an iomux. If the object is on the waiting queue
    // and new event(s) are being monitored, we must move the object to the 
    // ready queue and signal it.
    if ((iob->events_monitored & iob->events_signaled) == 0 && (iob->events_monitored & events) != 0) {
      // Update signaled events
      iob->events_signaled |= events;

      // Remove object from waiting queue
      if (iob->next) iob->next->prev = iob->prev;
      if (iob->prev) iob->prev->next = iob->next;
      if (iomux->waiting_head == iob) iomux->waiting_head = iob->next; 
      if (iomux->waiting_tail == iob) iomux->waiting_tail = iob->prev; 

      // Insert object in the ready queue
      iob->next = NULL;
      iob->prev = iomux->ready_tail;
      if (iomux->ready_tail) iomux->ready_tail->next = iob;
      iomux->ready_tail = iob;
      if (!iomux->ready_head) iomux->ready_head = iob;

      // Signal iomux
      iomux->object.signaled = 1;

      // Try to dispatch ready objects to waiting threads
      release_waiting_threads(iomux);
    } else {
      // Just update the signaled event(s) for object
      iob->events_signaled |= events;
    }
  } else {
    // Object is not attached to an iomux. Update the signaled events and signal
    // object if data is available.
    iob->events_signaled |= events;
    if (iob->events_signaled & IOEVT_READ) {
      iob->object.signaled = 1;
      release_waiters(&iob->object, 0);
    }
  }
}

void clear_io_event(struct ioobject *iob, int events) {
  // Clear events
  iob->events_signaled &= ~events;
  if (!(iob->events_signaled & IOEVT_READ)) iob->object.signaled = 0;
}

int dequeue_event_from_iomux(struct iomux *iomux) {
  struct ioobject *iob;

  // Get first ready object, return error if no objects are ready
  iob = iomux->ready_head;
  if (!iob) return -ENOENT;

  // Detach object from iomux
  detach_ioobject(iob);

  // Return context for object
  return iob->context;
}

static int check_fds(fd_set *fds, int eventmask) {
  unsigned int n;
  int matches;
  struct ioobject *iob;

  if (!fds) return 0;

  matches = 0;
  for (n = 0; n < fds->count; n++) {
    iob = (struct ioobject *) olock(fds->fd[n], OBJECT_ANY);
    if (!iob) return -EBADF;
    if (!ISIOOBJECT(iob)) {
      orel(iob);
      return -EBADF;
    }

    if (iob->events_signaled & eventmask) fds->fd[matches++] = fds->fd[n];
    orel(iob);
  }

  return matches;
}

static int add_fds_to_iomux(struct iomux *iomux, fd_set *fds, int eventmask) {
  unsigned int n;
  struct ioobject *iob;
  int rc;

  if (!fds) return 0;

  for (n = 0; n < fds->count; n++) {
    iob = (struct ioobject *) olock(fds->fd[n], OBJECT_ANY);
    if (!iob) return -EBADF;
    if (!ISIOOBJECT(iob)) {
      orel(iob);
      return -EBADF;
    }

    rc = queue_ioobject(iomux, iob, eventmask, 0);
    if (rc < 0) return rc;

    orel(iob);
  }

  return 0;
}

static int check_select(fd_set *readfds, fd_set *writefds, fd_set *exceptfds) {
  int numread;
  int numwrite;
  int numexcept;

  numread = check_fds(readfds, IOEVT_READ | IOEVT_ACCEPT | IOEVT_CLOSE);
  if (numread < 0) return numread;

  numwrite = check_fds(writefds, IOEVT_WRITE | IOEVT_CONNECT);
  if (numwrite < 0) return numwrite;

  numexcept = check_fds(exceptfds, IOEVT_ERROR);
  if (numexcept < 0) return numexcept;

  if (numread != 0 || numwrite != 0 || numexcept != 0) {
    if (readfds) readfds->count = numread;
    if (writefds) writefds->count = numwrite;
    if (exceptfds) exceptfds->count = numexcept;

    return numread + numwrite + numexcept;
  }

  return 0;
}

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout) {
  unsigned int tmo;
  int rc;
  struct iomux iomux;

  rc = check_select(readfds, writefds, exceptfds);
  if (rc != 0) return rc;

  if (!timeout) {
    tmo = INFINITE;
  } else {
    tmo = timeout->tv_sec * 1000 + timeout->tv_usec / 1000;
  }

  if (tmo == 0) return 0;
  if (timeout && !readfds && !writefds && !exceptfds) {
    return msleep(tmo) > 0 ? -EINTR : 0;
  }
  init_iomux(&iomux, 0);

  rc = add_fds_to_iomux(&iomux, readfds, IOEVT_READ | IOEVT_ACCEPT | IOEVT_CLOSE);
  if (rc < 0) {
    close_iomux(&iomux);
    return rc;
  }

  rc = add_fds_to_iomux(&iomux, writefds, IOEVT_WRITE | IOEVT_CONNECT);
  if (rc < 0) {
    close_iomux(&iomux);
    return rc;
  }

  rc = add_fds_to_iomux(&iomux, exceptfds, IOEVT_ERROR);
  if (rc < 0) {
    close_iomux(&iomux);
    return rc;
  }

  rc = wait_for_object(&iomux, tmo);
  if (rc < 0) {
    close_iomux(&iomux);
    if (rc == -ETIMEOUT) rc = 0;
    return rc;
  }

  rc = check_select(readfds, writefds, exceptfds);
  if (rc == 0) {
    if (readfds) readfds->count = 0;
    if (writefds) writefds->count = 0;
    if (exceptfds) exceptfds->count = 0;
  }

  close_iomux(&iomux);
  return rc;
}

static int check_poll(struct pollfd fds[], unsigned int nfds) {
  struct ioobject *iob;
  unsigned int n;
  int ready;
  int revents;
  int mask;

  ready = 0;
  for (n = 0; n < nfds; n++) {
    revents = 0;
    if (fds[n].fd >= 0) {
      iob = (struct ioobject *) olock(fds[n].fd, OBJECT_ANY);
      if (!iob || !ISIOOBJECT(iob)) {
        revents = POLLNVAL;
        if (iob) orel(iob);
      } else {
        mask = IOEVT_ERROR | IOEVT_CLOSE;
        if (fds[n].events & POLLIN) mask |= IOEVT_READ | IOEVT_ACCEPT;
        if (fds[n].events & POLLOUT) mask |= IOEVT_WRITE | IOEVT_CONNECT;

        mask &= iob->events_signaled;
        if (mask != 0) {
          if (mask & (IOEVT_READ | IOEVT_ACCEPT)) revents |= POLLIN;
          if (mask & (IOEVT_WRITE | IOEVT_CONNECT)) revents |= POLLOUT;
          if (mask & IOEVT_ERROR) revents |= POLLERR;
          if (mask & IOEVT_CLOSE) revents |= POLLHUP;
        }
      }

      orel(iob);
    }
    fds[n].revents = revents;
    if (revents != 0) ready++;
  }

  return ready;
}

static int add_fd_to_iomux(struct iomux *iomux, int fd, int events) {
  struct ioobject *iob;
  int rc;
  int mask;

  if (fd < 0) return 0;
  iob = (struct ioobject *) olock(fd, OBJECT_ANY);
  if (!iob) return -EBADF;
  if (!ISIOOBJECT(iob)) {
    orel(iob);
    return -EBADF;
  }

  mask = IOEVT_ERROR | IOEVT_CLOSE;
  if (events & POLLIN) mask |= IOEVT_READ | IOEVT_ACCEPT;
  if (events & POLLOUT) mask |= IOEVT_WRITE | IOEVT_CONNECT;

  rc = queue_ioobject(iomux, iob, mask, 0);
  if (rc < 0) return rc;

  orel(iob);
  return 0;
}

int poll(struct pollfd fds[], unsigned int nfds, int timeout) {
  struct iomux iomux;
  int rc;
  unsigned int n;

  if (nfds == 0) return msleep(timeout) > 0 ? -EINTR : 0;
  if (!fds) return -EINVAL;

  rc = check_poll(fds, nfds);
  if (rc > 0) return rc;
  if (timeout == 0) return 0;

  init_iomux(&iomux, 0);

  for (n = 0; n < nfds; n++) {
    rc = add_fd_to_iomux(&iomux, fds[n].fd, fds[n].events);
    if (rc < 0) {
      close_iomux(&iomux);
      return rc;
    }
  }

  rc = wait_for_object(&iomux, timeout);
  if (rc < 0) {
    close_iomux(&iomux);
    if (rc == -ETIMEOUT) rc = 0;
    return rc;
  }

  rc = check_poll(fds, nfds);
  close_iomux(&iomux);
  return rc;
}