Merge remote-tracking branch 'origin/clustered' into aks_dev_clus
authorAkshay Giridhar <akshay87@vt.edu>
Tue, 24 Jun 2014 21:39:59 +0000 (17:39 -0400)
committerAkshay Giridhar <akshay87@vt.edu>
Tue, 24 Jun 2014 21:39:59 +0000 (17:39 -0400)
Conflicts:
fs/exec.c
include/linux/popcorn.h
include/linux/process_server.h
kernel/futex.c
kernel/kinit.c
kernel/process_server.c
kernel/sched.c
pcnmsg/pcn_kmsg.c

1  2 
fs/exec.c
include/linux/pcn_kmsg.h
include/linux/process_server.h
ipc/Makefile
kernel/kmod.c
kernel/process_server.c
pcnmsg/pcn_kmsg.c

diff --combined fs/exec.c
+++ b/fs/exec.c
@@@ -813,10 -813,7 +813,7 @@@ int kernel_read(struct file *file, loff
  
  EXPORT_SYMBOL(kernel_read);
  
- #ifdef PROCESS_SERVER_USE_KMOD
- static 
- #endif
- int exec_mmap(struct mm_struct *mm)
+ static int exec_mmap(struct mm_struct *mm)
  {
        struct task_struct *tsk;
        struct mm_struct * old_mm, *active_mm;
        return 0;
  }
  
- #ifndef PROCESS_SERVER_USE_KMOD
- EXPORT_SYMBOL(exec_mmap);
- #endif
  /*
   * This function makes sure the current process has its own signal table,
   * so that flush_signal_handlers can later reset the handlers without
@@@ -1018,7 -1011,7 +1011,7 @@@ no_thread_group
   * These functions flushes out all traces of the currently running executable
   * so that a new one can be started
   */
 -static void flush_old_files(struct files_struct * files)
 +/*static*/ void flush_old_files(struct files_struct * files)
  {
        long j = -1;
        struct fdtable *fdt;
@@@ -1491,19 -1484,25 +1484,25 @@@ static int do_execve_common(const char 
                goto out_files;
  
        retval = prepare_bprm_creds(bprm);
-       if (retval)
+       if (retval) {
+ printk("%s: prepare_bprm_creds\n", __func__);
                goto out_free;
+ }
  
        retval = check_unsafe_exec(bprm);
-       if (retval < 0)
+       if (retval < 0) {
+ printk("%s: check_unsafe_exec\n", __func__);
                goto out_free;
+ }
        clear_in_exec = retval;
        current->in_execve = 1;
  
        file = open_exec(filename);
        retval = PTR_ERR(file);
-       if (IS_ERR(file))
+       if (IS_ERR(file)) {
+ //printk("%s: open_exec\n", __func__);
                goto out_unmark;
+ }
  
        sched_exec();
  
        bprm->interp = filename;
  
        retval = bprm_mm_init(bprm);
-       if (retval)
+       if (retval) {
+ printk("%s: bprm_mm_init\n", __func__);
                goto out_file;
+ }
  
        bprm->argc = count(argv, MAX_ARG_STRINGS);
-       if ((retval = bprm->argc) < 0)
+       if ((retval = bprm->argc) < 0) {
+ printk("%s: count argv\n", __func__);
                goto out;
+ }
  
        bprm->envc = count(envp, MAX_ARG_STRINGS);
-       if ((retval = bprm->envc) < 0)
+       if ((retval = bprm->envc) < 0) {
+ printk("%s: count envc\n", __func__);
                goto out;
+ }
  
        retval = prepare_binprm(bprm);
-       if (retval < 0)
+       if (retval < 0) {
+ printk("%s: prepare_binprm\n", __func__);
                goto out;
+ }
  
        retval = copy_strings_kernel(1, &bprm->filename, bprm);
-       if (retval < 0)
+       if (retval < 0) {
+ printk("%s: copy_string_kernel\n", __func__);
                goto out;
- #ifdef PROCESS_SERVER_USE_KMOD
+ }
      if(!current->executing_for_remote) {
- #endif
          bprm->exec = bprm->p;
          retval = copy_strings(bprm->envc, envp, bprm);
-         if (retval < 0)
+         if (retval < 0) {
+ printk("%s: copy_strings bprm->envc\n", __func__);
              goto out;
+ }
  
          retval = copy_strings(bprm->argc, argv, bprm);
-         if (retval < 0)
+         if (retval < 0) {
+ printk("%s: copy_strings bprm->argc\n", __func__);
              goto out;
- #ifdef PROCESS_SERVER_USE_KMOD
-     }
- #endif
+ }
+      }
  
        retval = search_binary_handler(bprm,regs);
-       if (retval < 0)
+       if (retval < 0) {
+ printk("%s: search_binary_handler\n", __func__);
                goto out;
+ }
  
        /* execve succeeded */
        current->fs->in_exec = 0;
@@@ -2312,3 -2325,4 +2325,4 @@@ int dump_seek(struct file *file, loff_
        return ret;
  }
  EXPORT_SYMBOL(dump_seek);
diff --combined include/linux/pcn_kmsg.h
@@@ -18,7 -18,6 +18,6 @@@
  /* BOOKKEEPING */
  
  #define POPCORN_MAX_MCAST_CHANNELS 32
- #define LG_SEQNUM_SIZE 7
  
  struct pcn_kmsg_mcast_wininfo {
        volatile unsigned char lock;
@@@ -60,39 -59,6 +59,39 @@@ enum pcn_kmsg_type 
        PCN_KMSG_TYPE_TEST_LONG,
        PCN_KMSG_TYPE_CHECKIN,
        PCN_KMSG_TYPE_MCAST,
 +      PCN_KMSG_TYPE_REMOTE_PROC_CPUINFO_REQUEST,
 +      PCN_KMSG_TYPE_REMOTE_PROC_CPUINFO_RESPONSE,
 +      PCN_KMSG_TYPE_REMOTE_PROC_MEMINFO_REQUEST,
 +      PCN_KMSG_TYPE_REMOTE_PROC_MEMINFO_RESPONSE,
 +      PCN_KMSG_TYPE_REMOTE_PROC_STAT_REQUEST,
 +      PCN_KMSG_TYPE_REMOTE_PROC_STAT_RESPONSE,
 +      PCN_KMSG_TYPE_REMOTE_PID_REQUEST,
 +      PCN_KMSG_TYPE_REMOTE_PID_RESPONSE,
 +      PCN_KMSG_TYPE_REMOTE_PID_STAT_REQUEST,
 +      PCN_KMSG_TYPE_REMOTE_PID_STAT_RESPONSE,
 +      PCN_KMSG_TYPE_REMOTE_PID_CPUSET_REQUEST,
 +      PCN_KMSG_TYPE_REMOTE_PID_CPUSET_RESPONSE,
 +      PCN_KMSG_TYPE_REMOTE_SENDSIG_REQUEST,
 +      PCN_KMSG_TYPE_REMOTE_SENDSIG_RESPONSE,
 +      PCN_KMSG_TYPE_REMOTE_SENDSIGPROCMASK_REQUEST,
 +      PCN_KMSG_TYPE_REMOTE_SENDSIGPROCMASK_RESPONSE,
 +      PCN_KMSG_TYPE_REMOTE_SENDSIGACTION_REQUEST,
 +      PCN_KMSG_TYPE_REMOTE_SENDSIGACTION_RESPONSE,
 +      PCN_KMSG_TYPE_REMOTE_IPC_SEMGET_REQUEST,
 +      PCN_KMSG_TYPE_REMOTE_IPC_SEMGET_RESPONSE,
 +      PCN_KMSG_TYPE_REMOTE_IPC_SEMCTL_REQUEST,
 +      PCN_KMSG_TYPE_REMOTE_IPC_SEMCTL_RESPONSE,
 +      PCN_KMSG_TYPE_REMOTE_IPC_SHMGET_REQUEST,
 +      PCN_KMSG_TYPE_REMOTE_IPC_SHMGET_RESPONSE,
 +      PCN_KMSG_TYPE_REMOTE_IPC_SHMAT_REQUEST,
 +      PCN_KMSG_TYPE_REMOTE_IPC_SHMAT_RESPONSE,
 +      PCN_KMSG_TYPE_REMOTE_IPC_FUTEX_WAKE_REQUEST,
 +      PCN_KMSG_TYPE_REMOTE_IPC_FUTEX_WAKE_RESPONSE,
 +      PCN_KMSG_TYPE_REMOTE_PFN_REQUEST,
 +      PCN_KMSG_TYPE_REMOTE_PFN_RESPONSE,
 +      PCN_KMSG_TYPE_REMOTE_IPC_FUTEX_KEY_REQUEST,
 +      PCN_KMSG_TYPE_REMOTE_IPC_FUTEX_KEY_RESPONSE,
 +      PCN_KMSG_TYPE_REMOTE_IPC_FUTEX_TOKEN_REQUEST,
      PCN_KMSG_TYPE_PROC_SRV_CLONE_REQUEST,
      PCN_KMSG_TYPE_PROC_SRV_CREATE_PROCESS_PAIRING,
      PCN_KMSG_TYPE_PROC_SRV_EXIT_PROCESS,
      PCN_KMSG_TYPE_ANSWER_TEST,
        PCN_KMSG_TYPE_MCAST_CLOSE,
        PCN_KMSG_TYPE_SHMTUN,
 -PCN_KMSG_TYPE_REMOTE_PROC_CPUINFO_RESPONSE,
 -PCN_KMSG_TYPE_REMOTE_PROC_CPUINFO_REQUEST,
        PCN_KMSG_TYPE_MAX
  };
  
@@@ -132,24 -100,37 +131,37 @@@ enum pcn_kmsg_prio 
        PCN_KMSG_PRIO_NORMAL
  };
  
+ #define __READY_SIZE 1
+ #define LG_SEQNUM_SIZE  (8 - __READY_SIZE)
  /* Message header */
  struct pcn_kmsg_hdr {
        unsigned int from_cpu   :8; // b0
+       
        enum pcn_kmsg_type type :8; // b1
+       
        enum pcn_kmsg_prio prio :5; // b2
        unsigned int is_lg_msg  :1;
        unsigned int lg_start   :1;
        unsigned int lg_end     :1;
-       unsigned long long_number;
  
-       unsigned int lg_seqnum  :LG_SEQNUM_SIZE;// b3
-       //volatile unsigned int ready   :1;
+       unsigned long long_number; // b3 .. b10
+       
+       unsigned int lg_seqnum  :LG_SEQNUM_SIZE; // b11
+       unsigned int __ready    :__READY_SIZE;
  }__attribute__((packed));
  
+ //#if ( &((struct pcn_kmsg_hdr*)0)->ready != 12 )
+ //# error "ready is not the last byte of the struct"
+ //#endif
+ // TODO cache size can be retrieved by the compiler, put it here
+ #define CACHE_LINE_SIZE 64
  //#define PCN_KMSG_PAYLOAD_SIZE 60
- #define PCN_KMSG_PAYLOAD_SIZE (64-sizeof(struct pcn_kmsg_hdr))
+ #define PCN_KMSG_PAYLOAD_SIZE (CACHE_LINE_SIZE - sizeof(struct pcn_kmsg_hdr))
+ #define MAX_CHUNKS ((1 << LG_SEQNUM_SIZE) -1)
+ #define PCN_KMSG_LONG_PAYLOAD_SIZE (MAX_CHUNKS*PCN_KMSG_PAYLOAD_SIZE)
  
  /* The actual messages.  The expectation is that developers will create their
     own message structs with the payload replaced with their own fields, and then
  struct pcn_kmsg_message {
        struct pcn_kmsg_hdr hdr;
        unsigned char payload[PCN_KMSG_PAYLOAD_SIZE];
- }__attribute__((packed)) __attribute__((aligned(64)));
+ }__attribute__((packed)) __attribute__((aligned(CACHE_LINE_SIZE)));
+ struct pcn_kmsg_reverse_message {
+       unsigned char payload[PCN_KMSG_PAYLOAD_SIZE];
+       struct pcn_kmsg_hdr hdr;
+       volatile unsigned long last_ticket;
+       volatile unsigned char ready;
+ }__attribute__((packed)) __attribute__((aligned(CACHE_LINE_SIZE)));
  
  /* Struct for sending long messages (>60 bytes payload) */
  struct pcn_kmsg_long_message {
        struct pcn_kmsg_hdr hdr;
-       unsigned char payload[512];
+       unsigned char payload[PCN_KMSG_LONG_PAYLOAD_SIZE];
  }__attribute__((packed));
  
  /* List entry to copy message into and pass around in receiving kernel */
@@@ -177,13 -165,6 +196,6 @@@ struct pcn_kmsg_container 
  }__attribute__((packed));
  
  
- struct pcn_kmsg_reverse_message {
-       unsigned char payload[PCN_KMSG_PAYLOAD_SIZE];
-       struct pcn_kmsg_hdr hdr;
-       volatile unsigned char ready;
-       volatile unsigned long last_ticket;
- }__attribute__((packed)) __attribute__((aligned(64)));
  
  /* TYPES OF MESSAGES */
  
@@@ -193,7 -174,7 +205,7 @@@ struct pcn_kmsg_checkin_message 
        unsigned long window_phys_addr;
        unsigned char cpu_to_add;
        char pad[51];
- }__attribute__((packed)) __attribute__((aligned(64)));
+ }__attribute__((packed)) __attribute__((aligned(CACHE_LINE_SIZE)));
  
  
  
@@@ -218,7 -199,7 +230,7 @@@ typedef int (*pcn_kmsg_cbftn)(struct pc
  
  /* Register a callback function to handle a new message type.  Intended to
     be called when a kernel module is loaded. */
 -int pcn_kmsg_register_callback(enum pcn_kmsg_type type, 
 +int pcn_kmsg_register_callback(enum pcn_kmsg_type type,
                               pcn_kmsg_cbftn callback);
  
  /* Unregister a callback function for a message type.  Intended to
@@@ -231,8 -212,8 +243,8 @@@ int pcn_kmsg_unregister_callback(enum p
  int pcn_kmsg_send(unsigned int dest_cpu, struct pcn_kmsg_message *msg);
  
  /* Send a long message to the specified destination CPU. */
 -int pcn_kmsg_send_long(unsigned int dest_cpu, 
 -                     struct pcn_kmsg_long_message *lmsg, 
 +int pcn_kmsg_send_long(unsigned int dest_cpu,
 +                     struct pcn_kmsg_long_message *lmsg,
                       unsigned int payload_size);
  
  /* Free a received message (called at the end of the callback function) */
@@@ -252,13 -233,13 +264,13 @@@ enum pcn_kmsg_mcast_type 
  /* Message struct for guest kernels to check in with each other. */
  struct pcn_kmsg_mcast_message {
        struct pcn_kmsg_hdr hdr;
 -      enum pcn_kmsg_mcast_type type :32; 
 -      pcn_kmsg_mcast_id id;   
 +      enum pcn_kmsg_mcast_type type :32;
 +      pcn_kmsg_mcast_id id;
        unsigned long mask;
        unsigned int num_members;
        unsigned long window_phys_addr;
        char pad[28];
- }__attribute__((packed)) __attribute__((aligned(64)));
+ }__attribute__((packed)) __attribute__((aligned(CACHE_LINE_SIZE)));
  
  struct pcn_kmsg_mcast_window {
        volatile unsigned long head;
@@@ -288,7 -269,7 +300,7 @@@ int pcn_kmsg_mcast_close(pcn_kmsg_mcast
  int pcn_kmsg_mcast_send(pcn_kmsg_mcast_id id, struct pcn_kmsg_message *msg);
  
  /* Send a long message to the specified multicast group. */
 -int pcn_kmsg_mcast_send_long(pcn_kmsg_mcast_id id, 
 +int pcn_kmsg_mcast_send_long(pcn_kmsg_mcast_id id,
                             struct pcn_kmsg_long_message *msg,
                             unsigned int payload_size);
  
@@@ -8,6 -8,10 +8,10 @@@
  
  #ifndef _PROCESS_SERVER_H
  #define _PROCESS_SERVER_H
+ /**
+  * Constants
+  */
  #define RETURN_DISPOSITION_NONE 0
  #define RETURN_DISPOSITION_EXIT 1
  #define RETURN_DISPOSITION_MIGRATE 2
@@@ -15,8 -19,8 +19,8 @@@
  #define PROCESS_SERVER_CLONE_FAIL 1
  
  //configuration
 -#define SUPPORT_FOR_CLUSTERING
 -//#undef SUPPORT_FOR_CLUSTERING
 +//#define SUPPORT_FOR_CLUSTERING
 +#undef SUPPORT_FOR_CLUSTERING
  
  /*
   * Migration hook.
diff --combined ipc/Makefile
@@@ -3,10 -3,11 +3,11 @@@
  #
  
  obj-$(CONFIG_SYSVIPC_COMPAT) += compat.o
 -obj-$(CONFIG_SYSVIPC) += util.o msgutil.o msg.o sem.o shm.o ipcns_notifier.o syscall.o
 +obj-$(CONFIG_SYSVIPC) += util.o msgutil.o msg.o sem.o shm.o ipcns_notifier.o syscall.o sem_remote.o shm_remote.o
  obj-$(CONFIG_SYSVIPC_SYSCTL) += ipc_sysctl.o
  obj_mq-$(CONFIG_COMPAT) += compat_mq.o
- obj-$(CONFIG_POSIX_MQUEUE) += mqueue.o msgutil.o $(obj_mq-y) bbuffer.o mbuffer.o mcomm.o
+ #obj-$(CONFIG_POSIX_MQUEUE) += mqueue.o msgutil.o $(obj_mq-y) bbuffer.o mbuffer.o mcomm.o
+ obj-$(CONFIG_POSIX_MQUEUE) += mqueue.o msgutil.o $(obj_mq-y)
  obj-$(CONFIG_IPC_NS) += namespace.o
  obj-$(CONFIG_POSIX_MQUEUE_SYSCTL) += mq_sysctl.o
  
diff --combined kernel/kmod.c
@@@ -199,23 -199,18 +199,21 @@@ static int ____call_usermodehelper(voi
          //current->represents_remote = 0;
          memcpy(&current->remote_regs, &sub_info->remote_regs, sizeof(struct pt_regs) );
  
 +        /*mklinux_akshay*/
 +        current->origin_pid =sub_info->origin_pid;
 +
          // Notify of PID/PID pairing.
-         process_server_notify_delegated_subprocess_starting(current->pid,sub_info->remote_pid,sub_info->remote_cpu);
+         process_server_notify_delegated_subprocess_starting(
+               current->pid,sub_info->remote_pid,sub_info->remote_cpu);
      } 
  
        retval = kernel_execve(sub_info->path,
                               (const char *const *)sub_info->argv,
                               (const char *const *)sub_info->envp);
      
        /* Exec failed? */
  fail:
-     printk("kmod exec failed retval{%d}\n",retval);
+       printk("%s: failed retval{%d}\n", __func__, retval);
        sub_info->retval = retval;
        do_exit(0);
  }
diff --combined kernel/process_server.c
@@@ -1,4 -1,4 +1,4 @@@
- /**
 /**
   * Implements task migration and maintains coherent 
   * address spaces across CPU cores.
   *
@@@ -28,7 -28,7 +28,7 @@@
  #include <linux/pcn_perf.h> // performance measurement
  #include <linux/string.h>
  
 -#include <linux/popcorn.h>
 +#include <linux/popcorn_cpuinfo.h>
  
  #include <asm/pgtable.h>
  #include <asm/atomic.h>
  
  unsigned long get_percpu_old_rsp(void);
  
 +#include <linux/futex.h>
 +#define  NSIG 32
 +
 +#include<linux/signal.h>
 +#include <linux/fcntl.h>
 +#include "futex_remote.h"
  /**
   * General purpose configuration
   */
@@@ -67,7 -61,6 +67,7 @@@
  // migrate in response to a mapping query.
  #define MAX_MAPPINGS 1
  
 +extern sys_topen(const char __user * filename, int flags, int mode, int fd);
  /**
   * Use the preprocessor to turn off printk.
   */
  /**
   * Perf
   */
- #define MEASURE_PERF 1
- #if MEASURE_PERF
+ #ifdef CONFIG_POPCORN_PERF
  #define PERF_INIT() perf_init()
  #define PERF_MEASURE_START(x) perf_measure_start(x)
  #define PERF_MEASURE_STOP(x,y,z)  perf_measure_stop(x,y,z)
@@@ -245,26 -237,18 +244,21 @@@ static void perf_init(void) 
             "handle_mprotect_resonse");
  
  }
- #else
+ #else /* CONFIG_POPCORN_PERF */
  #define PERF_INIT() 
  #define PERF_MEASURE_START(x) -1
  #define PERF_MEASURE_STOP(x, y, z)
 -#endif /* !CONFIG_POPCORN_PERF */
 +#endif
 +
 +
 +static DECLARE_WAIT_QUEUE_HEAD( countq);
  
- /**
-  * Constants
-  */
- #define RETURN_DISPOSITION_EXIT 0
- #define RETURN_DISPOSITION_MIGRATE 1
  /**
   * Library
   */
  
+ #define POPCORN_MAX_PATH 512
  /**
   * Some piping for linking data entries
   * and identifying data entry types.
@@@ -358,13 -342,6 +352,13 @@@ typedef struct _clone_data 
      unsigned long previous_cpus;
      vma_data_t* vma_list;
      vma_data_t* pending_vma_list;
 +    /*mklinux_akshay*/int origin_pid;
 +    sigset_t remote_blocked, remote_real_blocked;
 +    sigset_t remote_saved_sigmask;
 +    struct sigpending remote_pending;
 +    unsigned long sas_ss_sp;
 +    size_t sas_ss_size;
 +    struct k_sigaction action[_NSIG];
  } clone_data_t;
  
  /**
@@@ -481,13 -458,6 +475,13 @@@ typedef struct _clone_request 
      int prio, static_prio, normal_prio; //from sched.c
        unsigned int rt_priority; //from sched.c
        int sched_class; //from sched.c but here we are using SCHED_NORMAL, SCHED_FIFO, etc.
 +    /*mklinux_akshay*/int origin_pid;
 +    sigset_t remote_blocked, remote_real_blocked;
 +    sigset_t remote_saved_sigmask;
 +    struct sigpending remote_pending;
 +    unsigned long sas_ss_sp;
 +    size_t sas_ss_size;
 +    struct k_sigaction action[_NSIG];
      unsigned long previous_cpus;
  } clone_request_t;
  
@@@ -997,7 -967,6 +991,6 @@@ static int cpu_has_known_tgroup_mm(int 
      _remote_cpu_info_list_t *objPtr;
      struct cpumask *pcpum =0;
      int cpuid =-1;
- extern struct list_head rlist_head;
      if (cpumask_test_cpu(cpu, cpu_present_mask))
        return 1;
      list_for_each(iter, &rlist_head) {
@@@ -1389,12 -1358,13 +1382,13 @@@ int find_consecutive_physically_mapped_
                                                unsigned long vaddr,
                                                unsigned long* vaddr_mapping_start,
                                                unsigned long* paddr_mapping_start,
-                                               size_t* paddr_mapping_sz) {
-     unsigned long paddr_curr = NULL;
+                                               size_t* paddr_mapping_sz)
+ {
+     unsigned long paddr_curr = 0l;
      unsigned long vaddr_curr = vaddr;
      unsigned long vaddr_next = vaddr;
-     unsigned long paddr_next = NULL;
-     unsigned long paddr_start = NULL;
+     unsigned long paddr_next = 0l;
+     unsigned long paddr_start = 0l;
      size_t sz = 0;
  
      
@@@ -2327,6 -2297,7 +2321,7 @@@ static void destroy_clone_data(clone_da
      kfree(data);
  }
  
+ #if 0
  /**
   * @brief Finds a vma_data_t entry.
   */
@@@ -2347,6 -2318,7 +2342,7 @@@ static vma_data_t* find_vma_data(clone_
  
      return ret;
  }
+ #endif
  
  /**
   * @brief Callback for page walk that displays the contents of the walk.
@@@ -2393,9 -2365,9 +2389,9 @@@ static int dump_page_walk_pte_entry_cal
  /**
   * @brief Displays relevant data within a mm.
   */
- static void dump_mm(struct mm_struct* mm) {
+ static void dump_mm(struct mm_struct* mm)
+ {
      struct vm_area_struct * curr;
-     char buf[256];
      struct mm_walk walk = {
          .pte_entry = dump_page_walk_pte_entry_callback,
          .mm = mm,
@@@ -2689,7 -2661,6 +2685,6 @@@ static int count_remote_thread_members(
      // the list does not include the current processor group descirptor (TODO)
      struct list_head *iter;
      _remote_cpu_info_list_t *objPtr;
- extern struct list_head rlist_head;
      list_for_each(iter, &rlist_head) {
          objPtr = list_entry(iter, _remote_cpu_info_list_t, cpu_list_member);
          i = objPtr->_data._processor;
@@@ -2762,7 -2733,8 +2757,8 @@@ static int count_local_thread_members(i
   * thread group in which the "current" task resides.
   * @return The number of threads.
   */
- static int count_thread_members() {
+ static int count_thread_members (void)
+ {
       
      int count = 0;
      PSPRINTK("%s: entered\n",__func__);
  void process_tgroup_closed_item(struct work_struct* work) {
  
      tgroup_closed_work_t* w = (tgroup_closed_work_t*) work;
-     data_header_t *curr, *next;
+     data_header_t *curr;
      mm_data_t* mm_data;
      struct task_struct *g, *task;
      unsigned char tgroup_closed = 0;
@@@ -2955,45 -2927,41 +2951,41 @@@ handled
   *
   * <MEASURED perf_process_mapping_request>
   */
- void process_mapping_request(struct work_struct* work) {
+ void process_mapping_request(struct work_struct* work)
+ {
      mapping_request_work_t* w = (mapping_request_work_t*) work;
-     mapping_response_t response;
+     mapping_response_t* response;
      data_header_t* data_curr = NULL;
      mm_data_t* mm_data = NULL;
+     
      struct task_struct* task = NULL;
      struct task_struct* g;
      struct vm_area_struct* vma = NULL;
      struct mm_struct* mm = NULL;
+     
      unsigned long address = w->address;
      unsigned long resolved = 0;
      struct mm_walk walk = {
          .pte_entry = vm_search_page_walk_pte_entry_callback,
          .private = &(resolved)
      };
-     char* plpath = NULL;
-     char lpath[512];
+     char *plpath = NULL, *lpath = NULL;
+     int used_saved_mm = 0, found_vma = 1, found_pte = 1; 
      int i;
-     
-     // for perf
-     int used_saved_mm = 0;
-     int found_vma = 1;
-     int found_pte = 1;
-     
-     // Perf start
+ #ifdef CONFIG_POPCORN_PERF    
+     // for perf 
      int perf = PERF_MEASURE_START(&perf_process_mapping_request);
+ #endif /* CONFIG_POPCORN_PERF */    
  
-     //PSPRINTK("%s: entered\n",__func__);
-     PSPRINTK("received mapping request from {%d} address{%lx}, cpu{%d}, id{%d}\n",
-             w->from_cpu,
-             w->address,
-             w->tgroup_home_cpu,
-             w->tgroup_home_id);
+     PSPRINTK("received mapping request from{%d} address{%lx}, cpu{%d}, id{%d}\n",
+             w->from_cpu, w->address, w->tgroup_home_cpu, w->tgroup_home_id);
  
      // First, search through existing processes
      do_each_thread(g,task) {
          if((task->tgroup_home_cpu == w->tgroup_home_cpu) &&
             (task->tgroup_home_id  == w->tgroup_home_id )) {
-             //PSPRINTK("mapping request found common thread group here\n");
+             PSPRINTK("mapping request found common thread group here\n");
              mm = task->mm;
  
              // Take note of the fact that an mm exists on the remote kernel
@@@ -3021,14 -2989,25 +3013,25 @@@ task_mm_search_exit
              }
  
              data_curr = data_curr->next;
          } // while
          PS_SPIN_UNLOCK(&_saved_mm_head_lock);
      }
      
+     response = kmalloc(sizeof(mapping_response_t), GFP_ATOMIC); //TODO convert to alloc_cache
+     if (!response) {
+       printk(KERN_ALERT"can not kmalloc mapping_response_t area from{%d} address{%lx} cpu{%d} id{%d}\n",
+             w->from_cpu, w->address, w->tgroup_home_cpu, w->tgroup_home_id);
+       goto err_work;
+     }
+     lpath = kmalloc(POPCORN_MAX_PATH, GFP_ATOMIC); //TODO convert to alloc_cache
+     if (!lpath) {
+       printk(KERN_ALERT"can not kmalloc lpath area from{%d} address{%lx} cpu{%d} id{%d}\n",
+             w->from_cpu, w->address, w->tgroup_home_cpu, w->tgroup_home_id);
+       goto err_response;
+     }
+     
      // OK, if mm was found, look up the mapping.
-     if(mm) {
+     if (mm) {
  
          // The purpose of this code block is to determine
          // if we need to use a read or write lock, and safely.  
@@@ -3058,11 -3037,9 +3061,9 @@@ changed_can_be_cow
          walk_page_range(address & PAGE_MASK, 
                  (address & PAGE_MASK) + PAGE_SIZE, &walk);
  
-         if(vma && resolved != 0) {
+         if (vma && resolved != 0) {
              PSPRINTK("mapping found! %lx for vaddr %lx\n",resolved,
                      address & PAGE_MASK);
              /*
               * Find regions of consecutive physical memory
               * in this vma, including the faulting address
               */
              {
              // Break all cows in this vma
-             if(can_be_cow) {
+             if (can_be_cow) {
                  unsigned long cow_addr;
                  for(cow_addr = vma->vm_start; cow_addr < vma->vm_end; cow_addr += PAGE_SIZE) {
                      break_cow(mm, vma, cow_addr);
                  // We no longer need a write lock after the break_cow process
                  // is complete, so downgrade the lock to a read lock.
                  downgrade_write(&mm->mmap_sem);
-             }
+             } // if (can_be_cow
  
              // Now grab all the mappings that we can stuff into the response.
-             if(0 != fill_physical_mapping_array(mm, 
-                                                 vma,
-                                                 address,
-                                                 &response.mappings, 
-                                                 MAX_MAPPINGS)) {
+             if (0 != fill_physical_mapping_array(mm, vma, address,
+                                                 &(response->mappings[0]),
+                                               MAX_MAPPINGS)) {
                  // If the fill process fails, clear out all
                  // results.  Otherwise, we might trick the
                  // receiving cpu into thinking the target
                  // mapping was found when it was not.
                  for(i = 0; i < MAX_MAPPINGS; i++) {
-                     response.mappings[i].present = 0;
-                     response.mappings[i].vaddr = 0;
-                     response.mappings[i].paddr = 0;
-                     response.mappings[i].sz = 0;
-                 }
-                     
-             }
+                     response->mappings[i].present = 0;
+                     response->mappings[i].vaddr = 0;
+                     response->mappings[i].paddr = 0;
+                     response->mappings[i].sz = 0;
+                 }   
+             } // if (0 != fill_physical_mapping_array
              }
  
-             response.header.type = PCN_KMSG_TYPE_PROC_SRV_MAPPING_RESPONSE;
-             response.header.prio = PCN_KMSG_PRIO_NORMAL;
-             response.tgroup_home_cpu = w->tgroup_home_cpu;
-             response.tgroup_home_id = w->tgroup_home_id;
-             response.requester_pid = w->requester_pid;
-             response.address = address;
-             response.present = 1;
-             response.vaddr_start = vma->vm_start;
-             response.vaddr_size = vma->vm_end - vma->vm_start;
-             response.prot = vma->vm_page_prot;
-             response.vm_flags = vma->vm_flags;
+             response->header.type = PCN_KMSG_TYPE_PROC_SRV_MAPPING_RESPONSE;
+             response->header.prio = PCN_KMSG_PRIO_NORMAL;
+             response->tgroup_home_cpu = w->tgroup_home_cpu;
+             response->tgroup_home_id = w->tgroup_home_id;
+             response->requester_pid = w->requester_pid;
+             response->address = address;
+             response->present = 1;
+             response->vaddr_start = vma->vm_start;
+             response->vaddr_size = vma->vm_end - vma->vm_start;
+             response->prot = vma->vm_page_prot;
+             response->vm_flags = vma->vm_flags;
              if(vma->vm_file == NULL) {
-                 response.path[0] = '\0';
+                 response->path[0] = '\0';
              } else {    
                  plpath = d_path(&vma->vm_file->f_path,lpath,512);
-                 strcpy(response.path,plpath);
-                 response.pgoff = vma->vm_pgoff;
+                 strcpy(response->path,plpath);
+                 response->pgoff = vma->vm_pgoff;
              }
  
              // We modified this lock to be read-mode above so now
              // we can do a read-unlock instead of a write-unlock
              PS_UP_READ(&mm->mmap_sem);
         
-         } else {
+         } else { // (vma && resolved != 0) 
  
              if(can_be_cow)
                  PS_UP_WRITE(&mm->mmap_sem);
                  PS_UP_READ(&mm->mmap_sem);
              // Zero out mappings
              for(i = 0; i < MAX_MAPPINGS; i++) {
-                 response.mappings[i].present = 0;
-                 response.mappings[i].vaddr = 0;
-                 response.mappings[i].paddr = 0;
-                 response.mappings[i].sz = 0;
+                 response->mappings[i].present = 0;
+                 response->mappings[i].vaddr = 0;
+                 response->mappings[i].paddr = 0;
+                 response->mappings[i].sz = 0;
              }
-         }
-         
+         } // !(vma && resolved != 0) 
      }
  
      // Not found, respond accordingly
-     if(resolved == 0) {
+     if (resolved == 0) {
          found_vma = 0;
          found_pte = 0;
          //PSPRINTK("Mapping not found\n");
-         response.header.type = PCN_KMSG_TYPE_PROC_SRV_MAPPING_RESPONSE;
-         response.header.prio = PCN_KMSG_PRIO_NORMAL;
-         response.tgroup_home_cpu = w->tgroup_home_cpu;
-         response.tgroup_home_id = w->tgroup_home_id;
-         response.requester_pid = w->requester_pid;
-         response.address = address;
-         response.present = 0;
-         response.vaddr_start = 0;
-         response.vaddr_size = 0;
-         response.path[0] = '\0';
+         response->header.type = PCN_KMSG_TYPE_PROC_SRV_MAPPING_RESPONSE;
+         response->header.prio = PCN_KMSG_PRIO_NORMAL;
+         response->tgroup_home_cpu = w->tgroup_home_cpu;
+         response->tgroup_home_id = w->tgroup_home_id;
+         response->requester_pid = w->requester_pid;
+         response->address = address;
+         response->present = 0;
+         response->vaddr_start = 0;
+         response->vaddr_size = 0;
+         response->path[0] = '\0';
  
          // Handle case where vma was present but no pte.
-         if(vma) {
+         if (vma) {
              //PSPRINTK("But vma present\n");
              found_vma = 1;
-             response.present = 1;
-             response.vaddr_start = vma->vm_start;
-             response.vaddr_size = vma->vm_end - vma->vm_start;
-             response.prot = vma->vm_page_prot;
-             response.vm_flags = vma->vm_flags;
+             response->present = 1;
+             response->vaddr_start = vma->vm_start;
+             response->vaddr_size = vma->vm_end - vma->vm_start;
+             response->prot = vma->vm_page_prot;
+             response->vm_flags = vma->vm_flags;
               if(vma->vm_file == NULL) {
-                  response.path[0] = '\0';
+                  response->path[0] = '\0';
               } else {    
                   plpath = d_path(&vma->vm_file->f_path,lpath,512);
-                  strcpy(response.path,plpath);
-                  response.pgoff = vma->vm_pgoff;
+                  strcpy(response->path,plpath);
+                  response->pgoff = vma->vm_pgoff;
               }
          }
      }
  
      // Send response
-     if(response.present) {
+     if(response->present) {
          DO_UNTIL_SUCCESS(pcn_kmsg_send_long(w->from_cpu,
-                             (struct pcn_kmsg_long_message*)(&response),
+                             (struct pcn_kmsg_long_message*)(response),
                              sizeof(mapping_response_t) - 
                              sizeof(struct pcn_kmsg_hdr) -   //
-                             sizeof(response.path) +         // Chop off the end of the path
-                             strlen(response.path) + 1));    // variable to save bandwidth.
+                             sizeof(response->path) +         // Chop off the end of the path
+                             strlen(response->path) + 1));    // variable to save bandwidth.
      } else {
          // This is an optimization to get rid of the _long send 
          // which is a time sink.
          nonpresent_response.requester_pid = w->requester_pid;
          nonpresent_response.address = w->address;
          DO_UNTIL_SUCCESS(pcn_kmsg_send(w->from_cpu,(struct pcn_kmsg_message*)(&nonpresent_response)));
      }
  
+     kfree(lpath);
+ err_response:
+     kfree(response);
+ err_work:
      kfree(work);
  
-     // Perf stop
+ #ifdef CONFIG_POPCORN_PERF    
      if(used_saved_mm && found_vma && found_pte) {
          PERF_MEASURE_STOP(&perf_process_mapping_request,
                  "Saved MM + VMA + PTE",
      } else {
          PERF_MEASURE_STOP(&perf_process_mapping_request,"ERR",perf);
      }
+ #endif /* CONFIG_POPCORN_PERF */    
  
      return;
  }
@@@ -3302,7 -3275,6 +3299,7 @@@ void process_group_exit_item(struct wor
      group_exit_work_t* w = (group_exit_work_t*) work;
      struct task_struct *task = NULL;
      struct task_struct *g;
 +    unsigned long flags;
  
      //int perf = PERF_MEASURE_START(&perf_process_group_exit_item);
      PSPRINTK("%s: entered\n",__func__);
          if(task->tgroup_home_id == w->tgroup_home_id &&
             task->tgroup_home_cpu == w->tgroup_home_cpu) {
              
 -            if(!task->represents_remote) {
 -                // active, send sigkill
 -                PSPRINTK("Issuing SIGKILL to pid %d\n",task->pid);
 -                kill_pid(task_pid(task), SIGKILL, 1);
 -            }
 +            if (!task->represents_remote) { //similar to zap_other_threads
 +                              exit_robust_list(task);
 +                              task->robust_list = NULL;
 +                              // active, send sigkill
 +                              lock_task_sighand(task, &flags);
 +
 +                              task_clear_jobctl_pending(task, JOBCTL_PENDING_MASK);
 +                              sigaddset(&task->pending.signal, SIGKILL);
 +                              signal_wake_up(task, 1);
 +                              clear_ti_thread_flag(task, _TIF_USER_RETURN_NOTIFY);
 +
 +                              unlock_task_sighand(task, &flags);
 +
 +                      }
  
              // If it is a shadow task, it will eventually
              // get killed when its corresponding active task
@@@ -3357,7 -3320,7 +3354,7 @@@ void process_munmap_request(struct work
      data_header_t *curr = NULL;
      mm_data_t* mm_data = NULL;
      mm_data_t* to_munmap = NULL;
-     struct mm_struct * mm_to_munmap = NULL;
+     struct mm_struct* mm_to_munmap = NULL;
  
      int perf = PERF_MEASURE_START(&perf_process_munmap_request);
  
             task->tgroup_home_id  == w->tgroup_home_id &&
             !(task->flags & PF_EXITING)) {
  
-             // Thread group has been found, perform munmap operation on this
-             // task.
-        if (task && task->mm ) {
-           mm_to_munmap =task->mm;
-       }
-       else
-               printk("%s: pirla\n", __func__);
-       // TODO try and check if make sense
              // Take note of the fact that an mm exists on the remote kernel
              set_cpu_has_known_tgroup_mm(task,w->from_cpu);
+             
+             if (task->mm) {
+                 mm_to_munmap = task->mm;
+             }
+             else
+                 printk("%s: pirla\n", __func__);
  
-             goto done; // thread grouping - threads all share a common mm.
+             goto done; 
          }
      } while_each_thread(g,task);
  done:
      read_unlock(&tasklist_lock);
  
-       if(mm_to_munmap) {
-        PS_DOWN_WRITE(&task->mm->mmap_sem);
-        current->enable_distributed_munmap = 0;
-        do_munmap(mm_to_munmap, w->vaddr_start, w->vaddr_size);
-        current->enable_distributed_munmap = 1;
-        PS_UP_WRITE(&task->mm->mmap_sem);
-        }
+     if(mm_to_munmap) {
+         PS_DOWN_WRITE(&mm_to_munmap->mmap_sem);
+         current->enable_distributed_munmap = 0;
+         do_munmap(mm_to_munmap, w->vaddr_start, w->vaddr_size);
+         current->enable_distributed_munmap = 1;
+         PS_UP_WRITE(&mm_to_munmap->mmap_sem);
+     }
+     else
+       printk("%s: unexpected error task %p task->mm %p\n", 
+                __func__, task, (task ? task->mm : 0) );
      // munmap the specified region in any saved mm's as well.
      // This keeps old mappings saved in the mm of dead thread
      // group members from being resolved accidentally after
@@@ -3424,13 -3387,15 +3421,15 @@@ found
          current->enable_distributed_munmap = 0;
          do_munmap(to_munmap->mm, w->vaddr_start, w->vaddr_size);
          current->enable_distributed_munmap = 1;
-         if (to_munmap && to_munmap->mm)
-             PS_UP_WRITE(&to_munmap->mm->mmap_sem);
-         else
-             printk(KERN_ALERT"%s: ERROR2: to_munmap %p mm %p\n", __func__, to_munmap, to_munmap?to_munmap->mm:0);
+       if (to_munmap && to_munmap->mm)
+               PS_UP_WRITE(&to_munmap->mm->mmap_sem);
+       else
+               printk(KERN_ALERT"%s: ERROR2: to_munmap %p mm %p\n",
+                                __func__, to_munmap, to_munmap?to_munmap->mm:0);
      }
      else if (to_munmap) // It is OK for to_munmap to be null, but not to_munmap->mm
-         printk(KERN_ALERT"%s: ERROR1: to_munmap %p mm %p\n", __func__, to_munmap, to_munmap?to_munmap->mm:0);
+       printk(KERN_ALERT"%s: ERROR1: to_munmap %p mm %p\n",
+                        __func__, to_munmap, to_munmap?to_munmap->mm:0);
  
      // Construct response
      response.header.type = PCN_KMSG_TYPE_PROC_SRV_MUNMAP_RESPONSE;
@@@ -3464,57 -3429,45 +3463,45 @@@ void process_mprotect_item(struct work_
      int tgroup_home_id  = w->tgroup_home_id;
      unsigned long start = w->start;
      size_t len = w->len;
-     unsigned long prot = w->prot;
      struct task_struct* task, *g;
      data_header_t* curr = NULL;
      mm_data_t* mm_data = NULL;
      mm_data_t* to_munmap = NULL;
-     struct mm_structmm_to_munmap = NULL;
+     struct mm_struct *mm_to_munmap = NULL;
  
      int perf = PERF_MEASURE_START(&perf_process_mprotect_item);
      
      // Find the task
      read_lock(&tasklist_lock);
      do_each_thread(g,task) {
- //    task_lock(task); // TODO consider to use this
+         // Look for the thread group
          if (task->tgroup_home_cpu == tgroup_home_cpu &&
              task->tgroup_home_id  == tgroup_home_id &&
              !(task->flags & PF_EXITING)) {
-            /* 
-             if (task->mm)
-                 // do_mprotect
-                 do_mprotect(task, start, len, prot,0);
- //            task_unlock(task); //TODO consider to use this
-           else
-               printk("%s: task->mm task:%p mm:%p\n",
-                       __func__, task, task->mm);
-             */
-             // doing mprotect here causes errors, I do not know why
-             // for now I will unmap the region instead.
-             //do_mprotect(task,start,len,prot,0);
-             
-            if (task && task->mm ) {
-                    mm_to_munmap = task->mm;
-            }
-           // Take note of the fact that an mm exists on the remote kernel
+             // Take note of the fact that an mm exists on the remote kernel
              set_cpu_has_known_tgroup_mm(task,w->from_cpu);
  
-             // then quit
+             if(task->mm) {
+                 mm_to_munmap = task->mm;
+             }
+             else
+                 printk("%s: pirla\n",__func__);
+             
              goto done;
          }
- //    task_unlock(task); // TODO consider to use this
      } while_each_thread(g,task);
  done:
      read_unlock(&tasklist_lock);
  
-       if(mm_to_munmap) {
-         PS_DOWN_WRITE(&task->mm->mmap_sem);
+     if(mm_to_munmap) {
+         PS_DOWN_WRITE(&mm_to_munmap->mmap_sem);
          current->enable_distributed_munmap = 0;
          do_munmap(mm_to_munmap, start, len);
          current->enable_distributed_munmap = 1;
-         PS_UP_WRITE(&task->mm->mmap_sem);
-         }
+         PS_UP_WRITE(&mm_to_munmap->mmap_sem);
+     }
  
      // munmap the specified region in any saved mm's as well.
      // This keeps old mappings saved in the mm of dead thread
@@@ -4484,19 -4437,6 +4471,19 @@@ perf_cc = native_read_tsc()
      clone_data->sched_class = request->sched_class;
      clone_data->lock = __SPIN_LOCK_UNLOCKED(&clone_data->lock);
  
 +    /*mklinux_akshay*/
 +    clone_data->origin_pid =request->origin_pid;
 +    clone_data->remote_blocked = request->remote_blocked ;
 +    clone_data->remote_real_blocked = request->remote_real_blocked;
 +    clone_data->remote_saved_sigmask = request->remote_saved_sigmask ;
 +    clone_data->remote_pending = request->remote_pending;
 +
 +    clone_data->sas_ss_sp = request->sas_ss_sp;
 +    clone_data->sas_ss_size = request->sas_ss_size;
 +    int cnt=0;
 +    for(cnt=0;cnt<_NSIG;cnt++)
 +      clone_data->action[cnt] = request->action[cnt];
 +
      /*
       * Pull in vma data
       */
@@@ -4736,7 -4676,6 +4723,6 @@@ int process_server_import_address_space
          vma_data_t* vma_curr = NULL;
          int mmap_flags = 0;
          int vmas_installed = 0;
-         int ptes_installed = 0;
          unsigned long err = 0;
  
          vma_curr = clone_data->vma_list;
                          PSPRINTK("vma->pte_curr == null\n");
                      }
                      while(pte_curr) {
 +                        // MAP it
 +                        int domapping = 1;
 +                        pte_t* remap_pte = NULL;
 +                        pmd_t* remap_pmd = NULL;
 +                        pud_t* remap_pud = NULL;
 +                        pgd_t* remap_pgd = NULL;
 +
                          PS_DOWN_WRITE(&current->mm->mmap_sem);
                          err = remap_pfn_range_remaining(current->mm,
                                                          vma,
      current->thread.usersp = clone_data->thread_usersp;
     
  
 +    //mklinux_akshay
 +    current->origin_pid = clone_data->origin_pid;
 +    sigorsets(&current->blocked,&current->blocked,&clone_data->remote_blocked) ;
 +    sigorsets(&current->real_blocked,&current->real_blocked,&clone_data->remote_real_blocked);
 +    sigorsets(&current->saved_sigmask,&current->saved_sigmask,&clone_data->remote_saved_sigmask);
 +    current->pending = clone_data->remote_pending;
 +    current->sas_ss_sp = clone_data->sas_ss_sp;
 +    current->sas_ss_size = clone_data->sas_ss_size;
 +
 +    printk(KERN_ALERT "origin pid {%d}-{%d} \n",current->origin_pid,clone_data->origin_pid);
 +
 +    int cnt=0;
 +     for(cnt=0;cnt<_NSIG;cnt++)
 +       current->sighand->action[cnt] = clone_data->action[cnt];
 +
      // Set output variables.
      *sp = clone_data->thread_usersp;
      *ip = clone_data->regs.ip;
      // We assume that an exec is going on and the current process is the one is executing
      // (a switch will occur if it is not the one that must execute)
      { // FS/GS update --- start
-     unsigned long fs, gs;
      unsigned int fsindex, gsindex;
                      
      savesegment(fs, fsindex);
      if ( !(clone_data->thread_fs) || !(__user_addr(clone_data->thread_fs)) ) {
-       printk(KERN_ERR "%s: ERROR corrupted fs base address %p\n", __func__, clone_data->thread_fs);
+       printk(KERN_ERR "%s: ERROR corrupted fs base address 0x%lx\n", __func__, clone_data->thread_fs);
      }    
      current->thread.fsindex = clone_data->thread_fsindex;
      current->thread.fs = clone_data->thread_fs;
                               
      savesegment(gs, gsindex); //read the gs register in gsindex variable
      if ( !(clone_data->thread_gs) && !(__user_addr(clone_data->thread_gs)) ) {
-       printk(KERN_ERR "%s: ERROR corrupted gs base address %p\n", __func__, clone_data->thread_gs);      
+       printk(KERN_ERR "%s: ERROR corrupted gs base address 0x%lx\n", __func__, clone_data->thread_gs);      
      }
      current->thread.gs = clone_data->thread_gs;    
      current->thread.gsindex = clone_data->thread_gsindex;
  
  
      perf_e = native_read_tsc();
 -    printk("%s %llu %llu %llu %llu %llu %llu %llu %llu %llu %llu (%d)\n",
 +    printk("%s %llu %llu %llu %llu %llu %llu %llu %llu %llu %llu (%d) {%d} \n",
              __func__,
              perf_aa, perf_bb, perf_cc, perf_dd, perf_ee,
 -            perf_a, perf_b, perf_c, perf_d, perf_e, current->t_home_id);
 +            perf_a, perf_b, perf_c, perf_d, perf_e, current->t_home_id,current->pid);
  
      return 0;
  }
@@@ -5078,7 -4994,6 +5063,6 @@@ int process_server_do_group_exit(void) 
      // the list does not include the current processor group descirptor (TODO)
      struct list_head *iter;
      _remote_cpu_info_list_t *objPtr;
- extern struct list_head rlist_head;
      list_for_each(iter, &rlist_head) {
          objPtr = list_entry(iter, _remote_cpu_info_list_t, cpu_list_member);
          i = objPtr->_data._processor;
@@@ -5196,10 -5111,6 +5180,6 @@@ finished_membership_search
          // take over, so do not mark this as executing for remote
          current->executing_for_remote = 0;
  
-         // Migrate back - you just had an out of body experience, you will wake in
-         //                a familiar place (a place you've been before), but unfortunately, 
-         //                your life is over.
-         //                Note: comments like this must == I am tired.
  #ifndef SUPPORT_FOR_CLUSTERING
          for(i = 0; i < NR_CPUS; i++) {
            // Skip the current cpu
          struct list_head *iter;
          _remote_cpu_info_list_t *objPtr;
        struct cpumask *pcpum =0;
- extern struct list_head rlist_head;
          list_for_each(iter, &rlist_head) {
            objPtr = list_entry(iter, _remote_cpu_info_list_t, cpu_list_member);
            i = objPtr->_data._processor;
            // the list does not include the current processor group descirptor (TODO)
            struct list_head *iter;
            _remote_cpu_info_list_t *objPtr;
- extern struct list_head rlist_head;
              list_for_each(iter, &rlist_head) {
                objPtr = list_entry(iter, _remote_cpu_info_list_t, cpu_list_member);
                i = objPtr->_data._processor;
@@@ -5404,7 -5313,6 +5382,6 @@@ int process_server_do_munmap(struct mm_
      // the list does not include the current processor group descirptor (TODO)
      struct list_head *iter;
      _remote_cpu_info_list_t *objPtr;
- extern struct list_head rlist_head;
      list_for_each(iter, &rlist_head) {
          objPtr = list_entry(iter, _remote_cpu_info_list_t, cpu_list_member);
          i = objPtr->_data._processor;
@@@ -5447,13 -5355,14 +5424,14 @@@ exit
  void process_server_do_mprotect(struct task_struct* task,
                                  unsigned long start,
                                  size_t len,
-                                 unsigned long prot) {
+                                 unsigned long prot)
+ {
      mprotect_data_t* data;
      mprotect_request_t request;
      int i;
      int s;
      int perf = -1;
-     unsigned lockflags;
+     unsigned long lockflags;
  
       // Nothing to do for a thread group that's not distributed.
      if(!current->tgroup_distributed) {
      // the list does not include the current processor group descirptor (TODO)
      struct list_head *iter;
      _remote_cpu_info_list_t *objPtr;
- extern struct list_head rlist_head;
      list_for_each(iter, &rlist_head) {
          objPtr = list_entry(iter, _remote_cpu_info_list_t, cpu_list_member);
          i = objPtr->_data._processor;
      // OK, all responses are in, we can proceed.
  
      spin_lock_irqsave(&_mprotect_data_head_lock,lockflags);
-     remove_data_entry_from(data,
-                            &_mprotect_data_head);
+     remove_data_entry_from(data, &_mprotect_data_head);
      spin_unlock_irqrestore(&_mprotect_data_head_lock,lockflags);
  
      kfree(data);
@@@ -5587,16 -5494,13 +5563,13 @@@ int process_server_try_handle_mm_fault(
  
      mapping_request_data_t *data;
      unsigned long err = 0;
-     int ret = 0;
      mapping_request_t request;
-     int i;
-     int s;
-     int j;
+     int i, s, j, ret=0;
      struct file* f;
      unsigned long prot = 0;
      unsigned char started_outside_vma = 0;
      unsigned char did_early_removal = 0;
-     char path[512];
+     char path[512]; //TODO must be kmalloc-ed
      char* ppath;
      // for perf
      unsigned char pte_provided = 0;
      // the list does not include the current processor group descirptor (TODO)
      struct list_head *iter;
      _remote_cpu_info_list_t *objPtr;
- extern struct list_head rlist_head;
      list_for_each(iter, &rlist_head) { 
          objPtr = list_entry(iter, _remote_cpu_info_list_t, cpu_list_member);
          i = objPtr->_data._processor;
  
              // Check remap_pfn_range success
              if(remap_pfn_range_err) {
-                 printk(KERN_ALERT"ERROR: Failed to remap_pfn_range %d\n",err);
+                 printk(KERN_ALERT"ERROR: Failed to remap_pfn_range %ld\n",err);
              } else {
                  PSPRINTK("remap_pfn_range succeeded\n");
                  ret = 1;
@@@ -6015,7 -5918,7 +5987,7 @@@ int process_server_dup_task(struct task
      task->t_distributed = 0;
      task->previous_cpus = 0;
      task->known_cpu_with_tgroup_mm = 0;
-     task->return_disposition = RETURN_DISPOSITION_EXIT;
+     task->return_disposition = RETURN_DISPOSITION_NONE;
  
      // If this is pid 1 or 2, the parent cannot have been migrated
      // so it is safe to take on all local thread info.
  
      // This is important.  We want to make sure to keep an accurate record
      // of which cpu and thread group the new thread is a part of.
-     if(orig->executing_for_remote == 1 || orig->tgroup_home_cpu != home_kernel ) {
+     if(orig->executing_for_remote == 1 || orig->tgroup_home_cpu != home_kernel) {
          task->tgroup_home_cpu = orig->tgroup_home_cpu;
          task->tgroup_home_id = orig->tgroup_home_id;
          task->tgroup_distributed = 1;
 +
 +       // task->origin_pid = orig->origin_pid;
      } else {
          task->tgroup_home_cpu = home_kernel;
          task->tgroup_home_id = orig->tgid;
          task->tgroup_distributed = 0;
      }
 -
 +//printk(KERN_ALERT"TGID {%d} \n",task->tgid);
      return 1;
  }
 -
  /**
   * @brief Migrate the specified task <task> to a CPU on which
   * it has not yet executed.
@@@ -6086,40 -5988,15 +6058,40 @@@ static int do_migration_to_new_cpu(stru
      // This will be a placeholder process for the remote
      // process that is subsequently going to be started.
      // Block its execution.
 -    __set_task_state(task,TASK_UNINTERRUPTIBLE);
  
 +   // set_task_state(task,TASK_UNINTERRUPTIBLE); //mklinux_akshay modified to interruptible state
 +
 +
 +    int sig;
 +    struct task_struct *t=current;
      // Book keeping for previous cpu bitmask.
      set_bit(smp_processor_id(),&task->previous_cpus);
  
 +    printk("Procee---Futex: blocked signals\n");
 +    for (sig = 1; sig < NSIG; sig++) {
 +      if (sigismember(&t->blocked, sig)) {
 +              printk("POP: %d \n", sig);
 +      }
 +    }
 +    printk("Procee---futex: pending signals\n");
 +    for (sig = 1; sig < NSIG; sig++) {
 +      if (sigismember(&t->pending.signal, sig)) {
 +              printk("POP: %d \n", sig);
 +      }
 +    }
      // Book keeping for placeholder process.
      task->represents_remote = 1;
      task->t_distributed = 1;
  
 +    /*mklinux_akshay*/
 +    if(task->prev_pid==-1)
 +      task->origin_pid=task->pid;
 +    else
 +      task->origin_pid=task->origin_pid;
 +
 +   struct task_struct *par = task->parent;
 +
 +
      // Book keeping for distributed threads.
      task->tgroup_distributed = 1;
      do_each_thread(g,tgroup_iterator) {
      request->normal_prio = task->normal_prio;
      request->rt_priority = task->rt_priority;
      request->sched_class = task->policy;
 -    
 +
 +    /*mklinux_akshay*/
 +    if (task->prev_pid == -1)
 +      request->origin_pid = task->pid;
 +    else
 +      request->origin_pid = task->origin_pid;
 +    request->remote_blocked = task->blocked;
 +    request->remote_real_blocked = task->real_blocked;
 +    request->remote_saved_sigmask = task->saved_sigmask;
 +    request->remote_pending = task->pending;
 +    request->sas_ss_sp = task->sas_ss_sp;
 +    request->sas_ss_size = task->sas_ss_size;
 +    int cnt = 0;
 +    for (cnt = 0; cnt < _NSIG; cnt++)
 +      request->action[cnt] = task->sighand->action[cnt];
 +
      // struct thread_struct -------------------------------------------------------
      // have a look at: copy_thread() arch/x86/kernel/process_64.c 
      // have a look at: struct thread_struct arch/x86/include/asm/processor.h
@@@ -6309,9 -6171,7 +6281,9 @@@ els
  
      //dump_task(task,regs,request->stack_ptr);
      
 -    PERF_MEASURE_STOP(&perf_process_server_do_migration,"migration to new cpu",perf);
 +   set_task_state(task,TASK_INTERRUPTIBLE);
 +
 +    PERF_MEASURE_STOP(&perf_process_server_do_migration," ",perf);
  
      return PROCESS_SERVER_CLONE_SUCCESS;
  
@@@ -6345,7 -6205,6 +6317,6 @@@ static int do_migration_back_to_previou
      task->executing_for_remote = 0;
      task->represents_remote = 1;
      task->t_distributed = 1; // This should already be the case
-     task->return_disposition = RETURN_DISPOSITION_EXIT;
      
      // Build message
      mig.tgroup_home_cpu = task->tgroup_home_cpu;
@@@ -6401,7 -6260,6 +6372,7 @@@ int process_server_do_migration(struct 
      int ret = 0;
  
  #ifndef SUPPORT_FOR_CLUSTERING
 +    printk(KERN_ALERT"%s: normal migration\n",__func__);
      if(test_bit(cpu,&task->previous_cpus)) {
          ret = do_migration_back_to_previous_cpu(task,cpu);
      } else {
                       "(cpu: %d present_mask)\n", __func__, task, cpu);
          return -EBUSY;
      }
 +    printk(KERN_ALERT"%s: clustering activated\n",__func__);
      // TODO seems like that David is using previous_cpus as a bitmask.. 
      // TODO well this must be upgraded to a cpumask, declared as usigned long in task_struct
      struct list_head *iter;
      _remote_cpu_info_list_t *objPtr;
      struct cpumask *pcpum =0;
      int cpuid=-1;
- extern struct list_head rlist_head;
      list_for_each(iter, &rlist_head) {
          objPtr = list_entry(iter, _remote_cpu_info_list_t, cpu_list_member);
          cpuid = objPtr->_data._processor;
          pcpum = &(objPtr->_data._cpumask);
        if (cpumask_test_cpu(cpu, pcpum)) {
 +      printk(KERN_ALERT"%s: cpuid {%d} \n",cpuid);
                if ( bitmap_intersects(cpumask_bits(pcpum),
                                       &(task->previous_cpus),
                                       (sizeof(unsigned long)*8)) )
   * implements the actions that must be made immediately after
   * the newly awoken task resumes execution.
   */
- void process_server_do_return_disposition(void) {
+ void process_server_do_return_disposition(void)
+ {
      PSPRINTK("%s\n",__func__);
-     switch(current->return_disposition) {
+     int return_disposition = current->return_disposition;
+     // Reset the return disposition
+     current->return_disposition = RETURN_DISPOSITION_NONE;
+     
+     switch(return_disposition) {
+     case RETURN_DISPOSITION_NONE:
+         printk("%s: ERROR, return disposition is none!\n",__func__);
+         break;      
      case RETURN_DISPOSITION_MIGRATE:
          // Nothing to do, already back-imported the
          // state in process_back_migration.  This will
@@@ -6475,11 -6336,8 +6451,11 @@@ static int __init process_server_init(v
      /*
       * Cache some local information.
       */
 -    _cpu = smp_processor_id();
 -
 +#ifndef SUPPORT_FOR_CLUSTERING
 +           _cpu= smp_processor_id();
 +#else
 +         _cpu = cpumask_first(cpu_present_mask);
 +#endif
      /*
       * Init global semaphores
       */
diff --combined pcnmsg/pcn_kmsg.c
@@@ -1,8 -1,9 +1,9 @@@
  /*
   * Inter-kernel messaging support for Popcorn
   *
-  * Current ver: Antonio Barbalace, Phil Wilshire 2013
-  * First ver: Ben Shelton <beshelto@vt.edu> 2013
+  * Antonio Barbalace, David Katz, Marina Sadini 2014
+  * Antonio Barbalace, Marina Sadini, Phil Wilshire 2013
+  * Ben Shelton 2012 - 2013
   */
  
  #include <linux/irq.h>
  #define KMSG_PRINTK(...) ;
  #endif
  
+ /*
+  * MAX_LOOPS must be calibrated on each architecture. Is difficult to give a
+  * good estimation for this parameter. Worst case must be equal or inferior to
+  * the minimum time we will be blocked for a call to schedule_timeout
+  */
+ #define MAX_LOOPS 12345
+ #define MAX_LOOPS_JIFFIES (MAX_SCHEDULE_TIMEOUT)
  
  #define MCAST_VERBOSE 0
  
@@@ -96,11 -104,6 +104,11 @@@ struct workqueue_struct *messaging_wq
  #define PCN_WARN(...) ;
  #define PCN_ERROR(...) printk(__VA_ARGS__)
  
 +unsigned long long total_sleep_win_put = 0;
 +unsigned int sleep_win_put_count = 0;
 +unsigned long long total_sleep_win_get = 0;
 +unsigned int sleep_win_get_count = 0;
 +
  struct pcn_kmsg_hdr log_receive[LOGLEN];
  struct pcn_kmsg_hdr log_send[LOGLEN];
  int log_r_index=0;
@@@ -111,8 -114,13 +119,13 @@@ int log_f_index=0
  int log_f_sendindex=0;
  void * log_function_send[LOGCALL];
  
+ #define SIZE_RANGES 7
+ unsigned int large_message_sizes[(SIZE_RANGES +1)];
+ unsigned int large_message_count[(SIZE_RANGES +1)];
+ unsigned int type_message_count[PCN_KMSG_TYPE_MAX];
  /* From Wikipedia page "Fetch and add", modified to work for u64 */
- static inline unsigned long fetch_and_add(volatile unsigned long * variable, 
/*static inline unsigned long fetch_and_add(volatile unsigned long * variable, 
                                          unsigned long value)
  {
        asm volatile( 
                     : "a" (value), "m" (*variable)  //Input
                     :"memory" );
        return value;
- }
+ }*/
+ #define fetch_and_add xadd_sync
  
  static inline unsigned long win_inuse(struct pcn_kmsg_window *win) 
  {
@@@ -133,7 -142,7 +147,7 @@@ static inline int win_put(struct pcn_km
                          int no_block) 
  {
        unsigned long ticket;
-     unsigned long long sleep_start;
+       unsigned long loop;
  
        /* if we can't block and the queue is already really long, 
           return EAGAIN */
        /* spin until there's a spot free for me */
        //while (win_inuse(win) >= RB_SIZE) {}
        //if(ticket>=PCN_KMSG_RBUF_SIZE){
-     sleep_start = native_read_tsc();
-               while((win->buffer[ticket%PCN_KMSG_RBUF_SIZE].last_ticket != ticket-PCN_KMSG_RBUF_SIZE)) {
-                       //pcn_cpu_relax();
-                       //msleep(1);
-               }
-               while(  win->buffer[ticket%PCN_KMSG_RBUF_SIZE].ready!=0){
-                       //pcn_cpu_relax();
-                       //msleep(1);
-               }
-     total_sleep_win_put += native_read_tsc() - sleep_start;
-     sleep_win_put_count++;
        //}
+       loop=0;  
+       while( (win->buffer[ticket%PCN_KMSG_RBUF_SIZE].last_ticket
+         != (ticket - PCN_KMSG_RBUF_SIZE)) ) {
+               //pcn_cpu_relax();
+               //msleep(1);
+               if ( !(++loop % MAX_LOOPS) )
+                       schedule_timeout(MAX_LOOPS_JIFFIES);
+       }
+       /* the following it is always false because add is after ready=0*/
+       //loop=0;
+       while( win->buffer[ticket%PCN_KMSG_RBUF_SIZE].ready!=0 ) {
+               pcn_cpu_relax();
+               //msleep(1);
+               //if ( !(++loop % MAX_LOOPS) )
+               //      schedule_timeout(MAX_LOOPS_JIFFIES);
+       }
+       
        /* insert item */
        memcpy(&win->buffer[ticket%PCN_KMSG_RBUF_SIZE].payload,
               &msg->payload, PCN_KMSG_PAYLOAD_SIZE);
@@@ -198,52 -213,39 +218,39 @@@ static inline int win_get(struct pcn_km
                          struct pcn_kmsg_reverse_message **msg) 
  {
        struct pcn_kmsg_reverse_message *rcvd;
-     unsigned long long sleep_start;
+       unsigned long loop;
  
        if (!win_inuse(win)) {
                KMSG_PRINTK("nothing in buffer, returning...\n");
                return -1;
        }
  
-       KMSG_PRINTK("reached win_get, head %lu, tail %lu\n", 
-                   win->head, win->tail);      
-       /* spin until entry.ready at end of cache line is set */
+       KMSG_PRINTK("reached win_get, head %lu, tail %lu\n", win->head, win->tail);
        rcvd =(struct pcn_kmsg_reverse_message*) &(win->buffer[win->tail % PCN_KMSG_RBUF_SIZE]);
-       //KMSG_PRINTK("%s: Ready bit: %u\n", __func__, rcvd->hdr.ready);
  
-     sleep_start = native_read_tsc();
+       /* spin until entry.ready at end of cache line is set */
+       loop=0;
        while (!rcvd->ready) {
                //pcn_cpu_relax();
                //msleep(1);
+               if ( !(++loop % MAX_LOOPS) )
+                       schedule_timeout(MAX_LOOPS_JIFFIES);
        }
-     total_sleep_win_get += native_read_tsc() - sleep_start;
-     sleep_win_get_count++;
-       // barrier here?
-       pcn_barrier();
  
-       //log_receive[log_r_index%LOGLEN]=rcvd->hdr;
-       memcpy(&(log_receive[log_r_index%LOGLEN]),&(rcvd->hdr),sizeof(struct pcn_kmsg_hdr));
+       /* statistics */
+       memcpy(&(log_receive[log_r_index%LOGLEN]),
+              &(rcvd->hdr),
+              sizeof(struct pcn_kmsg_hdr));
        log_r_index++;
+       msg_get++;
  
-       //rcvd->hdr.ready = 0;
-       *msg = rcvd;    
- msg_get++;
+       *msg = rcvd;
        return 0;
  }
  
- static inline void win_advance_tail(struct pcn_kmsg_window *win) 
- {
-       win->tail++;
- }
+ /*static inline void win_advance_tail(struct pcn_kmsg_window *win) 
+ { win->tail++; }
+ */
  
  /* win_enable_int
   * win_disable_int
@@@ -471,18 -473,14 +478,14 @@@ static int pcn_read_proc(char *page, ch
        char *p= page;
      int len, i, idx;
  
-     p += sprintf(p, "Sleep in win_put[total,count,avg] = [%llx,%lx,%llx]\n",
-                     total_sleep_win_put,
-                     sleep_win_put_count,
-                     sleep_win_put_count? total_sleep_win_put/sleep_win_put_count:0);
-     p += sprintf(p, "Sleep in win_get[total,count,avg] = [%llx,%lx,%llx]\n",
-                     total_sleep_win_get,
-                     sleep_win_get_count,
-                     sleep_win_get_count? total_sleep_win_get/sleep_win_get_count:0);
-       p += sprintf(p, "messages get: %ld\n", msg_get);
-         p += sprintf(p, "messages put: %ld\n", msg_put);
+       p += sprintf(p, "get: %ld\n", msg_get);
+         p += sprintf(p, "put: %ld\n", msg_put);
+     for (i =0; i<(SIZE_RANGES +1); i++)
+       p +=sprintf (p,"%u: %u\n", large_message_sizes[i], large_message_count[i]);
+     
+     for (i =0; i<PCN_KMSG_TYPE_MAX; i++)
+       p +=sprintf (p, "t%u: %u\n", i, type_message_count[i]);
+     
      idx = log_r_index;
      for (i =0; i>-LOGLEN; i--)
        p +=sprintf (p,"r%d: from%d type%d %1d:%1d:%1d seq%d\n",
                        (idx+i),(int) log_send[(idx+i)%LOGLEN].from_cpu, (int)log_send[(idx+i)%LOGLEN].type,
                        (int) log_send[(idx+i)%LOGLEN].is_lg_msg, (int)log_send[(idx+i)%LOGLEN].lg_start,
                        (int) log_send[(idx+i)%LOGLEN].lg_end, (int) log_send[(idx+i)%LOGLEN].lg_seqnum );
+ /*
      idx = log_f_index;
          for (i =0; i>-LOGCALL; i--)
                p +=sprintf (p,"f%d: %pB\n",
  
          for(i=0; i<PCN_KMSG_RBUF_SIZE; i++)
                p +=sprintf (p,"second_buffer[%i]=%i\n",i,rkvirt[my_cpu]->second_buffer[i]);
+ */
  
        len = (p -page) - off;
        if (len < 0)
        return len;
  }
  
+ static int pcn_write_proc (struct file *file, const char __user *buffer, unsigned long count, void *data)
+ {
+   int i;
+       msg_get=0;
+       msg_put=0;
+       memset(large_message_count, 0, sizeof(int)*(SIZE_RANGES +1));
+       memset(large_message_sizes, 0, sizeof(int)*(SIZE_RANGES +1));
+       for (i=0; i<SIZE_RANGES; i++)
+         large_message_sizes[i] = ((i+1)*PCN_KMSG_PAYLOAD_SIZE);
+       large_message_sizes[SIZE_RANGES] = ~0;
+       memset(type_message_count, 0, sizeof(int)*PCN_KMSG_TYPE_MAX);
+       return count;
+ }
  static int __init pcn_kmsg_init(void)
  {
        int rc,i;
                boot_params_va->pcn_kmsg_master_window = rkinfo_phys_addr;
                KMSG_INIT("boot_params virt %p phys %p\n",
                        boot_params_va, orig_boot_params);
+               
+               KMSG_INIT("LOOPS %ld, LOOPS_JIFFIES %ld, 1nsec %ld, 1usec %ld, 1msec %ld, 1jiffies %d %d\n",
+                         MAX_LOOPS, MAX_LOOPS_JIFFIES, nsecs_to_jiffies(1), usecs_to_jiffies(1), msecs_to_jiffies(1),
+                         jiffies_to_msecs(1), jiffies_to_usecs(1));
        }
        else {
                KMSG_INIT("Primary kernel rkinfo phys addr: 0x%lx\n", 
                }
        } 
  
+       /* proc interface for debugging and stats */
+       memset(large_message_count, 0, sizeof(int)*(SIZE_RANGES +1));
+       memset(large_message_sizes, 0, sizeof(int)*(SIZE_RANGES +1));
+       for (i=0; i<SIZE_RANGES; i++)
+         large_message_sizes[i] = ((i+1)*PCN_KMSG_PAYLOAD_SIZE);
+       large_message_sizes[SIZE_RANGES] = ~0;
+       memset(type_message_count, 0, sizeof(int)*PCN_KMSG_TYPE_MAX);
        memset(log_receive,0,sizeof(struct pcn_kmsg_hdr)*LOGLEN);
        memset(log_send,0,sizeof(struct pcn_kmsg_hdr)*LOGLEN);
        memset(log_function_called,0,sizeof(void*)*LOGCALL);
                return -ENOMEM;
        }
        res->read_proc = pcn_read_proc;
+       res->write_proc = pcn_write_proc;
  
        return 0;
  }
@@@ -736,6 -761,7 +766,7 @@@ static int __pcn_kmsg_send(unsigned in
        msg->hdr.from_cpu = my_cpu;
  
        rc = win_put(dest_window, msg, no_block);
+ type_message_count[msg->hdr.type]++;
  
        if (rc) {
                if (no_block && (rc == EAGAIN)) {
@@@ -828,6 -854,10 +859,10 @@@ int pcn_kmsg_send_long(unsigned int des
                if(ret!=0)
                        return ret;
        }
+       
+       /* statistics */
+       num_chunks = payload_size / PCN_KMSG_PAYLOAD_SIZE;
+       large_message_count[((num_chunks < SIZE_RANGES) ? num_chunks : SIZE_RANGES)]++;
  
        return 0;
  }
@@@ -1076,18 -1106,19 +1111,19 @@@ static int process_small_message(struc
        return work_done;
  }
  
+ static int poll_handler_check=0;
  static int pcn_kmsg_poll_handler(void)
  {
        struct pcn_kmsg_reverse_message *msg;
        struct pcn_kmsg_window *win = rkvirt[my_cpu]; // TODO this will not work for clustering
        int work_done = 0;
  
-       KMSG_PRINTK("called\n");
+       poll_handler_check++;
+       if (poll_handler_check >1)
+               printk("poll_hanlder_check %d concurrent calls not supported\n", poll_handler_check);
  
  pull_msg:
        /* Get messages out of the buffer first */
- //#define PCN_KMSG_BUDGET 128
-       //while ((work_done < PCN_KMSG_BUDGET) && (!win_get(rkvirt[my_cpu], &msg))) {
        while (! win_get(win, &msg) ) {
                KMSG_PRINTK("got a message!\n");
  
                        KMSG_PRINTK("message is a small message!\n");
                        work_done += process_small_message(msg);
                }
-               pcn_barrier();
                msg->ready = 0;
-               //win_advance_tail(win);
-               fetch_and_add(&win->tail, 1);
+               pcn_barrier();
+               
+               fetch_and_add(&win->tail, 1); //win_advance_tail(win);
+               
+               // NOTE
+               // why you need the ready bit if you are incrementing the tail?
+               // can we increment the tail before signaling the ready bit?
+               // in that way we can support multiple calls to this function
        }
  
        win_enable_int(win);
                goto pull_msg;
        }
  
+       poll_handler_check--;
        return work_done;
  }