Merge remote-tracking branch 'origin/clustered' into aks_dev_clus
[projects/modsched/linux.git] / pcnmsg / pcn_kmsg.c
1 /*
2  * Inter-kernel messaging support for Popcorn
3  *
4  * Antonio Barbalace, David Katz, Marina Sadini 2014
5  * Antonio Barbalace, Marina Sadini, Phil Wilshire 2013
6  * Ben Shelton 2012 - 2013
7  */
8
9 #include <linux/irq.h>
10 #include <linux/interrupt.h>
11 #include <linux/smp.h>
12 #include <linux/syscalls.h>
13 #include <linux/kernel.h>
14 #include <linux/multikernel.h>
15 #include <linux/pcn_kmsg.h>
16 #include <linux/list.h>
17 #include <linux/slab.h>
18 #include <linux/proc_fs.h>
19
20 #include <asm/system.h>
21 #include <asm/apic.h>
22 #include <asm/hardirq.h>
23 #include <asm/setup.h>
24 #include <asm/bootparam.h>
25 #include <asm/errno.h>
26 #include <asm/atomic.h>
27 #include <linux/delay.h>
28
29 #define LOGLEN 4
30 #define LOGCALL 32
31
32 #define KMSG_VERBOSE 0
33
34 #if KMSG_VERBOSE
35 #define KMSG_PRINTK(fmt, args...) printk("%s: " fmt, __func__, ##args)
36 #else
37 #define KMSG_PRINTK(...) ;
38 #endif
39
40 /*
41  * MAX_LOOPS must be calibrated on each architecture. Is difficult to give a
42  * good estimation for this parameter. Worst case must be equal or inferior to
43  * the minimum time we will be blocked for a call to schedule_timeout
44  */
45 #define MAX_LOOPS 12345
46 #define MAX_LOOPS_JIFFIES (MAX_SCHEDULE_TIMEOUT)
47
48 #define MCAST_VERBOSE 0
49
50 #if MCAST_VERBOSE
51 #define MCAST_PRINTK(fmt, args...) printk("%s: " fmt, __func__, ##args)
52 #else
53 #define MCAST_PRINTK(...) ;
54 #endif
55
56 #define KMSG_INIT(fmt, args...) printk("KMSG INIT: %s: " fmt, __func__, ##args)
57
58 #define KMSG_ERR(fmt, args...) printk("%s: ERROR: " fmt, __func__, ##args)
59
60 #define ROUND_PAGES(size) ((size/PAGE_SIZE) + ((size%PAGE_SIZE)? 1:0))
61 #define ROUND_PAGE_SIZE(size) (ROUND_PAGES(size)*PAGE_SIZE)
62
63 /* COMMON STATE */
64
65 /* table of callback functions for handling each message type */
66 pcn_kmsg_cbftn callback_table[PCN_KMSG_TYPE_MAX];
67
68 /* number of current kernel */
69 int my_cpu = 0; // NOT CORRECT FOR CLUSTERING!!! STILL WE HAVE TO DECIDE HOW TO IMPLEMENT CLUSTERING
70
71 /* pointer to table with phys addresses for remote kernels' windows,
72  * owned by kernel 0 */
73 struct pcn_kmsg_rkinfo *rkinfo;
74
75 /* table with virtual (mapped) addresses for remote kernels' windows,
76    one per kernel */
77 struct pcn_kmsg_window * rkvirt[POPCORN_MAX_CPUS];
78
79
80 /* lists of messages to be processed for each prio */
81 struct list_head msglist_hiprio, msglist_normprio;
82
83 /* array to hold pointers to large messages received */
84 //struct pcn_kmsg_container * lg_buf[POPCORN_MAX_CPUS];
85 struct list_head lg_buf[POPCORN_MAX_CPUS];
86 volatile unsigned long long_id;
87 int who_is_writing=-1;
88
89 /* action for bottom half */
90 static void pcn_kmsg_action(/*struct softirq_action *h*/struct work_struct* work);
91
92 /* workqueue for operations that can sleep */
93 struct workqueue_struct *kmsg_wq;
94 struct workqueue_struct *messaging_wq;
95
96 /* RING BUFFER */
97
98 #define RB_SHIFT 6
99 #define RB_SIZE (1 << RB_SHIFT)
100 #define RB_MASK ((1 << RB_SHIFT) - 1)
101
102 #define PCN_DEBUG(...) ;
103 //#define PCN_WARN(...) printk(__VA_ARGS__)
104 #define PCN_WARN(...) ;
105 #define PCN_ERROR(...) printk(__VA_ARGS__)
106
107 unsigned long long total_sleep_win_put = 0;
108 unsigned int sleep_win_put_count = 0;
109 unsigned long long total_sleep_win_get = 0;
110 unsigned int sleep_win_get_count = 0;
111
112 struct pcn_kmsg_hdr log_receive[LOGLEN];
113 struct pcn_kmsg_hdr log_send[LOGLEN];
114 int log_r_index=0;
115 int log_s_index=0;
116
117 void * log_function_called[LOGCALL];
118 int log_f_index=0;
119 int log_f_sendindex=0;
120 void * log_function_send[LOGCALL];
121
122 #define SIZE_RANGES 7
123 unsigned int large_message_sizes[(SIZE_RANGES +1)];
124 unsigned int large_message_count[(SIZE_RANGES +1)];
125 unsigned int type_message_count[PCN_KMSG_TYPE_MAX];
126
127 /* From Wikipedia page "Fetch and add", modified to work for u64 */
128 /*static inline unsigned long fetch_and_add(volatile unsigned long * variable, 
129                                           unsigned long value)
130 {
131         asm volatile( 
132                      "lock; xaddq %%rax, %2;"
133                      :"=a" (value)                   //Output
134                      : "a" (value), "m" (*variable)  //Input
135                      :"memory" );
136         return value;
137 }*/
138 #define fetch_and_add xadd_sync
139
140 static inline unsigned long win_inuse(struct pcn_kmsg_window *win) 
141 {
142         return win->head - win->tail;
143 }
144 static long unsigned int msg_put=0;
145 static inline int win_put(struct pcn_kmsg_window *win, 
146                           struct pcn_kmsg_message *msg,
147                           int no_block) 
148 {
149         unsigned long ticket;
150         unsigned long loop;
151
152         /* if we can't block and the queue is already really long, 
153            return EAGAIN */
154         if (no_block && (win_inuse(win) >= RB_SIZE)) {
155                 KMSG_PRINTK("window full, caller should try again...\n");
156                 return -EAGAIN;
157         }
158
159         /* grab ticket */
160         ticket = fetch_and_add(&win->head, 1);
161         PCN_DEBUG(KERN_ERR "%s: ticket = %lu, head = %lu, tail = %lu\n", 
162                  __func__, ticket, win->head, win->tail);
163
164         KMSG_PRINTK("%s: ticket = %lu, head = %lu, tail = %lu\n",
165                          __func__, ticket, win->head, win->tail);
166
167         who_is_writing= ticket;
168         /* spin until there's a spot free for me */
169         //while (win_inuse(win) >= RB_SIZE) {}
170         //if(ticket>=PCN_KMSG_RBUF_SIZE){
171         //}
172         loop=0;  
173         while( (win->buffer[ticket%PCN_KMSG_RBUF_SIZE].last_ticket
174           != (ticket - PCN_KMSG_RBUF_SIZE)) ) {
175                 //pcn_cpu_relax();
176                 //msleep(1);
177                 if ( !(++loop % MAX_LOOPS) )
178                         schedule_timeout(MAX_LOOPS_JIFFIES);
179         }
180         /* the following it is always false because add is after ready=0*/
181         //loop=0;
182         while( win->buffer[ticket%PCN_KMSG_RBUF_SIZE].ready!=0 ) {
183                 pcn_cpu_relax();
184                 //msleep(1);
185                 //if ( !(++loop % MAX_LOOPS) )
186                 //      schedule_timeout(MAX_LOOPS_JIFFIES);
187         }
188         
189         /* insert item */
190         memcpy(&win->buffer[ticket%PCN_KMSG_RBUF_SIZE].payload,
191                &msg->payload, PCN_KMSG_PAYLOAD_SIZE);
192
193         memcpy((void*)&(win->buffer[ticket%PCN_KMSG_RBUF_SIZE].hdr),
194                (void*)&(msg->hdr), sizeof(struct pcn_kmsg_hdr));
195
196         //log_send[log_s_index%LOGLEN]= win->buffer[ticket & RB_MASK].hdr;
197         memcpy(&(log_send[log_s_index%LOGLEN]),
198                 (void*)&(win->buffer[ticket%PCN_KMSG_RBUF_SIZE].hdr),
199                 sizeof(struct pcn_kmsg_hdr));
200         log_s_index++;
201
202         win->second_buffer[ticket%PCN_KMSG_RBUF_SIZE]++;
203
204         /* set completed flag */
205         win->buffer[ticket%PCN_KMSG_RBUF_SIZE].ready = 1;
206         wmb();
207         win->buffer[ticket%PCN_KMSG_RBUF_SIZE].last_ticket = ticket;
208
209         who_is_writing=-1;
210
211 msg_put++;
212
213         return 0;
214 }
215
216 static long unsigned msg_get=0;
217 static inline int win_get(struct pcn_kmsg_window *win, 
218                           struct pcn_kmsg_reverse_message **msg) 
219 {
220         struct pcn_kmsg_reverse_message *rcvd;
221         unsigned long loop;
222
223         if (!win_inuse(win)) {
224                 KMSG_PRINTK("nothing in buffer, returning...\n");
225                 return -1;
226         }
227
228         KMSG_PRINTK("reached win_get, head %lu, tail %lu\n", win->head, win->tail);
229         rcvd =(struct pcn_kmsg_reverse_message*) &(win->buffer[win->tail % PCN_KMSG_RBUF_SIZE]);
230
231         /* spin until entry.ready at end of cache line is set */
232         loop=0;
233         while (!rcvd->ready) {
234                 //pcn_cpu_relax();
235                 //msleep(1);
236                 if ( !(++loop % MAX_LOOPS) )
237                         schedule_timeout(MAX_LOOPS_JIFFIES);
238         }
239
240         /* statistics */
241         memcpy(&(log_receive[log_r_index%LOGLEN]),
242                &(rcvd->hdr),
243                sizeof(struct pcn_kmsg_hdr));
244         log_r_index++;
245         msg_get++;
246
247         *msg = rcvd;
248         return 0;
249 }
250
251 /*static inline void win_advance_tail(struct pcn_kmsg_window *win) 
252 { win->tail++; }
253 */
254
255 /* win_enable_int
256  * win_disable_int
257  * win_int_enabled
258  *
259  * These functions will inhibit senders to send a message while
260  * the receiver is processing IPI from any sender.
261  */
262 static inline void win_enable_int(struct pcn_kmsg_window *win) {
263                 win->int_enabled = 1;
264                 wmb(); // enforce ordering
265 }
266 static inline void win_disable_int(struct pcn_kmsg_window *win) {
267                 win->int_enabled = 0;
268                 wmb(); // enforce ordering
269 }
270 static inline unsigned char win_int_enabled(struct pcn_kmsg_window *win) {
271                 rmb(); //not sure this is required (Antonio)
272                 return win->int_enabled;
273 }
274
275 static inline int atomic_add_return_sync(int i, atomic_t *v)
276 {
277         return i + xadd_sync(&v->counter, i);
278 }
279
280 static inline int atomic_dec_and_test_sync(atomic_t *v)
281 {
282         unsigned char c;
283
284         asm volatile("lock; decl %0; sete %1"
285                      : "+m" (v->counter), "=qm" (c)
286                      : : "memory");
287         return c != 0;
288 }
289
290
291
292 /* INITIALIZATION */
293 #ifdef PCN_SUPPORT_MULTICAST
294 static int pcn_kmsg_mcast_callback(struct pcn_kmsg_message *message);
295 #endif /* PCN_SUPPORT_MULTICAST */
296
297 static void map_msg_win(pcn_kmsg_work_t *w)
298 {
299         int cpu = w->cpu_to_add;
300
301         if (cpu < 0 || cpu >= POPCORN_MAX_CPUS) {
302                 KMSG_ERR("invalid CPU %d specified!\n", cpu);
303                 return;
304         }
305
306         rkvirt[cpu] = ioremap_cache(rkinfo->phys_addr[cpu],
307                                   ROUND_PAGE_SIZE(sizeof(struct pcn_kmsg_window)));
308
309         if (rkvirt[cpu]) {
310                 KMSG_INIT("ioremapped window, virt addr 0x%p\n", 
311                           rkvirt[cpu]);
312         } else {
313                 KMSG_ERR("failed to map CPU %d's window at phys addr 0x%lx\n",
314                          cpu, rkinfo->phys_addr[cpu]);
315         }
316 }
317
318 /* bottom half for workqueue */
319 static void process_kmsg_wq_item(struct work_struct * work)
320 {
321         pcn_kmsg_work_t *w = (pcn_kmsg_work_t *) work;
322
323         KMSG_PRINTK("called with op %d\n", w->op);
324
325         switch (w->op) {
326                 case PCN_KMSG_WQ_OP_MAP_MSG_WIN:
327                         map_msg_win(w);
328                         break;
329
330                 case PCN_KMSG_WQ_OP_UNMAP_MSG_WIN:
331                         KMSG_ERR("%s: UNMAP_MSG_WIN not yet implemented!\n",
332                                  __func__);
333                         break;
334
335 #ifdef PCN_SUPPORT_MULTICAST
336                 case PCN_KMSG_WQ_OP_MAP_MCAST_WIN:
337                         map_mcast_win(w);
338                         break;
339
340                 case PCN_KMSG_WQ_OP_UNMAP_MCAST_WIN:
341                         KMSG_ERR("UNMAP_MCAST_WIN not yet implemented!\n");
342                         break;
343 #endif /* PCN_SUPPORT_MULTICAST */
344
345                 default:
346                         KMSG_ERR("Invalid work queue operation %d\n", w->op);
347
348         }
349
350         kfree(work);
351 }
352
353 inline void pcn_kmsg_free_msg(void * msg)
354 {
355         kfree(msg - sizeof(struct list_head));
356 }
357
358 static int pcn_kmsg_checkin_callback(struct pcn_kmsg_message *message) 
359 {
360         struct pcn_kmsg_checkin_message *msg = 
361                 (struct pcn_kmsg_checkin_message *) message;
362         int from_cpu = msg->hdr.from_cpu;
363         pcn_kmsg_work_t *kmsg_work = NULL;
364
365
366         KMSG_INIT("From CPU %d, type %d, window phys addr 0x%lx\n", 
367                   msg->hdr.from_cpu, msg->hdr.type, 
368                   msg->window_phys_addr);
369
370         if (from_cpu >= POPCORN_MAX_CPUS) {
371                 KMSG_ERR("Invalid source CPU %d\n", msg->hdr.from_cpu);
372                 return -1;
373         }
374
375         if (!msg->window_phys_addr) {
376
377                 KMSG_ERR("Window physical address from CPU %d is NULL!\n", 
378                          from_cpu);
379                 return -1;
380         }
381
382         /* Note that we're not allowed to ioremap anything from a bottom half,
383            so we'll add it to a workqueue and do it in a kernel thread. */
384         kmsg_work = kmalloc(sizeof(pcn_kmsg_work_t), GFP_ATOMIC);
385         if (likely(kmsg_work)) {
386                 INIT_WORK((struct work_struct *) kmsg_work, 
387                           process_kmsg_wq_item);
388                 kmsg_work->op = PCN_KMSG_WQ_OP_MAP_MSG_WIN;
389                 kmsg_work->from_cpu = msg->hdr.from_cpu;
390                 kmsg_work->cpu_to_add = msg->cpu_to_add;
391                 queue_work(kmsg_wq, (struct work_struct *) kmsg_work);
392         } else {
393                 KMSG_ERR("Failed to malloc work structure!\n");
394         }
395
396         pcn_kmsg_free_msg(message);
397
398         return 0;
399 }
400
401 static inline int pcn_kmsg_window_init(struct pcn_kmsg_window *window)
402 {
403         window->head = 0;
404         window->tail = 0;
405         //memset(&window->buffer, 0,
406              //  PCN_KMSG_RBUF_SIZE * sizeof(struct pcn_kmsg_reverse_message));
407         int i;
408         for(i=0;i<PCN_KMSG_RBUF_SIZE;i++){
409                 window->buffer[i].last_ticket=i-PCN_KMSG_RBUF_SIZE;
410                 window->buffer[i].ready=0;
411         }
412         memset(&window->second_buffer, 0,
413                        PCN_KMSG_RBUF_SIZE * sizeof(int));
414
415         window->int_enabled = 1;
416         return 0;
417 }
418
419 extern unsigned long orig_boot_params;
420
421 static int send_checkin_msg(unsigned int cpu_to_add, unsigned int to_cpu)
422 {
423         int rc;
424         struct pcn_kmsg_checkin_message msg;
425
426         msg.hdr.type = PCN_KMSG_TYPE_CHECKIN;
427         msg.hdr.prio = PCN_KMSG_PRIO_HIGH;
428         msg.window_phys_addr = rkinfo->phys_addr[my_cpu];
429         msg.cpu_to_add = cpu_to_add;
430
431         rc = pcn_kmsg_send(to_cpu, (struct pcn_kmsg_message *) &msg);
432
433         if (rc) {
434                 KMSG_ERR("Failed to send checkin message, rc = %d\n", rc);
435                 return rc;
436         }
437
438         return 0;
439 }
440
441 static int do_checkin(void)
442 {
443         int rc = 0;
444         int i;
445
446         for (i = 0; i < POPCORN_MAX_CPUS; i++) {
447                 if (i == my_cpu) {
448                         continue;
449                 }
450
451                 if (rkinfo->phys_addr[i]) {
452                         rkvirt[i] = ioremap_cache(rkinfo->phys_addr[i],
453                                                   ROUND_PAGE_SIZE(sizeof(struct pcn_kmsg_window)));
454
455                         if (rkvirt[i]) {
456                                 KMSG_INIT("ioremapped CPU %d's window, virt addr 0x%p\n", 
457                                           i, rkvirt[i]);
458                         } else {
459                                 KMSG_ERR("Failed to ioremap CPU %d's window at phys addr 0x%lx\n",
460                                          i, rkinfo->phys_addr[i]);
461                                 return -1;
462                         }
463
464                         KMSG_INIT("Sending checkin message to kernel %d\n", i);                 
465                         rc = send_checkin_msg(my_cpu, i);
466                         if (rc) {
467                                 KMSG_ERR("POPCORN: Checkin failed for CPU %d!\n", i);
468                                 return rc;
469                         }
470                 }
471         }
472
473         return rc;
474 }
475
476 static int pcn_read_proc(char *page, char **start, off_t off, int count, int *eof, void *data)
477 {
478         char *p= page;
479     int len, i, idx;
480
481         p += sprintf(p, "get: %ld\n", msg_get);
482         p += sprintf(p, "put: %ld\n", msg_put);
483     for (i =0; i<(SIZE_RANGES +1); i++)
484       p +=sprintf (p,"%u: %u\n", large_message_sizes[i], large_message_count[i]);
485     
486     for (i =0; i<PCN_KMSG_TYPE_MAX; i++)
487       p +=sprintf (p, "t%u: %u\n", i, type_message_count[i]);
488     
489     idx = log_r_index;
490     for (i =0; i>-LOGLEN; i--)
491         p +=sprintf (p,"r%d: from%d type%d %1d:%1d:%1d seq%d\n",
492                         (idx+i),(int) log_receive[(idx+i)%LOGLEN].from_cpu, (int)log_receive[(idx+i)%LOGLEN].type,
493                         (int) log_receive[(idx+i)%LOGLEN].is_lg_msg, (int)log_receive[(idx+i)%LOGLEN].lg_start,
494                         (int) log_receive[(idx+i)%LOGLEN].lg_end, (int) log_receive[(idx+i)%LOGLEN].lg_seqnum );
495
496     idx = log_s_index;
497     for (i =0; i>-LOGLEN; i--)
498         p +=sprintf (p,"s%d: from%d type%d %1d:%1d:%1d seq%d\n",
499                         (idx+i),(int) log_send[(idx+i)%LOGLEN].from_cpu, (int)log_send[(idx+i)%LOGLEN].type,
500                         (int) log_send[(idx+i)%LOGLEN].is_lg_msg, (int)log_send[(idx+i)%LOGLEN].lg_start,
501                         (int) log_send[(idx+i)%LOGLEN].lg_end, (int) log_send[(idx+i)%LOGLEN].lg_seqnum );
502 /*
503     idx = log_f_index;
504         for (i =0; i>-LOGCALL; i--)
505                 p +=sprintf (p,"f%d: %pB\n",
506                                 (idx+i),(void*) log_function_called[(idx+i)%LOGCALL] );
507
508     idx = log_f_sendindex;
509         for (i =0; i>-LOGCALL; i--)
510                 p +=sprintf (p,"[s%d]->: %pB\n",
511                                 (idx+i),(void*) log_function_send[(idx+i)%LOGCALL] );
512
513         for(i=0; i<PCN_KMSG_RBUF_SIZE; i++)
514                 p +=sprintf (p,"second_buffer[%i]=%i\n",i,rkvirt[my_cpu]->second_buffer[i]);
515 */
516
517         len = (p -page) - off;
518         if (len < 0)
519                 len = 0;
520         *eof = (len <= count) ? 1 : 0;
521         *start = page + off;
522         return len;
523 }
524
525 static int pcn_write_proc (struct file *file, const char __user *buffer, unsigned long count, void *data)
526 {
527   int i;
528         msg_get=0;
529         msg_put=0;
530         memset(large_message_count, 0, sizeof(int)*(SIZE_RANGES +1));
531         memset(large_message_sizes, 0, sizeof(int)*(SIZE_RANGES +1));
532         for (i=0; i<SIZE_RANGES; i++)
533           large_message_sizes[i] = ((i+1)*PCN_KMSG_PAYLOAD_SIZE);
534         large_message_sizes[SIZE_RANGES] = ~0;
535         memset(type_message_count, 0, sizeof(int)*PCN_KMSG_TYPE_MAX);
536         return count;
537 }
538
539 static int __init pcn_kmsg_init(void)
540 {
541         int rc,i;
542         unsigned long win_phys_addr, rkinfo_phys_addr;
543         struct pcn_kmsg_window *win_virt_addr;
544         struct boot_params *boot_params_va;
545
546         KMSG_INIT("entered\n");
547
548         my_cpu = raw_smp_processor_id();
549         
550         printk("%s: THIS VERSION DOES NOT SUPPORT CACHE ALIGNED BUFFERS\n",
551                __func__);
552         printk("%s: Entered pcn_kmsg_init raw: %d id: %d\n",
553                 __func__, my_cpu, smp_processor_id());
554
555         /* Initialize list heads */
556         INIT_LIST_HEAD(&msglist_hiprio);
557         INIT_LIST_HEAD(&msglist_normprio);
558
559         /* Clear out large-message receive buffers */
560         //memset(&lg_buf, 0, POPCORN_MAX_CPUS * sizeof(unsigned char *));
561         for(i=0; i<POPCORN_MAX_CPUS; i++) {
562                 INIT_LIST_HEAD(&(lg_buf[i]));
563         }
564         long_id=0;
565
566
567         /* Clear callback table and register default callback functions */
568         KMSG_INIT("Registering initial callbacks...\n");
569         memset(&callback_table, 0, PCN_KMSG_TYPE_MAX * sizeof(pcn_kmsg_cbftn));
570         rc = pcn_kmsg_register_callback(PCN_KMSG_TYPE_CHECKIN, 
571                                         &pcn_kmsg_checkin_callback);
572         if (rc) {
573                 printk(KERN_ALERT"Failed to register initial kmsg checkin callback!\n");
574         }
575
576 #ifdef PCN_SUPPORT_MULTICAST
577         rc = pcn_kmsg_register_callback(PCN_KMSG_TYPE_MCAST, 
578                                         &pcn_kmsg_mcast_callback);
579         if (rc) {
580                 printk(KERN_ALERT"Failed to register initial kmsg mcast callback!\n");
581         }
582 #endif /* PCN_SUPPORT_MULTICAST */      
583
584         /* Register softirq handler now kworker */
585         KMSG_INIT("Registering softirq handler...\n");
586         //open_softirq(PCN_KMSG_SOFTIRQ, pcn_kmsg_action);
587         messaging_wq= create_workqueue("messaging_wq");
588         if (!messaging_wq) 
589                 printk("%s: create_workqueue(messaging_wq) ret 0x%lx ERROR\n",
590                         __func__, (unsigned long)messaging_wq);
591
592         /* Initialize work queue */
593         KMSG_INIT("Initializing workqueue...\n");
594         kmsg_wq = create_workqueue("kmsg_wq");
595         if (!kmsg_wq)
596                 printk("%s: create_workqueue(kmsg_wq) ret 0x%lx ERROR\n",
597                         __func__, (unsigned long)kmsg_wq);
598
599                 
600         /* If we're the master kernel, malloc and map the rkinfo structure and 
601            put its physical address in boot_params; otherwise, get it from the 
602            boot_params and map it */
603         if (!mklinux_boot) {
604                 /* rkinfo must be multiple of a page, because the granularity of
605                  * foreings mapping is per page. The following didn't worked,
606                  * the returned address is on the form 0xffff88000000, ioremap
607                  * on the remote fails. 
608                 int order = get_order(sizeof(struct pcn_kmsg_rkinfo));
609                 rkinfo = __get_free_pages(GFP_KERNEL, order);
610                 */
611                 KMSG_INIT("Primary kernel, mallocing rkinfo size:%d rounded:%d\n",
612                        sizeof(struct pcn_kmsg_rkinfo), ROUND_PAGE_SIZE(sizeof(struct pcn_kmsg_rkinfo)));
613                 rkinfo = kmalloc(ROUND_PAGE_SIZE(sizeof(struct pcn_kmsg_rkinfo)), GFP_KERNEL);
614                 if (!rkinfo) {
615                         KMSG_ERR("Failed to malloc rkinfo structure!\n");
616                         return -1;
617                 }
618                 memset(rkinfo, 0x0, sizeof(struct pcn_kmsg_rkinfo));
619                 rkinfo_phys_addr = virt_to_phys(rkinfo);
620                 KMSG_INIT("rkinfo virt %p, phys 0x%lx MAX_CPUS %d\n", 
621                           rkinfo, rkinfo_phys_addr, POPCORN_MAX_CPUS);
622
623                 /* Otherwise, we need to set the boot_params to show the rest
624                    of the kernels where the master kernel's messaging window 
625                    is. */
626                 KMSG_INIT("Setting boot_params...\n");
627                 boot_params_va = (struct boot_params *) 
628                         (0xffffffff80000000 + orig_boot_params);
629                 boot_params_va->pcn_kmsg_master_window = rkinfo_phys_addr;
630                 KMSG_INIT("boot_params virt %p phys %p\n",
631                         boot_params_va, orig_boot_params);
632                 
633                 KMSG_INIT("LOOPS %ld, LOOPS_JIFFIES %ld, 1nsec %ld, 1usec %ld, 1msec %ld, 1jiffies %d %d\n",
634                           MAX_LOOPS, MAX_LOOPS_JIFFIES, nsecs_to_jiffies(1), usecs_to_jiffies(1), msecs_to_jiffies(1),
635                           jiffies_to_msecs(1), jiffies_to_usecs(1));
636         }
637         else {
638                 KMSG_INIT("Primary kernel rkinfo phys addr: 0x%lx\n", 
639                           (unsigned long) boot_params.pcn_kmsg_master_window);
640                 rkinfo_phys_addr = boot_params.pcn_kmsg_master_window;
641                 
642                 rkinfo = ioremap_cache(rkinfo_phys_addr, ROUND_PAGE_SIZE(sizeof(struct pcn_kmsg_rkinfo)));
643                 if (!rkinfo) {
644                         KMSG_ERR("Failed to map rkinfo from master kernel!\n");
645                 }
646                 KMSG_INIT("rkinfo virt addr: 0x%p\n", rkinfo);
647         }
648
649         /* Malloc our own receive buffer and set it up */
650         win_virt_addr = kmalloc(ROUND_PAGE_SIZE(sizeof(struct pcn_kmsg_window)), GFP_KERNEL);
651         if (win_virt_addr) {
652                 KMSG_INIT("Allocated %ld(%ld) bytes for my win, virt addr 0x%p\n", 
653                           ROUND_PAGE_SIZE(sizeof(struct pcn_kmsg_window)),
654                           sizeof(struct pcn_kmsg_window), win_virt_addr);
655         } else {
656                 KMSG_ERR("%s: Failed to kmalloc kmsg recv window!\n", __func__);
657                 return -1;
658         }
659
660         rkvirt[my_cpu] = win_virt_addr;
661         win_phys_addr = virt_to_phys((void *) win_virt_addr);
662         KMSG_INIT("cpu %d physical address: 0x%lx\n", my_cpu, win_phys_addr);
663         rkinfo->phys_addr[my_cpu] = win_phys_addr;
664
665         rc = pcn_kmsg_window_init(rkvirt[my_cpu]);
666         if (rc) {
667                 KMSG_ERR("Failed to initialize kmsg recv window!\n");
668                 return -1;
669         }
670
671         /* If we're not the master kernel, we need to check in */
672         if (mklinux_boot) {
673                 rc = do_checkin();
674
675                 if (rc) { 
676                         KMSG_ERR("Failed to check in!\n");
677                         return -1;
678                 }
679         } 
680
681         /* proc interface for debugging and stats */
682         memset(large_message_count, 0, sizeof(int)*(SIZE_RANGES +1));
683         memset(large_message_sizes, 0, sizeof(int)*(SIZE_RANGES +1));
684         for (i=0; i<SIZE_RANGES; i++)
685           large_message_sizes[i] = ((i+1)*PCN_KMSG_PAYLOAD_SIZE);
686         large_message_sizes[SIZE_RANGES] = ~0;
687         memset(type_message_count, 0, sizeof(int)*PCN_KMSG_TYPE_MAX);
688
689         memset(log_receive,0,sizeof(struct pcn_kmsg_hdr)*LOGLEN);
690         memset(log_send,0,sizeof(struct pcn_kmsg_hdr)*LOGLEN);
691         memset(log_function_called,0,sizeof(void*)*LOGCALL);
692         memset(log_function_send,0,sizeof(void*)*LOGCALL);
693         /* if everything is ok create a proc interface */
694         struct proc_dir_entry *res;
695         res = create_proc_entry("pcnmsg", S_IRUGO, NULL);
696         if (!res) {
697                 printk(KERN_ALERT"%s: create_proc_entry failed (%p)\n", __func__, res);
698                 return -ENOMEM;
699         }
700         res->read_proc = pcn_read_proc;
701         res->write_proc = pcn_write_proc;
702
703         return 0;
704 }
705
706 subsys_initcall(pcn_kmsg_init);
707
708 /* Register a callback function when a kernel module is loaded */
709 int pcn_kmsg_register_callback(enum pcn_kmsg_type type, pcn_kmsg_cbftn callback)
710 {
711         PCN_WARN("%s: registering callback for type %d, ptr 0x%p\n", __func__, type, callback);
712
713         if (type >= PCN_KMSG_TYPE_MAX) {
714                 printk(KERN_ALERT"Attempted to register callback with bad type %d\n", 
715                          type);
716                 return -1;
717         }
718
719         callback_table[type] = callback;
720
721         return 0;
722 }
723
724 /* Unregister a callback function when a kernel module is unloaded */
725 int pcn_kmsg_unregister_callback(enum pcn_kmsg_type type)
726 {
727         if (type >= PCN_KMSG_TYPE_MAX) {
728                 KMSG_ERR("Attempted to register callback with bad type %d\n", 
729                          type);
730                 return -1;
731         }
732
733         callback_table[type] = NULL;
734
735         return 0;
736 }
737
738 /* SENDING / MARSHALING */
739
740 unsigned long int_ts;
741
742 static int __pcn_kmsg_send(unsigned int dest_cpu, struct pcn_kmsg_message *msg,
743                            int no_block)
744 {
745         int rc;
746         struct pcn_kmsg_window *dest_window;
747
748         if (unlikely(dest_cpu >= POPCORN_MAX_CPUS)) {
749                 KMSG_ERR("Invalid destination CPU %d\n", dest_cpu);
750                 return -1;
751         }
752
753         dest_window = rkvirt[dest_cpu];
754
755         if (unlikely(!rkvirt[dest_cpu])) {
756                 //KMSG_ERR("Dest win for CPU %d not mapped!\n", dest_cpu);
757                 return -1;
758         }
759
760         if (unlikely(!msg)) {
761                 KMSG_ERR("Passed in a null pointer to msg!\n");
762                 return -1;
763         }
764
765         /* set source CPU */
766         msg->hdr.from_cpu = my_cpu;
767
768         rc = win_put(dest_window, msg, no_block);
769 type_message_count[msg->hdr.type]++;
770
771         if (rc) {
772                 if (no_block && (rc == EAGAIN)) {
773                         return rc;
774                 }
775                 KMSG_ERR("Failed to place message in dest win!\n");
776                 return -1;
777         }
778
779
780         /* send IPI */
781         if (win_int_enabled(dest_window)) {
782                 KMSG_PRINTK("Interrupts enabled; sending IPI...\n");
783                 rdtscll(int_ts);
784                 apic->send_IPI_single(dest_cpu, POPCORN_KMSG_VECTOR);
785         } else {
786                 KMSG_PRINTK("Interrupts not enabled; not sending IPI...\n");
787         }
788
789
790         return 0;
791 }
792
793 int pcn_kmsg_send(unsigned int dest_cpu, struct pcn_kmsg_message *msg)
794 {
795         unsigned long bp;
796         get_bp(bp);
797         log_function_send[log_f_sendindex%LOGCALL]= callback_table[msg->hdr.type];
798         log_f_sendindex++;
799         msg->hdr.is_lg_msg = 0;
800         msg->hdr.lg_start = 0;
801         msg->hdr.lg_end = 0;
802         msg->hdr.lg_seqnum = 0;
803         msg->hdr.long_number= 0;
804
805         return __pcn_kmsg_send(dest_cpu, msg, 0);
806 }
807
808 int pcn_kmsg_send_noblock(unsigned int dest_cpu, struct pcn_kmsg_message *msg)
809 {
810
811         msg->hdr.is_lg_msg = 0;
812         msg->hdr.lg_start = 0;
813         msg->hdr.lg_end = 0;
814         msg->hdr.lg_seqnum = 0;
815         msg->hdr.long_number= 0;
816
817         return __pcn_kmsg_send(dest_cpu, msg, 1);
818 }
819
820 int pcn_kmsg_send_long(unsigned int dest_cpu, 
821                        struct pcn_kmsg_long_message *lmsg, 
822                        unsigned int payload_size)
823 {
824         int i, ret =0;
825         int num_chunks = payload_size / PCN_KMSG_PAYLOAD_SIZE;
826         struct pcn_kmsg_message this_chunk;
827
828         if (payload_size % PCN_KMSG_PAYLOAD_SIZE) {
829                 num_chunks++;
830         }
831
832          if ( num_chunks >= MAX_CHUNKS ){
833                  KMSG_PRINTK("Message too long (size:%d, chunks:%d, max:%d) can not be transferred\n",
834                         payload_size, num_chunks, MAX_CHUNKS);
835                 return -1;
836          }
837
838         KMSG_PRINTK("Sending large message to CPU %d, type %d, payload size %d bytes, %d chunks\n", 
839                     dest_cpu, lmsg->hdr.type, payload_size, num_chunks);
840
841         this_chunk.hdr.type = lmsg->hdr.type;
842         this_chunk.hdr.prio = lmsg->hdr.prio;
843         this_chunk.hdr.is_lg_msg = 1;
844         this_chunk.hdr.long_number= fetch_and_add(&long_id,1);
845
846         for (i = 0; i < num_chunks; i++) {
847                 KMSG_PRINTK("Sending chunk %d\n", i);
848
849                 this_chunk.hdr.lg_start = (i == 0) ? 1 : 0;
850                 this_chunk.hdr.lg_end = (i == num_chunks - 1) ? 1 : 0;
851                 this_chunk.hdr.lg_seqnum = (i == 0) ? num_chunks : i;
852
853                 memcpy(&this_chunk.payload, 
854                        ((unsigned char *) &lmsg->payload) + 
855                        i * PCN_KMSG_PAYLOAD_SIZE, 
856                        PCN_KMSG_PAYLOAD_SIZE);
857
858                 ret=__pcn_kmsg_send(dest_cpu, &this_chunk, 0);
859                 if(ret!=0)
860                         return ret;
861         }
862         
863         /* statistics */
864         num_chunks = payload_size / PCN_KMSG_PAYLOAD_SIZE;
865         large_message_count[((num_chunks < SIZE_RANGES) ? num_chunks : SIZE_RANGES)]++;
866
867         return 0;
868 }
869
870 /* RECEIVING / UNMARSHALING */
871
872 static int process_message_list(struct list_head *head) 
873 {
874         int rc, rc_overall = 0;
875         struct pcn_kmsg_container *pos = NULL, *n = NULL;
876         struct pcn_kmsg_message *msg;
877
878         list_for_each_entry_safe(pos, n, head, list) {
879                 msg = &pos->msg;
880
881                 KMSG_PRINTK("Item in list, type %d,  processing it...\n", 
882                             msg->hdr.type);
883
884                 list_del(&pos->list);
885
886                 if (unlikely(msg->hdr.type >= PCN_KMSG_TYPE_MAX || 
887                              !callback_table[msg->hdr.type])) {
888                         KMSG_ERR("Invalid type %d; continuing!\n", 
889                                  msg->hdr.type);
890                         continue;
891                 }
892
893                 rc = callback_table[msg->hdr.type](msg);
894                 if (!rc_overall) {
895                         rc_overall = rc;
896                 }
897                 //log_function_called[log_f_index%LOGLEN]= callback_table[msg->hdr.type];
898                 //memcpy(&(log_function_called[log_f_index%LOGCALL]),&(callback_table[msg->hdr.type]),sizeof(void*));
899                 log_function_called[log_f_index%LOGCALL]= callback_table[msg->hdr.type];
900                 log_f_index++;
901                 /* NOTE: callback function is responsible for freeing memory
902                    that was kmalloced! */
903         }
904
905         return rc_overall;
906 }
907
908 //void pcn_kmsg_do_tasklet(unsigned long);
909 //DECLARE_TASKLET(pcn_kmsg_tasklet, pcn_kmsg_do_tasklet, 0);
910
911 unsigned volatile long isr_ts = 0, isr_ts_2 = 0;
912
913 /* top half */
914 void smp_popcorn_kmsg_interrupt(struct pt_regs *regs)
915 {
916         //if (!isr_ts) {
917                 rdtscll(isr_ts);
918         //}
919
920         ack_APIC_irq();
921
922         KMSG_PRINTK("Reached Popcorn KMSG interrupt handler!\n");
923
924         inc_irq_stat(irq_popcorn_kmsg_count);
925         irq_enter();
926
927         /* We do as little work as possible in here (decoupling notification 
928            from messaging) */
929
930         /* disable further interrupts for now */
931         win_disable_int(rkvirt[my_cpu]);
932
933         //if (!isr_ts_2) {
934         rdtscll(isr_ts_2);
935         //}
936
937         /* schedule bottom half */
938         //__raise_softirq_irqoff(PCN_KMSG_SOFTIRQ);
939         struct work_struct* kmsg_work = kmalloc(sizeof(struct work_struct), GFP_ATOMIC);
940         if (kmsg_work) {
941                 INIT_WORK(kmsg_work,pcn_kmsg_action);
942                 queue_work(messaging_wq, kmsg_work);
943         } else {
944                 KMSG_ERR("Failed to kmalloc work structure!\n");
945         }
946         //tasklet_schedule(&pcn_kmsg_tasklet);
947
948         irq_exit();
949         return;
950 }
951
952 static int msg_add_list(struct pcn_kmsg_container *ctr)
953 {
954         int rc = 0;
955
956         switch (ctr->msg.hdr.prio) {
957                 case PCN_KMSG_PRIO_HIGH:
958                         KMSG_PRINTK("%s: Adding to high-priority list...\n", __func__);
959                         list_add_tail(&(ctr->list),
960                                       &msglist_hiprio);
961                         break;
962
963                 case PCN_KMSG_PRIO_NORMAL:
964                         KMSG_PRINTK("%s: Adding to normal-priority list...\n", __func__);
965                         list_add_tail(&(ctr->list),
966                                       &msglist_normprio);
967                         break;
968
969                 default:
970                         KMSG_ERR("%s: Priority value %d unknown -- THIS IS BAD!\n", __func__,
971                                   ctr->msg.hdr.prio);
972                         rc = -1;
973         }
974
975         return rc;
976 }
977
978 static int process_large_message(struct pcn_kmsg_reverse_message *msg)
979 {
980         int rc = 0;
981         int recv_buf_size;
982         struct pcn_kmsg_long_message *lmsg;
983         int work_done = 0;
984         struct pcn_kmsg_container* container_long=NULL, *n=NULL;
985
986         KMSG_PRINTK("Got a large message fragment, type %u, from_cpu %u, start %u, end %u, seqnum %u!\n",
987                     msg->hdr.type, msg->hdr.from_cpu,
988                     msg->hdr.lg_start, msg->hdr.lg_end,
989                     msg->hdr.lg_seqnum);
990
991         if (msg->hdr.lg_start) {
992                 KMSG_PRINTK("Processing initial message fragment...\n");
993
994                 if (!msg->hdr.lg_seqnum)
995                   printk(KERN_ALERT"%s: ERROR lg_seqnum is zero:%d long_number:%ld\n",
996                       __func__, (int)msg->hdr.lg_seqnum, (long)msg->hdr.long_number);
997                   
998                 // calculate the size of the holding buffer
999                 recv_buf_size = sizeof(struct list_head) + 
1000                         sizeof(struct pcn_kmsg_hdr) + 
1001                         msg->hdr.lg_seqnum * PCN_KMSG_PAYLOAD_SIZE;
1002 #undef BEN_VERSION
1003 #ifdef BEN_VERSION              
1004                 lg_buf[msg->hdr.from_cpu] = kmalloc(recv_buf_size, GFP_ATOMIC);
1005                 if (!lg_buf[msg->hdr.from_cpu]) {
1006                                         KMSG_ERR("Unable to kmalloc buffer for incoming message!\n");
1007                                         goto out;
1008                                 }
1009                 lmsg = (struct pcn_kmsg_long_message *) &lg_buf[msg->hdr.from_cpu]->msg;
1010 #else /* BEN_VERSION */
1011                 container_long= kmalloc(recv_buf_size, GFP_ATOMIC);
1012                 if (!container_long) {
1013                         KMSG_ERR("Unable to kmalloc buffer for incoming message!\n");
1014                         goto out;
1015                 }
1016                 lmsg = (struct pcn_kmsg_long_message *) &container_long->msg; //TODO wrong cast!
1017 #endif /* !BEN_VERSION */
1018
1019                 /* copy header first */
1020                 memcpy((unsigned char *) &lmsg->hdr, 
1021                        &msg->hdr, sizeof(struct pcn_kmsg_hdr));
1022                 /* copy first chunk of message */
1023                 memcpy((unsigned char *) &lmsg->payload,
1024                        &msg->payload, PCN_KMSG_PAYLOAD_SIZE);
1025
1026                 if (msg->hdr.lg_end) {
1027                         KMSG_PRINTK("NOTE: Long message of length 1 received; this isn't efficient!\n");
1028
1029                         /* add to appropriate list */
1030 #ifdef BEN_VERSION                      
1031                         rc = msg_add_list(lg_buf[msg->hdr.from_cpu]);
1032 #else /* BEN_VERSION */
1033                         rc = msg_add_list(container_long);
1034 #endif /* !BEN_VERSION */
1035                         if (rc)
1036                                 KMSG_ERR("Failed to add large message to list!\n");
1037                         work_done = 1;
1038                 }
1039 #ifndef BEN_VERSION             
1040                 else
1041                   // add the message in the lg_buf
1042                   list_add_tail(&container_long->list, &lg_buf[msg->hdr.from_cpu]);
1043 #endif /* !BEN_VERSION */
1044         }
1045         else {
1046                 KMSG_PRINTK("Processing subsequent message fragment...\n");
1047
1048                 //It should not be needed safe
1049                 list_for_each_entry_safe(container_long, n, &lg_buf[msg->hdr.from_cpu], list) {
1050                         if ( (container_long != NULL) &&
1051                           (container_long->msg.hdr.long_number == msg->hdr.long_number) )
1052                                 // found!
1053                                 goto next;
1054                 }
1055
1056                 KMSG_ERR("Failed to find long message %lu in the list of cpu %i!\n",
1057                          msg->hdr.long_number, msg->hdr.from_cpu);
1058                 goto out;
1059
1060 next:           
1061                 lmsg = (struct pcn_kmsg_long_message *) &container_long->msg;
1062                 memcpy((unsigned char *) ((void*)&lmsg->payload) + (PCN_KMSG_PAYLOAD_SIZE * msg->hdr.lg_seqnum),
1063                        &msg->payload, PCN_KMSG_PAYLOAD_SIZE);
1064
1065                 if (msg->hdr.lg_end) {
1066                         KMSG_PRINTK("Last fragment in series...\n");
1067                         KMSG_PRINTK("from_cpu %d, type %d, prio %d\n",
1068                                     lmsg->hdr.from_cpu, lmsg->hdr.type, lmsg->hdr.prio);
1069                         /* add to appropriate list */
1070 #ifdef BEN_VERSION
1071                         rc = msg_add_list(lg_buf[msg->hdr.from_cpu]);
1072 #else /* BEN_VERSION */
1073                         list_del(&container_long->list);
1074                         rc = msg_add_list(container_long);
1075 #endif /* !BEN_VERSION */                       
1076                         if (rc)
1077                                 KMSG_ERR("Failed to add large message to list!\n");
1078                         work_done = 1;
1079                 }
1080         }
1081
1082 out:
1083         return work_done;
1084 }
1085
1086 static int process_small_message(struct pcn_kmsg_reverse_message *msg)
1087 {
1088         int rc = 0, work_done = 1;
1089         struct pcn_kmsg_container *incoming;
1090
1091         /* malloc some memory (don't sleep!) */
1092         incoming = kmalloc(sizeof(struct pcn_kmsg_container), GFP_ATOMIC);
1093         if (unlikely(!incoming)) {
1094                 KMSG_ERR("Unable to kmalloc buffer for incoming message!\n");
1095                 return 0;
1096         }
1097
1098         /* memcpy message from rbuf */
1099         memcpy(&incoming->msg.hdr, &msg->hdr,
1100                sizeof(struct pcn_kmsg_hdr));
1101
1102         memcpy(&incoming->msg.payload, &msg->payload,
1103                PCN_KMSG_PAYLOAD_SIZE);
1104
1105         KMSG_PRINTK("Received message, type %d, prio %d\n",
1106                     incoming->msg.hdr.type, incoming->msg.hdr.prio);
1107
1108         /* add container to appropriate list */
1109         rc = msg_add_list(incoming);
1110
1111         return work_done;
1112 }
1113
1114 static int poll_handler_check=0;
1115 static int pcn_kmsg_poll_handler(void)
1116 {
1117         struct pcn_kmsg_reverse_message *msg;
1118         struct pcn_kmsg_window *win = rkvirt[my_cpu]; // TODO this will not work for clustering
1119         int work_done = 0;
1120
1121         poll_handler_check++;
1122         if (poll_handler_check >1)
1123                 printk("poll_hanlder_check %d concurrent calls not supported\n", poll_handler_check);
1124
1125 pull_msg:
1126         /* Get messages out of the buffer first */
1127         while (! win_get(win, &msg) ) {
1128                 KMSG_PRINTK("got a message!\n");
1129
1130                 /* Special processing for large messages */
1131                 if (msg->hdr.is_lg_msg) {
1132                         KMSG_PRINTK("message is a large message!\n");
1133                         work_done += process_large_message(msg);
1134                 } else {
1135                         KMSG_PRINTK("message is a small message!\n");
1136                         work_done += process_small_message(msg);
1137                 }
1138
1139                 msg->ready = 0;
1140                 pcn_barrier();
1141                 
1142                 fetch_and_add(&win->tail, 1); //win_advance_tail(win);
1143                 
1144                 // NOTE
1145                 // why you need the ready bit if you are incrementing the tail?
1146                 // can we increment the tail before signaling the ready bit?
1147                 // in that way we can support multiple calls to this function
1148         }
1149
1150         win_enable_int(win);
1151         if ( win_inuse(win) ) {
1152                 win_disable_int(win);
1153                 goto pull_msg;
1154         }
1155
1156         poll_handler_check--;
1157         return work_done;
1158 }
1159
1160 unsigned volatile long bh_ts = 0, bh_ts_2 = 0;
1161
1162 // NOTE the following was declared as a bottom half
1163 //static void pcn_kmsg_action(struct softirq_action *h)
1164 static void pcn_kmsg_action(struct work_struct* work)
1165 {
1166         int rc;
1167         int i;
1168         int work_done = 0;
1169
1170         //if (!bh_ts) {
1171                 rdtscll(bh_ts);
1172         //}
1173         KMSG_PRINTK("called\n");
1174
1175         work_done = pcn_kmsg_poll_handler();
1176         KMSG_PRINTK("Handler did %d units of work!\n", work_done);
1177
1178 #ifdef PCN_SUPPORT_MULTICAST    
1179         for (i = 0; i < POPCORN_MAX_MCAST_CHANNELS; i++) {
1180                 if (MCASTWIN(i)) {
1181                         KMSG_PRINTK("mcast win %d mapped, processing it\n", i);
1182                         process_mcast_queue(i);
1183                 }
1184         }
1185         KMSG_PRINTK("Done checking mcast queues; processing messages\n");
1186 #endif /* PCN_SUPPORT_MULTICAST */
1187
1188         //if (!bh_ts_2) {
1189                 rdtscll(bh_ts_2);
1190         //}
1191
1192         /* Process high-priority queue first */
1193         rc = process_message_list(&msglist_hiprio);
1194
1195         if (list_empty(&msglist_hiprio)) {
1196                 KMSG_PRINTK("High-priority queue is empty!\n");
1197         }
1198
1199         /* Then process normal-priority queue */
1200         rc = process_message_list(&msglist_normprio);
1201
1202         kfree(work);
1203
1204         return;
1205 }
1206
1207 #ifdef PCN_SUPPORT_MULTICAST
1208 # include "pcn_kmsg_mcast.h"
1209 #endif /* PCN_SUPPORT_MULTICAST */