Merge remote-tracking branch 'origin/clustered' into aks_dev_clus
[projects/modsched/linux.git] / pcnmsg / pcn_kmsg.c
index 3bc5f41..937125e 100644 (file)
@@ -1,8 +1,9 @@
 /*
  * Inter-kernel messaging support for Popcorn
  *
- * Current ver: Antonio Barbalace, Phil Wilshire 2013
- * First ver: Ben Shelton <beshelto@vt.edu> 2013
+ * Antonio Barbalace, David Katz, Marina Sadini 2014
+ * Antonio Barbalace, Marina Sadini, Phil Wilshire 2013
+ * Ben Shelton 2012 - 2013
  */
 
 #include <linux/irq.h>
 #define KMSG_PRINTK(...) ;
 #endif
 
+/*
+ * MAX_LOOPS must be calibrated on each architecture. Is difficult to give a
+ * good estimation for this parameter. Worst case must be equal or inferior to
+ * the minimum time we will be blocked for a call to schedule_timeout
+ */
+#define MAX_LOOPS 12345
+#define MAX_LOOPS_JIFFIES (MAX_SCHEDULE_TIMEOUT)
 
 #define MCAST_VERBOSE 0
 
@@ -111,8 +119,13 @@ int log_f_index=0;
 int log_f_sendindex=0;
 void * log_function_send[LOGCALL];
 
+#define SIZE_RANGES 7
+unsigned int large_message_sizes[(SIZE_RANGES +1)];
+unsigned int large_message_count[(SIZE_RANGES +1)];
+unsigned int type_message_count[PCN_KMSG_TYPE_MAX];
+
 /* From Wikipedia page "Fetch and add", modified to work for u64 */
-static inline unsigned long fetch_and_add(volatile unsigned long * variable, 
+/*static inline unsigned long fetch_and_add(volatile unsigned long * variable, 
                                          unsigned long value)
 {
        asm volatile( 
@@ -121,7 +134,8 @@ static inline unsigned long fetch_and_add(volatile unsigned long * variable,
                     : "a" (value), "m" (*variable)  //Input
                     :"memory" );
        return value;
-}
+}*/
+#define fetch_and_add xadd_sync
 
 static inline unsigned long win_inuse(struct pcn_kmsg_window *win) 
 {
@@ -133,7 +147,7 @@ static inline int win_put(struct pcn_kmsg_window *win,
                          int no_block) 
 {
        unsigned long ticket;
-    unsigned long long sleep_start;
+       unsigned long loop;
 
        /* if we can't block and the queue is already really long, 
           return EAGAIN */
@@ -154,18 +168,24 @@ static inline int win_put(struct pcn_kmsg_window *win,
        /* spin until there's a spot free for me */
        //while (win_inuse(win) >= RB_SIZE) {}
        //if(ticket>=PCN_KMSG_RBUF_SIZE){
-    sleep_start = native_read_tsc();
-               while((win->buffer[ticket%PCN_KMSG_RBUF_SIZE].last_ticket != ticket-PCN_KMSG_RBUF_SIZE)) {
-                       //pcn_cpu_relax();
-                       //msleep(1);
-               }
-               while(  win->buffer[ticket%PCN_KMSG_RBUF_SIZE].ready!=0){
-                       //pcn_cpu_relax();
-                       //msleep(1);
-               }
-    total_sleep_win_put += native_read_tsc() - sleep_start;
-    sleep_win_put_count++;
        //}
+       loop=0;  
+       while( (win->buffer[ticket%PCN_KMSG_RBUF_SIZE].last_ticket
+         != (ticket - PCN_KMSG_RBUF_SIZE)) ) {
+               //pcn_cpu_relax();
+               //msleep(1);
+               if ( !(++loop % MAX_LOOPS) )
+                       schedule_timeout(MAX_LOOPS_JIFFIES);
+       }
+       /* the following it is always false because add is after ready=0*/
+       //loop=0;
+       while( win->buffer[ticket%PCN_KMSG_RBUF_SIZE].ready!=0 ) {
+               pcn_cpu_relax();
+               //msleep(1);
+               //if ( !(++loop % MAX_LOOPS) )
+               //      schedule_timeout(MAX_LOOPS_JIFFIES);
+       }
+       
        /* insert item */
        memcpy(&win->buffer[ticket%PCN_KMSG_RBUF_SIZE].payload,
               &msg->payload, PCN_KMSG_PAYLOAD_SIZE);
@@ -198,52 +218,39 @@ static inline int win_get(struct pcn_kmsg_window *win,
                          struct pcn_kmsg_reverse_message **msg) 
 {
        struct pcn_kmsg_reverse_message *rcvd;
-    unsigned long long sleep_start;
+       unsigned long loop;
 
        if (!win_inuse(win)) {
-
                KMSG_PRINTK("nothing in buffer, returning...\n");
                return -1;
        }
 
-       KMSG_PRINTK("reached win_get, head %lu, tail %lu\n", 
-                   win->head, win->tail);      
-
-       /* spin until entry.ready at end of cache line is set */
+       KMSG_PRINTK("reached win_get, head %lu, tail %lu\n", win->head, win->tail);
        rcvd =(struct pcn_kmsg_reverse_message*) &(win->buffer[win->tail % PCN_KMSG_RBUF_SIZE]);
-       //KMSG_PRINTK("%s: Ready bit: %u\n", __func__, rcvd->hdr.ready);
 
-    sleep_start = native_read_tsc();
+       /* spin until entry.ready at end of cache line is set */
+       loop=0;
        while (!rcvd->ready) {
-
                //pcn_cpu_relax();
                //msleep(1);
-
+               if ( !(++loop % MAX_LOOPS) )
+                       schedule_timeout(MAX_LOOPS_JIFFIES);
        }
-    total_sleep_win_get += native_read_tsc() - sleep_start;
-    sleep_win_get_count++;
-
-       // barrier here?
-       pcn_barrier();
 
-       //log_receive[log_r_index%LOGLEN]=rcvd->hdr;
-       memcpy(&(log_receive[log_r_index%LOGLEN]),&(rcvd->hdr),sizeof(struct pcn_kmsg_hdr));
+       /* statistics */
+       memcpy(&(log_receive[log_r_index%LOGLEN]),
+              &(rcvd->hdr),
+              sizeof(struct pcn_kmsg_hdr));
        log_r_index++;
+       msg_get++;
 
-       //rcvd->hdr.ready = 0;
-
-       *msg = rcvd;    
-msg_get++;
-
-
-
+       *msg = rcvd;
        return 0;
 }
 
-static inline void win_advance_tail(struct pcn_kmsg_window *win) 
-{
-       win->tail++;
-}
+/*static inline void win_advance_tail(struct pcn_kmsg_window *win) 
+{ win->tail++; }
+*/
 
 /* win_enable_int
  * win_disable_int
@@ -471,18 +478,14 @@ static int pcn_read_proc(char *page, char **start, off_t off, int count, int *eo
        char *p= page;
     int len, i, idx;
 
-    p += sprintf(p, "Sleep in win_put[total,count,avg] = [%llx,%lx,%llx]\n",
-                    total_sleep_win_put,
-                    sleep_win_put_count,
-                    sleep_win_put_count? total_sleep_win_put/sleep_win_put_count:0);
-    p += sprintf(p, "Sleep in win_get[total,count,avg] = [%llx,%lx,%llx]\n",
-                    total_sleep_win_get,
-                    sleep_win_get_count,
-                    sleep_win_get_count? total_sleep_win_get/sleep_win_get_count:0);
-
-       p += sprintf(p, "messages get: %ld\n", msg_get);
-        p += sprintf(p, "messages put: %ld\n", msg_put);
-
+       p += sprintf(p, "get: %ld\n", msg_get);
+        p += sprintf(p, "put: %ld\n", msg_put);
+    for (i =0; i<(SIZE_RANGES +1); i++)
+      p +=sprintf (p,"%u: %u\n", large_message_sizes[i], large_message_count[i]);
+    
+    for (i =0; i<PCN_KMSG_TYPE_MAX; i++)
+      p +=sprintf (p, "t%u: %u\n", i, type_message_count[i]);
+    
     idx = log_r_index;
     for (i =0; i>-LOGLEN; i--)
        p +=sprintf (p,"r%d: from%d type%d %1d:%1d:%1d seq%d\n",
@@ -496,7 +499,7 @@ static int pcn_read_proc(char *page, char **start, off_t off, int count, int *eo
                        (idx+i),(int) log_send[(idx+i)%LOGLEN].from_cpu, (int)log_send[(idx+i)%LOGLEN].type,
                        (int) log_send[(idx+i)%LOGLEN].is_lg_msg, (int)log_send[(idx+i)%LOGLEN].lg_start,
                        (int) log_send[(idx+i)%LOGLEN].lg_end, (int) log_send[(idx+i)%LOGLEN].lg_seqnum );
-
+/*
     idx = log_f_index;
         for (i =0; i>-LOGCALL; i--)
                p +=sprintf (p,"f%d: %pB\n",
@@ -509,7 +512,7 @@ static int pcn_read_proc(char *page, char **start, off_t off, int count, int *eo
 
         for(i=0; i<PCN_KMSG_RBUF_SIZE; i++)
                p +=sprintf (p,"second_buffer[%i]=%i\n",i,rkvirt[my_cpu]->second_buffer[i]);
-
+*/
 
        len = (p -page) - off;
        if (len < 0)
@@ -519,6 +522,20 @@ static int pcn_read_proc(char *page, char **start, off_t off, int count, int *eo
        return len;
 }
 
+static int pcn_write_proc (struct file *file, const char __user *buffer, unsigned long count, void *data)
+{
+  int i;
+       msg_get=0;
+       msg_put=0;
+       memset(large_message_count, 0, sizeof(int)*(SIZE_RANGES +1));
+       memset(large_message_sizes, 0, sizeof(int)*(SIZE_RANGES +1));
+       for (i=0; i<SIZE_RANGES; i++)
+         large_message_sizes[i] = ((i+1)*PCN_KMSG_PAYLOAD_SIZE);
+       large_message_sizes[SIZE_RANGES] = ~0;
+       memset(type_message_count, 0, sizeof(int)*PCN_KMSG_TYPE_MAX);
+       return count;
+}
+
 static int __init pcn_kmsg_init(void)
 {
        int rc,i;
@@ -612,6 +629,10 @@ static int __init pcn_kmsg_init(void)
                boot_params_va->pcn_kmsg_master_window = rkinfo_phys_addr;
                KMSG_INIT("boot_params virt %p phys %p\n",
                        boot_params_va, orig_boot_params);
+               
+               KMSG_INIT("LOOPS %ld, LOOPS_JIFFIES %ld, 1nsec %ld, 1usec %ld, 1msec %ld, 1jiffies %d %d\n",
+                         MAX_LOOPS, MAX_LOOPS_JIFFIES, nsecs_to_jiffies(1), usecs_to_jiffies(1), msecs_to_jiffies(1),
+                         jiffies_to_msecs(1), jiffies_to_usecs(1));
        }
        else {
                KMSG_INIT("Primary kernel rkinfo phys addr: 0x%lx\n", 
@@ -657,6 +678,14 @@ static int __init pcn_kmsg_init(void)
                }
        } 
 
+       /* proc interface for debugging and stats */
+       memset(large_message_count, 0, sizeof(int)*(SIZE_RANGES +1));
+       memset(large_message_sizes, 0, sizeof(int)*(SIZE_RANGES +1));
+       for (i=0; i<SIZE_RANGES; i++)
+         large_message_sizes[i] = ((i+1)*PCN_KMSG_PAYLOAD_SIZE);
+       large_message_sizes[SIZE_RANGES] = ~0;
+       memset(type_message_count, 0, sizeof(int)*PCN_KMSG_TYPE_MAX);
+
        memset(log_receive,0,sizeof(struct pcn_kmsg_hdr)*LOGLEN);
        memset(log_send,0,sizeof(struct pcn_kmsg_hdr)*LOGLEN);
        memset(log_function_called,0,sizeof(void*)*LOGCALL);
@@ -669,6 +698,7 @@ static int __init pcn_kmsg_init(void)
                return -ENOMEM;
        }
        res->read_proc = pcn_read_proc;
+       res->write_proc = pcn_write_proc;
 
        return 0;
 }
@@ -736,6 +766,7 @@ static int __pcn_kmsg_send(unsigned int dest_cpu, struct pcn_kmsg_message *msg,
        msg->hdr.from_cpu = my_cpu;
 
        rc = win_put(dest_window, msg, no_block);
+type_message_count[msg->hdr.type]++;
 
        if (rc) {
                if (no_block && (rc == EAGAIN)) {
@@ -828,6 +859,10 @@ int pcn_kmsg_send_long(unsigned int dest_cpu,
                if(ret!=0)
                        return ret;
        }
+       
+       /* statistics */
+       num_chunks = payload_size / PCN_KMSG_PAYLOAD_SIZE;
+       large_message_count[((num_chunks < SIZE_RANGES) ? num_chunks : SIZE_RANGES)]++;
 
        return 0;
 }
@@ -1076,18 +1111,19 @@ static int process_small_message(struct pcn_kmsg_reverse_message *msg)
        return work_done;
 }
 
+static int poll_handler_check=0;
 static int pcn_kmsg_poll_handler(void)
 {
        struct pcn_kmsg_reverse_message *msg;
        struct pcn_kmsg_window *win = rkvirt[my_cpu]; // TODO this will not work for clustering
        int work_done = 0;
 
-       KMSG_PRINTK("called\n");
+       poll_handler_check++;
+       if (poll_handler_check >1)
+               printk("poll_hanlder_check %d concurrent calls not supported\n", poll_handler_check);
 
 pull_msg:
        /* Get messages out of the buffer first */
-//#define PCN_KMSG_BUDGET 128
-       //while ((work_done < PCN_KMSG_BUDGET) && (!win_get(rkvirt[my_cpu], &msg))) {
        while (! win_get(win, &msg) ) {
                KMSG_PRINTK("got a message!\n");
 
@@ -1099,10 +1135,16 @@ pull_msg:
                        KMSG_PRINTK("message is a small message!\n");
                        work_done += process_small_message(msg);
                }
-               pcn_barrier();
+
                msg->ready = 0;
-               //win_advance_tail(win);
-               fetch_and_add(&win->tail, 1);
+               pcn_barrier();
+               
+               fetch_and_add(&win->tail, 1); //win_advance_tail(win);
+               
+               // NOTE
+               // why you need the ready bit if you are incrementing the tail?
+               // can we increment the tail before signaling the ready bit?
+               // in that way we can support multiple calls to this function
        }
 
        win_enable_int(win);
@@ -1111,6 +1153,7 @@ pull_msg:
                goto pull_msg;
        }
 
+       poll_handler_check--;
        return work_done;
 }