Merge remote-tracking branch 'origin/clustered' into aks_dev_clus
authorAkshay Giridhar <akshay87@vt.edu>
Tue, 24 Jun 2014 21:39:59 +0000 (17:39 -0400)
committerAkshay Giridhar <akshay87@vt.edu>
Tue, 24 Jun 2014 21:39:59 +0000 (17:39 -0400)
Conflicts:
fs/exec.c
include/linux/popcorn.h
include/linux/process_server.h
kernel/futex.c
kernel/kinit.c
kernel/process_server.c
kernel/sched.c
pcnmsg/pcn_kmsg.c

14 files changed:
arch/x86/include/asm/unistd_64.h
arch/x86/kernel/apic/io_apic.c
arch/x86/mm/ioremap.c
block/scsi_ioctl.c
drivers/pci/probe.c
drivers/tty/vty.c
fs/exec.c
include/linux/pcn_kmsg.h
include/linux/process_server.h
ipc/Makefile
kernel/kmod.c
kernel/process_server.c
pcnmsg/pcn_kmsg.c
pcnmsg/pcn_kmsg_mcast.h [new file with mode: 0644]

index 7c601f3..d244c59 100644 (file)
@@ -686,6 +686,7 @@ __SYSCALL(__NR_getcpu, sys_getcpu)
 __SYSCALL(__NR_process_vm_readv, sys_process_vm_readv)
 #define __NR_process_vm_writev                 311
 __SYSCALL(__NR_process_vm_writev, sys_process_vm_writev)
+
 #define __NR_multikernel_boot                   312
 __SYSCALL(__NR_multikernel_boot, sys_multikernel_boot)
 #define __NR_get_boot_params_addr               313
@@ -694,10 +695,13 @@ __SYSCALL(__NR_get_boot_params_addr, sys_get_boot_params_addr)
 __SYSCALL(__NR_popcorn_test_kmsg, sys_popcorn_test_kmsg)
 #define __NR_popcorn_test_ipi_latency          315
 __SYSCALL(__NR_popcorn_test_ipi_latency, sys_popcorn_test_ipi_latency)
+
+#ifdef CONFIG_POPCORN_PERF
 #define __NR_popcorn_perf_start     316
 __SYSCALL(__NR_popcorn_perf_start, sys_popcorn_perf_start)
 #define __NR_popcorn_perf_end       317
 __SYSCALL(__NR_popcorn_perf_end, sys_popcorn_perf_end)
+#endif /* CONFIG_POPCORN_PERF */
 
 #ifndef __NO_STUBS
 #define __ARCH_WANT_OLD_READDIR
index 26873ff..95f105b 100644 (file)
@@ -1478,7 +1478,6 @@ static void __init __io_apic_setup_irqs(unsigned int ioapic_idx)
        int idx, node;
        struct io_apic_irq_attr attr;
        unsigned int pin, irq;
-       //char buffer[128];
        node = cpu_to_node(0); // note: node is always node zero! Have sense in single kernel
 
        for (pin = 0; pin < ioapics[ioapic_idx].nr_registers; pin++) {
index 52d241b..a7c4672 100644 (file)
@@ -126,9 +126,8 @@ static void __iomem *__ioremap_caller(resource_size_t phys_addr,
                                prot_val, new_prot_val);
                        goto err_free_memtype;
                }
-               printk(KERN_ERR
-       "%s: ioremap WARN 0x%llx-0x%llx, requested 0x%lx, got 0x%lx\n",
-                       __func__,
+               printk(KERN_WARNING
+       "ioremap WARN 0x%llx-0x%llx, requested 0x%lx, got 0x%lx\n",
                        (unsigned long long)phys_addr,
                        (unsigned long long)(phys_addr + size),
                        prot_val, new_prot_val);
index 688be8a..ed46890 100644 (file)
@@ -720,12 +720,15 @@ int scsi_verify_blk_ioctl(struct block_device *bd, unsigned int cmd)
        default:
                break;
        }
+       
+       if (capable(CAP_SYS_RAWIO))
+         return 0;
 
        /* In particular, rule out all resets and host-specific ioctls.  */
        printk_ratelimited(KERN_WARNING
                           "%s: sending ioctl %x to a partition!\n", current->comm, cmd);
 
-       return capable(CAP_SYS_RAWIO) ? 0 : -ENOTTY;
+       return -ENOTTY;
 }
 EXPORT_SYMBOL(scsi_verify_blk_ioctl);
 
index bfd2123..b1d59d5 100644 (file)
@@ -1232,12 +1232,12 @@ struct pci_dev_blacklist_item {
        unsigned short device;
        unsigned int flags;
 };
-#define BL 16
+#define BL 32
 static struct pci_dev_blacklist_item
-pci_dev_blacklist[BL] = { {0,0,0},{0,0,0},{0,0,0},{0,0,0},
-                                               {0,0,0},{0,0,0},{0,0,0},{0,0,0},
-                                               {0,0,0},{0,0,0},{0,0,0},{0,0,0},
-                                               {0,0,0},{0,0,0},{0,0,0},{0,0,0} };
+pci_dev_blacklist[BL] = { {0,0,0},{0,0,0},{0,0,0},{0,0,0},{0,0,0},{0,0,0},{0,0,0},{0,0,0},
+                          {0,0,0},{0,0,0},{0,0,0},{0,0,0},{0,0,0},{0,0,0},{0,0,0},{0,0,0},
+                          {0,0,0},{0,0,0},{0,0,0},{0,0,0},{0,0,0},{0,0,0},{0,0,0},{0,0,0},
+                          {0,0,0},{0,0,0},{0,0,0},{0,0,0},{0,0,0},{0,0,0},{0,0,0},{0,0,0} };
 static int pci_dev_blacklist_elements = 0;
 
 int pci_dev_list_add(int compatible, char *vendor, char *model,
index cc62320..6c2b10d 100644 (file)
@@ -1,3 +1,10 @@
+/*
+ * Copyright 2012-2014, SSRG VT
+ * original version: Arijit Chattopardiyay
+ * rewritten by: Antonio Barbalace
+ */
+// TODO rewrite the buffering algorithm
+
 #include<linux/module.h>
 #include<linux/init.h>
 #include<linux/kernel.h>
@@ -6,7 +13,6 @@
 #include<linux/spinlock.h>
 #include<linux/highmem.h>
 #include<linux/mm.h>
-#include<asm/io.h>
 #include<linux/sched.h>
 #include<linux/tty.h>
 #include<linux/tty_driver.h>
 #include<linux/spinlock.h>
 #include<linux/types.h>
 
+#include<asm/io.h>
+#include<asm/bug.h>
+
 MODULE_LICENSE("GPL");
 
 #define SUCCESS 0
 #define FAILURE -1
-#define BUF_SIZE 19200
 
+/*
+ * configurables
+ */
 #define NO_OF_DEV 64
-
-/**************************************************/
-/*System Variables*/
-
+#define SHM_SIZE 0x200000
+#define BUF_SIZE ((SHM_SIZE/NO_OF_DEV) - PAGE_SIZE)
+#define READING_INTERVAL 125
+
+#if ( (SHM_SIZE & ~PAGE_MASK) )
+# error "size must be a multiple of the architecture page size"
+#endif
+#if ( (NO_OF_DEV * (BUF_SIZE + PAGE_SIZE)) > SHM_SIZE )
+# error "(NO_OF_DEV * (BUF_SIZE + PAGE_SIZE)) exceeds SHM_SIZE"
+#endif
+
+/*
+ * constants
+ */
 #define MINOR_START_NUMBER 121
-const char *tty_dev_name = "ttty";
-int order = 0;
-const int reading_interval = 200;
-const char tokenizer = '%';
-
-/***************************************************/
+#define VTY_DEV_NAM "vty"
 
-unsigned long long global_poff = 0l;
+/*
+ * System Variables
+ */
+static int order = 0;
+static const char *tty_dev_name = VTY_DEV_NAM;
+static const int reading_interval = READING_INTERVAL;
+static const char tokenizer = '%';
+static unsigned long long global_poff = 0l;
 
 typedef struct vty_desc {
        struct tty_struct * tty;
@@ -44,40 +67,56 @@ typedef struct vty_desc {
        struct list_head list;
 } vty_desc_t;
 
-int index = 0;
 struct list_head current_tty;
 
+/*
+ * _setup_vty_offset
+ * 
+ * fetching the command line argument for configuration.
+ * if the configuration argument is missing the driver will not be loaded.
+ */
 static int __init _setup_vty_offset(char *str)
 {
-       global_poff = simple_strtoull(str,0,16);
-       printk(KERN_ALERT "VTY offset %llx\n",global_poff);
+       global_poff = simple_strtoull(str, 0, 16);
        return 0;
 }
 early_param("vty_offset", _setup_vty_offset);
 
+/*
+ * allocate_shared_memory
+ * 
+ * it maps a physical shared memory area as a matrix communication buffer.
+ * The matrix is NO_OF_DEV*NO_OF_DEV. There is a buffer for each communication
+ * direction.
+ */
 struct ring_buffer {
        char buffer[BUF_SIZE];
        int current_pos;
        rwlock_t lock;
 };
+static struct ring_buffer *ring_buffer_address[NO_OF_DEV][NO_OF_DEV];
 
-struct ring_buffer *ring_buffer_address[NO_OF_DEV][NO_OF_DEV];
-
-static int allocate_shared_memory() {
-
-       unsigned long *poff = 0l;
-       if (global_poff == 0) {
-               poff = 0xc0000000;
-       } else {
-               poff = (unsigned long *) global_poff;
-       }
-
-       size_t size = 0x200000;
-       void *virtual_address[64];
-
-       printk(KERN_ALERT "Poff %llx and %ld\n",poff,size);
-       unsigned long pfn = (long) poff >> PAGE_SHIFT;
-       unsigned long node = -1, nid = -1;
+static int allocate_shared_memory (void)
+{
+       int i, j;
+       void *poff = 0l;
+       void *virtual_address[NO_OF_DEV];
+       unsigned long pfn, pfn_end, node, nid;
+       size_t size = SHM_SIZE;
+       
+       // fetching the pysical address (in shared memory)
+       if (global_poff == 0)
+         return -ENOMEM;
+       
+       poff = (void*) global_poff;
+       pfn = (unsigned long) poff >> PAGE_SHIFT;
+       pfn_end = ((unsigned long) poff + ((unsigned long) size * NO_OF_DEV)) >> PAGE_SHIFT;
+       
+       //buffers do not have to overlap
+       BUG_ON( (sizeof(struct ring_buffer) > (BUF_SIZE + PAGE_SIZE)) );
+       
+       // finding the current node id
+       node = nid = -1;
        for_each_online_node(nid)
        {
                unsigned long start_pfn, end_pfn;
@@ -88,29 +127,34 @@ static int allocate_shared_memory() {
                        break; // node found continue
                }
        }
-
-       int i;
-       for (i = 0; i < 64; ++i) {
-               if (node == -1) { // page never mapped (why?)
+       printk(KERN_INFO"virtualTTY: buffer %ld, rows %ld, columns(cpus) %d, node %ld[0x%lx-0x%lx]\n",
+              sizeof(struct ring_buffer), (long) size, (int)NO_OF_DEV, node,
+              (node!=-1)?node_start_pfn(node):0l, (node!=-1)?node_end_pfn(node):0l);
+       
+       // mapping in all the matrix of possible buffers (per rows)
+       for (i = 0; i < NO_OF_DEV; i++) {
+               if (node == -1) { // page never mapped
                        virtual_address[i] = ioremap_cache(
                                        (resource_size_t)((void *) poff + (i * size)), size);
+                       if (! virtual_address[i])
+                               printk(KERN_ERR"%s: ioremap failed, virtual_address %d is 0x%p\n",
+                                       __func__, i, virtual_address[i]);
                } else {
                        struct page *shared_page;
-                       shared_page = pfn_to_page(pfn);
+                       shared_page = pfn_to_page(pfn + (size >> PAGE_SHIFT)*i);
                        virtual_address[i] = page_address(shared_page);
-                       void * kmap_addr = kmap(shared_page);
-
                }
        }
 
-       int j;
-       for (i = 0; i < 64; ++i) {
+       // mapping each buffer and initialize it
+       for (i = 0; i < NO_OF_DEV; i++) {
                void *p = virtual_address[i];
-               for (j = 0; j < 64; ++j) {
+               for (j = 0; j < NO_OF_DEV; j++) {
                        ring_buffer_address[i][j] = (struct ring_buffer *) ((void *) p
-                                       + (sizeof(struct ring_buffer) * j));
-                               ring_buffer_address[i][j]->current_pos = 0;
-                               rwlock_init(&(ring_buffer_address[i][j]->lock));
+                                       + ((BUF_SIZE + PAGE_SIZE) * j));
+                       
+                       ring_buffer_address[i][j]->current_pos = 0;
+                       rwlock_init(&(ring_buffer_address[i][j]->lock));
                }
        }
 
@@ -122,20 +166,22 @@ struct timer_list read_function_timer;
 
 static void tty_dev_read(long unsigned int time);
 
-int tty_dev_open(struct tty_struct *tty, struct file *flip) {
+int tty_dev_open(struct tty_struct *tty, struct file *flip)
+{
 
        vty_desc_t *tmp = (vty_desc_t *) kmalloc(sizeof(vty_desc_t), GFP_KERNEL);
        tmp->tty = tty;
-       tmp->id = index++;
+       tmp->id = tty->index;
        INIT_LIST_HEAD(&(tmp->list));
-       tty->driver_data = tmp->id;
+       tty->driver_data = (void*)(long)tmp->id;
        list_add(&(tmp->list), &(current_tty));
 
        mod_timer(&read_function_timer, jiffies + msecs_to_jiffies(reading_interval));
        return SUCCESS;
 }
 
-void tty_dev_close(struct tty_struct *tty, struct file *flip) {
+void tty_dev_close(struct tty_struct *tty, struct file *flip)
+{
 
        struct list_head * cur, *n;
        vty_desc_t * curt;
@@ -144,55 +190,68 @@ void tty_dev_close(struct tty_struct *tty, struct file *flip) {
        {
                curt = list_entry(cur, vty_desc_t, list);
 
-               if (curt->tty == tty && curt->id == (int) tty->driver_data) {
+               if (curt->tty == tty &&
+                   curt->id == (int)(long)tty->driver_data) {
                        list_del(cur);
                        kfree(curt);
-                       return 0;
+                       return;
                }
        }
 
        del_timer(&read_function_timer);
 }
 
-int tty_dev_write(struct tty_struct * tty, const unsigned char *buf, int count) {
+int tty_dev_write(struct tty_struct * tty, const unsigned char *buf, int count)
+{
        /**
         * When 0 wants to write to 2 it will write in 2,0
         * 2 wants to read what is written by 0, it will read 2,0
         * */
        int xGrid = tty->index;
        int yGrid = order;
-       if (count > 0 && (ring_buffer_address[xGrid][yGrid] != NULL)) {
+       struct ring_buffer *current_buffer;
+       
+       if ( (count > 0) &&
+           (ring_buffer_address[xGrid][yGrid] != NULL) ) {
+         
                write_lock(&(ring_buffer_address[xGrid][yGrid]->lock));
-               if (ring_buffer_address[xGrid][yGrid]->current_pos
-                               < 0||
-                               ring_buffer_address[xGrid][yGrid]->current_pos >= BUF_SIZE || count > BUF_SIZE) {ring_buffer_address[xGrid][yGrid]->current_pos = 0;
-               printk(KERN_ALERT "Memory Overflow...........\n Resetting the value.....\n");
-               if(count > BUF_SIZE) {
-                       count = BUF_SIZE;
+               if (ring_buffer_address[xGrid][yGrid]->current_pos < 0 ||
+                               ring_buffer_address[xGrid][yGrid]->current_pos >= BUF_SIZE ||
+                               count > BUF_SIZE) {
+                 
+                       ring_buffer_address[xGrid][yGrid]->current_pos = 0;
+                       printk(KERN_ALERT "Memory Overflow...........\n Resetting the value.....\n");
+                       if(count > BUF_SIZE) {
+                               count = BUF_SIZE;
+                       }
                }
-       }
-               struct ring_buffer *current_buffer = ring_buffer_address[xGrid][yGrid];
+               current_buffer = ring_buffer_address[xGrid][yGrid];
                memcpy(&(current_buffer->buffer[current_buffer->current_pos]), buf,
                                count);
                current_buffer->current_pos += count;
                current_buffer->buffer[current_buffer->current_pos] = '\0';
+               
                write_unlock(&(ring_buffer_address[xGrid][yGrid]->lock));
        }
        return count;
 }
 
-void tty_dev_read(long unsigned int time) {
+void tty_dev_read(long unsigned int time)
+{
 
        struct list_head * cur, *n;
        vty_desc_t * curt;
 
        list_for_each_safe(cur, n, &current_tty)        
        {
+               int xGrid, yGrid;
+               struct ring_buffer *my_ring_buf;
+               
                curt = list_entry(cur, vty_desc_t, list);
-
-               int xGrid = order;
-               int yGrid = curt->tty->index;
-               struct ring_buffer *my_ring_buf = ring_buffer_address[xGrid][yGrid];
+               xGrid = order;
+               yGrid = curt->tty->index;
+               
+               my_ring_buf = ring_buffer_address[xGrid][yGrid];
                if (my_ring_buf != NULL) {
                        read_lock(&(ring_buffer_address[xGrid][yGrid]->lock));
                        if (my_ring_buf->current_pos == 0) {
@@ -222,7 +281,8 @@ void tty_dev_read(long unsigned int time) {
 
 }
 
-static int tty_dev_write_room(struct tty_struct *tty) {
+static int tty_dev_write_room(struct tty_struct *tty)
+{
        int xGrid = tty->index;
        int yGrid = order;
        int retVal = BUF_SIZE;
@@ -234,28 +294,35 @@ static int tty_dev_write_room(struct tty_struct *tty) {
        return retVal;
 }
 
-static struct tty_operations tty_dev_operations = { .open = tty_dev_open,
-               .close = tty_dev_close, .write = tty_dev_write, .write_room =
-                               tty_dev_write_room };
+static struct tty_operations tty_dev_operations = {
+       .open = tty_dev_open,
+       .close = tty_dev_close,
+       .write = tty_dev_write,
+       .write_room = tty_dev_write_room
+};
 
 static int __init vty_init(void)
 {
-       printk(KERN_ALERT "Loading MyTTy Driver memory %s\n",__func__);
+       int ret;
        order = smp_processor_id();
-       printk(KERN_ALERT "The order is %d\n",order);
+       printk(KERN_INFO "virtualTTY: cpu %d, phys 0x%llx-0x%llx\n",
+              order, global_poff, global_poff + (SHM_SIZE * NO_OF_DEV));
 
-       allocate_shared_memory();
-       printk(KERN_ALERT "Memory Allocation is successful\n");
+       if ( (ret = allocate_shared_memory()) ) {
+         printk(KERN_ERR "%s: allocate_shared_memory error %d\n",
+                __func__, ret);
+         return FAILURE;
+       }
 
-       printk(KERN_ALERT "My TTY driver Module is loading\n");
        master_tty_driver = alloc_tty_driver(NO_OF_DEV);
-       if(!master_tty_driver) {
-               printk(KERN_ALERT "Allocation of master is failed\n");
-               return FAILURE;
+       if( !master_tty_driver ) {
+         printk(KERN_ERR "%s: allocation of master tty failed %p\n",
+               __func__, master_tty_driver);
+         return FAILURE;
        }
 
        master_tty_driver->owner = THIS_MODULE;
-       master_tty_driver->driver_name="Myttydriver";
+       master_tty_driver->driver_name="vtydriver";
        master_tty_driver->name = tty_dev_name;
        master_tty_driver->major = TTY_MAJOR;
        master_tty_driver->minor_start = MINOR_START_NUMBER;
@@ -274,13 +341,13 @@ static int __init vty_init(void)
        master_tty_driver->init_termios.c_ospeed = 38400;
        tty_set_operations(master_tty_driver, &tty_dev_operations);
 
-       int retval = tty_register_driver(master_tty_driver);
-       if(retval != 0)
-       {
-               printk(KERN_ALERT "Unable to register the device\n");
+       ret = tty_register_driver(master_tty_driver);
+       if(ret != 0) {
+               printk(KERN_ERR "%s: unable to register the tty device %d\n",
+                 __func__, ret);
                return FAILURE;
        }
-       setup_timer(&read_function_timer,tty_dev_read,0);
+       setup_timer(&read_function_timer, tty_dev_read, 0);
 
        INIT_LIST_HEAD(&current_tty);
 
@@ -289,7 +356,8 @@ static int __init vty_init(void)
 
 static void __exit vty_exit(void)
 {
-       printk(KERN_ALERT "Unloading shared memory\n");
+       printk(KERN_INFO "virtualTTY Driver: unloading\n");
+       // TODO not implemented
 }
 
 module_init( vty_init);
index 639cda4..3c76f9f 100644 (file)
--- a/fs/exec.c
+++ b/fs/exec.c
@@ -813,10 +813,7 @@ int kernel_read(struct file *file, loff_t offset,
 
 EXPORT_SYMBOL(kernel_read);
 
-#ifdef PROCESS_SERVER_USE_KMOD
-static 
-#endif
-int exec_mmap(struct mm_struct *mm)
+static int exec_mmap(struct mm_struct *mm)
 {
        struct task_struct *tsk;
        struct mm_struct * old_mm, *active_mm;
@@ -858,10 +855,6 @@ int exec_mmap(struct mm_struct *mm)
        return 0;
 }
 
-#ifndef PROCESS_SERVER_USE_KMOD
-EXPORT_SYMBOL(exec_mmap);
-#endif
-
 /*
  * This function makes sure the current process has its own signal table,
  * so that flush_signal_handlers can later reset the handlers without
@@ -1491,19 +1484,25 @@ static int do_execve_common(const char *filename,
                goto out_files;
 
        retval = prepare_bprm_creds(bprm);
-       if (retval)
+       if (retval) {
+printk("%s: prepare_bprm_creds\n", __func__);
                goto out_free;
+}
 
        retval = check_unsafe_exec(bprm);
-       if (retval < 0)
+       if (retval < 0) {
+printk("%s: check_unsafe_exec\n", __func__);
                goto out_free;
+}
        clear_in_exec = retval;
        current->in_execve = 1;
 
        file = open_exec(filename);
        retval = PTR_ERR(file);
-       if (IS_ERR(file))
+       if (IS_ERR(file)) {
+//printk("%s: open_exec\n", __func__);
                goto out_unmark;
+}
 
        sched_exec();
 
@@ -1512,42 +1511,56 @@ static int do_execve_common(const char *filename,
        bprm->interp = filename;
 
        retval = bprm_mm_init(bprm);
-       if (retval)
+       if (retval) {
+printk("%s: bprm_mm_init\n", __func__);
                goto out_file;
+}
 
        bprm->argc = count(argv, MAX_ARG_STRINGS);
-       if ((retval = bprm->argc) < 0)
+       if ((retval = bprm->argc) < 0) {
+printk("%s: count argv\n", __func__);
                goto out;
+}
 
        bprm->envc = count(envp, MAX_ARG_STRINGS);
-       if ((retval = bprm->envc) < 0)
+       if ((retval = bprm->envc) < 0) {
+printk("%s: count envc\n", __func__);
                goto out;
+}
 
        retval = prepare_binprm(bprm);
-       if (retval < 0)
+       if (retval < 0) {
+printk("%s: prepare_binprm\n", __func__);
                goto out;
+}
 
        retval = copy_strings_kernel(1, &bprm->filename, bprm);
-       if (retval < 0)
+       if (retval < 0) {
+printk("%s: copy_string_kernel\n", __func__);
                goto out;
-#ifdef PROCESS_SERVER_USE_KMOD
+}
+
     if(!current->executing_for_remote) {
-#endif
         bprm->exec = bprm->p;
         retval = copy_strings(bprm->envc, envp, bprm);
-        if (retval < 0)
+        if (retval < 0) {
+printk("%s: copy_strings bprm->envc\n", __func__);
             goto out;
+}
 
         retval = copy_strings(bprm->argc, argv, bprm);
-        if (retval < 0)
+        if (retval < 0) {
+printk("%s: copy_strings bprm->argc\n", __func__);
             goto out;
-#ifdef PROCESS_SERVER_USE_KMOD
-    }
-#endif
+}
+
+     }
 
        retval = search_binary_handler(bprm,regs);
-       if (retval < 0)
+       if (retval < 0) {
+printk("%s: search_binary_handler\n", __func__);
                goto out;
+}
 
        /* execve succeeded */
        current->fs->in_exec = 0;
@@ -2312,3 +2325,4 @@ int dump_seek(struct file *file, loff_t off)
        return ret;
 }
 EXPORT_SYMBOL(dump_seek);
+
index a973c90..4f9f195 100644 (file)
@@ -18,7 +18,6 @@
 /* BOOKKEEPING */
 
 #define POPCORN_MAX_MCAST_CHANNELS 32
-#define LG_SEQNUM_SIZE 7
 
 struct pcn_kmsg_mcast_wininfo {
        volatile unsigned char lock;
@@ -132,24 +131,37 @@ enum pcn_kmsg_prio {
        PCN_KMSG_PRIO_NORMAL
 };
 
+#define __READY_SIZE 1
+#define LG_SEQNUM_SIZE  (8 - __READY_SIZE)
+
 /* Message header */
 struct pcn_kmsg_hdr {
        unsigned int from_cpu   :8; // b0
-
+       
        enum pcn_kmsg_type type :8; // b1
-
+       
        enum pcn_kmsg_prio prio :5; // b2
        unsigned int is_lg_msg  :1;
        unsigned int lg_start   :1;
        unsigned int lg_end     :1;
-       unsigned long long_number;
 
-       unsigned int lg_seqnum  :LG_SEQNUM_SIZE;// b3
-       //volatile unsigned int ready   :1;
+       unsigned long long_number; // b3 .. b10
+       
+       unsigned int lg_seqnum  :LG_SEQNUM_SIZE; // b11
+       unsigned int __ready    :__READY_SIZE;
 }__attribute__((packed));
 
+//#if ( &((struct pcn_kmsg_hdr*)0)->ready != 12 )
+//# error "ready is not the last byte of the struct"
+//#endif
+
+// TODO cache size can be retrieved by the compiler, put it here
+#define CACHE_LINE_SIZE 64
 //#define PCN_KMSG_PAYLOAD_SIZE 60
-#define PCN_KMSG_PAYLOAD_SIZE (64-sizeof(struct pcn_kmsg_hdr))
+#define PCN_KMSG_PAYLOAD_SIZE (CACHE_LINE_SIZE - sizeof(struct pcn_kmsg_hdr))
+
+#define MAX_CHUNKS ((1 << LG_SEQNUM_SIZE) -1)
+#define PCN_KMSG_LONG_PAYLOAD_SIZE (MAX_CHUNKS*PCN_KMSG_PAYLOAD_SIZE)
 
 /* The actual messages.  The expectation is that developers will create their
    own message structs with the payload replaced with their own fields, and then
@@ -162,12 +174,19 @@ struct pcn_kmsg_hdr {
 struct pcn_kmsg_message {
        struct pcn_kmsg_hdr hdr;
        unsigned char payload[PCN_KMSG_PAYLOAD_SIZE];
-}__attribute__((packed)) __attribute__((aligned(64)));
+}__attribute__((packed)) __attribute__((aligned(CACHE_LINE_SIZE)));
+
+struct pcn_kmsg_reverse_message {
+       unsigned char payload[PCN_KMSG_PAYLOAD_SIZE];
+       struct pcn_kmsg_hdr hdr;
+       volatile unsigned long last_ticket;
+       volatile unsigned char ready;
+}__attribute__((packed)) __attribute__((aligned(CACHE_LINE_SIZE)));
 
 /* Struct for sending long messages (>60 bytes payload) */
 struct pcn_kmsg_long_message {
        struct pcn_kmsg_hdr hdr;
-       unsigned char payload[512];
+       unsigned char payload[PCN_KMSG_LONG_PAYLOAD_SIZE];
 }__attribute__((packed));
 
 /* List entry to copy message into and pass around in receiving kernel */
@@ -177,13 +196,6 @@ struct pcn_kmsg_container {
 }__attribute__((packed));
 
 
-struct pcn_kmsg_reverse_message {
-       unsigned char payload[PCN_KMSG_PAYLOAD_SIZE];
-       struct pcn_kmsg_hdr hdr;
-       volatile unsigned char ready;
-       volatile unsigned long last_ticket;
-}__attribute__((packed)) __attribute__((aligned(64)));
-
 
 /* TYPES OF MESSAGES */
 
@@ -193,7 +205,7 @@ struct pcn_kmsg_checkin_message {
        unsigned long window_phys_addr;
        unsigned char cpu_to_add;
        char pad[51];
-}__attribute__((packed)) __attribute__((aligned(64)));
+}__attribute__((packed)) __attribute__((aligned(CACHE_LINE_SIZE)));
 
 
 
@@ -258,7 +270,7 @@ struct pcn_kmsg_mcast_message {
        unsigned int num_members;
        unsigned long window_phys_addr;
        char pad[28];
-}__attribute__((packed)) __attribute__((aligned(64)));
+}__attribute__((packed)) __attribute__((aligned(CACHE_LINE_SIZE)));
 
 struct pcn_kmsg_mcast_window {
        volatile unsigned long head;
index b06b6e8..4e4b736 100644 (file)
@@ -8,6 +8,10 @@
 
 #ifndef _PROCESS_SERVER_H
 #define _PROCESS_SERVER_H
+
+/**
+ * Constants
+ */
 #define RETURN_DISPOSITION_NONE 0
 #define RETURN_DISPOSITION_EXIT 1
 #define RETURN_DISPOSITION_MIGRATE 2
index 46d79b7..7ac8bf4 100644 (file)
@@ -6,7 +6,8 @@ obj-$(CONFIG_SYSVIPC_COMPAT) += compat.o
 obj-$(CONFIG_SYSVIPC) += util.o msgutil.o msg.o sem.o shm.o ipcns_notifier.o syscall.o sem_remote.o shm_remote.o
 obj-$(CONFIG_SYSVIPC_SYSCTL) += ipc_sysctl.o
 obj_mq-$(CONFIG_COMPAT) += compat_mq.o
-obj-$(CONFIG_POSIX_MQUEUE) += mqueue.o msgutil.o $(obj_mq-y) bbuffer.o mbuffer.o mcomm.o
+#obj-$(CONFIG_POSIX_MQUEUE) += mqueue.o msgutil.o $(obj_mq-y) bbuffer.o mbuffer.o mcomm.o
+obj-$(CONFIG_POSIX_MQUEUE) += mqueue.o msgutil.o $(obj_mq-y)
 obj-$(CONFIG_IPC_NS) += namespace.o
 obj-$(CONFIG_POSIX_MQUEUE_SYSCTL) += mq_sysctl.o
 
index c1ad7aa..021e6c3 100644 (file)
@@ -203,19 +203,17 @@ static int ____call_usermodehelper(void *data)
         current->origin_pid =sub_info->origin_pid;
 
         // Notify of PID/PID pairing.
-        process_server_notify_delegated_subprocess_starting(current->pid,sub_info->remote_pid,sub_info->remote_cpu);
+        process_server_notify_delegated_subprocess_starting(
+               current->pid,sub_info->remote_pid,sub_info->remote_cpu);
     } 
 
        retval = kernel_execve(sub_info->path,
                               (const char *const *)sub_info->argv,
                               (const char *const *)sub_info->envp);
     
-
        /* Exec failed? */
 fail:
-
-    printk("kmod exec failed retval{%d}\n",retval);
-
+       printk("%s: failed retval{%d}\n", __func__, retval);
        sub_info->retval = retval;
        do_exit(0);
 }
index 067bb30..f7fb5d0 100644 (file)
@@ -1,4 +1,4 @@
-/**
+ /**
  * Implements task migration and maintains coherent 
  * address spaces across CPU cores.
  *
@@ -130,8 +130,7 @@ extern sys_topen(const char __user * filename, int flags, int mode, int fd);
 /**
  * Perf
  */
-#define MEASURE_PERF 1
-#if MEASURE_PERF
+#ifdef CONFIG_POPCORN_PERF
 #define PERF_INIT() perf_init()
 #define PERF_MEASURE_START(x) perf_measure_start(x)
 #define PERF_MEASURE_STOP(x,y,z)  perf_measure_stop(x,y,z)
@@ -245,8 +244,7 @@ static void perf_init(void) {
            "handle_mprotect_resonse");
 
 }
-
-#else
+#else /* CONFIG_POPCORN_PERF */
 #define PERF_INIT() 
 #define PERF_MEASURE_START(x) -1
 #define PERF_MEASURE_STOP(x, y, z)
@@ -255,16 +253,12 @@ static void perf_init(void) {
 
 static DECLARE_WAIT_QUEUE_HEAD( countq);
 
-/**
- * Constants
- */
-#define RETURN_DISPOSITION_EXIT 0
-#define RETURN_DISPOSITION_MIGRATE 1
-
 /**
  * Library
  */
 
+#define POPCORN_MAX_PATH 512
+
 /**
  * Some piping for linking data entries
  * and identifying data entry types.
@@ -997,7 +991,6 @@ static int cpu_has_known_tgroup_mm(int cpu)
     _remote_cpu_info_list_t *objPtr;
     struct cpumask *pcpum =0;
     int cpuid =-1;
-extern struct list_head rlist_head;
     if (cpumask_test_cpu(cpu, cpu_present_mask))
        return 1;
     list_for_each(iter, &rlist_head) {
@@ -1389,12 +1382,13 @@ int find_consecutive_physically_mapped_region(struct mm_struct* mm,
                                               unsigned long vaddr,
                                               unsigned long* vaddr_mapping_start,
                                               unsigned long* paddr_mapping_start,
-                                              size_t* paddr_mapping_sz) {
-    unsigned long paddr_curr = NULL;
+                                              size_t* paddr_mapping_sz)
+{
+    unsigned long paddr_curr = 0l;
     unsigned long vaddr_curr = vaddr;
     unsigned long vaddr_next = vaddr;
-    unsigned long paddr_next = NULL;
-    unsigned long paddr_start = NULL;
+    unsigned long paddr_next = 0l;
+    unsigned long paddr_start = 0l;
     size_t sz = 0;
 
     
@@ -2327,6 +2321,7 @@ static void destroy_clone_data(clone_data_t* data) {
     kfree(data);
 }
 
+#if 0
 /**
  * @brief Finds a vma_data_t entry.
  */
@@ -2347,6 +2342,7 @@ static vma_data_t* find_vma_data(clone_data_t* clone_data, unsigned long addr_st
 
     return ret;
 }
+#endif
 
 /**
  * @brief Callback for page walk that displays the contents of the walk.
@@ -2393,9 +2389,9 @@ static int dump_page_walk_pte_entry_callback(pte_t *pte, unsigned long start,
 /**
  * @brief Displays relevant data within a mm.
  */
-static void dump_mm(struct mm_struct* mm) {
+static void dump_mm(struct mm_struct* mm)
+{
     struct vm_area_struct * curr;
-    char buf[256];
     struct mm_walk walk = {
         .pte_entry = dump_page_walk_pte_entry_callback,
         .mm = mm,
@@ -2689,7 +2685,6 @@ static int count_remote_thread_members(int exclude_t_home_cpu,
     // the list does not include the current processor group descirptor (TODO)
     struct list_head *iter;
     _remote_cpu_info_list_t *objPtr;
-extern struct list_head rlist_head;
     list_for_each(iter, &rlist_head) {
         objPtr = list_entry(iter, _remote_cpu_info_list_t, cpu_list_member);
         i = objPtr->_data._processor;
@@ -2762,7 +2757,8 @@ static int count_local_thread_members(int tgroup_home_cpu,
  * thread group in which the "current" task resides.
  * @return The number of threads.
  */
-static int count_thread_members() {
+static int count_thread_members (void)
+{
      
     int count = 0;
     PSPRINTK("%s: entered\n",__func__);
@@ -2785,7 +2781,7 @@ static int count_thread_members() {
 void process_tgroup_closed_item(struct work_struct* work) {
 
     tgroup_closed_work_t* w = (tgroup_closed_work_t*) work;
-    data_header_t *curr, *next;
+    data_header_t *curr;
     mm_data_t* mm_data;
     struct task_struct *g, *task;
     unsigned char tgroup_closed = 0;
@@ -2955,45 +2951,41 @@ handled:
  *
  * <MEASURED perf_process_mapping_request>
  */
-void process_mapping_request(struct work_struct* work) {
+void process_mapping_request(struct work_struct* work)
+{
     mapping_request_work_t* w = (mapping_request_work_t*) work;
-    mapping_response_t response;
+    mapping_response_t* response;
     data_header_t* data_curr = NULL;
     mm_data_t* mm_data = NULL;
+    
     struct task_struct* task = NULL;
     struct task_struct* g;
     struct vm_area_struct* vma = NULL;
     struct mm_struct* mm = NULL;
+    
     unsigned long address = w->address;
     unsigned long resolved = 0;
     struct mm_walk walk = {
         .pte_entry = vm_search_page_walk_pte_entry_callback,
         .private = &(resolved)
     };
-    char* plpath = NULL;
-    char lpath[512];
+    char *plpath = NULL, *lpath = NULL;
+    int used_saved_mm = 0, found_vma = 1, found_pte = 1; 
     int i;
-    
-    // for perf
-    int used_saved_mm = 0;
-    int found_vma = 1;
-    int found_pte = 1;
-    
-    // Perf start
+
+#ifdef CONFIG_POPCORN_PERF    
+    // for perf 
     int perf = PERF_MEASURE_START(&perf_process_mapping_request);
+#endif /* CONFIG_POPCORN_PERF */    
 
-    //PSPRINTK("%s: entered\n",__func__);
-    PSPRINTK("received mapping request from {%d} address{%lx}, cpu{%d}, id{%d}\n",
-            w->from_cpu,
-            w->address,
-            w->tgroup_home_cpu,
-            w->tgroup_home_id);
+    PSPRINTK("received mapping request from{%d} address{%lx}, cpu{%d}, id{%d}\n",
+            w->from_cpu, w->address, w->tgroup_home_cpu, w->tgroup_home_id);
 
     // First, search through existing processes
     do_each_thread(g,task) {
         if((task->tgroup_home_cpu == w->tgroup_home_cpu) &&
            (task->tgroup_home_id  == w->tgroup_home_id )) {
-            //PSPRINTK("mapping request found common thread group here\n");
+            PSPRINTK("mapping request found common thread group here\n");
             mm = task->mm;
 
             // Take note of the fact that an mm exists on the remote kernel
@@ -3021,14 +3013,25 @@ task_mm_search_exit:
             }
 
             data_curr = data_curr->next;
-
         } // while
-
         PS_SPIN_UNLOCK(&_saved_mm_head_lock);
     }
     
+    response = kmalloc(sizeof(mapping_response_t), GFP_ATOMIC); //TODO convert to alloc_cache
+    if (!response) {
+      printk(KERN_ALERT"can not kmalloc mapping_response_t area from{%d} address{%lx} cpu{%d} id{%d}\n",
+             w->from_cpu, w->address, w->tgroup_home_cpu, w->tgroup_home_id);
+      goto err_work;
+    }
+    lpath = kmalloc(POPCORN_MAX_PATH, GFP_ATOMIC); //TODO convert to alloc_cache
+    if (!lpath) {
+      printk(KERN_ALERT"can not kmalloc lpath area from{%d} address{%lx} cpu{%d} id{%d}\n",
+             w->from_cpu, w->address, w->tgroup_home_cpu, w->tgroup_home_id);
+      goto err_response;
+    }
+    
     // OK, if mm was found, look up the mapping.
-    if(mm) {
+    if (mm) {
 
         // The purpose of this code block is to determine
         // if we need to use a read or write lock, and safely.  
@@ -3058,11 +3061,9 @@ changed_can_be_cow:
         walk_page_range(address & PAGE_MASK, 
                 (address & PAGE_MASK) + PAGE_SIZE, &walk);
 
-        if(vma && resolved != 0) {
-
+        if (vma && resolved != 0) {
             PSPRINTK("mapping found! %lx for vaddr %lx\n",resolved,
                     address & PAGE_MASK);
-
             /*
              * Find regions of consecutive physical memory
              * in this vma, including the faulting address
@@ -3070,7 +3071,7 @@ changed_can_be_cow:
              */
             {
             // Break all cows in this vma
-            if(can_be_cow) {
+            if (can_be_cow) {
                 unsigned long cow_addr;
                 for(cow_addr = vma->vm_start; cow_addr < vma->vm_end; cow_addr += PAGE_SIZE) {
                     break_cow(mm, vma, cow_addr);
@@ -3078,54 +3079,49 @@ changed_can_be_cow:
                 // We no longer need a write lock after the break_cow process
                 // is complete, so downgrade the lock to a read lock.
                 downgrade_write(&mm->mmap_sem);
-            }
-
+            } // if (can_be_cow
 
             // Now grab all the mappings that we can stuff into the response.
-            if(0 != fill_physical_mapping_array(mm, 
-                                                vma,
-                                                address,
-                                                &response.mappings, 
-                                                MAX_MAPPINGS)) {
+            if (0 != fill_physical_mapping_array(mm, vma, address,
+                                                &(response->mappings[0]),
+                                               MAX_MAPPINGS)) {
                 // If the fill process fails, clear out all
                 // results.  Otherwise, we might trick the
                 // receiving cpu into thinking the target
                 // mapping was found when it was not.
                 for(i = 0; i < MAX_MAPPINGS; i++) {
-                    response.mappings[i].present = 0;
-                    response.mappings[i].vaddr = 0;
-                    response.mappings[i].paddr = 0;
-                    response.mappings[i].sz = 0;
-                }
-                    
-            }
-
+                    response->mappings[i].present = 0;
+                    response->mappings[i].vaddr = 0;
+                    response->mappings[i].paddr = 0;
+                    response->mappings[i].sz = 0;
+                }   
+            } // if (0 != fill_physical_mapping_array
             }
 
-            response.header.type = PCN_KMSG_TYPE_PROC_SRV_MAPPING_RESPONSE;
-            response.header.prio = PCN_KMSG_PRIO_NORMAL;
-            response.tgroup_home_cpu = w->tgroup_home_cpu;
-            response.tgroup_home_id = w->tgroup_home_id;
-            response.requester_pid = w->requester_pid;
-            response.address = address;
-            response.present = 1;
-            response.vaddr_start = vma->vm_start;
-            response.vaddr_size = vma->vm_end - vma->vm_start;
-            response.prot = vma->vm_page_prot;
-            response.vm_flags = vma->vm_flags;
+            response->header.type = PCN_KMSG_TYPE_PROC_SRV_MAPPING_RESPONSE;
+            response->header.prio = PCN_KMSG_PRIO_NORMAL;
+            response->tgroup_home_cpu = w->tgroup_home_cpu;
+            response->tgroup_home_id = w->tgroup_home_id;
+            response->requester_pid = w->requester_pid;
+            response->address = address;
+            response->present = 1;
+            response->vaddr_start = vma->vm_start;
+            response->vaddr_size = vma->vm_end - vma->vm_start;
+            response->prot = vma->vm_page_prot;
+            response->vm_flags = vma->vm_flags;
             if(vma->vm_file == NULL) {
-                response.path[0] = '\0';
+                response->path[0] = '\0';
             } else {    
                 plpath = d_path(&vma->vm_file->f_path,lpath,512);
-                strcpy(response.path,plpath);
-                response.pgoff = vma->vm_pgoff;
+                strcpy(response->path,plpath);
+                response->pgoff = vma->vm_pgoff;
             }
 
             // We modified this lock to be read-mode above so now
             // we can do a read-unlock instead of a write-unlock
             PS_UP_READ(&mm->mmap_sem);
        
-        } else {
+        } else { // (vma && resolved != 0) 
 
             if(can_be_cow)
                 PS_UP_WRITE(&mm->mmap_sem);
@@ -3133,60 +3129,57 @@ changed_can_be_cow:
                 PS_UP_READ(&mm->mmap_sem);
             // Zero out mappings
             for(i = 0; i < MAX_MAPPINGS; i++) {
-                response.mappings[i].present = 0;
-                response.mappings[i].vaddr = 0;
-                response.mappings[i].paddr = 0;
-                response.mappings[i].sz = 0;
+                response->mappings[i].present = 0;
+                response->mappings[i].vaddr = 0;
+                response->mappings[i].paddr = 0;
+                response->mappings[i].sz = 0;
             }
-
-        }
-        
-
+        } // !(vma && resolved != 0) 
     }
 
     // Not found, respond accordingly
-    if(resolved == 0) {
+    if (resolved == 0) {
         found_vma = 0;
         found_pte = 0;
         //PSPRINTK("Mapping not found\n");
-        response.header.type = PCN_KMSG_TYPE_PROC_SRV_MAPPING_RESPONSE;
-        response.header.prio = PCN_KMSG_PRIO_NORMAL;
-        response.tgroup_home_cpu = w->tgroup_home_cpu;
-        response.tgroup_home_id = w->tgroup_home_id;
-        response.requester_pid = w->requester_pid;
-        response.address = address;
-        response.present = 0;
-        response.vaddr_start = 0;
-        response.vaddr_size = 0;
-        response.path[0] = '\0';
+        response->header.type = PCN_KMSG_TYPE_PROC_SRV_MAPPING_RESPONSE;
+        response->header.prio = PCN_KMSG_PRIO_NORMAL;
+        response->tgroup_home_cpu = w->tgroup_home_cpu;
+        response->tgroup_home_id = w->tgroup_home_id;
+        response->requester_pid = w->requester_pid;
+        response->address = address;
+        response->present = 0;
+        response->vaddr_start = 0;
+        response->vaddr_size = 0;
+        response->path[0] = '\0';
 
         // Handle case where vma was present but no pte.
-        if(vma) {
+        if (vma) {
             //PSPRINTK("But vma present\n");
             found_vma = 1;
-            response.present = 1;
-            response.vaddr_start = vma->vm_start;
-            response.vaddr_size = vma->vm_end - vma->vm_start;
-            response.prot = vma->vm_page_prot;
-            response.vm_flags = vma->vm_flags;
+            response->present = 1;
+            response->vaddr_start = vma->vm_start;
+            response->vaddr_size = vma->vm_end - vma->vm_start;
+            response->prot = vma->vm_page_prot;
+            response->vm_flags = vma->vm_flags;
              if(vma->vm_file == NULL) {
-                 response.path[0] = '\0';
+                 response->path[0] = '\0';
              } else {    
                  plpath = d_path(&vma->vm_file->f_path,lpath,512);
-                 strcpy(response.path,plpath);
-                 response.pgoff = vma->vm_pgoff;
+                 strcpy(response->path,plpath);
+                 response->pgoff = vma->vm_pgoff;
              }
         }
     }
 
     // Send response
-    if(response.present) {
+    if(response->present) {
         DO_UNTIL_SUCCESS(pcn_kmsg_send_long(w->from_cpu,
-                            (struct pcn_kmsg_long_message*)(&response),
+                            (struct pcn_kmsg_long_message*)(response),
                             sizeof(mapping_response_t) - 
                             sizeof(struct pcn_kmsg_hdr) -   //
-                            sizeof(response.path) +         // Chop off the end of the path
-                            strlen(response.path) + 1));    // variable to save bandwidth.
+                            sizeof(response->path) +         // Chop off the end of the path
+                            strlen(response->path) + 1));    // variable to save bandwidth.
     } else {
         // This is an optimization to get rid of the _long send 
         // which is a time sink.
@@ -3198,12 +3191,15 @@ changed_can_be_cow:
         nonpresent_response.requester_pid = w->requester_pid;
         nonpresent_response.address = w->address;
         DO_UNTIL_SUCCESS(pcn_kmsg_send(w->from_cpu,(struct pcn_kmsg_message*)(&nonpresent_response)));
-
     }
 
+    kfree(lpath);
+err_response:
+    kfree(response);
+err_work:
     kfree(work);
 
-    // Perf stop
+#ifdef CONFIG_POPCORN_PERF    
     if(used_saved_mm && found_vma && found_pte) {
         PERF_MEASURE_STOP(&perf_process_mapping_request,
                 "Saved MM + VMA + PTE",
@@ -3231,6 +3227,7 @@ changed_can_be_cow:
     } else {
         PERF_MEASURE_STOP(&perf_process_mapping_request,"ERR",perf);
     }
+#endif /* CONFIG_POPCORN_PERF */    
 
     return;
 }
@@ -3357,7 +3354,7 @@ void process_munmap_request(struct work_struct* work) {
     data_header_t *curr = NULL;
     mm_data_t* mm_data = NULL;
     mm_data_t* to_munmap = NULL;
-    struct mm_struct * mm_to_munmap = NULL;
+    struct mm_struct* mm_to_munmap = NULL;
 
     int perf = PERF_MEASURE_START(&perf_process_munmap_request);
 
@@ -3372,32 +3369,32 @@ void process_munmap_request(struct work_struct* work) {
            task->tgroup_home_id  == w->tgroup_home_id &&
            !(task->flags & PF_EXITING)) {
 
-            // Thread group has been found, perform munmap operation on this
-            // task.
-        if (task && task->mm ) {
-           mm_to_munmap =task->mm;
-       }
-       else
-               printk("%s: pirla\n", __func__);
-
-       // TODO try and check if make sense
             // Take note of the fact that an mm exists on the remote kernel
             set_cpu_has_known_tgroup_mm(task,w->from_cpu);
+            
+            if (task->mm) {
+                mm_to_munmap = task->mm;
+            }
+            else
+                printk("%s: pirla\n", __func__);
 
-            goto done; // thread grouping - threads all share a common mm.
-
+            goto done; 
         }
     } while_each_thread(g,task);
 done:
     read_unlock(&tasklist_lock);
 
-      if(mm_to_munmap) {
-        PS_DOWN_WRITE(&task->mm->mmap_sem);
-        current->enable_distributed_munmap = 0;
-        do_munmap(mm_to_munmap, w->vaddr_start, w->vaddr_size);
-        current->enable_distributed_munmap = 1;
-        PS_UP_WRITE(&task->mm->mmap_sem);
-        }
+    if(mm_to_munmap) {
+        PS_DOWN_WRITE(&mm_to_munmap->mmap_sem);
+        current->enable_distributed_munmap = 0;
+        do_munmap(mm_to_munmap, w->vaddr_start, w->vaddr_size);
+        current->enable_distributed_munmap = 1;
+        PS_UP_WRITE(&mm_to_munmap->mmap_sem);
+    }
+    else
+       printk("%s: unexpected error task %p task->mm %p\n", 
+                __func__, task, (task ? task->mm : 0) );
+
     // munmap the specified region in any saved mm's as well.
     // This keeps old mappings saved in the mm of dead thread
     // group members from being resolved accidentally after
@@ -3424,13 +3421,15 @@ found:
         current->enable_distributed_munmap = 0;
         do_munmap(to_munmap->mm, w->vaddr_start, w->vaddr_size);
         current->enable_distributed_munmap = 1;
-        if (to_munmap && to_munmap->mm)
-            PS_UP_WRITE(&to_munmap->mm->mmap_sem);
-        else
-            printk(KERN_ALERT"%s: ERROR2: to_munmap %p mm %p\n", __func__, to_munmap, to_munmap?to_munmap->mm:0);
+       if (to_munmap && to_munmap->mm)
+               PS_UP_WRITE(&to_munmap->mm->mmap_sem);
+       else
+               printk(KERN_ALERT"%s: ERROR2: to_munmap %p mm %p\n",
+                                __func__, to_munmap, to_munmap?to_munmap->mm:0);
     }
     else if (to_munmap) // It is OK for to_munmap to be null, but not to_munmap->mm
-        printk(KERN_ALERT"%s: ERROR1: to_munmap %p mm %p\n", __func__, to_munmap, to_munmap?to_munmap->mm:0);
+       printk(KERN_ALERT"%s: ERROR1: to_munmap %p mm %p\n",
+                        __func__, to_munmap, to_munmap?to_munmap->mm:0);
 
     // Construct response
     response.header.type = PCN_KMSG_TYPE_PROC_SRV_MUNMAP_RESPONSE;
@@ -3464,57 +3463,45 @@ void process_mprotect_item(struct work_struct* work) {
     int tgroup_home_id  = w->tgroup_home_id;
     unsigned long start = w->start;
     size_t len = w->len;
-    unsigned long prot = w->prot;
     struct task_struct* task, *g;
     data_header_t* curr = NULL;
     mm_data_t* mm_data = NULL;
     mm_data_t* to_munmap = NULL;
-    struct mm_structmm_to_munmap = NULL;
+    struct mm_struct *mm_to_munmap = NULL;
 
     int perf = PERF_MEASURE_START(&perf_process_mprotect_item);
     
     // Find the task
     read_lock(&tasklist_lock);
     do_each_thread(g,task) {
-//     task_lock(task); // TODO consider to use this
+
+        // Look for the thread group
         if (task->tgroup_home_cpu == tgroup_home_cpu &&
             task->tgroup_home_id  == tgroup_home_id &&
             !(task->flags & PF_EXITING)) {
-           /* 
-            if (task->mm)
-                // do_mprotect
-                do_mprotect(task, start, len, prot,0);
-//             task_unlock(task); //TODO consider to use this
-           else
-               printk("%s: task->mm task:%p mm:%p\n",
-                       __func__, task, task->mm);
-            */
-            // doing mprotect here causes errors, I do not know why
-            // for now I will unmap the region instead.
-            //do_mprotect(task,start,len,prot,0);
-            
-            if (task && task->mm ) {
-                    mm_to_munmap = task->mm;
-            }
-           // Take note of the fact that an mm exists on the remote kernel
+
+            // Take note of the fact that an mm exists on the remote kernel
             set_cpu_has_known_tgroup_mm(task,w->from_cpu);
 
-            // then quit
+            if(task->mm) {
+                mm_to_munmap = task->mm;
+            }
+            else
+                printk("%s: pirla\n",__func__);
+            
             goto done;
         }
-//     task_unlock(task); // TODO consider to use this
     } while_each_thread(g,task);
 done:
     read_unlock(&tasklist_lock);
 
-      if(mm_to_munmap) {
-        PS_DOWN_WRITE(&task->mm->mmap_sem);
+    if(mm_to_munmap) {
+        PS_DOWN_WRITE(&mm_to_munmap->mmap_sem);
         current->enable_distributed_munmap = 0;
         do_munmap(mm_to_munmap, start, len);
         current->enable_distributed_munmap = 1;
-        PS_UP_WRITE(&task->mm->mmap_sem);
-        }
-
+        PS_UP_WRITE(&mm_to_munmap->mmap_sem);
+    }
 
     // munmap the specified region in any saved mm's as well.
     // This keeps old mappings saved in the mm of dead thread
@@ -4736,7 +4723,6 @@ int process_server_import_address_space(unsigned long* ip,
         vma_data_t* vma_curr = NULL;
         int mmap_flags = 0;
         int vmas_installed = 0;
-        int ptes_installed = 0;
         unsigned long err = 0;
 
         vma_curr = clone_data->vma_list;
@@ -4989,12 +4975,11 @@ int process_server_import_address_space(unsigned long* ip,
     // We assume that an exec is going on and the current process is the one is executing
     // (a switch will occur if it is not the one that must execute)
     { // FS/GS update --- start
-    unsigned long fs, gs;
     unsigned int fsindex, gsindex;
                     
     savesegment(fs, fsindex);
     if ( !(clone_data->thread_fs) || !(__user_addr(clone_data->thread_fs)) ) {
-      printk(KERN_ERR "%s: ERROR corrupted fs base address %p\n", __func__, clone_data->thread_fs);
+      printk(KERN_ERR "%s: ERROR corrupted fs base address 0x%lx\n", __func__, clone_data->thread_fs);
     }    
     current->thread.fsindex = clone_data->thread_fsindex;
     current->thread.fs = clone_data->thread_fs;
@@ -5007,7 +4992,7 @@ int process_server_import_address_space(unsigned long* ip,
                              
     savesegment(gs, gsindex); //read the gs register in gsindex variable
     if ( !(clone_data->thread_gs) && !(__user_addr(clone_data->thread_gs)) ) {
-      printk(KERN_ERR "%s: ERROR corrupted gs base address %p\n", __func__, clone_data->thread_gs);      
+      printk(KERN_ERR "%s: ERROR corrupted gs base address 0x%lx\n", __func__, clone_data->thread_gs);      
     }
     current->thread.gs = clone_data->thread_gs;    
     current->thread.gsindex = clone_data->thread_gsindex;
@@ -5078,7 +5063,6 @@ int process_server_do_group_exit(void) {
     // the list does not include the current processor group descirptor (TODO)
     struct list_head *iter;
     _remote_cpu_info_list_t *objPtr;
-extern struct list_head rlist_head;
     list_for_each(iter, &rlist_head) {
         objPtr = list_entry(iter, _remote_cpu_info_list_t, cpu_list_member);
         i = objPtr->_data._processor;
@@ -5196,10 +5180,6 @@ finished_membership_search:
         // take over, so do not mark this as executing for remote
         current->executing_for_remote = 0;
 
-        // Migrate back - you just had an out of body experience, you will wake in
-        //                a familiar place (a place you've been before), but unfortunately, 
-        //                your life is over.
-        //                Note: comments like this must == I am tired.
 #ifndef SUPPORT_FOR_CLUSTERING
         for(i = 0; i < NR_CPUS; i++) {
           // Skip the current cpu
@@ -5211,7 +5191,6 @@ finished_membership_search:
         struct list_head *iter;
         _remote_cpu_info_list_t *objPtr;
        struct cpumask *pcpum =0;
-extern struct list_head rlist_head;
         list_for_each(iter, &rlist_head) {
           objPtr = list_entry(iter, _remote_cpu_info_list_t, cpu_list_member);
           i = objPtr->_data._processor;
@@ -5255,7 +5234,6 @@ extern struct list_head rlist_head;
            // the list does not include the current processor group descirptor (TODO)
            struct list_head *iter;
            _remote_cpu_info_list_t *objPtr;
-extern struct list_head rlist_head;
             list_for_each(iter, &rlist_head) {
               objPtr = list_entry(iter, _remote_cpu_info_list_t, cpu_list_member);
               i = objPtr->_data._processor;
@@ -5404,7 +5382,6 @@ int process_server_do_munmap(struct mm_struct* mm,
     // the list does not include the current processor group descirptor (TODO)
     struct list_head *iter;
     _remote_cpu_info_list_t *objPtr;
-extern struct list_head rlist_head;
     list_for_each(iter, &rlist_head) {
         objPtr = list_entry(iter, _remote_cpu_info_list_t, cpu_list_member);
         i = objPtr->_data._processor;
@@ -5447,13 +5424,14 @@ exit:
 void process_server_do_mprotect(struct task_struct* task,
                                 unsigned long start,
                                 size_t len,
-                                unsigned long prot) {
+                                unsigned long prot)
+{
     mprotect_data_t* data;
     mprotect_request_t request;
     int i;
     int s;
     int perf = -1;
-    unsigned lockflags;
+    unsigned long lockflags;
 
      // Nothing to do for a thread group that's not distributed.
     if(!current->tgroup_distributed) {
@@ -5498,7 +5476,6 @@ void process_server_do_mprotect(struct task_struct* task,
     // the list does not include the current processor group descirptor (TODO)
     struct list_head *iter;
     _remote_cpu_info_list_t *objPtr;
-extern struct list_head rlist_head;
     list_for_each(iter, &rlist_head) {
         objPtr = list_entry(iter, _remote_cpu_info_list_t, cpu_list_member);
         i = objPtr->_data._processor;
@@ -5524,8 +5501,7 @@ extern struct list_head rlist_head;
     // OK, all responses are in, we can proceed.
 
     spin_lock_irqsave(&_mprotect_data_head_lock,lockflags);
-    remove_data_entry_from(data,
-                           &_mprotect_data_head);
+    remove_data_entry_from(data, &_mprotect_data_head);
     spin_unlock_irqrestore(&_mprotect_data_head_lock,lockflags);
 
     kfree(data);
@@ -5587,16 +5563,13 @@ int process_server_try_handle_mm_fault(struct mm_struct *mm,
 
     mapping_request_data_t *data;
     unsigned long err = 0;
-    int ret = 0;
     mapping_request_t request;
-    int i;
-    int s;
-    int j;
+    int i, s, j, ret=0;
     struct file* f;
     unsigned long prot = 0;
     unsigned char started_outside_vma = 0;
     unsigned char did_early_removal = 0;
-    char path[512];
+    char path[512]; //TODO must be kmalloc-ed
     char* ppath;
     // for perf
     unsigned char pte_provided = 0;
@@ -5727,7 +5700,6 @@ int process_server_try_handle_mm_fault(struct mm_struct *mm,
     // the list does not include the current processor group descirptor (TODO)
     struct list_head *iter;
     _remote_cpu_info_list_t *objPtr;
-extern struct list_head rlist_head;
     list_for_each(iter, &rlist_head) { 
         objPtr = list_entry(iter, _remote_cpu_info_list_t, cpu_list_member);
         i = objPtr->_data._processor;
@@ -5908,7 +5880,7 @@ extern struct list_head rlist_head;
 
             // Check remap_pfn_range success
             if(remap_pfn_range_err) {
-                printk(KERN_ALERT"ERROR: Failed to remap_pfn_range %d\n",err);
+                printk(KERN_ALERT"ERROR: Failed to remap_pfn_range %ld\n",err);
             } else {
                 PSPRINTK("remap_pfn_range succeeded\n");
                 ret = 1;
@@ -6015,7 +5987,7 @@ int process_server_dup_task(struct task_struct* orig, struct task_struct* task)
     task->t_distributed = 0;
     task->previous_cpus = 0;
     task->known_cpu_with_tgroup_mm = 0;
-    task->return_disposition = RETURN_DISPOSITION_EXIT;
+    task->return_disposition = RETURN_DISPOSITION_NONE;
 
     // If this is pid 1 or 2, the parent cannot have been migrated
     // so it is safe to take on all local thread info.
@@ -6040,7 +6012,7 @@ int process_server_dup_task(struct task_struct* orig, struct task_struct* task)
 
     // This is important.  We want to make sure to keep an accurate record
     // of which cpu and thread group the new thread is a part of.
-    if(orig->executing_for_remote == 1 || orig->tgroup_home_cpu != home_kernel ) {
+    if(orig->executing_for_remote == 1 || orig->tgroup_home_cpu != home_kernel) {
         task->tgroup_home_cpu = orig->tgroup_home_cpu;
         task->tgroup_home_id = orig->tgroup_home_id;
         task->tgroup_distributed = 1;
@@ -6345,7 +6317,6 @@ static int do_migration_back_to_previous_cpu(struct task_struct* task, int cpu)
     task->executing_for_remote = 0;
     task->represents_remote = 1;
     task->t_distributed = 1; // This should already be the case
-    task->return_disposition = RETURN_DISPOSITION_EXIT;
     
     // Build message
     mig.tgroup_home_cpu = task->tgroup_home_cpu;
@@ -6420,7 +6391,6 @@ int process_server_do_migration(struct task_struct* task, int cpu) {
     _remote_cpu_info_list_t *objPtr;
     struct cpumask *pcpum =0;
     int cpuid=-1;
-extern struct list_head rlist_head;
     list_for_each(iter, &rlist_head) {
         objPtr = list_entry(iter, _remote_cpu_info_list_t, cpu_list_member);
         cpuid = objPtr->_data._processor;
@@ -6445,11 +6415,17 @@ extern struct list_head rlist_head;
  * implements the actions that must be made immediately after
  * the newly awoken task resumes execution.
  */
-void process_server_do_return_disposition(void) {
-
+void process_server_do_return_disposition(void)
+{
     PSPRINTK("%s\n",__func__);
-
-    switch(current->return_disposition) {
+    int return_disposition = current->return_disposition;
+    // Reset the return disposition
+    current->return_disposition = RETURN_DISPOSITION_NONE;
+    
+    switch(return_disposition) {
+    case RETURN_DISPOSITION_NONE:
+        printk("%s: ERROR, return disposition is none!\n",__func__);
+        break;      
     case RETURN_DISPOSITION_MIGRATE:
         // Nothing to do, already back-imported the
         // state in process_back_migration.  This will
index 3bc5f41..937125e 100644 (file)
@@ -1,8 +1,9 @@
 /*
  * Inter-kernel messaging support for Popcorn
  *
- * Current ver: Antonio Barbalace, Phil Wilshire 2013
- * First ver: Ben Shelton <beshelto@vt.edu> 2013
+ * Antonio Barbalace, David Katz, Marina Sadini 2014
+ * Antonio Barbalace, Marina Sadini, Phil Wilshire 2013
+ * Ben Shelton 2012 - 2013
  */
 
 #include <linux/irq.h>
 #define KMSG_PRINTK(...) ;
 #endif
 
+/*
+ * MAX_LOOPS must be calibrated on each architecture. Is difficult to give a
+ * good estimation for this parameter. Worst case must be equal or inferior to
+ * the minimum time we will be blocked for a call to schedule_timeout
+ */
+#define MAX_LOOPS 12345
+#define MAX_LOOPS_JIFFIES (MAX_SCHEDULE_TIMEOUT)
 
 #define MCAST_VERBOSE 0
 
@@ -111,8 +119,13 @@ int log_f_index=0;
 int log_f_sendindex=0;
 void * log_function_send[LOGCALL];
 
+#define SIZE_RANGES 7
+unsigned int large_message_sizes[(SIZE_RANGES +1)];
+unsigned int large_message_count[(SIZE_RANGES +1)];
+unsigned int type_message_count[PCN_KMSG_TYPE_MAX];
+
 /* From Wikipedia page "Fetch and add", modified to work for u64 */
-static inline unsigned long fetch_and_add(volatile unsigned long * variable, 
+/*static inline unsigned long fetch_and_add(volatile unsigned long * variable, 
                                          unsigned long value)
 {
        asm volatile( 
@@ -121,7 +134,8 @@ static inline unsigned long fetch_and_add(volatile unsigned long * variable,
                     : "a" (value), "m" (*variable)  //Input
                     :"memory" );
        return value;
-}
+}*/
+#define fetch_and_add xadd_sync
 
 static inline unsigned long win_inuse(struct pcn_kmsg_window *win) 
 {
@@ -133,7 +147,7 @@ static inline int win_put(struct pcn_kmsg_window *win,
                          int no_block) 
 {
        unsigned long ticket;
-    unsigned long long sleep_start;
+       unsigned long loop;
 
        /* if we can't block and the queue is already really long, 
           return EAGAIN */
@@ -154,18 +168,24 @@ static inline int win_put(struct pcn_kmsg_window *win,
        /* spin until there's a spot free for me */
        //while (win_inuse(win) >= RB_SIZE) {}
        //if(ticket>=PCN_KMSG_RBUF_SIZE){
-    sleep_start = native_read_tsc();
-               while((win->buffer[ticket%PCN_KMSG_RBUF_SIZE].last_ticket != ticket-PCN_KMSG_RBUF_SIZE)) {
-                       //pcn_cpu_relax();
-                       //msleep(1);
-               }
-               while(  win->buffer[ticket%PCN_KMSG_RBUF_SIZE].ready!=0){
-                       //pcn_cpu_relax();
-                       //msleep(1);
-               }
-    total_sleep_win_put += native_read_tsc() - sleep_start;
-    sleep_win_put_count++;
        //}
+       loop=0;  
+       while( (win->buffer[ticket%PCN_KMSG_RBUF_SIZE].last_ticket
+         != (ticket - PCN_KMSG_RBUF_SIZE)) ) {
+               //pcn_cpu_relax();
+               //msleep(1);
+               if ( !(++loop % MAX_LOOPS) )
+                       schedule_timeout(MAX_LOOPS_JIFFIES);
+       }
+       /* the following it is always false because add is after ready=0*/
+       //loop=0;
+       while( win->buffer[ticket%PCN_KMSG_RBUF_SIZE].ready!=0 ) {
+               pcn_cpu_relax();
+               //msleep(1);
+               //if ( !(++loop % MAX_LOOPS) )
+               //      schedule_timeout(MAX_LOOPS_JIFFIES);
+       }
+       
        /* insert item */
        memcpy(&win->buffer[ticket%PCN_KMSG_RBUF_SIZE].payload,
               &msg->payload, PCN_KMSG_PAYLOAD_SIZE);
@@ -198,52 +218,39 @@ static inline int win_get(struct pcn_kmsg_window *win,
                          struct pcn_kmsg_reverse_message **msg) 
 {
        struct pcn_kmsg_reverse_message *rcvd;
-    unsigned long long sleep_start;
+       unsigned long loop;
 
        if (!win_inuse(win)) {
-
                KMSG_PRINTK("nothing in buffer, returning...\n");
                return -1;
        }
 
-       KMSG_PRINTK("reached win_get, head %lu, tail %lu\n", 
-                   win->head, win->tail);      
-
-       /* spin until entry.ready at end of cache line is set */
+       KMSG_PRINTK("reached win_get, head %lu, tail %lu\n", win->head, win->tail);
        rcvd =(struct pcn_kmsg_reverse_message*) &(win->buffer[win->tail % PCN_KMSG_RBUF_SIZE]);
-       //KMSG_PRINTK("%s: Ready bit: %u\n", __func__, rcvd->hdr.ready);
 
-    sleep_start = native_read_tsc();
+       /* spin until entry.ready at end of cache line is set */
+       loop=0;
        while (!rcvd->ready) {
-
                //pcn_cpu_relax();
                //msleep(1);
-
+               if ( !(++loop % MAX_LOOPS) )
+                       schedule_timeout(MAX_LOOPS_JIFFIES);
        }
-    total_sleep_win_get += native_read_tsc() - sleep_start;
-    sleep_win_get_count++;
-
-       // barrier here?
-       pcn_barrier();
 
-       //log_receive[log_r_index%LOGLEN]=rcvd->hdr;
-       memcpy(&(log_receive[log_r_index%LOGLEN]),&(rcvd->hdr),sizeof(struct pcn_kmsg_hdr));
+       /* statistics */
+       memcpy(&(log_receive[log_r_index%LOGLEN]),
+              &(rcvd->hdr),
+              sizeof(struct pcn_kmsg_hdr));
        log_r_index++;
+       msg_get++;
 
-       //rcvd->hdr.ready = 0;
-
-       *msg = rcvd;    
-msg_get++;
-
-
-
+       *msg = rcvd;
        return 0;
 }
 
-static inline void win_advance_tail(struct pcn_kmsg_window *win) 
-{
-       win->tail++;
-}
+/*static inline void win_advance_tail(struct pcn_kmsg_window *win) 
+{ win->tail++; }
+*/
 
 /* win_enable_int
  * win_disable_int
@@ -471,18 +478,14 @@ static int pcn_read_proc(char *page, char **start, off_t off, int count, int *eo
        char *p= page;
     int len, i, idx;
 
-    p += sprintf(p, "Sleep in win_put[total,count,avg] = [%llx,%lx,%llx]\n",
-                    total_sleep_win_put,
-                    sleep_win_put_count,
-                    sleep_win_put_count? total_sleep_win_put/sleep_win_put_count:0);
-    p += sprintf(p, "Sleep in win_get[total,count,avg] = [%llx,%lx,%llx]\n",
-                    total_sleep_win_get,
-                    sleep_win_get_count,
-                    sleep_win_get_count? total_sleep_win_get/sleep_win_get_count:0);
-
-       p += sprintf(p, "messages get: %ld\n", msg_get);
-        p += sprintf(p, "messages put: %ld\n", msg_put);
-
+       p += sprintf(p, "get: %ld\n", msg_get);
+        p += sprintf(p, "put: %ld\n", msg_put);
+    for (i =0; i<(SIZE_RANGES +1); i++)
+      p +=sprintf (p,"%u: %u\n", large_message_sizes[i], large_message_count[i]);
+    
+    for (i =0; i<PCN_KMSG_TYPE_MAX; i++)
+      p +=sprintf (p, "t%u: %u\n", i, type_message_count[i]);
+    
     idx = log_r_index;
     for (i =0; i>-LOGLEN; i--)
        p +=sprintf (p,"r%d: from%d type%d %1d:%1d:%1d seq%d\n",
@@ -496,7 +499,7 @@ static int pcn_read_proc(char *page, char **start, off_t off, int count, int *eo
                        (idx+i),(int) log_send[(idx+i)%LOGLEN].from_cpu, (int)log_send[(idx+i)%LOGLEN].type,
                        (int) log_send[(idx+i)%LOGLEN].is_lg_msg, (int)log_send[(idx+i)%LOGLEN].lg_start,
                        (int) log_send[(idx+i)%LOGLEN].lg_end, (int) log_send[(idx+i)%LOGLEN].lg_seqnum );
-
+/*
     idx = log_f_index;
         for (i =0; i>-LOGCALL; i--)
                p +=sprintf (p,"f%d: %pB\n",
@@ -509,7 +512,7 @@ static int pcn_read_proc(char *page, char **start, off_t off, int count, int *eo
 
         for(i=0; i<PCN_KMSG_RBUF_SIZE; i++)
                p +=sprintf (p,"second_buffer[%i]=%i\n",i,rkvirt[my_cpu]->second_buffer[i]);
-
+*/
 
        len = (p -page) - off;
        if (len < 0)
@@ -519,6 +522,20 @@ static int pcn_read_proc(char *page, char **start, off_t off, int count, int *eo
        return len;
 }
 
+static int pcn_write_proc (struct file *file, const char __user *buffer, unsigned long count, void *data)
+{
+  int i;
+       msg_get=0;
+       msg_put=0;
+       memset(large_message_count, 0, sizeof(int)*(SIZE_RANGES +1));
+       memset(large_message_sizes, 0, sizeof(int)*(SIZE_RANGES +1));
+       for (i=0; i<SIZE_RANGES; i++)
+         large_message_sizes[i] = ((i+1)*PCN_KMSG_PAYLOAD_SIZE);
+       large_message_sizes[SIZE_RANGES] = ~0;
+       memset(type_message_count, 0, sizeof(int)*PCN_KMSG_TYPE_MAX);
+       return count;
+}
+
 static int __init pcn_kmsg_init(void)
 {
        int rc,i;
@@ -612,6 +629,10 @@ static int __init pcn_kmsg_init(void)
                boot_params_va->pcn_kmsg_master_window = rkinfo_phys_addr;
                KMSG_INIT("boot_params virt %p phys %p\n",
                        boot_params_va, orig_boot_params);
+               
+               KMSG_INIT("LOOPS %ld, LOOPS_JIFFIES %ld, 1nsec %ld, 1usec %ld, 1msec %ld, 1jiffies %d %d\n",
+                         MAX_LOOPS, MAX_LOOPS_JIFFIES, nsecs_to_jiffies(1), usecs_to_jiffies(1), msecs_to_jiffies(1),
+                         jiffies_to_msecs(1), jiffies_to_usecs(1));
        }
        else {
                KMSG_INIT("Primary kernel rkinfo phys addr: 0x%lx\n", 
@@ -657,6 +678,14 @@ static int __init pcn_kmsg_init(void)
                }
        } 
 
+       /* proc interface for debugging and stats */
+       memset(large_message_count, 0, sizeof(int)*(SIZE_RANGES +1));
+       memset(large_message_sizes, 0, sizeof(int)*(SIZE_RANGES +1));
+       for (i=0; i<SIZE_RANGES; i++)
+         large_message_sizes[i] = ((i+1)*PCN_KMSG_PAYLOAD_SIZE);
+       large_message_sizes[SIZE_RANGES] = ~0;
+       memset(type_message_count, 0, sizeof(int)*PCN_KMSG_TYPE_MAX);
+
        memset(log_receive,0,sizeof(struct pcn_kmsg_hdr)*LOGLEN);
        memset(log_send,0,sizeof(struct pcn_kmsg_hdr)*LOGLEN);
        memset(log_function_called,0,sizeof(void*)*LOGCALL);
@@ -669,6 +698,7 @@ static int __init pcn_kmsg_init(void)
                return -ENOMEM;
        }
        res->read_proc = pcn_read_proc;
+       res->write_proc = pcn_write_proc;
 
        return 0;
 }
@@ -736,6 +766,7 @@ static int __pcn_kmsg_send(unsigned int dest_cpu, struct pcn_kmsg_message *msg,
        msg->hdr.from_cpu = my_cpu;
 
        rc = win_put(dest_window, msg, no_block);
+type_message_count[msg->hdr.type]++;
 
        if (rc) {
                if (no_block && (rc == EAGAIN)) {
@@ -828,6 +859,10 @@ int pcn_kmsg_send_long(unsigned int dest_cpu,
                if(ret!=0)
                        return ret;
        }
+       
+       /* statistics */
+       num_chunks = payload_size / PCN_KMSG_PAYLOAD_SIZE;
+       large_message_count[((num_chunks < SIZE_RANGES) ? num_chunks : SIZE_RANGES)]++;
 
        return 0;
 }
@@ -1076,18 +1111,19 @@ static int process_small_message(struct pcn_kmsg_reverse_message *msg)
        return work_done;
 }
 
+static int poll_handler_check=0;
 static int pcn_kmsg_poll_handler(void)
 {
        struct pcn_kmsg_reverse_message *msg;
        struct pcn_kmsg_window *win = rkvirt[my_cpu]; // TODO this will not work for clustering
        int work_done = 0;
 
-       KMSG_PRINTK("called\n");
+       poll_handler_check++;
+       if (poll_handler_check >1)
+               printk("poll_hanlder_check %d concurrent calls not supported\n", poll_handler_check);
 
 pull_msg:
        /* Get messages out of the buffer first */
-//#define PCN_KMSG_BUDGET 128
-       //while ((work_done < PCN_KMSG_BUDGET) && (!win_get(rkvirt[my_cpu], &msg))) {
        while (! win_get(win, &msg) ) {
                KMSG_PRINTK("got a message!\n");
 
@@ -1099,10 +1135,16 @@ pull_msg:
                        KMSG_PRINTK("message is a small message!\n");
                        work_done += process_small_message(msg);
                }
-               pcn_barrier();
+
                msg->ready = 0;
-               //win_advance_tail(win);
-               fetch_and_add(&win->tail, 1);
+               pcn_barrier();
+               
+               fetch_and_add(&win->tail, 1); //win_advance_tail(win);
+               
+               // NOTE
+               // why you need the ready bit if you are incrementing the tail?
+               // can we increment the tail before signaling the ready bit?
+               // in that way we can support multiple calls to this function
        }
 
        win_enable_int(win);
@@ -1111,6 +1153,7 @@ pull_msg:
                goto pull_msg;
        }
 
+       poll_handler_check--;
        return work_done;
 }
 
diff --git a/pcnmsg/pcn_kmsg_mcast.h b/pcnmsg/pcn_kmsg_mcast.h
new file mode 100644 (file)
index 0000000..b49e498
--- /dev/null
@@ -0,0 +1,614 @@
+
+/*
+ * MULTICAST support routine
+ */
+
+/* Same thing, but for mcast windows */
+struct pcn_kmsg_mcast_local mcastlocal[POPCORN_MAX_MCAST_CHANNELS];
+
+
+#define MCASTWIN(_id_) (mcastlocal[(_id_)].mcastvirt)
+#define LOCAL_TAIL(_id_) (mcastlocal[(_id_)].local_tail)
+
+/* MULTICAST RING BUFFER */
+static inline unsigned long mcastwin_inuse(pcn_kmsg_mcast_id id)
+{
+       return MCASTWIN(id)->head - MCASTWIN(id)->tail;
+}
+
+static inline int mcastwin_put(pcn_kmsg_mcast_id id,
+                              struct pcn_kmsg_message *msg)
+{
+       unsigned long ticket;
+       unsigned long time_limit = jiffies + 2;
+
+
+       MCAST_PRINTK("called for id %lu, msg 0x%p\n", id, msg);
+
+       /* if the queue is already really long, return EAGAIN */
+       if (mcastwin_inuse(id) >= RB_SIZE) {
+               MCAST_PRINTK("window full, caller should try again...\n");
+               return -EAGAIN;
+       }
+
+       /* grab ticket */
+       ticket = fetch_and_add(&MCASTWIN(id)->head, 1);
+       MCAST_PRINTK("ticket = %lu, head = %lu, tail = %lu\n",
+                    ticket, MCASTWIN(id)->head, MCASTWIN(id)->tail);
+
+       /* spin until there's a spot free for me */
+       while (mcastwin_inuse(id) >= RB_SIZE) {
+               if (unlikely(time_after(jiffies, time_limit))) {
+                       MCAST_PRINTK("spinning too long to wait for window to be free; this is bad!\n");
+                       return -1;
+               }
+       }
+
+       /* insert item */
+       memcpy(&MCASTWIN(id)->buffer[ticket & RB_MASK].payload, 
+              &msg->payload, PCN_KMSG_PAYLOAD_SIZE);
+
+       memcpy((void*)&(MCASTWIN(id)->buffer[ticket & RB_MASK].hdr), 
+              (void*)&(msg->hdr), sizeof(struct pcn_kmsg_hdr));
+
+       /* set counter to (# in group - self) */
+
+       /*
+       int x;
+
+       if ((x = atomic_read(&MCASTWIN(id)->read_counter[ticket & RB_MASK]))) {
+               KMSG_ERR("read counter is not zero (it's %d)\n", x);
+               return -1;
+       }
+       */
+
+       atomic_set(&MCASTWIN(id)->read_counter[ticket & RB_MASK],
+               rkinfo->mcast_wininfo[id].num_members - 1);
+
+       MCAST_PRINTK("set counter to %d\n", 
+                    rkinfo->mcast_wininfo[id].num_members - 1);
+
+       pcn_barrier();
+
+       /* set completed flag */
+       MCASTWIN(id)->buffer[ticket & RB_MASK].ready = 1;
+
+       return 0;
+}
+
+static inline int mcastwin_get(pcn_kmsg_mcast_id id,
+                              struct pcn_kmsg_reverse_message **msg)
+{
+       struct pcn_kmsg_reverse_message *rcvd;
+
+       MCAST_PRINTK("called for id %lu, head %lu, tail %lu, local_tail %lu\n", 
+                    id, MCASTWIN(id)->head, MCASTWIN(id)->tail, 
+                    LOCAL_TAIL(id));
+
+retry:
+
+       /* if we sent a bunch of messages, it's possible our local_tail
+          has gotten behind the global tail and we need to update it */
+       /* TODO -- atomicity concerns here? */
+       if (LOCAL_TAIL(id) < MCASTWIN(id)->tail) {
+               LOCAL_TAIL(id) = MCASTWIN(id)->tail;
+       }
+
+       if (MCASTWIN(id)->head == LOCAL_TAIL(id)) {
+               MCAST_PRINTK("nothing in buffer, returning...\n");
+               return -1;
+       }
+
+       /* spin until entry.ready at end of cache line is set */
+       rcvd = &(MCASTWIN(id)->buffer[LOCAL_TAIL(id) & RB_MASK]);
+       while (!rcvd->ready) {
+               pcn_cpu_relax();
+       }
+
+       // barrier here?
+       pcn_barrier();
+
+       /* we can't step on our own messages! */
+       if (rcvd->hdr.from_cpu == my_cpu) {
+               LOCAL_TAIL(id)++;
+               goto retry;
+       }
+
+       *msg = rcvd;
+
+       return 0;
+}
+
+static inline void mcastwin_advance_tail(pcn_kmsg_mcast_id id)
+{
+       unsigned long slot = LOCAL_TAIL(id) & RB_MASK;
+       //int val_ret, i;
+       //char printstr[256];
+       //char intstr[16];
+
+       MCAST_PRINTK("local tail currently on slot %lu, read counter %d\n", 
+                    LOCAL_TAIL(id), atomic_read(&MCASTWIN(id)->read_counter[slot]));
+
+       /*
+       memset(printstr, 0, 256);
+       memset(intstr, 0, 16);
+
+       for (i = 0; i < 64; i++) {
+               sprintf(intstr, "%d ", atomic_read(&MCASTWIN(id)->read_counter[i]));
+               strcat(printstr, intstr);
+       }
+
+       MCAST_PRINTK("read_counter: %s\n", printstr);
+
+       val_ret = atomic_add_return_sync(-1, &MCASTWIN(id)->read_counter[slot]);
+
+       MCAST_PRINTK("read counter after: %d\n", val_ret);
+       */
+
+       if (atomic_dec_and_test_sync(&MCASTWIN(id)->read_counter[slot])) {
+               MCAST_PRINTK("we're the last reader to go; ++ global tail\n");
+               MCASTWIN(id)->buffer[slot].ready = 0;
+               atomic64_inc((atomic64_t *) &MCASTWIN(id)->tail);
+       }
+
+       LOCAL_TAIL(id)++;
+}
+
+static void map_mcast_win(pcn_kmsg_work_t *w)
+{
+       pcn_kmsg_mcast_id id = w->id_to_join;
+
+       /* map window */
+       if (id < 0 || id > POPCORN_MAX_MCAST_CHANNELS) {
+               KMSG_ERR("%s: invalid mcast channel id %lu specified!\n",
+                        __func__, id);
+               return;
+       }
+
+       MCASTWIN(id) = ioremap_cache(rkinfo->mcast_wininfo[id].phys_addr,
+                                    sizeof(struct pcn_kmsg_mcast_window));
+       if (MCASTWIN(id)) {
+               MCAST_PRINTK("ioremapped mcast window, virt addr 0x%p\n",
+                            MCASTWIN(id));
+       } else {
+               KMSG_ERR("Failed to map mcast window %lu at phys addr 0x%lx\n",
+                        id, rkinfo->mcast_wininfo[id].phys_addr);
+       }
+}
+
+static inline int pcn_kmsg_mcast_window_init(struct pcn_kmsg_mcast_window *win)
+{
+       int i;
+
+       win->head = 0;
+       win->tail = 0;
+
+       for (i = 0; i < PCN_KMSG_RBUF_SIZE; i++) {
+               atomic_set(&win->read_counter[i], 0);
+       }
+
+       //memset(&win->read_counter, 0, 
+       //       PCN_KMSG_RBUF_SIZE * sizeof(int));
+       memset(&win->buffer, 0,
+              PCN_KMSG_RBUF_SIZE * sizeof(struct pcn_kmsg_message));
+       return 0;
+}
+
+static void process_mcast_queue(pcn_kmsg_mcast_id id)
+{
+       struct pcn_kmsg_reverse_message *msg;
+       while (!mcastwin_get(id, &msg)) {
+               MCAST_PRINTK("Got an mcast message, type %d!\n",
+                            msg->hdr.type);
+
+               /* Special processing for large messages */
+                if (msg->hdr.is_lg_msg) {
+                        MCAST_PRINTK("message is a large message!\n");
+                        process_large_message(msg);
+                } else {
+                        MCAST_PRINTK("message is a small message!\n");
+                        process_small_message(msg);
+                }
+
+               mcastwin_advance_tail(id);
+       }
+
+}
+
+inline void lock_chan(pcn_kmsg_mcast_id id)
+{
+}
+
+inline void unlock_chan(pcn_kmsg_mcast_id id)
+{
+}
+
+inline int count_members(unsigned long mask)
+{
+       int i, count = 0;
+
+       for (i = 0; i < POPCORN_MAX_CPUS; i++) {
+               if (mask & (1ULL << i)) {
+                       count++;
+               }
+       }
+
+       return count;
+}
+
+void print_mcast_map(void)
+{
+#if MCAST_VERBOSE
+       int i;
+
+       printk("ACTIVE MCAST GROUPS:\n");
+
+       for (i = 0; i < POPCORN_MAX_MCAST_CHANNELS; i++) {
+               if (rkinfo->mcast_wininfo[i].mask) {
+                       printk("group %d, mask 0x%lx, num_members %d\n", 
+                              i, rkinfo->mcast_wininfo[i].mask, 
+                              rkinfo->mcast_wininfo[i].num_members);
+               }
+       }
+       return;
+#endif
+}
+
+/* Open a multicast group containing the CPUs specified in the mask. */
+int pcn_kmsg_mcast_open(pcn_kmsg_mcast_id *id, unsigned long mask)
+{
+       int rc, i, found_id;
+       struct pcn_kmsg_mcast_message msg;
+       struct pcn_kmsg_mcast_wininfo *slot;
+       struct pcn_kmsg_mcast_window * new_win;
+
+       MCAST_PRINTK("Reached pcn_kmsg_mcast_open, mask 0x%lx\n", mask);
+
+       if (!(mask & (1 << my_cpu))) {
+               KMSG_ERR("This CPU is not a member of the mcast group to be created, cpu %d, mask 0x%lx\n",
+                        my_cpu, mask);
+               return -1;
+       }
+
+       /* find first unused channel */
+retry:
+       found_id = -1;
+
+       for (i = 0; i < POPCORN_MAX_MCAST_CHANNELS; i++) {
+               if (!rkinfo->mcast_wininfo[i].num_members) {
+                       found_id = i;
+                       break;
+               }
+       }
+
+       MCAST_PRINTK("Found channel ID %d\n", found_id);
+
+       if (found_id == -1) {
+               KMSG_ERR("No free multicast channels!\n");
+               return -1;
+       }
+
+       /* lock and check if channel is still unused; 
+          otherwise, try again */
+       lock_chan(found_id);
+
+       if (rkinfo->mcast_wininfo[i].num_members) {
+               unlock_chan(found_id);
+               MCAST_PRINTK("Got scooped; trying again...\n");
+               goto retry;
+       }
+
+       /* set slot info */
+       slot = &rkinfo->mcast_wininfo[found_id];
+       slot->mask = mask;
+       slot->num_members = count_members(mask);
+       slot->owner_cpu = my_cpu;
+
+       MCAST_PRINTK("Found %d members\n", slot->num_members);
+
+       /* kmalloc window for slot */
+       new_win = kmalloc(sizeof(struct pcn_kmsg_mcast_window), GFP_ATOMIC);
+
+       if (!new_win) {
+               KMSG_ERR("Failed to kmalloc mcast buffer!\n");
+               goto out;
+       }
+
+       /* zero out window */
+       memset(new_win, 0x0, sizeof(struct pcn_kmsg_mcast_window));
+
+       MCASTWIN(found_id) = new_win;
+       slot->phys_addr = virt_to_phys(new_win);
+       MCAST_PRINTK("Malloced mcast receive window %d at phys addr 0x%lx\n",
+                    found_id, slot->phys_addr);
+
+       /* send message to each member except self.  Can't use mcast yet because
+          group is not yet established, so unicast to each CPU in mask. */
+       msg.hdr.type = PCN_KMSG_TYPE_MCAST;
+       msg.hdr.prio = PCN_KMSG_PRIO_HIGH;
+       msg.type = PCN_KMSG_MCAST_OPEN;
+       msg.id = found_id;
+       msg.mask = mask;
+       msg.num_members = slot->num_members;
+
+       for (i = 0; i < POPCORN_MAX_CPUS; i++) {
+               if ((slot->mask & (1ULL << i)) && 
+                   (my_cpu != i)) {
+                       MCAST_PRINTK("Sending message to CPU %d\n", i);
+
+                       rc = pcn_kmsg_send(i, (struct pcn_kmsg_message *) &msg);
+
+                       if (rc) {
+                               KMSG_ERR("Message send failed!\n");
+                       }
+               }
+       }
+
+       *id = found_id;
+
+out:
+       unlock_chan(found_id);
+
+       return 0;
+}
+
+/* Add new members to a multicast group. */
+int pcn_kmsg_mcast_add_members(pcn_kmsg_mcast_id id, unsigned long mask)
+{
+       lock_chan(id);
+
+       KMSG_ERR("Operation not yet supported!\n");
+
+       //rkinfo->mcast_wininfo[id].mask |= mask; 
+
+       /* TODO -- notify new members */
+
+       unlock_chan(id);
+       return 0;
+}
+
+/* Remove existing members from a multicast group. */
+int pcn_kmsg_mcast_delete_members(pcn_kmsg_mcast_id id, unsigned long mask)
+{
+       lock_chan(id);
+
+       KMSG_ERR("Operation not yet supported!\n");
+
+       //rkinfo->mcast_wininfo[id].mask &= !mask;
+
+       /* TODO -- notify new members */
+
+       unlock_chan(id);
+
+       return 0;
+}
+
+inline int pcn_kmsg_mcast_close_notowner(pcn_kmsg_mcast_id id)
+{
+       MCAST_PRINTK("Closing multicast channel %lu on CPU %d\n", id, my_cpu);
+
+       /* process remaining messages in queue (should there be any?) */
+
+       /* remove queue from list of queues being polled */
+       iounmap(MCASTWIN(id));
+
+       MCASTWIN(id) = NULL;
+
+       return 0;
+}
+
+/* Close a multicast group. */
+int pcn_kmsg_mcast_close(pcn_kmsg_mcast_id id)
+{
+       int rc;
+       struct pcn_kmsg_mcast_message msg;
+       struct pcn_kmsg_mcast_wininfo *wi = &rkinfo->mcast_wininfo[id];
+
+       if (wi->owner_cpu != my_cpu) {
+               KMSG_ERR("Only creator (cpu %d) can close mcast group %lu!\n",
+                        wi->owner_cpu, id);
+               return -1;
+       }
+
+       lock_chan(id);
+
+       /* set window to close */
+       wi->is_closing = 1;
+
+       /* broadcast message to close window globally */
+       msg.hdr.type = PCN_KMSG_TYPE_MCAST;
+       msg.hdr.prio = PCN_KMSG_PRIO_HIGH;
+       msg.type = PCN_KMSG_MCAST_CLOSE;
+       msg.id = id;
+
+       rc = pcn_kmsg_mcast_send(id, (struct pcn_kmsg_message *) &msg);
+       if (rc) {
+               KMSG_ERR("failed to send mcast close message!\n");
+               return -1;
+       }
+
+       /* wait until global_tail == global_head */
+       while (MCASTWIN(id)->tail != MCASTWIN(id)->head) {}
+
+       /* free window and set channel as unused */
+       kfree(MCASTWIN(id));
+       MCASTWIN(id) = NULL;
+
+       wi->mask = 0;
+       wi->num_members = 0;
+       wi->is_closing = 0;
+
+       unlock_chan(id);
+
+       return 0;
+}
+
+unsigned long mcast_ipi_ts;
+
+static int __pcn_kmsg_mcast_send(pcn_kmsg_mcast_id id, 
+                                struct pcn_kmsg_message *msg)
+{
+       int i, rc;
+
+       if (!msg) {
+               KMSG_ERR("Passed in a null pointer to msg!\n");
+               return -1;
+       }
+
+       /* set source CPU */
+       msg->hdr.from_cpu = my_cpu;
+
+       /* place message in rbuf */
+       rc = mcastwin_put(id, msg);
+
+       if (rc) {
+               KMSG_ERR("failed to place message in mcast window -- maybe it's full?\n");
+               return -1;
+       }
+
+       rdtscll(mcast_ipi_ts);
+
+       /* send IPI to all in mask but me */
+       for (i = 0; i < POPCORN_MAX_CPUS; i++) {
+               if (rkinfo->mcast_wininfo[id].mask & (1ULL << i)) {
+                       if (i != my_cpu) {
+                               MCAST_PRINTK("sending IPI to CPU %d\n", i);
+                               apic->send_IPI_single(i, POPCORN_KMSG_VECTOR);
+                       }
+               }
+       }
+
+       return 0;
+}
+
+#define MCAST_HACK 0
+
+/* Send a message to the specified multicast group. */
+int pcn_kmsg_mcast_send(pcn_kmsg_mcast_id id, struct pcn_kmsg_message *msg)
+{
+#if MCAST_HACK
+
+       int i, rc;
+
+       MCAST_PRINTK("Sending mcast message, id %lu\n", id);
+
+       /* quick hack for testing for now; 
+          loop through mask and send individual messages */
+       for (i = 0; i < POPCORN_MAX_CPUS; i++) {
+               if (rkinfo->mcast_wininfo[id].mask & (0x1 << i)) {
+                       rc = pcn_kmsg_send(i, msg);
+
+                       if (rc) {
+                               KMSG_ERR("Batch send failed to CPU %d\n", i);
+                               return -1;
+                       }
+               }
+       }
+
+       return 0;
+#else
+       int rc;
+
+       MCAST_PRINTK("sending mcast message to group id %lu\n", id);
+
+       msg->hdr.is_lg_msg = 0;
+       msg->hdr.lg_start = 0;
+       msg->hdr.lg_end = 0;
+       msg->hdr.lg_seqnum = 0;
+
+       rc = __pcn_kmsg_mcast_send(id, msg);
+
+       return rc;
+#endif
+}
+
+/* Send a message to the specified multicast group. */
+int pcn_kmsg_mcast_send_long(pcn_kmsg_mcast_id id, 
+                            struct pcn_kmsg_long_message *msg, 
+                            unsigned int payload_size)
+{
+#if MCAST_HACK
+       int i, rc;
+
+       MCAST_PRINTK("Sending long mcast message, id %lu, size %u\n", 
+                    id, payload_size);
+
+       /* quick hack for testing for now; 
+          loop through mask and send individual messages */
+       for (i = 0; i < POPCORN_MAX_CPUS; i++) {
+               if (rkinfo->mcast_wininfo[id].mask & (0x1 << i)) {
+                       rc = pcn_kmsg_send_long(i, msg, payload_size);
+
+                       if (rc) {
+                               KMSG_ERR("Batch send failed to CPU %d\n", i);
+                               return -1;
+                       }
+               }
+       }
+
+       return 0;
+#else
+
+       KMSG_ERR("long messages not yet supported in mcast!\n");
+
+       return 0;
+#endif
+}
+
+
+static int pcn_kmsg_mcast_callback(struct pcn_kmsg_message *message) 
+{
+       int rc = 0;
+       struct pcn_kmsg_mcast_message *msg = 
+               (struct pcn_kmsg_mcast_message *) message;
+       pcn_kmsg_work_t *kmsg_work;
+
+       MCAST_PRINTK("Received mcast message, type %d\n", msg->type);
+
+       switch (msg->type) {
+               case PCN_KMSG_MCAST_OPEN:
+                       MCAST_PRINTK("Processing mcast open message...\n");
+
+                       /* Need to queue work to remap the window in a kernel
+                          thread; it can't happen here */
+                       kmsg_work = kmalloc(sizeof(pcn_kmsg_work_t), GFP_ATOMIC);
+                       if (kmsg_work) {
+                               INIT_WORK((struct work_struct *) kmsg_work,
+                                         process_kmsg_wq_item);
+                               kmsg_work->op = PCN_KMSG_WQ_OP_MAP_MCAST_WIN;
+                               kmsg_work->from_cpu = msg->hdr.from_cpu;
+                               kmsg_work->id_to_join = msg->id;
+                               queue_work(kmsg_wq, 
+                                          (struct work_struct *) kmsg_work);
+                       } else {
+                               KMSG_ERR("Failed to kmalloc work structure!\n");
+                       }
+
+                       break;
+
+               case PCN_KMSG_MCAST_ADD_MEMBERS:
+                       KMSG_ERR("Mcast add not yet implemented...\n");
+                       break;
+
+               case PCN_KMSG_MCAST_DEL_MEMBERS:
+                       KMSG_ERR("Mcast delete not yet implemented...\n");
+                       break;
+
+               case PCN_KMSG_MCAST_CLOSE:
+                       MCAST_PRINTK("Processing mcast close message...\n");
+                       pcn_kmsg_mcast_close_notowner(msg->id);
+                       break;
+
+               default:
+                       KMSG_ERR("Invalid multicast message type %d\n", 
+                                msg->type);
+                       rc = -1;
+                       goto out;
+       }
+
+       print_mcast_map();
+
+out:
+       pcn_kmsg_free_msg(message);
+       return rc;
+}