Minor fix made in messaging layer; problem caused during integration
authorAkshay Giridhar <akshay87@vt.edu>
Wed, 16 Jul 2014 16:35:29 +0000 (12:35 -0400)
committerAkshay Giridhar <akshay87@vt.edu>
Wed, 16 Jul 2014 16:35:29 +0000 (12:35 -0400)
include/linux/pcn_kmsg.h
include/linux/popcorn_cpuinfo.h [new file with mode: 0644]
include/linux/process_server.h
kernel/futex.c
kernel/futex_remote.c
kernel/global_spinlock.c
kernel/process_server.c
kernel/signal.c
pcnmsg/pcn_kmsg.c
pcnmsg/pcn_kmsg_test.c

index a973c90..12c79bd 100644 (file)
@@ -18,7 +18,6 @@
 /* BOOKKEEPING */
 
 #define POPCORN_MAX_MCAST_CHANNELS 32
-#define LG_SEQNUM_SIZE 7
 
 struct pcn_kmsg_mcast_wininfo {
        volatile unsigned char lock;
@@ -51,6 +50,7 @@ typedef struct {
        pcn_kmsg_mcast_id id_to_join;
 } pcn_kmsg_work_t;
 
+
 /* MESSAGING */
 
 /* Enum for message types.  Modules should add types after
@@ -132,6 +132,9 @@ enum pcn_kmsg_prio {
        PCN_KMSG_PRIO_NORMAL
 };
 
+#define __READY_SIZE 1
+#define LG_SEQNUM_SIZE  (8 - __READY_SIZE)
+
 /* Message header */
 struct pcn_kmsg_hdr {
        unsigned int from_cpu   :8; // b0
@@ -142,14 +145,24 @@ struct pcn_kmsg_hdr {
        unsigned int is_lg_msg  :1;
        unsigned int lg_start   :1;
        unsigned int lg_end     :1;
-       unsigned long long_number;
 
-       unsigned int lg_seqnum  :LG_SEQNUM_SIZE;// b3
-       //volatile unsigned int ready   :1;
+       unsigned long long_number; // b3 .. b10
+
+       unsigned int lg_seqnum  :LG_SEQNUM_SIZE; // b11
+       unsigned int __ready    :__READY_SIZE;
 }__attribute__((packed));
 
+//#if ( &((struct pcn_kmsg_hdr*)0)->ready != 12 )
+//# error "ready is not the last byte of the struct"
+//#endif
+
+// TODO cache size can be retrieved by the compiler, put it here
+#define CACHE_LINE_SIZE 64
 //#define PCN_KMSG_PAYLOAD_SIZE 60
-#define PCN_KMSG_PAYLOAD_SIZE (64-sizeof(struct pcn_kmsg_hdr))
+#define PCN_KMSG_PAYLOAD_SIZE (CACHE_LINE_SIZE - sizeof(struct pcn_kmsg_hdr))
+
+#define MAX_CHUNKS ((1 << LG_SEQNUM_SIZE) -1)
+#define PCN_KMSG_LONG_PAYLOAD_SIZE (MAX_CHUNKS*PCN_KMSG_PAYLOAD_SIZE)
 
 /* The actual messages.  The expectation is that developers will create their
    own message structs with the payload replaced with their own fields, and then
@@ -162,12 +175,19 @@ struct pcn_kmsg_hdr {
 struct pcn_kmsg_message {
        struct pcn_kmsg_hdr hdr;
        unsigned char payload[PCN_KMSG_PAYLOAD_SIZE];
-}__attribute__((packed)) __attribute__((aligned(64)));
+}__attribute__((packed)) __attribute__((aligned(CACHE_LINE_SIZE)));
+
+struct pcn_kmsg_reverse_message {
+       unsigned char payload[PCN_KMSG_PAYLOAD_SIZE];
+       struct pcn_kmsg_hdr hdr;
+       volatile unsigned long last_ticket;
+       volatile unsigned char ready;
+}__attribute__((packed)) __attribute__((aligned(CACHE_LINE_SIZE)));
 
 /* Struct for sending long messages (>60 bytes payload) */
 struct pcn_kmsg_long_message {
        struct pcn_kmsg_hdr hdr;
-       unsigned char payload[512];
+       unsigned char payload[PCN_KMSG_LONG_PAYLOAD_SIZE];
 }__attribute__((packed));
 
 /* List entry to copy message into and pass around in receiving kernel */
@@ -177,13 +197,6 @@ struct pcn_kmsg_container {
 }__attribute__((packed));
 
 
-struct pcn_kmsg_reverse_message {
-       unsigned char payload[PCN_KMSG_PAYLOAD_SIZE];
-       struct pcn_kmsg_hdr hdr;
-       volatile unsigned char ready;
-       volatile unsigned long last_ticket;
-}__attribute__((packed)) __attribute__((aligned(64)));
-
 
 /* TYPES OF MESSAGES */
 
@@ -193,7 +206,7 @@ struct pcn_kmsg_checkin_message {
        unsigned long window_phys_addr;
        unsigned char cpu_to_add;
        char pad[51];
-}__attribute__((packed)) __attribute__((aligned(64)));
+}__attribute__((packed)) __attribute__((aligned(CACHE_LINE_SIZE)));
 
 
 
@@ -258,7 +271,7 @@ struct pcn_kmsg_mcast_message {
        unsigned int num_members;
        unsigned long window_phys_addr;
        char pad[28];
-}__attribute__((packed)) __attribute__((aligned(64)));
+}__attribute__((packed)) __attribute__((aligned(CACHE_LINE_SIZE)));
 
 struct pcn_kmsg_mcast_window {
        volatile unsigned long head;
diff --git a/include/linux/popcorn_cpuinfo.h b/include/linux/popcorn_cpuinfo.h
new file mode 100644 (file)
index 0000000..e160e2c
--- /dev/null
@@ -0,0 +1,37 @@
+
+ struct _remote_cpu_info_data
+ {
+                 unsigned int _processor;
+                 char _vendor_id[16];
+                 int _cpu_family;
+                unsigned int _model;
+                 char _model_name[64];
+                 int _stepping;
+                 unsigned long _microcode;
+                 unsigned _cpu_freq;
+                 int _cache_size;
+                 char _fpu[3];
+                 char _fpu_exception[3];
+                 int _cpuid_level;
+                 char _wp[3];
+                 char _flags[512];
+                 unsigned long _nbogomips;
+                int _TLB_size;
+                unsigned int _clflush_size;
+                int _cache_alignment;
+                unsigned int _bits_physical;
+                 unsigned int _bits_virtual;
+                char _power_management[64];
+                 struct cpumask _cpumask;
+ };
+
+
+typedef struct _remote_cpu_info_data _remote_cpu_info_data_t;
+struct _remote_cpu_info_list
+ {
+         _remote_cpu_info_data_t _data;
+         struct list_head cpu_list_member;
+
+  };
+typedef struct _remote_cpu_info_list _remote_cpu_info_list_t;
+
index b06b6e8..b63f14f 100644 (file)
@@ -15,8 +15,8 @@
 #define PROCESS_SERVER_CLONE_FAIL 1
 
 //configuration
-//#define SUPPORT_FOR_CLUSTERING
-#undef SUPPORT_FOR_CLUSTERING
+#define SUPPORT_FOR_CLUSTERING
+//#undef SUPPORT_FOR_CLUSTERING
 
 /*
  * Migration hook.
index ab4f3fe..7a06fea 100644 (file)
@@ -72,7 +72,7 @@
 #include <linux/mmu_context.h>
 #include <linux/string.h>
 
-#define FUTEX_VERBOSE 1 
+#define FUTEX_VERBOSE 0 
 #if FUTEX_VERBOSE
 #define FPRINTK(...) printk(__VA_ARGS__)
 #else
index 5c002f4..cb2c6e6 100644 (file)
@@ -570,6 +570,9 @@ static int handle_remote_futex_wake_response(struct pcn_kmsg_message* inc_msg) {
        FRPRINTK(KERN_ALERT"%s: response {%d} \n",
                        __func__, msg->errno);
        struct task_struct *p =  pid_task(find_vpid(msg->rem_pid), PIDTYPE_PID);
+        
+       if(!p)
+          goto out;
 
        get_task_struct(p);
 
@@ -587,7 +590,7 @@ static int handle_remote_futex_wake_response(struct pcn_kmsg_message* inc_msg) {
 
        put_task_struct(p);
 
-       pcn_kmsg_free_msg(inc_msg);
+out:   pcn_kmsg_free_msg(inc_msg);
        
        preempt_enable();
 
@@ -727,6 +730,9 @@ static int handle_remote_futex_key_response(struct pcn_kmsg_message* inc_msg) {
 
        struct task_struct *p =  pid_task(find_vpid(msg->rem_pid), PIDTYPE_PID);
 
+       if(!p)
+               goto out;
+       
        get_task_struct(p);
 
        struct spin_key sk;
@@ -745,7 +751,7 @@ static int handle_remote_futex_key_response(struct pcn_kmsg_message* inc_msg) {
 
        put_task_struct(p);
 
-       pcn_kmsg_free_msg(inc_msg);
+out:   pcn_kmsg_free_msg(inc_msg);
        
        preempt_enable();
        
index 0163e29..d753d25 100644 (file)
@@ -32,6 +32,12 @@ DEFINE_SPINLOCK(request_queue_lock);
 #define GENERAL_SPIN_LOCK(x,f) spin_lock_irqsave(x,f)
 #define GENERAL_SPIN_UNLOCK(x,f) spin_unlock_irqrestore(x,f)
 
+#define GSP_VERBOSE 0
+#if GSP_VERBOSE
+#define GSPRINTK(...) printk(__VA_ARGS__)
+#else
+#define GSPRINTK(...) ;
+#endif
 //extern functions
  extern struct vm_area_struct * getVMAfromUaddr(unsigned long uaddr);
  extern pte_t *do_page_walk(unsigned long address);
@@ -180,8 +186,6 @@ _global_value *hashgroup(struct task_struct *group_pid)
 int global_spinlock(unsigned long uaddr,futex_common_data_t *_data,_spin_value * value,_local_rq_t *rq_ptr,int localticket_value)
 __releases(&value->_sp)
 {
-       //preempt_disable();
-
        int res = 0;
        int cpu=0;
        unsigned int flgs;
@@ -195,7 +199,7 @@ __releases(&value->_sp)
        if(_data->ops==WAIT_OPS){
 
 //     printk(KERN_ALERT"%s: request -- entered whos calling{%s} \n", __func__,current->comm);
-//     printk(KERN_ALERT"%s:  uaddr {%lx}  pid{%d} current->tgroup_home_id{%d}\n",                             __func__,uaddr,current->pid,current->tgroup_home_id);
+       GSPRINTK(KERN_ALERT"%s:  uaddr {%lx}  pid{%d} current->tgroup_home_id{%d}\n",                           __func__,uaddr,current->pid,current->tgroup_home_id);
 
        // Finish constructing response
        wait_req->header.type = PCN_KMSG_TYPE_REMOTE_IPC_FUTEX_KEY_REQUEST;
@@ -233,7 +237,7 @@ __releases(&value->_sp)
                wake_req->flags = _data->flags;
 
                wake_req->ticket = localticket_value;//GET_TOKEN; //set the request has no ticket
-//             printk(KERN_ALERT"%s: wake uaddr2{%lx} data{%lx} \n",__func__,wake_req->uaddr2,_data->uaddr2);
+               GSPRINTK(KERN_ALERT"%s: wake uaddr2{%lx} data{%lx} \n",__func__,wake_req->uaddr2,_data->uaddr2);
        }
 
 
@@ -252,7 +256,7 @@ __releases(&value->_sp)
                                }
                                else
                                        wake_req->fn_flag |= FLAGS_REMOTECALL;
-  //                   printk(KERN_ALERT"%s: sending to origin remote callpfn cpu: 0x{%d} request->ticket{%d} \n",__func__,cpu,localticket_value);
+                       GSPRINTK(KERN_ALERT"%s: sending to origin remote callpfn cpu: 0x{%d} request->ticket{%d} \n",__func__,cpu,localticket_value);
                        if ((cpu = find_kernel_for_pfn(pfn, &pfn_list_head)) != -1){
                                spin_unlock(&value->_sp);
                                
@@ -266,7 +270,7 @@ __releases(&value->_sp)
                                        wake_req->fn_flag |= FLAGS_ORIGINCALL;
                                        wake_req->rflag = current->pid;
                                }
-//                     printk(KERN_ALERT"%s: sending to origin origin call cpu: 0x{%d} request->ticket{%d} \n",__func__,cpu,localticket_value);
+                       GSPRINTK(KERN_ALERT"%s: sending to origin origin call cpu: 0x{%d} request->ticket{%d} \n",__func__,cpu,localticket_value);
                        if ((cpu = find_kernel_for_pfn(pfn, &pfn_list_head)) != -1){
                                spin_unlock(&value->_sp);
                                
@@ -276,7 +280,7 @@ __releases(&value->_sp)
 
        //      printk(KERN_ALERT"%s:goto sleep after ticket request: 0x{%d} {%d}\n",__func__,cpu,current->pid);
                wait_event_interruptible(rq_ptr->_wq, (rq_ptr->status == DONE));
-       //      printk(KERN_ALERT"%s:after wake up process: task woken{%d}\n",__func__,current->pid);
+               GSPRINTK(KERN_ALERT"%s:after wake up process: task woken{%d}\n",__func__,current->pid);
 
 out:
    kfree(wake_req);
index c65ccf7..78c76ae 100644 (file)
@@ -256,19 +256,13 @@ static void perf_init(void) {
 #define PERF_MEASURE_STOP(x, y, z)
 #endif
 
-
 static DECLARE_WAIT_QUEUE_HEAD( countq);
 
-/**
- * Constants
- */
-#define RETURN_DISPOSITION_EXIT 0
-#define RETURN_DISPOSITION_MIGRATE 1
 
 /**
  * Library
  */
-
+#define POPCORN_MAX_PATH 512
 /**
  * Some piping for linking data entries
  * and identifying data entry types.
@@ -1426,11 +1420,11 @@ int find_consecutive_physically_mapped_region(struct mm_struct* mm,
                                               unsigned long* vaddr_mapping_start,
                                               unsigned long* paddr_mapping_start,
                                               size_t* paddr_mapping_sz) {
-    unsigned long paddr_curr = NULL;
+    unsigned long paddr_curr = 0l;
     unsigned long vaddr_curr = vaddr;
     unsigned long vaddr_next = vaddr;
-    unsigned long paddr_next = NULL;
-    unsigned long paddr_start = NULL;
+    unsigned long paddr_next = 0l;
+    unsigned long paddr_start = 0l;
     size_t sz = 0;
 
     
@@ -2434,9 +2428,9 @@ static int dump_page_walk_pte_entry_callback(pte_t *pte, unsigned long start,
 /**
  * @brief Displays relevant data within a mm.
  */
-static void dump_mm(struct mm_struct* mm) {
+static void dump_mm(struct mm_struct* mm) 
+{
     struct vm_area_struct * curr;
-    char buf[256];
     struct mm_walk walk = {
         .pte_entry = dump_page_walk_pte_entry_callback,
         .mm = mm,
@@ -2803,7 +2797,8 @@ static int count_local_thread_members(int tgroup_home_cpu,
  * thread group in which the "current" task resides.
  * @return The number of threads.
  */
-static int count_thread_members() {
+static int count_thread_members(void)
+ {
      
     int count = 0;
     PSPRINTK("%s: entered\n",__func__);
@@ -2998,7 +2993,7 @@ handled:
  */
 void process_mapping_request(struct work_struct* work) {
     mapping_request_work_t* w = (mapping_request_work_t*) work;
-    mapping_response_t response;
+    mapping_response_t* response;
     data_header_t* data_curr = NULL;
     mm_data_t* mm_data = NULL;
     struct task_struct* task = NULL;
@@ -3011,15 +3006,11 @@ void process_mapping_request(struct work_struct* work) {
         .pte_entry = vm_search_page_walk_pte_entry_callback,
         .private = &(resolved)
     };
-    char* plpath = NULL;
-    char lpath[512];
+    char *plpath = NULL, *lpath = NULL;
+    int used_saved_mm = 0, found_vma = 1, found_pte = 1; 
     int i;
     
-    // for perf
-    int used_saved_mm = 0;
-    int found_vma = 1;
-    int found_pte = 1;
-    
+    // for perf    
     // Perf start
     int perf = PERF_MEASURE_START(&perf_process_mapping_request);
 
@@ -3067,6 +3058,18 @@ task_mm_search_exit:
 
         PS_SPIN_UNLOCK(&_saved_mm_head_lock);
     }
+     response = kmalloc(sizeof(mapping_response_t), GFP_ATOMIC); //TODO convert to alloc_cache
+    if (!response) {
+      printk(KERN_ALERT"can not kmalloc mapping_response_t area from{%d} address{%lx} cpu{%d} id{%d}\n",
+             w->from_cpu, w->address, w->tgroup_home_cpu, w->tgroup_home_id);
+      goto err_work;
+    }
+    lpath = kmalloc(POPCORN_MAX_PATH, GFP_ATOMIC); //TODO convert to alloc_cache
+    if (!lpath) {
+      printk(KERN_ALERT"can not kmalloc lpath area from{%d} address{%lx} cpu{%d} id{%d}\n",
+             w->from_cpu, w->address, w->tgroup_home_cpu, w->tgroup_home_id);
+      goto err_response;
+    }
     
     // OK, if mm was found, look up the mapping.
     if(mm) {
@@ -3100,10 +3103,8 @@ changed_can_be_cow:
                 (address & PAGE_MASK) + PAGE_SIZE, &walk);
 
         if(vma && resolved != 0) {
-
             PSPRINTK("mapping found! %lx for vaddr %lx\n",resolved,
                     address & PAGE_MASK);
-
             /*
              * Find regions of consecutive physical memory
              * in this vma, including the faulting address
@@ -3121,45 +3122,41 @@ changed_can_be_cow:
                 downgrade_write(&mm->mmap_sem);
             }
 
-
             // Now grab all the mappings that we can stuff into the response.
-            if(0 != fill_physical_mapping_array(mm, 
-                                                vma,
-                                                address,
-                                                &response.mappings, 
-                                                MAX_MAPPINGS)) {
+         if (0 != fill_physical_mapping_array(mm, vma, address,
+                                                &(response->mappings[0]),
+                                               MAX_MAPPINGS)) {
                 // If the fill process fails, clear out all
                 // results.  Otherwise, we might trick the
                 // receiving cpu into thinking the target
                 // mapping was found when it was not.
                 for(i = 0; i < MAX_MAPPINGS; i++) {
-                    response.mappings[i].present = 0;
-                    response.mappings[i].vaddr = 0;
-                    response.mappings[i].paddr = 0;
-                    response.mappings[i].sz = 0;
-                }
-                    
+                    response->mappings[i].present = 0;
+                    response->mappings[i].vaddr = 0;
+                    response->mappings[i].paddr = 0;
+                    response->mappings[i].sz = 0;
+                }                    
             }
 
             }
 
-            response.header.type = PCN_KMSG_TYPE_PROC_SRV_MAPPING_RESPONSE;
-            response.header.prio = PCN_KMSG_PRIO_NORMAL;
-            response.tgroup_home_cpu = w->tgroup_home_cpu;
-            response.tgroup_home_id = w->tgroup_home_id;
-            response.requester_pid = w->requester_pid;
-            response.address = address;
-            response.present = 1;
-            response.vaddr_start = vma->vm_start;
-            response.vaddr_size = vma->vm_end - vma->vm_start;
-            response.prot = vma->vm_page_prot;
-            response.vm_flags = vma->vm_flags;
+            response->header.type = PCN_KMSG_TYPE_PROC_SRV_MAPPING_RESPONSE;
+            response->header.prio = PCN_KMSG_PRIO_NORMAL;
+            response->tgroup_home_cpu = w->tgroup_home_cpu;
+            response->tgroup_home_id = w->tgroup_home_id;
+            response->requester_pid = w->requester_pid;
+            response->address = address;
+            response->present = 1;
+            response->vaddr_start = vma->vm_start;
+            response->vaddr_size = vma->vm_end - vma->vm_start;
+            response->prot = vma->vm_page_prot;
+            response->vm_flags = vma->vm_flags;
             if(vma->vm_file == NULL) {
-                response.path[0] = '\0';
+                 response->path[0] = '\0';
             } else {    
                 plpath = d_path(&vma->vm_file->f_path,lpath,512);
-                strcpy(response.path,plpath);
-                response.pgoff = vma->vm_pgoff;
+                strcpy(response->path,plpath);
+                response->pgoff = vma->vm_pgoff;
             }
 
             // We modified this lock to be read-mode above so now
@@ -3174,15 +3171,12 @@ changed_can_be_cow:
                 PS_UP_READ(&mm->mmap_sem);
             // Zero out mappings
             for(i = 0; i < MAX_MAPPINGS; i++) {
-                response.mappings[i].present = 0;
-                response.mappings[i].vaddr = 0;
-                response.mappings[i].paddr = 0;
-                response.mappings[i].sz = 0;
+               response->mappings[i].present = 0;
+               response->mappings[i].vaddr = 0;
+               response->mappings[i].paddr = 0;
+               response->mappings[i].sz = 0;
             }
-
         }
-        
-
     }
 
     // Not found, respond accordingly
@@ -3190,44 +3184,44 @@ changed_can_be_cow:
         found_vma = 0;
         found_pte = 0;
         //PSPRINTK("Mapping not found\n");
-        response.header.type = PCN_KMSG_TYPE_PROC_SRV_MAPPING_RESPONSE;
-        response.header.prio = PCN_KMSG_PRIO_NORMAL;
-        response.tgroup_home_cpu = w->tgroup_home_cpu;
-        response.tgroup_home_id = w->tgroup_home_id;
-        response.requester_pid = w->requester_pid;
-        response.address = address;
-        response.present = 0;
-        response.vaddr_start = 0;
-        response.vaddr_size = 0;
-        response.path[0] = '\0';
+        response->header.type = PCN_KMSG_TYPE_PROC_SRV_MAPPING_RESPONSE;
+        response->header.prio = PCN_KMSG_PRIO_NORMAL;
+        response->tgroup_home_cpu = w->tgroup_home_cpu;
+        response->tgroup_home_id = w->tgroup_home_id;
+        response->requester_pid = w->requester_pid;
+        response->address = address;
+        response->present = 0;
+        response->vaddr_start = 0;
+        response->vaddr_size = 0;
+        response->path[0] = '\0';
 
         // Handle case where vma was present but no pte.
         if(vma) {
             //PSPRINTK("But vma present\n");
             found_vma = 1;
-            response.present = 1;
-            response.vaddr_start = vma->vm_start;
-            response.vaddr_size = vma->vm_end - vma->vm_start;
-            response.prot = vma->vm_page_prot;
-            response.vm_flags = vma->vm_flags;
+            response->present = 1;
+            response->vaddr_start = vma->vm_start;
+            response->vaddr_size = vma->vm_end - vma->vm_start;
+            response->prot = vma->vm_page_prot;
+            response->vm_flags = vma->vm_flags;
              if(vma->vm_file == NULL) {
-                 response.path[0] = '\0';
+               response->path[0] = '\0';
              } else {    
                  plpath = d_path(&vma->vm_file->f_path,lpath,512);
-                 strcpy(response.path,plpath);
-                 response.pgoff = vma->vm_pgoff;
+                 strcpy(response->path,plpath);
+                 response->pgoff = vma->vm_pgoff;
              }
         }
     }
 
     // Send response
-    if(response.present) {
+    if(response->present) {
         DO_UNTIL_SUCCESS(pcn_kmsg_send_long(w->from_cpu,
-                            (struct pcn_kmsg_long_message*)(&response),
+                            (struct pcn_kmsg_long_message*)(response),
                             sizeof(mapping_response_t) - 
                             sizeof(struct pcn_kmsg_hdr) -   //
-                            sizeof(response.path) +         // Chop off the end of the path
-                            strlen(response.path) + 1));    // variable to save bandwidth.
+                            sizeof(response->path) +         // Chop off the end of the path
+                            strlen(response->path) + 1));    // variable to save bandwidth.
     } else {
         // This is an optimization to get rid of the _long send 
         // which is a time sink.
@@ -3241,7 +3235,11 @@ changed_can_be_cow:
         DO_UNTIL_SUCCESS(pcn_kmsg_send(w->from_cpu,(struct pcn_kmsg_message*)(&nonpresent_response)));
 
     }
-
+    
+    kfree(lpath);
+err_response:
+    kfree(response);
+err_work:
     kfree(work);
 
     // Perf stop
@@ -3413,32 +3411,30 @@ void process_munmap_request(struct work_struct* work) {
            task->tgroup_home_id  == w->tgroup_home_id &&
            !(task->flags & PF_EXITING)) {
 
-            // Thread group has been found, perform munmap operation on this
-            // task.
+            // Take note of the fact that an mm exists on the remote kernel
+            set_cpu_has_known_tgroup_mm(task,w->from_cpu);
         if (task && task->mm ) {
            mm_to_munmap =task->mm;
        }
        else
                printk("%s: pirla\n", __func__);
 
-       // TODO try and check if make sense
-            // Take note of the fact that an mm exists on the remote kernel
-            set_cpu_has_known_tgroup_mm(task,w->from_cpu);
-
-            goto done; // thread grouping - threads all share a common mm.
-
+       goto done; // thread grouping - threads all share a common mm.
         }
     } while_each_thread(g,task);
 done:
     read_unlock(&tasklist_lock);
 
       if(mm_to_munmap) {
-        PS_DOWN_WRITE(&task->mm->mmap_sem);
+        PS_DOWN_WRITE(&mm_to_munmap->mmap_sem);
         current->enable_distributed_munmap = 0;
         do_munmap(mm_to_munmap, w->vaddr_start, w->vaddr_size);
         current->enable_distributed_munmap = 1;
-        PS_UP_WRITE(&task->mm->mmap_sem);
+        PS_UP_WRITE(&mm_to_munmap->mmap_sem);
         }
+       else
+       printk("%s: unexpected error task %p pid %d comm %s task->mm %p\n", 
+                __func__, task,task->pid,task->comm, (task ? task->mm : 0) );
     // munmap the specified region in any saved mm's as well.
     // This keeps old mappings saved in the mm of dead thread
     // group members from being resolved accidentally after
@@ -3521,39 +3517,28 @@ void process_mprotect_item(struct work_struct* work) {
         if (task->tgroup_home_cpu == tgroup_home_cpu &&
             task->tgroup_home_id  == tgroup_home_id &&
             !(task->flags & PF_EXITING)) {
-           /* 
-            if (task->mm)
-                // do_mprotect
-                do_mprotect(task, start, len, prot,0);
-//             task_unlock(task); //TODO consider to use this
-           else
-               printk("%s: task->mm task:%p mm:%p\n",
-                       __func__, task, task->mm);
-            */
-            // doing mprotect here causes errors, I do not know why
-            // for now I will unmap the region instead.
-            //do_mprotect(task,start,len,prot,0);
+          
+           // Take note of the fact that an mm exists on the remote kernel
+            set_cpu_has_known_tgroup_mm(task,w->from_cpu);
             
             if (task && task->mm ) {
                     mm_to_munmap = task->mm;
             }
-           // Take note of the fact that an mm exists on the remote kernel
-            set_cpu_has_known_tgroup_mm(task,w->from_cpu);
-
-            // then quit
-            goto done;
+            else
+                printk("%s: pirla\n",__func__);
+          // then quit
+         goto done;
         }
-//     task_unlock(task); // TODO consider to use this
     } while_each_thread(g,task);
 done:
     read_unlock(&tasklist_lock);
 
       if(mm_to_munmap) {
-        PS_DOWN_WRITE(&task->mm->mmap_sem);
+        PS_DOWN_WRITE(&mm_to_munmap->mmap_sem);
         current->enable_distributed_munmap = 0;
         do_munmap(mm_to_munmap, start, len);
         current->enable_distributed_munmap = 1;
-        PS_UP_WRITE(&task->mm->mmap_sem);
+        PS_UP_WRITE(&mm_to_munmap->mmap_sem);
         }
 
 
@@ -4679,7 +4664,7 @@ static int handle_back_migration(struct pcn_kmsg_message* inc_msg) {
                work->fpu_state       = msg->fpu_state;
                // end FPU code
 #endif        
-       memcpy(&work->regs, &msg->regs, sizeof(struct pt_regs));
+               memcpy(&work->regs, &msg->regs, sizeof(struct pt_regs));
         queue_work(clone_wq, (struct work_struct*)work);
     }
 
@@ -5557,7 +5542,7 @@ void process_server_do_mprotect(struct task_struct* task,
     int i;
     int s;
     int perf = -1;
-    unsigned lockflags;
+    unsigned long lockflags;
 
      // Nothing to do for a thread group that's not distributed.
     if(!current->tgroup_distributed) {
@@ -5628,8 +5613,7 @@ extern struct list_head rlist_head;
     // OK, all responses are in, we can proceed.
 
     spin_lock_irqsave(&_mprotect_data_head_lock,lockflags);
-    remove_data_entry_from(data,
-                           &_mprotect_data_head);
+    remove_data_entry_from(data, &_mprotect_data_head);
     spin_unlock_irqrestore(&_mprotect_data_head_lock,lockflags);
 
     kfree(data);
@@ -6119,7 +6103,7 @@ int process_server_dup_task(struct task_struct* orig, struct task_struct* task)
     task->t_distributed = 0;
     task->previous_cpus = 0;
     task->known_cpu_with_tgroup_mm = 0;
-    task->return_disposition = RETURN_DISPOSITION_EXIT;
+    task->return_disposition = RETURN_DISPOSITION_NONE;
 
     // If this is pid 1 or 2, the parent cannot have been migrated
     // so it is safe to take on all local thread info.
@@ -6144,11 +6128,10 @@ int process_server_dup_task(struct task_struct* orig, struct task_struct* task)
 
     // This is important.  We want to make sure to keep an accurate record
     // of which cpu and thread group the new thread is a part of.
-    if(orig->executing_for_remote == 1 || orig->tgroup_home_cpu != home_kernel ) {
+    if(orig->executing_for_remote == 1 || orig->tgroup_home_cpu != home_kernel) {
         task->tgroup_home_cpu = orig->tgroup_home_cpu;
         task->tgroup_home_id = orig->tgroup_home_id;
         task->tgroup_distributed = 1;
-
        // task->origin_pid = orig->origin_pid;
     } else {
         task->tgroup_home_cpu = home_kernel;
@@ -6460,6 +6443,10 @@ static int do_migration_back_to_previous_cpu(struct task_struct* task, int cpu)
     perf = PERF_MEASURE_START(&perf_process_server_do_migration);
 
     mig = kmalloc(sizeof(back_migration_t), GFP_ATOMIC);
+    if (!mig) {
+         printk("%s: ERROR kmalloc ret %p\n", __func__, mig);
+         return -1;
+     }
     // Set up response header
 
     mig->header.type = PCN_KMSG_TYPE_PROC_SRV_BACK_MIGRATION;
@@ -6475,7 +6462,6 @@ static int do_migration_back_to_previous_cpu(struct task_struct* task, int cpu)
     task->executing_for_remote = 0;
     task->represents_remote = 1;
     task->t_distributed = 1; // This should already be the case
-    task->return_disposition = RETURN_DISPOSITION_EXIT;
     
     // Build message
     mig->tgroup_home_cpu = task->tgroup_home_cpu;
@@ -6523,7 +6509,7 @@ if (task->thread.usersp != _usersp) {
 
     // Send migration request to destination.
     pcn_kmsg_send_long(cpu,
-                       (struct pcn_kmsg_long_message*)&mig,
+                       (struct pcn_kmsg_long_message*)mig,
                        sizeof(back_migration_t) - sizeof(struct pcn_kmsg_hdr));
 
     pcn_kmsg_free_msg(mig);
@@ -6600,8 +6586,13 @@ extern struct list_head rlist_head;
 void process_server_do_return_disposition(void) {
 
     PSPRINTK("%s\n",__func__);
-
+    int return_disposition = current->return_disposition;
+    // Reset the return disposition
+    current->return_disposition = RETURN_DISPOSITION_NONE;
     switch(current->return_disposition) {
+    case RETURN_DISPOSITION_NONE:
+        printk("%s: ERROR, return disposition is none!\n",__func__);
+        break;    
     case RETURN_DISPOSITION_MIGRATE:
         // Nothing to do, already back-imported the
         // state in process_back_migration.  This will
@@ -6627,11 +6618,11 @@ static int __init process_server_init(void) {
     /*
      * Cache some local information.
      */
-#ifndef SUPPORT_FOR_CLUSTERING
+//#ifndef SUPPORT_FOR_CLUSTERING
            _cpu= smp_processor_id();
-#else
-          _cpu = cpumask_first(cpu_present_mask);
-#endif
+//#else
+//        _cpu = cpumask_first(cpu_present_mask);
+//#endif
     /*
      * Init global semaphores
      */
index 3afc377..92cebc0 100644 (file)
@@ -3596,11 +3596,20 @@ int error = -ESRCH;
 
 rcu_read_lock();
 p = find_task_by_vpid(pid);
-printk(KERN_ALERT"%s: pid{%d} tgid{%d} p{%d} \n",__func__,pid,tgid,(!p)?0:1);
+
+if(!p){
+       if(_cpu == ORIG_NODE(pid))
+               goto out;
+   goto do_remote;
+}
+
+get_task_struct(p);
+printk(KERN_ALERT"%s: cpu {%d} pid{%d} tgid{%d} p{%d} \n",__func__,_cpu,pid,tgid,(!p)?0:1);
 
 if(p && p->tgroup_distributed && !p->executing_for_remote){
        if(p->return_disposition == RETURN_DISPOSITION_NONE) {
                printk(KERN_ALERT"%s: ret disp pid{%d} next{%d} \n",__func__,pid,p->next_pid);
+               put_task_struct(p);     
                rcu_read_unlock();
                return remote_do_send_specific(ORIG_NODE(p->next_pid),tgid,p->next_pid,sig,info);
        }
@@ -3629,13 +3638,16 @@ if (p && (tgid <= 0 || task_tgid_vnr(p) == tgid) || p->executing_for_remote) {
                        error = 0;
        }
 }
+do_remote:
 if (p == NULL) {
        printk(KERN_ALERT"%s: tgid{%d} pid{%d} sig{%d} \n",__func__ ,tgid,pid,sig);
-       rcu_read_unlock();
-       return remote_do_send_specific(ORIG_NODE(pid), tgid, pid, sig, info);
+        rcu_read_unlock();
+       if(!cpumask_test_cpu(ORIG_NODE(pid), cpu_present_mask))
+               return remote_do_send_specific(ORIG_NODE(pid), tgid, pid, sig, info);
 }
+put_task_struct(p);
+out:
 rcu_read_unlock();
-
 return error;
 }
 
index c790011..c9e671e 100644 (file)
@@ -1,8 +1,9 @@
 /*
  * Inter-kernel messaging support for Popcorn
  *
- * Current ver: Antonio Barbalace, Phil Wilshire 2013
- * First ver: Ben Shelton <beshelto@vt.edu> 2013
+ * Antonio Barbalace, David Katz, Marina Sadini 2014
+ * Antonio Barbalace, Marina Sadini, Phil Wilshire 2013
+ * Ben Shelton 2012 - 2013
  */
 
 #include <linux/irq.h>
@@ -28,7 +29,7 @@
 #define LOGLEN 4
 #define LOGCALL 32
 
-#define KMSG_VERBOSE 0
+#define KMSG_VERBOSE  0 
 
 #if KMSG_VERBOSE
 #define KMSG_PRINTK(fmt, args...) printk("%s: " fmt, __func__, ##args)
 #define KMSG_PRINTK(...) ;
 #endif
 
+/*
+ * MAX_LOOPS must be calibrated on each architecture. Is difficult to give a
+ * good estimation for this parameter. Worst case must be equal or inferior to
+ * the minimum time we will be blocked for a call to schedule_timeout
+ */
+#define MAX_LOOPS 12345
+#define MAX_LOOPS_JIFFIES (MAX_SCHEDULE_TIMEOUT)
 
 #define MCAST_VERBOSE 0
 
@@ -49,6 +57,9 @@
 
 #define KMSG_ERR(fmt, args...) printk("%s: ERROR: " fmt, __func__, ##args)
 
+#define ROUND_PAGES(size) ((size/PAGE_SIZE) + ((size%PAGE_SIZE)? 1:0))
+#define ROUND_PAGE_SIZE(size) (ROUND_PAGES(size)*PAGE_SIZE)
+
 /* COMMON STATE */
 
 /* table of callback functions for handling each message type */
@@ -65,8 +76,6 @@ struct pcn_kmsg_rkinfo *rkinfo;
    one per kernel */
 struct pcn_kmsg_window * rkvirt[POPCORN_MAX_CPUS];
 
-/* Same thing, but for mcast windows */
-struct pcn_kmsg_mcast_local mcastlocal[POPCORN_MAX_MCAST_CHANNELS];
 
 /* lists of messages to be processed for each prio */
 struct list_head msglist_hiprio, msglist_normprio;
@@ -86,9 +95,12 @@ struct workqueue_struct *messaging_wq;
 
 /* RING BUFFER */
 
-#define RB_SHIFT 6
-#define RB_SIZE (1 << RB_SHIFT)
-#define RB_MASK ((1 << RB_SHIFT) - 1)
+//#define RB_SHIFT 6
+//#define RB_SIZE (1 << RB_SHIFT)
+//#define RB_MASK ((1 << RB_SHIFT) - 1)
+
+#define RB_SIZE PCN_KMSG_RBUF_SIZE
+#define RB_MASK (RB_SIZE - 1)
 
 #define PCN_DEBUG(...) ;
 //#define PCN_WARN(...) printk(__VA_ARGS__)
@@ -105,8 +117,13 @@ int log_f_index=0;
 int log_f_sendindex=0;
 void * log_function_send[LOGCALL];
 
+#define SIZE_RANGES 7
+unsigned int large_message_sizes[(SIZE_RANGES +1)];
+unsigned int large_message_count[(SIZE_RANGES +1)];
+unsigned int type_message_count[PCN_KMSG_TYPE_MAX];
+
 /* From Wikipedia page "Fetch and add", modified to work for u64 */
-static inline unsigned long fetch_and_add(volatile unsigned long * variable, 
+/*static inline unsigned long fetch_and_add(volatile unsigned long * variable, 
                                          unsigned long value)
 {
        asm volatile( 
@@ -115,7 +132,8 @@ static inline unsigned long fetch_and_add(volatile unsigned long * variable,
                     : "a" (value), "m" (*variable)  //Input
                     :"memory" );
        return value;
-}
+}*/
+#define fetch_and_add xadd_sync
 
 static inline unsigned long win_inuse(struct pcn_kmsg_window *win) 
 {
@@ -127,6 +145,7 @@ static inline int win_put(struct pcn_kmsg_window *win,
                          int no_block) 
 {
        unsigned long ticket;
+       unsigned long loop;
 
        /* if we can't block and the queue is already really long, 
           return EAGAIN */
@@ -140,32 +159,44 @@ static inline int win_put(struct pcn_kmsg_window *win,
        PCN_DEBUG(KERN_ERR "%s: ticket = %lu, head = %lu, tail = %lu\n", 
                 __func__, ticket, win->head, win->tail);
 
-       KMSG_PRINTK("%s: ticket = %lu, head = %lu, tail = %lu\n",
-                        __func__, ticket, win->head, win->tail);
+       KMSG_PRINTK("%s: ticket = %lu, head = %lu, tail = %lu buffer.lt = %lu diff = %lu\n",
+                        __func__, ticket, win->head, win->tail,win->buffer[ticket%PCN_KMSG_RBUF_SIZE].last_ticket, (ticket - PCN_KMSG_RBUF_SIZE));
 
        who_is_writing= ticket;
        /* spin until there's a spot free for me */
        //while (win_inuse(win) >= RB_SIZE) {}
        //if(ticket>=PCN_KMSG_RBUF_SIZE){
-               while((win->buffer[ticket%PCN_KMSG_RBUF_SIZE].last_ticket!=ticket-PCN_KMSG_RBUF_SIZE)) {
-                       //pcn_cpu_relax();
-                                       msleep(1);
-               }
-               while(  win->buffer[ticket%PCN_KMSG_RBUF_SIZE].ready!=0){
-                       //pcn_cpu_relax();
-                       msleep(1);
-               }
        //}
+       loop=0;  
+       if(ticket>=PCN_KMSG_RBUF_SIZE){
+               while( (win->buffer[ticket%PCN_KMSG_RBUF_SIZE].last_ticket
+                               != (ticket - PCN_KMSG_RBUF_SIZE)) ) {
+                       pcn_cpu_relax();
+                       //msleep(1);
+                       if ( !(++loop % MAX_LOOPS) )
+                               schedule_timeout(MAX_LOOPS_JIFFIES);
+               }
+       }
+       /* the following it is always false because add is after ready=0*/
+       //loop=0;
+       while( win->buffer[ticket%PCN_KMSG_RBUF_SIZE].ready!=0 ) {
+               pcn_cpu_relax();
+               //msleep(1);
+               if ( !(++loop % MAX_LOOPS) )
+                       schedule_timeout(MAX_LOOPS_JIFFIES);
+       }
+       
        /* insert item */
        memcpy(&win->buffer[ticket%PCN_KMSG_RBUF_SIZE].payload,
               &msg->payload, PCN_KMSG_PAYLOAD_SIZE);
 
-       memcpy(&win->buffer[ticket%PCN_KMSG_RBUF_SIZE].hdr,
-              &msg->hdr, sizeof(struct pcn_kmsg_hdr));
-
+       memcpy((void*)&(win->buffer[ticket%PCN_KMSG_RBUF_SIZE].hdr),
+              (void*)&(msg->hdr), sizeof(struct pcn_kmsg_hdr));
 
        //log_send[log_s_index%LOGLEN]= win->buffer[ticket & RB_MASK].hdr;
-       memcpy(&(log_send[log_s_index%LOGLEN]),&(win->buffer[ticket%PCN_KMSG_RBUF_SIZE].hdr),sizeof(struct pcn_kmsg_hdr));
+       memcpy(&(log_send[log_s_index%LOGLEN]),
+               (void*)&(win->buffer[ticket%PCN_KMSG_RBUF_SIZE].hdr),
+               sizeof(struct pcn_kmsg_hdr));
        log_s_index++;
 
        win->second_buffer[ticket%PCN_KMSG_RBUF_SIZE]++;
@@ -179,8 +210,6 @@ static inline int win_put(struct pcn_kmsg_window *win,
 
 msg_put++;
 
-
-
        return 0;
 }
 
@@ -189,51 +218,39 @@ static inline int win_get(struct pcn_kmsg_window *win,
                          struct pcn_kmsg_reverse_message **msg) 
 {
        struct pcn_kmsg_reverse_message *rcvd;
+       unsigned long loop;
 
        if (!win_inuse(win)) {
-
                KMSG_PRINTK("nothing in buffer, returning...\n");
                return -1;
        }
 
-       KMSG_PRINTK("reached win_get, head %lu, tail %lu\n", 
-                   win->head, win->tail);      
+       KMSG_PRINTK("reached win_get, head %lu, tail %lu\n", win->head, win->tail);
+       rcvd =(struct pcn_kmsg_reverse_message*) &(win->buffer[win->tail % PCN_KMSG_RBUF_SIZE]);
 
        /* spin until entry.ready at end of cache line is set */
-       rcvd = &(win->buffer[win->tail % PCN_KMSG_RBUF_SIZE]);
-       //KMSG_PRINTK("%s: Ready bit: %u\n", __func__, rcvd->hdr.ready);
-
-
-
+       loop=0;
        while (!rcvd->ready) {
-
-               //pcn_cpu_relax();
-               msleep(1);
-
+               pcn_cpu_relax();
+               //msleep(1);
+               if ( !(++loop % MAX_LOOPS) )
+                       schedule_timeout(MAX_LOOPS_JIFFIES);
        }
 
-
-       // barrier here?
-       pcn_barrier();
-
-       //log_receive[log_r_index%LOGLEN]=rcvd->hdr;
-       memcpy(&(log_receive[log_r_index%LOGLEN]),&(rcvd->hdr),sizeof(struct pcn_kmsg_hdr));
+       /* statistics */
+       memcpy(&(log_receive[log_r_index%LOGLEN]),
+              &(rcvd->hdr),
+              sizeof(struct pcn_kmsg_hdr));
        log_r_index++;
+       msg_get++;
 
-       //rcvd->hdr.ready = 0;
-
-       *msg = rcvd;    
-msg_get++;
-
-
-
+       *msg = rcvd;
        return 0;
 }
 
-static inline void win_advance_tail(struct pcn_kmsg_window *win) 
-{
-       win->tail++;
-}
+/*static inline void win_advance_tail(struct pcn_kmsg_window *win) 
+{ win->tail++; }
+*/
 
 /* win_enable_int
  * win_disable_int
@@ -255,118 +272,6 @@ static inline unsigned char win_int_enabled(struct pcn_kmsg_window *win) {
                return win->int_enabled;
 }
 
-#define MCASTWIN(_id_) (mcastlocal[(_id_)].mcastvirt)
-#define LOCAL_TAIL(_id_) (mcastlocal[(_id_)].local_tail)
-
-/* MULTICAST RING BUFFER */
-static inline unsigned long mcastwin_inuse(pcn_kmsg_mcast_id id)
-{
-       return MCASTWIN(id)->head - MCASTWIN(id)->tail;
-}
-
-static inline int mcastwin_put(pcn_kmsg_mcast_id id,
-                              struct pcn_kmsg_message *msg)
-{
-       unsigned long ticket;
-       unsigned long time_limit = jiffies + 2;
-
-
-       MCAST_PRINTK("called for id %lu, msg 0x%p\n", id, msg);
-
-       /* if the queue is already really long, return EAGAIN */
-       if (mcastwin_inuse(id) >= RB_SIZE) {
-               MCAST_PRINTK("window full, caller should try again...\n");
-               return -EAGAIN;
-       }
-
-       /* grab ticket */
-       ticket = fetch_and_add(&MCASTWIN(id)->head, 1);
-       MCAST_PRINTK("ticket = %lu, head = %lu, tail = %lu\n",
-                    ticket, MCASTWIN(id)->head, MCASTWIN(id)->tail);
-
-       /* spin until there's a spot free for me */
-       while (mcastwin_inuse(id) >= RB_SIZE) {
-               if (unlikely(time_after(jiffies, time_limit))) {
-                       MCAST_PRINTK("spinning too long to wait for window to be free; this is bad!\n");
-                       return -1;
-               }
-       }
-
-       /* insert item */
-       memcpy(&MCASTWIN(id)->buffer[ticket & RB_MASK].payload, 
-              &msg->payload, PCN_KMSG_PAYLOAD_SIZE);
-
-       memcpy(&MCASTWIN(id)->buffer[ticket & RB_MASK].hdr, 
-              &msg->hdr, sizeof(struct pcn_kmsg_hdr));
-
-       /* set counter to (# in group - self) */
-
-       /*
-       int x;
-
-       if ((x = atomic_read(&MCASTWIN(id)->read_counter[ticket & RB_MASK]))) {
-               KMSG_ERR("read counter is not zero (it's %d)\n", x);
-               return -1;
-       }
-       */
-
-       atomic_set(&MCASTWIN(id)->read_counter[ticket & RB_MASK],
-               rkinfo->mcast_wininfo[id].num_members - 1);
-
-       MCAST_PRINTK("set counter to %d\n", 
-                    rkinfo->mcast_wininfo[id].num_members - 1);
-
-       pcn_barrier();
-
-       /* set completed flag */
-       MCASTWIN(id)->buffer[ticket & RB_MASK].ready = 1;
-
-       return 0;
-}
-
-static inline int mcastwin_get(pcn_kmsg_mcast_id id,
-                              struct pcn_kmsg_reverse_message **msg)
-{
-       volatile struct pcn_kmsg_reverse_message *rcvd;
-
-       MCAST_PRINTK("called for id %lu, head %lu, tail %lu, local_tail %lu\n", 
-                    id, MCASTWIN(id)->head, MCASTWIN(id)->tail, 
-                    LOCAL_TAIL(id));
-
-retry:
-
-       /* if we sent a bunch of messages, it's possible our local_tail
-          has gotten behind the global tail and we need to update it */
-       /* TODO -- atomicity concerns here? */
-       if (LOCAL_TAIL(id) < MCASTWIN(id)->tail) {
-               LOCAL_TAIL(id) = MCASTWIN(id)->tail;
-       }
-
-       if (MCASTWIN(id)->head == LOCAL_TAIL(id)) {
-               MCAST_PRINTK("nothing in buffer, returning...\n");
-               return -1;
-       }
-
-       /* spin until entry.ready at end of cache line is set */
-       rcvd = &(MCASTWIN(id)->buffer[LOCAL_TAIL(id) & RB_MASK]);
-       while (!rcvd->ready) {
-               pcn_cpu_relax();
-       }
-
-       // barrier here?
-       pcn_barrier();
-
-       /* we can't step on our own messages! */
-       if (rcvd->hdr.from_cpu == my_cpu) {
-               LOCAL_TAIL(id)++;
-               goto retry;
-       }
-
-       *msg = rcvd;
-
-       return 0;
-}
-
 static inline int atomic_add_return_sync(int i, atomic_t *v)
 {
        return i + xadd_sync(&v->counter, i);
@@ -382,44 +287,12 @@ static inline int atomic_dec_and_test_sync(atomic_t *v)
        return c != 0;
 }
 
-static inline void mcastwin_advance_tail(pcn_kmsg_mcast_id id)
-{
-       unsigned long slot = LOCAL_TAIL(id) & RB_MASK;
-       //int val_ret, i;
-       //char printstr[256];
-       //char intstr[16];
 
-       MCAST_PRINTK("local tail currently on slot %lu, read counter %d\n", 
-                    LOCAL_TAIL(id), atomic_read(&MCASTWIN(id)->read_counter[slot]));
-
-       /*
-       memset(printstr, 0, 256);
-       memset(intstr, 0, 16);
-
-       for (i = 0; i < 64; i++) {
-               sprintf(intstr, "%d ", atomic_read(&MCASTWIN(id)->read_counter[i]));
-               strcat(printstr, intstr);
-       }
-
-       MCAST_PRINTK("read_counter: %s\n", printstr);
-
-       val_ret = atomic_add_return_sync(-1, &MCASTWIN(id)->read_counter[slot]);
-
-       MCAST_PRINTK("read counter after: %d\n", val_ret);
-       */
-
-       if (atomic_dec_and_test_sync(&MCASTWIN(id)->read_counter[slot])) {
-               MCAST_PRINTK("we're the last reader to go; ++ global tail\n");
-               MCASTWIN(id)->buffer[slot].ready = 0;
-               atomic64_inc((atomic64_t *) &MCASTWIN(id)->tail);
-       }
-
-       LOCAL_TAIL(id)++;
-}
 
 /* INITIALIZATION */
-
+#ifdef PCN_SUPPORT_MULTICAST
 static int pcn_kmsg_mcast_callback(struct pcn_kmsg_message *message);
+#endif /* PCN_SUPPORT_MULTICAST */
 
 static void map_msg_win(pcn_kmsg_work_t *w)
 {
@@ -431,7 +304,8 @@ static void map_msg_win(pcn_kmsg_work_t *w)
        }
 
        rkvirt[cpu] = ioremap_cache(rkinfo->phys_addr[cpu],
-                                   sizeof(struct pcn_kmsg_window));
+                                 ROUND_PAGE_SIZE(sizeof(struct pcn_kmsg_window)));
+
        if (rkvirt[cpu]) {
                KMSG_INIT("ioremapped window, virt addr 0x%p\n", 
                          rkvirt[cpu]);
@@ -441,28 +315,6 @@ static void map_msg_win(pcn_kmsg_work_t *w)
        }
 }
 
-static void map_mcast_win(pcn_kmsg_work_t *w)
-{
-       pcn_kmsg_mcast_id id = w->id_to_join;
-
-       /* map window */
-       if (id < 0 || id > POPCORN_MAX_MCAST_CHANNELS) {
-               KMSG_ERR("%s: invalid mcast channel id %lu specified!\n",
-                        __func__, id);
-               return;
-       }
-
-       MCASTWIN(id) = ioremap_cache(rkinfo->mcast_wininfo[id].phys_addr,
-                                    sizeof(struct pcn_kmsg_mcast_window));
-       if (MCASTWIN(id)) {
-               MCAST_PRINTK("ioremapped mcast window, virt addr 0x%p\n",
-                            MCASTWIN(id));
-       } else {
-               KMSG_ERR("Failed to map mcast window %lu at phys addr 0x%lx\n",
-                        id, rkinfo->mcast_wininfo[id].phys_addr);
-       }
-}
-
 /* bottom half for workqueue */
 static void process_kmsg_wq_item(struct work_struct * work)
 {
@@ -480,6 +332,7 @@ static void process_kmsg_wq_item(struct work_struct * work)
                                 __func__);
                        break;
 
+#ifdef PCN_SUPPORT_MULTICAST
                case PCN_KMSG_WQ_OP_MAP_MCAST_WIN:
                        map_mcast_win(w);
                        break;
@@ -487,6 +340,7 @@ static void process_kmsg_wq_item(struct work_struct * work)
                case PCN_KMSG_WQ_OP_UNMAP_MCAST_WIN:
                        KMSG_ERR("UNMAP_MCAST_WIN not yet implemented!\n");
                        break;
+#endif /* PCN_SUPPORT_MULTICAST */
 
                default:
                        KMSG_ERR("Invalid work queue operation %d\n", w->op);
@@ -548,33 +402,17 @@ static inline int pcn_kmsg_window_init(struct pcn_kmsg_window *window)
 {
        window->head = 0;
        window->tail = 0;
-       window->int_enabled = 1;
        //memset(&window->buffer, 0,
             //  PCN_KMSG_RBUF_SIZE * sizeof(struct pcn_kmsg_reverse_message));
        int i;
        for(i=0;i<PCN_KMSG_RBUF_SIZE;i++){
                window->buffer[i].last_ticket=i-PCN_KMSG_RBUF_SIZE;
+               window->buffer[i].ready=0;
        }
        memset(&window->second_buffer, 0,
                       PCN_KMSG_RBUF_SIZE * sizeof(int));
-       return 0;
-}
-
-static inline int pcn_kmsg_mcast_window_init(struct pcn_kmsg_mcast_window *win)
-{
-       int i;
-
-       win->head = 0;
-       win->tail = 0;
-
-       for (i = 0; i < PCN_KMSG_RBUF_SIZE; i++) {
-               atomic_set(&win->read_counter[i], 0);
-       }
 
-       //memset(&win->read_counter, 0, 
-       //       PCN_KMSG_RBUF_SIZE * sizeof(int));
-       memset(&win->buffer, 0,
-              PCN_KMSG_RBUF_SIZE * sizeof(struct pcn_kmsg_message));
+       window->int_enabled = 1;
        return 0;
 }
 
@@ -612,7 +450,8 @@ static int do_checkin(void)
 
                if (rkinfo->phys_addr[i]) {
                        rkvirt[i] = ioremap_cache(rkinfo->phys_addr[i],
-                                                 sizeof(struct pcn_kmsg_window));
+                                                 ROUND_PAGE_SIZE(sizeof(struct pcn_kmsg_window)));
+
                        if (rkvirt[i]) {
                                KMSG_INIT("ioremapped CPU %d's window, virt addr 0x%p\n", 
                                          i, rkvirt[i]);
@@ -639,9 +478,14 @@ static int pcn_read_proc(char *page, char **start, off_t off, int count, int *eo
        char *p= page;
     int len, i, idx;
 
-       p += sprintf(p, "messages get: %ld\n", msg_get);
-        p += sprintf(p, "messages put: %ld\n", msg_put);
-
+       p += sprintf(p, "get: %ld\n", msg_get);
+        p += sprintf(p, "put: %ld\n", msg_put);
+    for (i =0; i<(SIZE_RANGES +1); i++)
+      p +=sprintf (p,"%u: %u\n", large_message_sizes[i], large_message_count[i]);
+    
+    for (i =0; i<PCN_KMSG_TYPE_MAX; i++)
+      p +=sprintf (p, "t%u: %u\n", i, type_message_count[i]);
+    
     idx = log_r_index;
     for (i =0; i>-LOGLEN; i--)
        p +=sprintf (p,"r%d: from%d type%d %1d:%1d:%1d seq%d\n",
@@ -655,7 +499,7 @@ static int pcn_read_proc(char *page, char **start, off_t off, int count, int *eo
                        (idx+i),(int) log_send[(idx+i)%LOGLEN].from_cpu, (int)log_send[(idx+i)%LOGLEN].type,
                        (int) log_send[(idx+i)%LOGLEN].is_lg_msg, (int)log_send[(idx+i)%LOGLEN].lg_start,
                        (int) log_send[(idx+i)%LOGLEN].lg_end, (int) log_send[(idx+i)%LOGLEN].lg_seqnum );
-
+/*
     idx = log_f_index;
         for (i =0; i>-LOGCALL; i--)
                p +=sprintf (p,"f%d: %pB\n",
@@ -668,7 +512,7 @@ static int pcn_read_proc(char *page, char **start, off_t off, int count, int *eo
 
         for(i=0; i<PCN_KMSG_RBUF_SIZE; i++)
                p +=sprintf (p,"second_buffer[%i]=%i\n",i,rkvirt[my_cpu]->second_buffer[i]);
-
+*/
 
        len = (p -page) - off;
        if (len < 0)
@@ -678,18 +522,38 @@ static int pcn_read_proc(char *page, char **start, off_t off, int count, int *eo
        return len;
 }
 
+static int pcn_write_proc (struct file *file, const char __user *buffer, unsigned long count, void *data)
+{
+  int i;
+       msg_get=0;
+       msg_put=0;
+       memset(large_message_count, 0, sizeof(int)*(SIZE_RANGES +1));
+       memset(large_message_sizes, 0, sizeof(int)*(SIZE_RANGES +1));
+       for (i=0; i<SIZE_RANGES; i++)
+         large_message_sizes[i] = ((i+1)*PCN_KMSG_PAYLOAD_SIZE);
+       large_message_sizes[SIZE_RANGES] = ~0;
+       memset(type_message_count, 0, sizeof(int)*PCN_KMSG_TYPE_MAX);
+       return count;
+}
+
 static int __init pcn_kmsg_init(void)
 {
        int rc,i;
        unsigned long win_phys_addr, rkinfo_phys_addr;
        struct pcn_kmsg_window *win_virt_addr;
-       struct boot_params * boot_params_va;
+       struct boot_params *boot_params_va;
 
        KMSG_INIT("entered\n");
-
-       my_cpu = raw_smp_processor_id();
+#ifndef SUPPORT_FOR_CLUSTERING
+       my_cpu= smp_processor_id();
+#else
+       my_cpu = cpumask_first(cpu_present_mask);
+#endif
        
-       printk("%s: Entered pcn_kmsg_init raw: %d id: %d\n", __func__, my_cpu, smp_processor_id());
+       printk("%s: THIS VERSION DOES NOT SUPPORT CACHE ALIGNED BUFFERS\n",
+              __func__);
+       printk("%s: Entered pcn_kmsg_init raw: %d id: %d\n",
+               __func__, my_cpu, smp_processor_id());
 
        /* Initialize list heads */
        INIT_LIST_HEAD(&msglist_hiprio);
@@ -697,7 +561,7 @@ static int __init pcn_kmsg_init(void)
 
        /* Clear out large-message receive buffers */
        //memset(&lg_buf, 0, POPCORN_MAX_CPUS * sizeof(unsigned char *));
-       for(i=0;i<PCN_KMSG_RBUF_SIZE;i++){
+       for(i=0; i<POPCORN_MAX_CPUS; i++) {
                INIT_LIST_HEAD(&(lg_buf[i]));
        }
        long_id=0;
@@ -709,80 +573,96 @@ static int __init pcn_kmsg_init(void)
        rc = pcn_kmsg_register_callback(PCN_KMSG_TYPE_CHECKIN, 
                                        &pcn_kmsg_checkin_callback);
        if (rc) {
-               KMSG_ERR("Failed to register initial kmsg checkin callback!\n");
+               printk(KERN_ALERT"Failed to register initial kmsg checkin callback!\n");
        }
 
+#ifdef PCN_SUPPORT_MULTICAST
        rc = pcn_kmsg_register_callback(PCN_KMSG_TYPE_MCAST, 
                                        &pcn_kmsg_mcast_callback);
        if (rc) {
-               KMSG_ERR("Failed to register initial kmsg mcast callback!\n");
+               printk(KERN_ALERT"Failed to register initial kmsg mcast callback!\n");
        }
+#endif /* PCN_SUPPORT_MULTICAST */     
 
-       /* Register softirq handler */
+       /* Register softirq handler now kworker */
        KMSG_INIT("Registering softirq handler...\n");
        //open_softirq(PCN_KMSG_SOFTIRQ, pcn_kmsg_action);
        messaging_wq= create_singlethread_workqueue("messaging_wq");
+       if (!messaging_wq) 
+               printk("%s: create_workqueue(messaging_wq) ret 0x%lx ERROR\n",
+                       __func__, (unsigned long)messaging_wq);
 
        /* Initialize work queue */
        KMSG_INIT("Initializing workqueue...\n");
        kmsg_wq = create_singlethread_workqueue("kmsg_wq");
+       if (!kmsg_wq)
+               printk("%s: create_workqueue(kmsg_wq) ret 0x%lx ERROR\n",
+                       __func__, (unsigned long)kmsg_wq);
 
+               
        /* If we're the master kernel, malloc and map the rkinfo structure and 
           put its physical address in boot_params; otherwise, get it from the 
           boot_params and map it */
        if (!mklinux_boot) {
-               KMSG_INIT("We're the master; mallocing rkinfo...\n");
-               rkinfo = kmalloc(sizeof(struct pcn_kmsg_rkinfo), GFP_KERNEL);
-
+               /* rkinfo must be multiple of a page, because the granularity of
+                * foreings mapping is per page. The following didn't worked,
+                * the returned address is on the form 0xffff88000000, ioremap
+                * on the remote fails. 
+               int order = get_order(sizeof(struct pcn_kmsg_rkinfo));
+               rkinfo = __get_free_pages(GFP_KERNEL, order);
+               */
+               KMSG_INIT("Primary kernel, mallocing rkinfo size:%d rounded:%d\n",
+                      sizeof(struct pcn_kmsg_rkinfo), ROUND_PAGE_SIZE(sizeof(struct pcn_kmsg_rkinfo)));
+               rkinfo = kmalloc(ROUND_PAGE_SIZE(sizeof(struct pcn_kmsg_rkinfo)), GFP_KERNEL);
                if (!rkinfo) {
                        KMSG_ERR("Failed to malloc rkinfo structure!\n");
                        return -1;
                }
-
-               rkinfo_phys_addr = virt_to_phys(rkinfo);
-
-               KMSG_INIT("rkinfo virt addr 0x%p, phys addr 0x%lx\n", 
-                         rkinfo, rkinfo_phys_addr);
-
                memset(rkinfo, 0x0, sizeof(struct pcn_kmsg_rkinfo));
+               rkinfo_phys_addr = virt_to_phys(rkinfo);
+               KMSG_INIT("rkinfo virt %p, phys 0x%lx MAX_CPUS %d\n", 
+                         rkinfo, rkinfo_phys_addr, POPCORN_MAX_CPUS);
 
-               KMSG_INIT("Setting boot_params...\n");
                /* Otherwise, we need to set the boot_params to show the rest
                   of the kernels where the master kernel's messaging window 
                   is. */
+               KMSG_INIT("Setting boot_params...\n");
                boot_params_va = (struct boot_params *) 
                        (0xffffffff80000000 + orig_boot_params);
-               KMSG_INIT("Boot params virt addr: 0x%p\n", boot_params_va);
                boot_params_va->pcn_kmsg_master_window = rkinfo_phys_addr;
-       } else {
-               KMSG_INIT("Master kernel rkinfo phys addr: 0x%lx\n", 
+               KMSG_INIT("boot_params virt %p phys %p\n",
+                       boot_params_va, orig_boot_params);
+               
+               KMSG_INIT("LOOPS %ld, LOOPS_JIFFIES %ld, 1nsec %ld, 1usec %ld, 1msec %ld, 1jiffies %d %d\n",
+                         MAX_LOOPS, MAX_LOOPS_JIFFIES, nsecs_to_jiffies(1), usecs_to_jiffies(1), msecs_to_jiffies(1),
+                         jiffies_to_msecs(1), jiffies_to_usecs(1));
+       }
+       else {
+               KMSG_INIT("Primary kernel rkinfo phys addr: 0x%lx\n", 
                          (unsigned long) boot_params.pcn_kmsg_master_window);
-
                rkinfo_phys_addr = boot_params.pcn_kmsg_master_window;
-               rkinfo = ioremap_cache(rkinfo_phys_addr, 
-                                      sizeof(struct pcn_kmsg_rkinfo));
-
+               
+               rkinfo = ioremap_cache(rkinfo_phys_addr, ROUND_PAGE_SIZE(sizeof(struct pcn_kmsg_rkinfo)));
                if (!rkinfo) {
                        KMSG_ERR("Failed to map rkinfo from master kernel!\n");
                }
-
                KMSG_INIT("rkinfo virt addr: 0x%p\n", rkinfo);
        }
 
        /* Malloc our own receive buffer and set it up */
-       win_virt_addr = kmalloc(sizeof(struct pcn_kmsg_window), GFP_KERNEL);
-
+       win_virt_addr = kmalloc(ROUND_PAGE_SIZE(sizeof(struct pcn_kmsg_window)), GFP_KERNEL);
        if (win_virt_addr) {
-               KMSG_INIT("Allocated %ld bytes for my win, virt addr 0x%p\n", 
+               KMSG_INIT("Allocated %ld(%ld) bytes for my win, virt addr 0x%p\n", 
+                         ROUND_PAGE_SIZE(sizeof(struct pcn_kmsg_window)),
                          sizeof(struct pcn_kmsg_window), win_virt_addr);
        } else {
-               KMSG_ERR("Failed to kmalloc kmsg recv window!\n");
+               KMSG_ERR("%s: Failed to kmalloc kmsg recv window!\n", __func__);
                return -1;
        }
 
        rkvirt[my_cpu] = win_virt_addr;
        win_phys_addr = virt_to_phys((void *) win_virt_addr);
-       KMSG_INIT("Physical address: 0x%lx\n", win_phys_addr);
+       KMSG_INIT("cpu %d physical address: 0x%lx\n", my_cpu, win_phys_addr);
        rkinfo->phys_addr[my_cpu] = win_phys_addr;
 
        rc = pcn_kmsg_window_init(rkvirt[my_cpu]);
@@ -791,6 +671,7 @@ static int __init pcn_kmsg_init(void)
                return -1;
        }
 
+
        /* If we're not the master kernel, we need to check in */
        if (mklinux_boot) {
                rc = do_checkin();
@@ -801,6 +682,16 @@ static int __init pcn_kmsg_init(void)
                }
        } 
 
+
+       printk("PCN_KMSG_RBUF_SIZE %ld",PCN_KMSG_RBUF_SIZE);
+       /* proc interface for debugging and stats */
+       memset(large_message_count, 0, sizeof(int)*(SIZE_RANGES +1));
+       memset(large_message_sizes, 0, sizeof(int)*(SIZE_RANGES +1));
+       for (i=0; i<SIZE_RANGES; i++)
+         large_message_sizes[i] = ((i+1)*PCN_KMSG_PAYLOAD_SIZE);
+       large_message_sizes[SIZE_RANGES] = ~0;
+       memset(type_message_count, 0, sizeof(int)*PCN_KMSG_TYPE_MAX);
+
        memset(log_receive,0,sizeof(struct pcn_kmsg_hdr)*LOGLEN);
        memset(log_send,0,sizeof(struct pcn_kmsg_hdr)*LOGLEN);
        memset(log_function_called,0,sizeof(void*)*LOGCALL);
@@ -809,12 +700,12 @@ static int __init pcn_kmsg_init(void)
        struct proc_dir_entry *res;
        res = create_proc_entry("pcnmsg", S_IRUGO, NULL);
        if (!res) {
-               printk("%s: create_proc_entry failed (%p)\n", __func__, res);
+               printk(KERN_ALERT"%s: create_proc_entry failed (%p)\n", __func__, res);
                return -ENOMEM;
        }
        res->read_proc = pcn_read_proc;
-
-
+       res->write_proc = pcn_write_proc;
+       
 
        return 0;
 }
@@ -827,7 +718,7 @@ int pcn_kmsg_register_callback(enum pcn_kmsg_type type, pcn_kmsg_cbftn callback)
        PCN_WARN("%s: registering callback for type %d, ptr 0x%p\n", __func__, type, callback);
 
        if (type >= PCN_KMSG_TYPE_MAX) {
-               KMSG_ERR("Attempted to register callback with bad type %d\n", 
+               printk(KERN_ALERT"Attempted to register callback with bad type %d\n", 
                         type);
                return -1;
        }
@@ -862,7 +753,7 @@ static int __pcn_kmsg_send(unsigned int dest_cpu, struct pcn_kmsg_message *msg,
        struct pcn_kmsg_window *dest_window;
 
        if (unlikely(dest_cpu >= POPCORN_MAX_CPUS)) {
-               KMSG_ERR("Invalid destination CPU %d\n", dest_cpu);
+               KMSG_PRINTK("Invalid destination CPU %d\n", dest_cpu);
                return -1;
        }
 
@@ -874,7 +765,7 @@ static int __pcn_kmsg_send(unsigned int dest_cpu, struct pcn_kmsg_message *msg,
        }
 
        if (unlikely(!msg)) {
-               KMSG_ERR("Passed in a null pointer to msg!\n");
+               KMSG_PRINTK("Passed in a null pointer to msg!\n");
                return -1;
        }
 
@@ -882,6 +773,7 @@ static int __pcn_kmsg_send(unsigned int dest_cpu, struct pcn_kmsg_message *msg,
        msg->hdr.from_cpu = my_cpu;
 
        rc = win_put(dest_window, msg, no_block);
+type_message_count[msg->hdr.type]++;
 
        if (rc) {
                if (no_block && (rc == EAGAIN)) {
@@ -936,30 +828,23 @@ int pcn_kmsg_send_long(unsigned int dest_cpu,
                       struct pcn_kmsg_long_message *lmsg, 
                       unsigned int payload_size)
 {
-       int i;
+       int i, ret =0;
        int num_chunks = payload_size / PCN_KMSG_PAYLOAD_SIZE;
        struct pcn_kmsg_message this_chunk;
-       //char test_buf[15];
-
-       /*mklinux_akshay*/
-       int ret=0;
-       /*mklinux_akshay*/
 
        if (payload_size % PCN_KMSG_PAYLOAD_SIZE) {
                num_chunks++;
        }
 
-        if ( num_chunks >= ((1<<LG_SEQNUM_SIZE)-1) ){
+        if ( num_chunks >= MAX_CHUNKS ){
                 KMSG_PRINTK("Message too long (size:%d, chunks:%d, max:%d) can not be transferred\n",
-                       payload_size, num_chunks, ((1 << LG_SEQNUM_SIZE)-1));
+                       payload_size, num_chunks, MAX_CHUNKS);
                return -1;
         }
 
        KMSG_PRINTK("Sending large message to CPU %d, type %d, payload size %d bytes, %d chunks\n", 
                    dest_cpu, lmsg->hdr.type, payload_size, num_chunks);
 
-
-
        this_chunk.hdr.type = lmsg->hdr.type;
        this_chunk.hdr.prio = lmsg->hdr.prio;
        this_chunk.hdr.is_lg_msg = 1;
@@ -981,6 +866,10 @@ int pcn_kmsg_send_long(unsigned int dest_cpu,
                if(ret!=0)
                        return ret;
        }
+       
+       /* statistics */
+       num_chunks = payload_size / PCN_KMSG_PAYLOAD_SIZE;
+       large_message_count[((num_chunks < SIZE_RANGES) ? num_chunks : SIZE_RANGES)]++;
 
        return 0;
 }
@@ -1101,7 +990,6 @@ static int process_large_message(struct pcn_kmsg_reverse_message *msg)
        int work_done = 0;
        struct pcn_kmsg_container* container_long=NULL, *n=NULL;
 
-
        KMSG_PRINTK("Got a large message fragment, type %u, from_cpu %u, start %u, end %u, seqnum %u!\n",
                    msg->hdr.type, msg->hdr.from_cpu,
                    msg->hdr.lg_start, msg->hdr.lg_end,
@@ -1110,99 +998,96 @@ static int process_large_message(struct pcn_kmsg_reverse_message *msg)
        if (msg->hdr.lg_start) {
                KMSG_PRINTK("Processing initial message fragment...\n");
 
+               if (!msg->hdr.lg_seqnum)
+                 printk(KERN_ALERT"%s: ERROR lg_seqnum is zero:%d long_number:%ld\n",
+                     __func__, (int)msg->hdr.lg_seqnum, (long)msg->hdr.long_number);
+                 
+               // calculate the size of the holding buffer
                recv_buf_size = sizeof(struct list_head) + 
                        sizeof(struct pcn_kmsg_hdr) + 
                        msg->hdr.lg_seqnum * PCN_KMSG_PAYLOAD_SIZE;
-
-               /*lg_buf[msg->hdr.from_cpu] = kmalloc(recv_buf_size, GFP_ATOMIC);
+#undef BEN_VERSION
+#ifdef BEN_VERSION             
+               lg_buf[msg->hdr.from_cpu] = kmalloc(recv_buf_size, GFP_ATOMIC);
                if (!lg_buf[msg->hdr.from_cpu]) {
                                        KMSG_ERR("Unable to kmalloc buffer for incoming message!\n");
                                        goto out;
                                }
                lmsg = (struct pcn_kmsg_long_message *) &lg_buf[msg->hdr.from_cpu]->msg;
-               */
-
+#else /* BEN_VERSION */
                container_long= kmalloc(recv_buf_size, GFP_ATOMIC);
                if (!container_long) {
-                                       KMSG_ERR("Unable to kmalloc buffer for incoming message!\n");
-                                       goto out;
+                       KMSG_ERR("Unable to kmalloc buffer for incoming message!\n");
+                       goto out;
                }
-
-               lmsg = (struct pcn_kmsg_long_message *) &container_long->msg;
+               lmsg = (struct pcn_kmsg_long_message *) &container_long->msg; //TODO wrong cast!
+#endif /* !BEN_VERSION */
 
                /* copy header first */
                memcpy((unsigned char *) &lmsg->hdr, 
                       &msg->hdr, sizeof(struct pcn_kmsg_hdr));
-
                /* copy first chunk of message */
                memcpy((unsigned char *) &lmsg->payload,
                       &msg->payload, PCN_KMSG_PAYLOAD_SIZE);
+                
 
                if (msg->hdr.lg_end) {
                        KMSG_PRINTK("NOTE: Long message of length 1 received; this isn't efficient!\n");
 
                        /* add to appropriate list */
-
-                       //rc = msg_add_list(lg_buf[msg->hdr.from_cpu]);
+#ifdef BEN_VERSION                     
+                       rc = msg_add_list(lg_buf[msg->hdr.from_cpu]);
+#else /* BEN_VERSION */
                        rc = msg_add_list(container_long);
-                       work_done = 1;
-
-                       if (rc) {
+#endif /* !BEN_VERSION */
+                       if (rc)
                                KMSG_ERR("Failed to add large message to list!\n");
-                       }
-               }
-               else{
-
-                       list_add_tail(&container_long->list,&lg_buf[msg->hdr.from_cpu]);
-
+                       work_done = 1;
                }
-       } else {
-
+#ifndef BEN_VERSION            
+               else
+                 // add the message in the lg_buf
+                 list_add_tail(&container_long->list, &lg_buf[msg->hdr.from_cpu]);
+#endif /* !BEN_VERSION */
+       }
+       else {
                KMSG_PRINTK("Processing subsequent message fragment...\n");
 
-
                //It should not be needed safe
                list_for_each_entry_safe(container_long, n, &lg_buf[msg->hdr.from_cpu], list) {
-
-                       if(container_long!=NULL && container_long->msg.hdr.long_number==msg->hdr.long_number)
+                       if ( (container_long != NULL) &&
+                         (container_long->msg.hdr.long_number == msg->hdr.long_number) )
+                               // found!
                                goto next;
-
                }
 
-               KMSG_ERR("Failed to find long message %lu in the list of cpu %i!\n",msg->hdr.long_number,msg->hdr.from_cpu);
+               KMSG_ERR("Failed to find long message %lu in the list of cpu %i!\n",
+                        msg->hdr.long_number, msg->hdr.from_cpu);
                goto out;
 
-next:          lmsg = (struct pcn_kmsg_long_message *) &container_long->msg;
-
-               memcpy((unsigned char *) &lmsg->payload + PCN_KMSG_PAYLOAD_SIZE * msg->hdr.lg_seqnum,
+next:          
+               lmsg = (struct pcn_kmsg_long_message *) &container_long->msg;
+               memcpy((unsigned char *) ((void*)&lmsg->payload) + (PCN_KMSG_PAYLOAD_SIZE * msg->hdr.lg_seqnum),
                       &msg->payload, PCN_KMSG_PAYLOAD_SIZE);
 
                if (msg->hdr.lg_end) {
                        KMSG_PRINTK("Last fragment in series...\n");
-
-                       KMSG_PRINTK("from_cpu %d, type %d, prio %d\n",
-                                   lmsg->hdr.from_cpu,
-                                   lmsg->hdr.type,
-                                   lmsg->hdr.prio);
-
-
-                       //rc = msg_add_list(lg_buf[msg->hdr.from_cpu]);
-
-                       list_del(&container_long->list);
-
+                       KMSG_PRINTK("from_cpu %d, type %d, prio %d-mprio%d\n",
+                                   lmsg->hdr.from_cpu, lmsg->hdr.type, lmsg->hdr.prio,msg->hdr.prio);
                        /* add to appropriate list */
+#ifdef BEN_VERSION
+                       rc = msg_add_list(lg_buf[msg->hdr.from_cpu]);
+#else /* BEN_VERSION */
+                       list_del(&container_long->list);
                        rc = msg_add_list(container_long);
-
-                       work_done = 1;
-
-                       if (rc) {
+#endif /* !BEN_VERSION */                      
+                       if (rc)
                                KMSG_ERR("Failed to add large message to list!\n");
-                       }
+                       work_done = 1;
                }
        }
 
 out:
-
        return work_done;
 }
 
@@ -1234,39 +1119,19 @@ static int process_small_message(struct pcn_kmsg_reverse_message *msg)
        return work_done;
 }
 
-static void process_mcast_queue(pcn_kmsg_mcast_id id)
-{
-       struct pcn_kmsg_reverse_message *msg;
-       while (!mcastwin_get(id, &msg)) {
-               MCAST_PRINTK("Got an mcast message, type %d!\n",
-                            msg->hdr.type);
-
-               /* Special processing for large messages */
-                if (msg->hdr.is_lg_msg) {
-                        MCAST_PRINTK("message is a large message!\n");
-                        process_large_message(msg);
-                } else {
-                        MCAST_PRINTK("message is a small message!\n");
-                        process_small_message(msg);
-                }
-
-               mcastwin_advance_tail(id);
-       }
-
-}
-
+static int poll_handler_check=0;
 static int pcn_kmsg_poll_handler(void)
 {
        struct pcn_kmsg_reverse_message *msg;
        struct pcn_kmsg_window *win = rkvirt[my_cpu]; // TODO this will not work for clustering
        int work_done = 0;
 
-       KMSG_PRINTK("called\n");
+       poll_handler_check++;
+       if (poll_handler_check >1)
+               printk("poll_hanlder_check %d concurrent calls not supported\n", poll_handler_check);
 
 pull_msg:
        /* Get messages out of the buffer first */
-//#define PCN_KMSG_BUDGET 128
-       //while ((work_done < PCN_KMSG_BUDGET) && (!win_get(rkvirt[my_cpu], &msg))) {
        while (! win_get(win, &msg) ) {
                KMSG_PRINTK("got a message!\n");
 
@@ -1278,10 +1143,16 @@ pull_msg:
                        KMSG_PRINTK("message is a small message!\n");
                        work_done += process_small_message(msg);
                }
-               pcn_barrier();
+
                msg->ready = 0;
-               //win_advance_tail(win);
-               fetch_and_add(&win->tail, 1);
+               pcn_barrier();
+               
+               fetch_and_add(&win->tail, 1); //win_advance_tail(win);
+               
+               // NOTE
+               // why you need the ready bit if you are incrementing the tail?
+               // can we increment the tail before signaling the ready bit?
+               // in that way we can support multiple calls to this function
        }
 
        win_enable_int(win);
@@ -1290,13 +1161,15 @@ pull_msg:
                goto pull_msg;
        }
 
+       poll_handler_check--;
        return work_done;
 }
 
 unsigned volatile long bh_ts = 0, bh_ts_2 = 0;
 
-/* bottom half */
-static void pcn_kmsg_action(/*struct softirq_action *h*/struct work_struct* work)
+// NOTE the following was declared as a bottom half
+//static void pcn_kmsg_action(struct softirq_action *h)
+static void pcn_kmsg_action(struct work_struct* work)
 {
        int rc;
        int i;
@@ -1305,20 +1178,20 @@ static void pcn_kmsg_action(/*struct softirq_action *h*/struct work_struct* work
        //if (!bh_ts) {
                rdtscll(bh_ts);
        //}
-
        KMSG_PRINTK("called\n");
 
        work_done = pcn_kmsg_poll_handler();
        KMSG_PRINTK("Handler did %d units of work!\n", work_done);
 
+#ifdef PCN_SUPPORT_MULTICAST   
        for (i = 0; i < POPCORN_MAX_MCAST_CHANNELS; i++) {
                if (MCASTWIN(i)) {
                        KMSG_PRINTK("mcast win %d mapped, processing it\n", i);
                        process_mcast_queue(i);
                }
        }
-
        KMSG_PRINTK("Done checking mcast queues; processing messages\n");
+#endif /* PCN_SUPPORT_MULTICAST */
 
        //if (!bh_ts_2) {
                rdtscll(bh_ts_2);
@@ -1339,406 +1212,6 @@ static void pcn_kmsg_action(/*struct softirq_action *h*/struct work_struct* work
        return;
 }
 
-/* MULTICAST */
-
-inline void lock_chan(pcn_kmsg_mcast_id id)
-{
-
-}
-
-inline void unlock_chan(pcn_kmsg_mcast_id id)
-{
-
-}
-
-inline int count_members(unsigned long mask)
-{
-       int i, count = 0;
-
-       for (i = 0; i < POPCORN_MAX_CPUS; i++) {
-               if (mask & (1ULL << i)) {
-                       count++;
-               }
-       }
-
-       return count;
-}
-
-void print_mcast_map(void)
-{
-#if MCAST_VERBOSE
-       int i;
-
-       printk("ACTIVE MCAST GROUPS:\n");
-
-       for (i = 0; i < POPCORN_MAX_MCAST_CHANNELS; i++) {
-               if (rkinfo->mcast_wininfo[i].mask) {
-                       printk("group %d, mask 0x%lx, num_members %d\n", 
-                              i, rkinfo->mcast_wininfo[i].mask, 
-                              rkinfo->mcast_wininfo[i].num_members);
-               }
-       }
-       return;
-#endif
-}
-
-/* Open a multicast group containing the CPUs specified in the mask. */
-int pcn_kmsg_mcast_open(pcn_kmsg_mcast_id *id, unsigned long mask)
-{
-       int rc, i, found_id;
-       struct pcn_kmsg_mcast_message msg;
-       struct pcn_kmsg_mcast_wininfo *slot;
-       struct pcn_kmsg_mcast_window * new_win;
-
-       MCAST_PRINTK("Reached pcn_kmsg_mcast_open, mask 0x%lx\n", mask);
-
-       if (!(mask & (1 << my_cpu))) {
-               KMSG_ERR("This CPU is not a member of the mcast group to be created, cpu %d, mask 0x%lx\n",
-                        my_cpu, mask);
-               return -1;
-       }
-
-       /* find first unused channel */
-retry:
-       found_id = -1;
-
-       for (i = 0; i < POPCORN_MAX_MCAST_CHANNELS; i++) {
-               if (!rkinfo->mcast_wininfo[i].num_members) {
-                       found_id = i;
-                       break;
-               }
-       }
-
-       MCAST_PRINTK("Found channel ID %d\n", found_id);
-
-       if (found_id == -1) {
-               KMSG_ERR("No free multicast channels!\n");
-               return -1;
-       }
-
-       /* lock and check if channel is still unused; 
-          otherwise, try again */
-       lock_chan(found_id);
-
-       if (rkinfo->mcast_wininfo[i].num_members) {
-               unlock_chan(found_id);
-               MCAST_PRINTK("Got scooped; trying again...\n");
-               goto retry;
-       }
-
-       /* set slot info */
-       slot = &rkinfo->mcast_wininfo[found_id];
-       slot->mask = mask;
-       slot->num_members = count_members(mask);
-       slot->owner_cpu = my_cpu;
-
-       MCAST_PRINTK("Found %d members\n", slot->num_members);
-
-       /* kmalloc window for slot */
-       new_win = kmalloc(sizeof(struct pcn_kmsg_mcast_window), GFP_ATOMIC);
-
-       if (!new_win) {
-               KMSG_ERR("Failed to kmalloc mcast buffer!\n");
-               goto out;
-       }
-
-       /* zero out window */
-       memset(new_win, 0x0, sizeof(struct pcn_kmsg_mcast_window));
-
-       MCASTWIN(found_id) = new_win;
-       slot->phys_addr = virt_to_phys(new_win);
-       MCAST_PRINTK("Malloced mcast receive window %d at phys addr 0x%lx\n",
-                    found_id, slot->phys_addr);
-
-       /* send message to each member except self.  Can't use mcast yet because
-          group is not yet established, so unicast to each CPU in mask. */
-       msg.hdr.type = PCN_KMSG_TYPE_MCAST;
-       msg.hdr.prio = PCN_KMSG_PRIO_HIGH;
-       msg.type = PCN_KMSG_MCAST_OPEN;
-       msg.id = found_id;
-       msg.mask = mask;
-       msg.num_members = slot->num_members;
-
-       for (i = 0; i < POPCORN_MAX_CPUS; i++) {
-               if ((slot->mask & (1ULL << i)) && 
-                   (my_cpu != i)) {
-                       MCAST_PRINTK("Sending message to CPU %d\n", i);
-
-                       rc = pcn_kmsg_send(i, (struct pcn_kmsg_message *) &msg);
-
-                       if (rc) {
-                               KMSG_ERR("Message send failed!\n");
-                       }
-               }
-       }
-
-       *id = found_id;
-
-out:
-       unlock_chan(found_id);
-
-       return 0;
-}
-
-/* Add new members to a multicast group. */
-int pcn_kmsg_mcast_add_members(pcn_kmsg_mcast_id id, unsigned long mask)
-{
-       lock_chan(id);
-
-       KMSG_ERR("Operation not yet supported!\n");
-
-       //rkinfo->mcast_wininfo[id].mask |= mask; 
-
-       /* TODO -- notify new members */
-
-       unlock_chan(id);
-       return 0;
-}
-
-/* Remove existing members from a multicast group. */
-int pcn_kmsg_mcast_delete_members(pcn_kmsg_mcast_id id, unsigned long mask)
-{
-       lock_chan(id);
-
-       KMSG_ERR("Operation not yet supported!\n");
-
-       //rkinfo->mcast_wininfo[id].mask &= !mask;
-
-       /* TODO -- notify new members */
-
-       unlock_chan(id);
-
-       return 0;
-}
-
-inline int pcn_kmsg_mcast_close_notowner(pcn_kmsg_mcast_id id)
-{
-       MCAST_PRINTK("Closing multicast channel %lu on CPU %d\n", id, my_cpu);
-
-       /* process remaining messages in queue (should there be any?) */
-
-       /* remove queue from list of queues being polled */
-       iounmap(MCASTWIN(id));
-
-       MCASTWIN(id) = NULL;
-
-       return 0;
-}
-
-/* Close a multicast group. */
-int pcn_kmsg_mcast_close(pcn_kmsg_mcast_id id)
-{
-       int rc;
-       struct pcn_kmsg_mcast_message msg;
-       struct pcn_kmsg_mcast_wininfo *wi = &rkinfo->mcast_wininfo[id];
-
-       if (wi->owner_cpu != my_cpu) {
-               KMSG_ERR("Only creator (cpu %d) can close mcast group %lu!\n",
-                        wi->owner_cpu, id);
-               return -1;
-       }
-
-       lock_chan(id);
-
-       /* set window to close */
-       wi->is_closing = 1;
-
-       /* broadcast message to close window globally */
-       msg.hdr.type = PCN_KMSG_TYPE_MCAST;
-       msg.hdr.prio = PCN_KMSG_PRIO_HIGH;
-       msg.type = PCN_KMSG_MCAST_CLOSE;
-       msg.id = id;
-
-       rc = pcn_kmsg_mcast_send(id, (struct pcn_kmsg_message *) &msg);
-       if (rc) {
-               KMSG_ERR("failed to send mcast close message!\n");
-               return -1;
-       }
-
-       /* wait until global_tail == global_head */
-       while (MCASTWIN(id)->tail != MCASTWIN(id)->head) {}
-
-       /* free window and set channel as unused */
-       kfree(MCASTWIN(id));
-       MCASTWIN(id) = NULL;
-
-       wi->mask = 0;
-       wi->num_members = 0;
-       wi->is_closing = 0;
-
-       unlock_chan(id);
-
-       return 0;
-}
-
-unsigned long mcast_ipi_ts;
-
-static int __pcn_kmsg_mcast_send(pcn_kmsg_mcast_id id, 
-                                struct pcn_kmsg_message *msg)
-{
-       int i, rc;
-
-       if (!msg) {
-               KMSG_ERR("Passed in a null pointer to msg!\n");
-               return -1;
-       }
-
-       /* set source CPU */
-       msg->hdr.from_cpu = my_cpu;
-
-       /* place message in rbuf */
-       rc = mcastwin_put(id, msg);
-
-       if (rc) {
-               KMSG_ERR("failed to place message in mcast window -- maybe it's full?\n");
-               return -1;
-       }
-
-       rdtscll(mcast_ipi_ts);
-
-       /* send IPI to all in mask but me */
-       for (i = 0; i < POPCORN_MAX_CPUS; i++) {
-               if (rkinfo->mcast_wininfo[id].mask & (1ULL << i)) {
-                       if (i != my_cpu) {
-                               MCAST_PRINTK("sending IPI to CPU %d\n", i);
-                               apic->send_IPI_single(i, POPCORN_KMSG_VECTOR);
-                       }
-               }
-       }
-
-       return 0;
-}
-
-#define MCAST_HACK 0
-
-/* Send a message to the specified multicast group. */
-int pcn_kmsg_mcast_send(pcn_kmsg_mcast_id id, struct pcn_kmsg_message *msg)
-{
-#if MCAST_HACK
-
-       int i, rc;
-
-       MCAST_PRINTK("Sending mcast message, id %lu\n", id);
-
-       /* quick hack for testing for now; 
-          loop through mask and send individual messages */
-       for (i = 0; i < POPCORN_MAX_CPUS; i++) {
-               if (rkinfo->mcast_wininfo[id].mask & (0x1 << i)) {
-                       rc = pcn_kmsg_send(i, msg);
-
-                       if (rc) {
-                               KMSG_ERR("Batch send failed to CPU %d\n", i);
-                               return -1;
-                       }
-               }
-       }
-
-       return 0;
-#else
-       int rc;
-
-       MCAST_PRINTK("sending mcast message to group id %lu\n", id);
-
-       msg->hdr.is_lg_msg = 0;
-       msg->hdr.lg_start = 0;
-       msg->hdr.lg_end = 0;
-       msg->hdr.lg_seqnum = 0;
-
-       rc = __pcn_kmsg_mcast_send(id, msg);
-
-       return rc;
-#endif
-}
-
-/* Send a message to the specified multicast group. */
-int pcn_kmsg_mcast_send_long(pcn_kmsg_mcast_id id, 
-                            struct pcn_kmsg_long_message *msg, 
-                            unsigned int payload_size)
-{
-#if MCAST_HACK
-       int i, rc;
-
-       MCAST_PRINTK("Sending long mcast message, id %lu, size %u\n", 
-                    id, payload_size);
-
-       /* quick hack for testing for now; 
-          loop through mask and send individual messages */
-       for (i = 0; i < POPCORN_MAX_CPUS; i++) {
-               if (rkinfo->mcast_wininfo[id].mask & (0x1 << i)) {
-                       rc = pcn_kmsg_send_long(i, msg, payload_size);
-
-                       if (rc) {
-                               KMSG_ERR("Batch send failed to CPU %d\n", i);
-                               return -1;
-                       }
-               }
-       }
-
-       return 0;
-#else
-
-       KMSG_ERR("long messages not yet supported in mcast!\n");
-
-       return 0;
-#endif
-}
-
-
-static int pcn_kmsg_mcast_callback(struct pcn_kmsg_message *message) 
-{
-       int rc = 0;
-       struct pcn_kmsg_mcast_message *msg = 
-               (struct pcn_kmsg_mcast_message *) message;
-       pcn_kmsg_work_t *kmsg_work;
-
-       MCAST_PRINTK("Received mcast message, type %d\n", msg->type);
-
-       switch (msg->type) {
-               case PCN_KMSG_MCAST_OPEN:
-                       MCAST_PRINTK("Processing mcast open message...\n");
-
-                       /* Need to queue work to remap the window in a kernel
-                          thread; it can't happen here */
-                       kmsg_work = kmalloc(sizeof(pcn_kmsg_work_t), GFP_ATOMIC);
-                       if (kmsg_work) {
-                               INIT_WORK((struct work_struct *) kmsg_work,
-                                         process_kmsg_wq_item);
-                               kmsg_work->op = PCN_KMSG_WQ_OP_MAP_MCAST_WIN;
-                               kmsg_work->from_cpu = msg->hdr.from_cpu;
-                               kmsg_work->id_to_join = msg->id;
-                               queue_work(kmsg_wq, 
-                                          (struct work_struct *) kmsg_work);
-                       } else {
-                               KMSG_ERR("Failed to kmalloc work structure!\n");
-                       }
-
-                       break;
-
-               case PCN_KMSG_MCAST_ADD_MEMBERS:
-                       KMSG_ERR("Mcast add not yet implemented...\n");
-                       break;
-
-               case PCN_KMSG_MCAST_DEL_MEMBERS:
-                       KMSG_ERR("Mcast delete not yet implemented...\n");
-                       break;
-
-               case PCN_KMSG_MCAST_CLOSE:
-                       MCAST_PRINTK("Processing mcast close message...\n");
-                       pcn_kmsg_mcast_close_notowner(msg->id);
-                       break;
-
-               default:
-                       KMSG_ERR("Invalid multicast message type %d\n", 
-                                msg->type);
-                       rc = -1;
-                       goto out;
-       }
-
-       print_mcast_map();
-
-out:
-       pcn_kmsg_free_msg(message);
-       return rc;
-}
-
-
+#ifdef PCN_SUPPORT_MULTICAST
+# include "pcn_kmsg_mcast.h"
+#endif /* PCN_SUPPORT_MULTICAST */
index 9efc00a..74af8e0 100644 (file)
@@ -163,6 +163,7 @@ static int pcn_kmsg_test_long_msg(struct pcn_kmsg_test_args __user *args)
        return rc;
 }
 
+#ifdef PCN_SUPPORT_MULTICAST
 static int pcn_kmsg_test_mcast_open(struct pcn_kmsg_test_args __user *args)
 {
        int rc;
@@ -226,6 +227,7 @@ static int pcn_kmsg_test_mcast_close(struct pcn_kmsg_test_args __user *args)
 
        return rc;
 }
+#endif /* PCN_SUPPORT_MULTICAST */
 
 /* Syscall for testing all this stuff */
 SYSCALL_DEFINE2(popcorn_test_kmsg, enum pcn_kmsg_test_op, op,
@@ -252,7 +254,7 @@ SYSCALL_DEFINE2(popcorn_test_kmsg, enum pcn_kmsg_test_op, op,
                case PCN_KMSG_TEST_SEND_LONG:
                        rc = pcn_kmsg_test_long_msg(args);
                        break;
-
+#ifdef PCN_SUPPORT_MULTICAST
                case PCN_KMSG_TEST_OP_MCAST_OPEN:
                        rc = pcn_kmsg_test_mcast_open(args);
                        break;
@@ -264,6 +266,7 @@ SYSCALL_DEFINE2(popcorn_test_kmsg, enum pcn_kmsg_test_op, op,
                case PCN_KMSG_TEST_OP_MCAST_CLOSE:
                        rc = pcn_kmsg_test_mcast_close(args);
                        break;
+#endif /* PCN_SUPPORT_MULTICAST */
 
                default:
                        TEST_ERR("invalid option %d\n", op);
@@ -273,7 +276,6 @@ SYSCALL_DEFINE2(popcorn_test_kmsg, enum pcn_kmsg_test_op, op,
        return rc;
 }
 
-
 /* CALLBACKS */
 
 static int handle_single_msg(struct pcn_kmsg_test_message *msg)
@@ -454,4 +456,3 @@ static int __init pcn_kmsg_test_init(void)
 }
 
 late_initcall(pcn_kmsg_test_init);
-