Logo Search packages:      
Sourcecode: nautilus-cd-burner version File versions

mapping-protocol.c

/* -*- Mode: C; indent-tabs-mode: t; c-basic-offset: 8; tab-width: 8 -*-
 *
 * mapping-protocol.c - code to talk with the mapping daemon
 *
 * Copyright (C) 2002 Red Hat Inc,
 * Copyright (C) 2005-2006 William Jon McCann <mccann@jhu.edu>
 *
 * The Gnome Library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public License as
 * published by the Free Software Foundation; either version 2 of the
 * License, or (at your option) any later version.
 *
 * The Gnome Library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with the Gnome Library; see the file COPYING.LIB.  If not,
 * write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 * Boston, MA 02111-1307, USA.
 *
 * Authors: Alexander Larsson <alexl@redhat.com>
 *          William Jon McCann <mccann@jhu.edu>
 */

#include "config.h"

#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
#include <errno.h>

#include "mapping-protocol.h"

G_LOCK_DEFINE_STATIC (channel_lock);

#define MINIMUM_MESSAGE_SIZE 16

struct MappingProtocolChannel {
      GIOChannel *iochannel;
      GString    *read_buffer;
      GString    *write_buffer;

      GHashTable *replies;
      GList      *message_queue;

      int         ref_count;
      guint32     client_serial;

      guint       read_watch_id;
      guint       write_watch_id;
      MappingProtocolHandleMessageFunction message_handler;
      gpointer    user_data;
};

static gboolean mapping_protocol_channel_do_iteration_unlocked (MappingProtocolChannel *channel,
                                                gboolean                blocking);
static gboolean mapping_protocol_channel_do_read_iteration_unlocked (MappingProtocolChannel *channel,
                                                     gboolean                blocking);
static void     add_write_watch                                (MappingProtocolChannel *channel);
static gboolean handle_read                                    (GIOChannel             *source,
                                                GIOCondition            condition,
                                                MappingProtocolChannel *channel);
static gboolean handle_write                                   (GIOChannel             *source,
                                                GIOCondition            condition,
                                                MappingProtocolChannel *channel);

#undef DEBUG_ENABLE


#ifdef DEBUG_ENABLE
#ifdef G_HAVE_ISO_VARARGS
#  define DEBUG_PRINT(...) profile_log (G_STRFUNC, NULL, __VA_ARGS__);
#elif defined(G_HAVE_GNUC_VARARGS)
#  define DEBUG_PRINT(args...) profile_log (G_STRFUNC, NULL, args);
#endif
#else
#ifdef G_HAVE_ISO_VARARGS
#  define DEBUG_PRINT(...)
#elif defined(G_HAVE_GNUC_VARARGS)
#  define DEBUG_PRINT(args...)
#endif
#endif

#ifdef DEBUG_ENABLE
static void
profile_log (const char *func,
           const char *note,
           const char *format,
           ...)
{
        va_list args;
        char   *str;
        char   *formatted;

        if (format == NULL) {
                formatted = g_strdup ("");
        } else {
            va_start (args, format);
            formatted = g_strdup_vprintf (format, args);
            va_end (args);
        }

        if (func != NULL) {
                str = g_strdup_printf ("MARK: %s %s: %s %s", g_get_prgname(), func, note ? note : "", formatted);
        } else {
                str = g_strdup_printf ("MARK: %s: %s %s", g_get_prgname(), note ? note : "", formatted);
        }

        g_free (formatted);

        g_access (str, F_OK);
        g_free (str);
}
#endif

static MappingProtocolMessage *
_mapping_protocol_message_new (MappingProtocolMessageType type)
{
      MappingProtocolMessage *message;

      message = g_new0 (MappingProtocolMessage, 1);

      switch (type) {
      case MAPPING_PROTOCOL_MESSAGE_REPLY:
            break;
      case MAPPING_PROTOCOL_MESSAGE_REQUEST:
            break;
      case MAPPING_PROTOCOL_MESSAGE_MONITOR_EVENT:
            break;
      default:
            g_assert_not_reached ();
            break;
      }

      message->type = type;
      message->ref_count = 1;

      return message;
}

guint32
mapping_protocol_message_get_serial (MappingProtocolMessage *message)
{
      guint32 serial;

      switch (message->type) {
      case MAPPING_PROTOCOL_MESSAGE_REPLY:
            serial = MAPPING_PROTOCOL_REPLY (message)->serial;
            break;
      case MAPPING_PROTOCOL_MESSAGE_REQUEST:
            serial = MAPPING_PROTOCOL_REQUEST (message)->serial;
            break;
      case MAPPING_PROTOCOL_MESSAGE_MONITOR_EVENT:
            serial = MAPPING_PROTOCOL_MONITOR_EVENT (message)->serial;
            break;
      default:
            g_assert_not_reached ();
            break;
      }

      return serial;
}

void
mapping_protocol_message_set_serial (MappingProtocolMessage *message,
                             guint32                 serial)
{
      switch (message->type) {
      case MAPPING_PROTOCOL_MESSAGE_REPLY:
            MAPPING_PROTOCOL_REPLY (message)->serial = serial;
            break;
      case MAPPING_PROTOCOL_MESSAGE_REQUEST:
            MAPPING_PROTOCOL_REQUEST (message)->serial = serial;
            break;
      case MAPPING_PROTOCOL_MESSAGE_MONITOR_EVENT:
            MAPPING_PROTOCOL_MONITOR_EVENT (message)->serial = serial;
            break;
      default:
            g_assert_not_reached ();
            break;
      }
}

MappingProtocolMessage *
mapping_protocol_message_new_reply (MappingProtocolMessage *message)
{
      MappingProtocolMessage *reply;
      guint32                 serial;

      reply = _mapping_protocol_message_new (MAPPING_PROTOCOL_MESSAGE_REPLY);

      serial = mapping_protocol_message_get_serial (message);
      mapping_protocol_message_set_serial (reply, serial);

      DEBUG_PRINT ("Creating new reply for serial:%u", serial);

      return reply;
}

MappingProtocolMessage *
mapping_protocol_message_new_monitor_event (void)
{
      MappingProtocolMessage *message;

      message = _mapping_protocol_message_new (MAPPING_PROTOCOL_MESSAGE_MONITOR_EVENT);

      return message;
}

MappingProtocolMessage *
mapping_protocol_message_new_request (void)
{
      MappingProtocolMessage *message;

      message = _mapping_protocol_message_new (MAPPING_PROTOCOL_MESSAGE_REQUEST);

      return message;
}

MappingProtocolMessage *
mapping_protocol_message_ref (MappingProtocolMessage *message)
{
      g_return_val_if_fail (message != NULL, NULL);
      g_return_val_if_fail (message->ref_count > 0, NULL);

      g_atomic_int_inc (&message->ref_count);
      return message;
}

static void
mapping_protocol_reply_destroy (MappingProtocolReply *reply)
{
      int i;

      g_return_if_fail (reply != NULL);

      g_free (reply->path);
      reply->path = NULL;

      for (i = 0; i < reply->n_strings; i++) {
            g_free (reply->strings [i]);
            reply->strings [i] = NULL;
      }
      g_free (reply->strings);
}

static void
mapping_protocol_request_destroy (MappingProtocolRequest *req)
{
      g_free (req->root);
      req->root = NULL;
      g_free (req->path1);
      req->path1 = NULL;
      g_free (req->path2);
      req->path2 = NULL;
}

static void
mapping_protocol_monitor_event_destroy (MappingProtocolMonitorEvent *event)
{
      g_free (event->path);
      event->path = NULL;
}

void
mapping_protocol_message_unref (MappingProtocolMessage *message)
{
      g_return_if_fail (message != NULL);
      g_return_if_fail (message->ref_count > 0);

      if (g_atomic_int_dec_and_test (&message->ref_count)) {
            switch (message->type) {
            case MAPPING_PROTOCOL_MESSAGE_REPLY:
                  mapping_protocol_reply_destroy (MAPPING_PROTOCOL_REPLY (message));
                  break;
            case MAPPING_PROTOCOL_MESSAGE_REQUEST:
                  mapping_protocol_request_destroy (MAPPING_PROTOCOL_REQUEST (message));
                  break;
            case MAPPING_PROTOCOL_MESSAGE_MONITOR_EVENT:
                  mapping_protocol_monitor_event_destroy (MAPPING_PROTOCOL_MONITOR_EVENT (message));
                  break;
            default:
                  g_assert_not_reached ();
                  break;
            }

            g_free (message);
      }
}

static gboolean
encode_uint32 (GString *str,
             guint32  val)
{
      char   *ptr;
      guint32 val2;

      val2 = g_htonl (val);

      ptr = (char *) &val2;

      g_string_append_len (str, ptr, 4);

      return TRUE;
}

static gboolean
encode_int32 (GString *str,
            gint32   val)
{
      char  *ptr;
      gint32 val2;

      val2 = g_htonl (val);

      ptr = (char *) &val2;
      g_string_append_len (str, ptr, 4);

      return TRUE;
}

static guint32
decode_uint32 (char *ptr)
{
      guint32 val;

      val = ((guint8)ptr [0] << 24)
            | ((guint8)ptr [1] << 16)
            | ((guint8)ptr [2] << 8)
            | ((guint8)ptr [3]);

      return val;
}

static gint32
decode_int32 (char *ptr)
{
      guint32 val;

      val = ((guint8)ptr [0] << 24)
            | ((guint8)ptr [1] << 16)
            | ((guint8)ptr [2] << 8)
            | ((guint8)ptr [3]);

      return (gint32)val;
}

static char *
decode_string (char *ptr,
             gsize len)
{
      char *str;

      str = g_malloc (len + 1);

      memcpy (str, ptr, len);
      str [len] = 0;

      return str;
}

static gpointer
decode_pointer (char *ptr,
            gsize len)
{
      gpointer  p;
      gpointer *tmp;

      /* We use only local socket so no need for network byte order conversion */

      tmp = g_memdup (ptr, sizeof (gpointer));

      p = (gpointer)*tmp;
      g_free (tmp);

      return p;
}

static gboolean
encode_string (GString *gstring,
             char    *str)
{
      int      len;
      gboolean res;

      if (str == NULL) {
            res = encode_int32 (gstring, -1);
      } else {
            len = strlen (str);
            res = encode_int32 (gstring, len);
            if (res) {
                  g_string_append_len (gstring, str, len);
            }
      }

      return res;
}

static gboolean
encode_pointer (GString *str,
            gpointer val)
{
      int      len;
      gboolean res;

      /* We use only local socket so no need for network byte order conversion */

      if (val == NULL) {
            res = encode_int32 (str, -1);
      } else {
            len = sizeof (gpointer);
            res = encode_int32 (str, len);
            if (res) {
                  g_string_append_len (str, (char *)&val, len);
            }
      }
      return res;
}

static gboolean
encode_handshake (GString *str,
              char     c)
{
      g_string_append_c (str, c);

      return TRUE;
}

static GString *
mapping_protocol_request_encode (MappingProtocolChannel *channel,
                         MappingProtocolRequest *request)
{
      gboolean res;
      GString *str;

      g_return_val_if_fail (channel != NULL, NULL);

      str = g_string_new (NULL);

      res = encode_handshake (str, 'Q');
      if (! res) goto fail;

      res = encode_uint32 (str, request->serial);
      if (! res) goto fail;

      res = encode_int32 (str, request->operation);
      if (! res) goto fail;

      res = encode_string (str, request->root);
      if (! res) goto fail;

      res = encode_string (str, request->path1);
      if (! res) goto fail;

      res = encode_string (str, request->path2);
      if (! res) goto fail;

      res = encode_int32 (str, request->option);
      if (! res) goto fail;

      res = encode_pointer (str, request->userdata);
      if (! res) goto fail;

      return str;
 fail:
      g_string_free (str, TRUE);
      return NULL;
}

static gboolean
channel_read_buffer_has_len (MappingProtocolChannel *channel,
                       char                   *position,
                       gsize                   len)
{
      gsize remain;
      gsize curr;

      curr = position - channel->read_buffer->str;
      remain = channel->read_buffer->len - curr;

      return (remain >= len);
}

static gboolean
mapping_protocol_request_decode (MappingProtocolChannel *channel,
                         MappingProtocolRequest *req)
{
      char  *p;
      gint32 len;

      g_return_val_if_fail (channel != NULL, FALSE);

      p = channel->read_buffer->str;

      /* increment for event type char */
      p++;

      if (! channel_read_buffer_has_len (channel, p, 4)) {
            return FALSE;
      }
      req->serial = decode_uint32 (p);
      p += 4;

      if (! channel_read_buffer_has_len (channel, p, 4)) {
            return FALSE;
      }
      req->operation = decode_int32 (p);
      p += 4;

      if (! channel_read_buffer_has_len (channel, p, 4)) {
            return FALSE;
      }
      len = decode_int32 (p);
      p += 4;

      if (len >= 0) {
            if (! channel_read_buffer_has_len (channel, p, len)) {
                  return FALSE;
            }
            req->root = decode_string (p, len);
            p += len;
      }

      if (! channel_read_buffer_has_len (channel, p, 4)) {
            return FALSE;
      }
      len = decode_int32 (p);
      p += 4;

      if (len >= 0) {
            if (! channel_read_buffer_has_len (channel, p, len)) {
                  return FALSE;
            }
            req->path1 = decode_string (p, len);
            p += len;
      }

      if (! channel_read_buffer_has_len (channel, p, 4)) {
            return FALSE;
      }
      len = decode_int32 (p);
      p += 4;

      if (len >= 0) {
            if (! channel_read_buffer_has_len (channel, p, len)) {
                  return FALSE;
            }
            req->path2 = decode_string (p, len);
            p += len;
      }

      if (! channel_read_buffer_has_len (channel, p, 4)) {
            return FALSE;
      }
      req->option = decode_int32 (p);
      p += 4;

      if (! channel_read_buffer_has_len (channel, p, 4)) {
            return FALSE;
      }
      len = decode_int32 (p);
      p += 4;

      if (len >= 0) {
            if (! channel_read_buffer_has_len (channel, p, len)) {
                  return FALSE;
            }
            req->userdata = decode_pointer (p, len);
            p += len;
      }

      /* remove message from buffer */
      channel->read_buffer = g_string_erase (channel->read_buffer, 0, p - channel->read_buffer->str);

      return TRUE;
}

static gboolean
mapping_protocol_decode_handshake (MappingProtocolChannel *channel,
                           char                   *c)
{
      if (channel->read_buffer->len <= 0) {
            return FALSE;
      }

      *c = channel->read_buffer->str[0];

      return TRUE;
}

static GString *
mapping_protocol_reply_encode (MappingProtocolChannel *channel,
                         MappingProtocolReply   *reply)
{
      int res;
      int i;
      GString *str;

      g_return_val_if_fail (channel != NULL, NULL);

      str = g_string_new (NULL);

      res = encode_handshake (str, 'R');
      if (! res) goto fail;

      res = encode_uint32 (str, reply->serial);
      if (! res) goto fail;

      res = encode_int32 (str, reply->result);
      if (! res) goto fail;

      res = encode_string (str, reply->path);
      if (! res) goto fail;

      res = encode_int32 (str, reply->option);
      if (! res) goto fail;

      res = encode_int32 (str, reply->n_strings);
      if (! res) goto fail;

      for (i = 0; i < reply->n_strings; i++) {
            res = encode_string (str, reply->strings [i]);
            if (! res) goto fail;
      }

      return str;
 fail:
      g_string_free (str, TRUE);
      return NULL;
}

static gboolean
mapping_protocol_reply_decode (MappingProtocolChannel *channel,
                         MappingProtocolReply   *reply)
{
      int   i;
      char *p;
      gint32 len;

      g_return_val_if_fail (channel != NULL, FALSE);

      p = channel->read_buffer->str;

      /* increment for event type char */
      p++;

      if (! channel_read_buffer_has_len (channel, p, 4)) {
            return FALSE;
      }
      reply->serial = decode_uint32 (p);
      p += 4;

      if (! channel_read_buffer_has_len (channel, p, 4)) {
            return FALSE;
      }
      reply->result = decode_int32 (p);
      p += 4;

      if (! channel_read_buffer_has_len (channel, p, 4)) {
            return FALSE;
      }
      len = decode_int32 (p);
      p += 4;

      if (len >= 0) {
            if (! channel_read_buffer_has_len (channel, p, len)) {
                  return FALSE;
            }
            reply->path = decode_string (p, len);
            p += len;
      }

      if (! channel_read_buffer_has_len (channel, p, 4)) {
            return FALSE;
      }
      reply->option = decode_int32 (p);
      p += 4;

      if (! channel_read_buffer_has_len (channel, p, 4)) {
            return FALSE;
      }
      reply->n_strings = decode_int32 (p);
      p += 4;

      g_assert (reply->n_strings < 100000);
      g_assert (reply->n_strings >= 0);

      reply->strings = g_new0 (char *, reply->n_strings);

      for (i = 0; i < reply->n_strings; i++) {
            if (! channel_read_buffer_has_len (channel, p, 4)) {
                  return FALSE;
            }
            len = decode_int32 (p);
            p += 4;

            if (len >= 0) {
                  if (! channel_read_buffer_has_len (channel, p, len)) {
                        return FALSE;
                  }
                  reply->strings [i] = decode_string (p, len);
                  p += len;
            }
      }

      /* remove message from buffer */
      channel->read_buffer = g_string_erase (channel->read_buffer, 0, p - channel->read_buffer->str);

      return TRUE;
}

static GString *
mapping_protocol_monitor_event_encode (MappingProtocolChannel      *channel,
                               MappingProtocolMonitorEvent *event)
{
      int      res;
      GString *str;

      g_return_val_if_fail (channel != NULL, NULL);

      str = g_string_new (NULL);

      res = encode_handshake (str, 'E');
      if (! res) goto fail;

      res = encode_uint32 (str, event->serial);
      if (! res) goto fail;

      res = encode_int32 (str, event->type);
      if (! res) goto fail;

      res = encode_pointer (str, event->userdata);
      if (! res) goto fail;

      res = encode_string (str, event->path);
      if (! res) goto fail;

      return str;
 fail:
      g_string_free (str, TRUE);
      return NULL;
}

static gboolean
mapping_protocol_monitor_event_decode (MappingProtocolChannel      *channel,
                               MappingProtocolMonitorEvent *event)
{
      gint32 len;
      char  *p;

      g_return_val_if_fail (channel != NULL, FALSE);

      p = channel->read_buffer->str;

      /* increment for event type char */
      p++;

      if (! channel_read_buffer_has_len (channel, p, 4)) {
            return FALSE;
      }
      event->serial = decode_uint32 (p);
      p += 4;

      if (! channel_read_buffer_has_len (channel, p, 4)) {
            return FALSE;
      }
      event->type = decode_int32 (p);
      p += 4;

      if (! channel_read_buffer_has_len (channel, p, 4)) {
            return FALSE;
      }
      len = decode_int32 (p);
      p += 4;

      if (len >= 0) {
            if (! channel_read_buffer_has_len (channel, p, len)) {
                  return FALSE;
            }
            event->userdata = decode_pointer (p, len);
            p += len;
      }

      if (! channel_read_buffer_has_len (channel, p, 4)) {
            return FALSE;
      }
      len = decode_int32 (p);
      p += 4;

      if (len >= 0) {
            if (! channel_read_buffer_has_len (channel, p, len)) {
                  return FALSE;
            }
            event->path = decode_string (p, len);
            p += len;
      }

      /* remove message from buffer */
      channel->read_buffer = g_string_erase (channel->read_buffer, 0, p - channel->read_buffer->str);

      return TRUE;
}

char *
mapping_protocol_get_unix_name (void)
{
      char *path;
      char *name;

      name = g_strdup_printf ("mapping-%s", g_get_user_name ());
      path = g_build_filename (g_get_tmp_dir (), name, NULL);

      g_free (name);

      return path;
}

static gboolean
wait_for_data (MappingProtocolChannel *channel)
{
      fd_set         read_set;
      struct timeval tv;
      int            avail;
      int            fd;

      g_return_val_if_fail (channel != NULL, -1);

      fd = g_io_channel_unix_get_fd (channel->iochannel);

 retry:
      FD_ZERO (&read_set);
      FD_SET (fd, &read_set);
      tv.tv_sec = 1;
      tv.tv_usec = 0;

      avail = select (fd + 1, &read_set, NULL, NULL, &tv);
      if (avail < 0) {
            if (errno == EINTR)
                  goto retry;
            g_warning ("Failed to check data availability on socket %d\n", fd);
            return FALSE;
      } else if (avail == 0) {
            return FALSE;
      }

      return TRUE;
}

static guint32
mapping_protocol_channel_get_next_serial (MappingProtocolChannel *channel)
{
      guint32 serial;

      serial = channel->client_serial++;

      if ((gint32)channel->client_serial < 0) {
            channel->client_serial = 1;
      }

      return serial;
}

static gboolean
mapping_protocol_channel_dispatch_unlocked (MappingProtocolChannel *channel)
{
      GIOStatus status;
      gsize     bytes;
      GError   *error;

      status = G_IO_STATUS_NORMAL;
      if (channel->write_buffer->len > MINIMUM_MESSAGE_SIZE) {
            DEBUG_PRINT ("Dispatching messages");

            error = NULL;
            status = g_io_channel_write_chars (channel->iochannel,
                                       channel->write_buffer->str,
                                       channel->write_buffer->len,
                                       &bytes,
                                       &error);

            channel->write_buffer = g_string_erase (channel->write_buffer, 0, bytes);

            if (status != G_IO_STATUS_NORMAL) {
                  DEBUG_PRINT ("Write error: %s", error->message);
                  g_error_free (error);
            } else {
                  g_io_channel_flush (channel->iochannel, NULL);
            }
      }

      return status == G_IO_STATUS_NORMAL;
}

static void
mapping_protocol_channel_queue_write_unlocked (MappingProtocolChannel *channel,
                                     GString                *str)
{
      DEBUG_PRINT ("Adding string to write queue len:%d", str->len);
      g_string_append_len (channel->write_buffer, str->str, str->len);

      /* watch for time to write */
      if (channel->write_watch_id == 0) {
            add_write_watch (channel);
      }
}

static void
mapping_protocol_channel_send_internal (MappingProtocolChannel *channel,
                              MappingProtocolMessage *message)
{
      GString *str;

      G_LOCK (channel_lock);

      if (mapping_protocol_message_get_serial (message) == 0) {
            guint32 serial;

            serial = mapping_protocol_channel_get_next_serial (channel);
            mapping_protocol_message_set_serial (message, serial);
      }

      switch (message->type) {
      case MAPPING_PROTOCOL_MESSAGE_REPLY:
            str = mapping_protocol_reply_encode (channel, MAPPING_PROTOCOL_REPLY (message));
            break;
      case MAPPING_PROTOCOL_MESSAGE_REQUEST:
            str = mapping_protocol_request_encode (channel, MAPPING_PROTOCOL_REQUEST (message));
            break;
      case MAPPING_PROTOCOL_MESSAGE_MONITOR_EVENT:
            str = mapping_protocol_monitor_event_encode (channel, MAPPING_PROTOCOL_MONITOR_EVENT (message));
            break;
      default:
            g_assert_not_reached ();
            break;
      }

      if (str != NULL) {
            mapping_protocol_channel_queue_write_unlocked (channel, str);
            g_string_free (str, TRUE);
      }

      G_UNLOCK (channel_lock);
}

gboolean
mapping_protocol_channel_send (MappingProtocolChannel *channel,
                         MappingProtocolMessage *message)
{
      g_return_val_if_fail (channel != NULL, FALSE);
      g_return_val_if_fail (message != NULL, FALSE);

      mapping_protocol_channel_send_internal (channel, message);

      return TRUE;
}

static MappingProtocolMessage *
lookup_reply_for_serial (MappingProtocolChannel *channel,
                   guint32                 serial)
{
      MappingProtocolMessage *reply;

      DEBUG_PRINT ("Looking for reply for %u", serial);

      reply = g_hash_table_lookup (channel->replies, GUINT_TO_POINTER (serial));

      return reply;
}

static void
insert_reply_for_serial (MappingProtocolChannel *channel,
                   guint32                 serial,
                   MappingProtocolMessage *message)
{
      DEBUG_PRINT ("Inserting reply for serial %u", serial);

      /* hash takes ownership of ref */
      g_hash_table_insert (channel->replies, GUINT_TO_POINTER (serial), message);
}

static void
remove_reply_for_serial (MappingProtocolChannel *channel,
                   guint32                 serial)
{
      DEBUG_PRINT ("Removing reply for serial %u", serial);
      g_hash_table_remove (channel->replies, GUINT_TO_POINTER (serial));
}

static void
add_read_watch (MappingProtocolChannel *channel)
{
      channel->read_watch_id = g_io_add_watch (channel->iochannel,
                                     G_IO_IN | G_IO_ERR | G_IO_HUP,
                                     (GIOFunc)handle_read,
                                     channel);
}

static void
remove_read_watch (MappingProtocolChannel *channel)
{
      if (channel->read_watch_id > 0) {
            g_source_remove (channel->read_watch_id);
            channel->read_watch_id = 0;
      }
}

static void
add_write_watch (MappingProtocolChannel *channel)
{
      channel->write_watch_id = g_io_add_watch (channel->iochannel,
                                      G_IO_OUT | G_IO_ERR | G_IO_HUP,
                                      (GIOFunc)handle_write,
                                      channel);
}

static void
remove_write_watch (MappingProtocolChannel *channel)
{
      if (channel->write_watch_id > 0) {
            g_source_remove (channel->write_watch_id);
            channel->write_watch_id = 0;
      }
}

static gboolean
block_for_reply (MappingProtocolChannel  *channel,
             MappingProtocolMessage  *message,
             MappingProtocolMessage **reply)
{
      guint32                 serial;
      guint32                 count;
      MappingProtocolMessage *r;
      gboolean                res;

      G_LOCK (channel_lock);

      remove_read_watch (channel);

      /* ensure the message was sent */
      res = mapping_protocol_channel_dispatch_unlocked (channel);
      if (! res) {
            goto out;
      }

      /* wait for reply */
      serial = mapping_protocol_message_get_serial (message);

      DEBUG_PRINT ("Send message %u, waiting for reply", serial);

      r = lookup_reply_for_serial (channel, serial);
      count = 1;
      while (r == NULL && count < 100000) {
            mapping_protocol_channel_do_read_iteration_unlocked (channel, TRUE);

            /* FIXME: check connection */

            r = lookup_reply_for_serial (channel, serial);

            count++;
      }

      if (r != NULL) {
            DEBUG_PRINT ("Got reply for message %u", serial);

            mapping_protocol_message_ref (r);
            res = TRUE;
      } else {
            DEBUG_PRINT ("WARNING: Did not receive reply for message %u", serial);
            res = FALSE;
      }

      if (reply != NULL) {
            *reply = r;
      }

      remove_reply_for_serial (channel, serial);

      /* FIXME: handle result? */
      mapping_protocol_channel_do_iteration_unlocked (channel, FALSE);

      add_read_watch (channel);
 out:
      G_UNLOCK (channel_lock);

      return res;
}

gboolean
mapping_protocol_channel_send_with_reply (MappingProtocolChannel  *channel,
                                MappingProtocolMessage  *message,
                                MappingProtocolMessage **reply)
{
      g_return_val_if_fail (channel != NULL, FALSE);
      g_return_val_if_fail (message != NULL, FALSE);

      mapping_protocol_channel_send_internal (channel, message);

      return block_for_reply (channel, message, reply);
}

static gboolean
decode_event (MappingProtocolChannel  *channel,
            MappingProtocolMessage **message)
{
      gboolean                res;
      MappingProtocolMessage *m;

      m = _mapping_protocol_message_new (MAPPING_PROTOCOL_MESSAGE_MONITOR_EVENT);
      res = mapping_protocol_monitor_event_decode (channel, MAPPING_PROTOCOL_MONITOR_EVENT (m));

      if (! res) {
            mapping_protocol_message_unref (m);
            m = NULL;
      }

      if (message != NULL) {
            *message = m;
      }

      return res;
}

static gboolean
decode_request (MappingProtocolChannel  *channel,
            MappingProtocolMessage **message)
{
      gboolean                res;
      MappingProtocolMessage *m;

      m = _mapping_protocol_message_new (MAPPING_PROTOCOL_MESSAGE_REQUEST);
      res = mapping_protocol_request_decode (channel, MAPPING_PROTOCOL_REQUEST (m));

      if (! res) {
            mapping_protocol_message_unref (m);
            m = NULL;
      }

      if (message != NULL) {
            *message = m;
      }

      return res;
}

static gboolean
decode_reply (MappingProtocolChannel  *channel,
            MappingProtocolMessage **message)
{
      gboolean                res;
      MappingProtocolMessage *m;

      m = _mapping_protocol_message_new (MAPPING_PROTOCOL_MESSAGE_REPLY);
      res = mapping_protocol_reply_decode (channel, MAPPING_PROTOCOL_REPLY (m));

      if (! res) {
            mapping_protocol_message_unref (m);
            m = NULL;
      }

      if (message != NULL) {
            *message = m;
      }

      return res;
}

static gboolean
mapping_protocol_channel_fill_read_buffer_unlocked (MappingProtocolChannel *channel,
                                        gboolean                blocking)
{
      GIOStatus status;
      char      data[4096];
      gsize     length;
      gsize     count;

      DEBUG_PRINT ("Reading... ");
      count = sizeof (data);

      if (blocking) {
            wait_for_data (channel);
      }

      status = g_io_channel_read_chars (channel->iochannel, data, count, &length, NULL);

      if (status == G_IO_STATUS_EOF) {
            return FALSE;
      }

      if (length > 0) {
            g_string_append_len (channel->read_buffer, data, length);
            DEBUG_PRINT ("Filling read buffer: length %d", length);
      } else {
            DEBUG_PRINT ("No data read into buffer");
      }

      return TRUE;
}

static void
mapping_protocol_channel_queue_messages_unlocked (MappingProtocolChannel *channel)
{
      MappingProtocolMessage *message;
      gboolean                ok;

      if (channel->read_buffer == NULL
          || channel->read_buffer->len < MINIMUM_MESSAGE_SIZE) {
            return;
      }

      DEBUG_PRINT ("Queuing messages...");

      ok = TRUE;
      while (ok) {
            char c;

            ok = mapping_protocol_decode_handshake (channel, &c);

            if (!ok) {
                  break;
            }

            DEBUG_PRINT ("Processing message of type: %c", c);

            message = NULL;
            switch (c) {
            case 'E':
                  ok = decode_event (channel, &message);
                  break;
            case 'Q':
                  ok = decode_request (channel, &message);
                  break;
            case 'R':
                  ok = decode_reply (channel, &message);
                  if (ok) {
                        insert_reply_for_serial (channel, MAPPING_PROTOCOL_REPLY (message)->serial, message);
                  }
                  message = NULL;
                  break;
            default:
                  g_assert_not_reached ();
                  break;
            }

            if (message != NULL) {
                  DEBUG_PRINT ("Queuing message of type: %c", c);
                  /* list takes ownership of ref */
                  channel->message_queue = g_list_append (channel->message_queue, message);
            }
      }
}

static void
mapping_protocol_channel_deliver_messages_unlocked (MappingProtocolChannel *channel)
{
      while (channel->message_queue != NULL) {
            MappingProtocolMessage *message;

            message = channel->message_queue->data;

            DEBUG_PRINT ("Delivering message of type: %d", message->type);

            if (channel->message_handler != NULL) {
                  channel->message_handler (channel, message, channel->user_data);
            }
            if (message != NULL) {
                  mapping_protocol_message_unref (message);
            }
            channel->message_queue = g_list_delete_link (channel->message_queue, channel->message_queue);
      }
}

static gboolean
mapping_protocol_channel_do_read_iteration_unlocked (MappingProtocolChannel *channel,
                                         gboolean                blocking)
{
      gboolean res;

      res = mapping_protocol_channel_fill_read_buffer_unlocked (channel, blocking);
      if (res) {
            mapping_protocol_channel_queue_messages_unlocked (channel);
      }

      return res;
}

static gboolean
mapping_protocol_channel_do_iteration_unlocked (MappingProtocolChannel *channel,
                                    gboolean                blocking)
{
      gboolean res;

      res = mapping_protocol_channel_do_read_iteration_unlocked (channel, blocking);

      if (res) {
            mapping_protocol_channel_deliver_messages_unlocked (channel);
      }

      return res;
}

static gboolean
mapping_protocol_channel_do_iteration (MappingProtocolChannel *channel)
{
      gboolean res;

      DEBUG_PRINT ("Do iteration");

      G_LOCK (channel_lock);
      res = mapping_protocol_channel_do_iteration_unlocked (channel, FALSE);
      G_UNLOCK (channel_lock);

      return res;
}

static gboolean
mapping_protocol_channel_dispatch (MappingProtocolChannel *channel)
{
      gboolean res;

      DEBUG_PRINT ("Do dispatch");

      G_LOCK (channel_lock);
      res = mapping_protocol_channel_dispatch_unlocked (channel);
      G_UNLOCK (channel_lock);

      return res;
}

static gboolean
handle_write (GIOChannel             *source,
            GIOCondition            condition,
            MappingProtocolChannel *channel)
{
      gboolean res;

        if (condition & G_IO_OUT) {
            /* FIXME: handle result? */
            mapping_protocol_channel_dispatch (channel);
      }

      channel->write_watch_id = 0;
      res = FALSE;

      return res;
}

static gboolean
handle_read (GIOChannel             *source,
           GIOCondition            condition,
           MappingProtocolChannel *channel)
{
      gboolean res;

      res = TRUE;

        if (condition & G_IO_IN) {
            res = mapping_protocol_channel_do_iteration (channel);
      }

      if (condition & G_IO_HUP) {
            res = FALSE;
      }

      if (! res) {
            channel->read_watch_id = 0;
            /* send a NULL message to signal error */
            if (channel->message_handler != NULL) {
                  channel->message_handler (channel, NULL, channel->user_data);
            }
      }

      return res;
}

MappingProtocolChannel *
mapping_protocol_channel_new (int fd)
{
      MappingProtocolChannel *channel;
      GIOChannel             *iochannel = NULL;
      GIOFlags                flags;

      channel = g_new0 (MappingProtocolChannel, 1);

      iochannel = g_io_channel_unix_new (fd);
      g_io_channel_set_close_on_unref (iochannel, TRUE);
      g_io_channel_set_encoding (iochannel, NULL, NULL);

      flags = g_io_channel_get_flags (iochannel);
      g_io_channel_set_flags (iochannel, flags | G_IO_FLAG_NONBLOCK, NULL);

      channel->iochannel = iochannel;
      channel->ref_count = 1;
      channel->client_serial = 1;
      channel->read_buffer = g_string_new (NULL);
      channel->write_buffer = g_string_new (NULL);

      channel->replies = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, (GDestroyNotify)mapping_protocol_message_unref);

      add_read_watch (channel);

      return channel;
}

static void
mapping_protocol_channel_free (MappingProtocolChannel *channel)
{
      g_io_channel_unref (channel->iochannel);

      remove_read_watch (channel);
      remove_write_watch (channel);

      g_string_free (channel->read_buffer, TRUE);
      g_string_free (channel->write_buffer, TRUE);

      g_hash_table_destroy (channel->replies);

      g_free (channel);
      channel = NULL;
}

void
mapping_protocol_channel_ref (MappingProtocolChannel *channel)
{
      g_return_if_fail (channel != NULL);
      g_return_if_fail (channel->ref_count > 0);

      channel->ref_count += 1;
}

void
mapping_protocol_channel_unref (MappingProtocolChannel *channel)
{
      g_return_if_fail (channel != NULL);
      g_return_if_fail (channel->ref_count > 0);

      if (channel->ref_count > 1)
            channel->ref_count -= 1;
      else
            mapping_protocol_channel_free (channel);
}

void
mapping_protocol_channel_set_message_handler (MappingProtocolChannel              *channel,
                                    MappingProtocolHandleMessageFunction function,
                                    gpointer                             user_data)
{
      g_return_if_fail (channel != NULL);
      g_return_if_fail (function != NULL);

      channel->message_handler = function;
      channel->user_data = user_data;
}

Generated by  Doxygen 1.6.0   Back to index