cleaned and structured futex.
authorakshay <akshay87@vt.edu>
Tue, 27 May 2014 23:14:09 +0000 (19:14 -0400)
committerakshay <akshay87@vt.edu>
Tue, 27 May 2014 23:14:09 +0000 (19:14 -0400)
arch/x86/vdso/vma.c
include/linux/process_server.h
include/popcorn/global_spinlock.h
kernel/exit.c
kernel/futex.c
kernel/futex_remote.c
kernel/futex_remote.h
kernel/global_spinlock.c
kernel/sched.c
kernel/signal.c

index 153407c..29b9c08 100644 (file)
@@ -16,7 +16,7 @@
 #include <asm/vdso.h>
 #include <asm/page.h>
 
-unsigned int __read_mostly vdso_enabled = 1;
+unsigned int __read_mostly vdso_enabled = 0;
 
 extern char vdso_start[], vdso_end[];
 extern unsigned short vdso_sync_cpuid;
index b6b14e0..b06b6e8 100644 (file)
@@ -8,7 +8,9 @@
 
 #ifndef _PROCESS_SERVER_H
 #define _PROCESS_SERVER_H
-
+#define RETURN_DISPOSITION_NONE 0
+#define RETURN_DISPOSITION_EXIT 1
+#define RETURN_DISPOSITION_MIGRATE 2
 #define PROCESS_SERVER_CLONE_SUCCESS 0
 #define PROCESS_SERVER_CLONE_FAIL 1
 
index bae3e02..698c2eb 100644 (file)
@@ -37,18 +37,6 @@ struct futex_common_data{
 typedef struct futex_common_data  futex_common_data_t;
 
 
-struct global_request_work  {
-       struct work_struct work;
-       spinlock_t * lock;
-       struct plist_head * _grq_head;
-       volatile unsigned int _is_alive;
-       pid_t _worker_pid;
-        wait_queue_head_t *flush;
-        unsigned int * free_work;
-       int ops ; //0-wait 1-wake
-};
-
-typedef struct global_request_work global_request_work_t;
 
 
 typedef struct spin_key {
@@ -71,7 +59,7 @@ struct local_request_queue {
 typedef struct local_request_queue _local_rq_t;
 
 struct global_request_queue {
-       volatile struct plist_node list;
+//     volatile struct plist_node list;
        _remote_wakeup_request_t wakeup;
        _remote_key_request_t wait;
        int cnt;
@@ -79,6 +67,24 @@ struct global_request_queue {
 }__attribute__((packed));
 typedef struct global_request_queue _global_rq;
 
+
+
+
+
+struct global_request_work  {
+       struct work_struct work;
+       spinlock_t * lock;
+       _global_rq * gq;
+//     struct plist_head * _grq_head;
+//     volatile unsigned int _is_alive;
+//     pid_t _worker_pid;
+//     wait_queue_head_t *flush;
+//     unsigned int * free_work;
+//     int ops ; //0-wait 1-wake
+};
+
+typedef struct global_request_work global_request_work_t;
+
 struct spin_value {
        spinlock_t _sp;
        volatile unsigned int _st; //token status//TBR
@@ -90,7 +96,7 @@ typedef struct spin_value  _spin_value;
 
 struct global_value {
        spinlock_t lock;
-       volatile struct plist_head _grq_head;
+//     volatile struct plist_head _grq_head; // TODO for storing mutiple wq
        struct workqueue_struct *global_wq;
        struct task_struct *thread_group_leader;
        global_request_work_t *worker_task;
@@ -116,6 +122,8 @@ _local_rq_t * add_request_node(int request_id,pid_t pid, struct list_head *head)
 int find_and_delete_request(int request_id, struct list_head *head);
 _local_rq_t * find_request(int request_id, struct list_head *head) ;
 _local_rq_t * find_request_by_pid(pid_t pid, struct list_head *head) ;
+_local_rq_t * set_err_request(int request_id,int err, struct list_head *head) ;
+
 
 extern _spin_value spin_bucket[1 << _SPIN_HASHBITS];
 extern _global_value global_bucket[1 << _SPIN_HASHBITS];
index 7ac843c..56c66ff 100644 (file)
@@ -1012,18 +1012,7 @@ NORET_TYPE void do_exit(long code)
 
        if(gvp != NULL){
 
-               ((global_request_work_t*)gvp->worker_task)->_is_alive = 0;
-               *(((global_request_work_t*)gvp->worker_task)->free_work) = 1;
-               smp_mb();
-               printk(KERN_INFO"((global_request_work_t*)gvp->worker_task){%d} free{%d}\n",((global_request_work_t*)gvp->worker_task)->_worker_pid,
-                                               *(((global_request_work_t*)gvp->worker_task)->free_work));
-               set_task_state(pid_task(find_vpid(((global_request_work_t*)gvp->worker_task)->_worker_pid), PIDTYPE_PID),TASK_INTERRUPTIBLE);
-               wake_up_interruptible_all(((global_request_work_t*)gvp->worker_task)->flush);
-               printk(KERN_INFO "after wake up \n");
-               /*if(!( cancel_work_sync((struct work_struct*)gvp->worker_task))){
-                       flush_work((struct work_struct*)gvp->worker_task);
-                       flush_workqueue(gvp->global_wq);
-               }*/
+               printk(KERN_INFO"Inside GVP");
                gvp->thread_group_leader = NULL;
                gvp->free = 0;
                gvp->global_wq = NULL;
index 5d8d3ed..bd52d42 100644 (file)
@@ -68,7 +68,7 @@
 #include <popcorn/global_spinlock.h>
 
 
-#define FUTEX_VERBOSE 0
+#define FUTEX_VERBOSE 1
 #if FUTEX_VERBOSE
 #define FPRINTK(...) printk(__VA_ARGS__)
 #else
@@ -355,7 +355,9 @@ void put_futex_key(union futex_key *key)
  * disabled section so we can as well avoid the #PF overhead by
  * calling get_user_pages() right away.
  */
-static int fault_in_user_writeable(u32 __user *uaddr)
+//static 
+
+int fault_in_user_writeable(u32 __user *uaddr)
 {
        struct mm_struct *mm = current->mm;
        int ret;
@@ -368,7 +370,8 @@ static int fault_in_user_writeable(u32 __user *uaddr)
        return ret < 0 ? ret : 0;
 }
 
-static int fault_in_user_writeable_task(u32 __user *uaddr,struct task_struct * tgid)
+//static
+ int fault_in_user_writeable_task(u32 __user *uaddr,struct task_struct * tgid)
 {
        struct mm_struct *mm = tgid->mm;
        int ret;
@@ -1754,8 +1757,6 @@ static inline void queue_me(struct futex_q *q, struct futex_hash_bucket *hb)
        q->task = current;
        spin_unlock(&hb->lock);
 
-       //if(current->tgroup_distributed)
-       //      global_spinunlock((unsigned long)q->key.private.address+q->key.private.offset,FLAGS_SYSCALL);
 
 }
 
@@ -2018,9 +2019,6 @@ static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q,
        set_current_state(TASK_INTERRUPTIBLE);
        queue_me(q, hb);
 
-       struct plist_head *head = &hb->chain;
-       struct futex_q *this,*next;
-
        /* Arm the timer */
        if (timeout) {
                hrtimer_start_expires(&timeout->timer, HRTIMER_MODE_ABS);
@@ -2042,7 +2040,6 @@ static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q,
                        schedule();
                }
        }
-       int sig;
        __set_current_state(TASK_RUNNING);
 }
 
@@ -2103,14 +2100,20 @@ retry_private:
                                flags & FLAGS_SHARED, VERIFY_READ, bitset);
                FPRINTK(KERN_ALERT " %s: spinlock  futex_wait_setup err {%d}\n",__func__,g_errno);
                if (g_errno) {  //error due to val change
-                       ret = g_errno;
-                       goto out;
+                           ret = g_errno;
+                           if( ret == -EFAULT)
+                           {
+                                FPRINTK(KERN_ALERT" client side efault fix up {%d} \n",fault_in_user_writeable(uaddr));
+                               
+                           }
+
                } else if (!g_errno) {  //no error => just queue it acquiring spinlock
                        //get the actual spinlock : Not necessary as we are alone
                        *hb = queue_lock(q);
                        ret = g_errno;
-                       goto out;
                }
+
+               goto out;
        }
        else{
                *hb = queue_lock(q);
@@ -2230,7 +2233,7 @@ int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
 
        struct task_struct *t=current;
        int rep_rem = t->tgroup_distributed;
-       int x=0,y=0,wake=0,woke=0,nw=0,bs=0;
+       int x=0;
        struct pt_regs * regs;
        unsigned long bp = stack_frame(current,NULL);
 
@@ -2254,9 +2257,8 @@ int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
 
 retry:
 
-       FPRINTK(KERN_ALERT "%s:wait before task {%d} rep_rem {%d}  uaddr{%lx}\ value{%d},wake{%d} ,woke{%d},nw{%d},bs{%d} disp{%d}\n",__func__,
-               t->pid, t->tgroup_distributed, uaddr, x, wake, woke, nw, bs,
-               current->return_disposition);
+       FPRINTK(KERN_ALERT "%s:wait before task {%d} rep_rem {%d}  uaddr{%lx}\ value{%d} disp{%d}\n",__func__,
+               t->pid, t->tgroup_distributed, uaddr, x,current->return_disposition);
        /*
         * Prepare to wait on uaddr. On success, holds hb lock and increments
         * q.key refs.
@@ -2287,6 +2289,8 @@ retry:
        if (!signal_pending(current))
                goto retry;
 
+       printk(KERN_ALERT" up for restart abs{%d} \n",(!abs_time)?0:1);
+
        ret = -ERESTARTSYS;
        if (!abs_time)
                goto out;
@@ -2314,7 +2318,8 @@ static long futex_wait_restart(struct restart_block *restart)
 {
        u32 __user *uaddr = restart->futex.uaddr;
        ktime_t t, *tp = NULL;
-
+       if(current->tgroup_distributed==1)
+               printk(KERN_ALERT"futex_restarted {%d} \n",uaddr);
        if (restart->futex.flags & FLAGS_HAS_TIMEOUT) {
                t.tv64 = restart->futex.time;
                tp = &t;
@@ -3006,7 +3011,7 @@ long do_futex(u32 __user *uaddr, int op, u32 val, ktime_t *timeout,
                flags |= FLAGS_SHARED;
 
        if (op & FUTEX_CLOCK_REALTIME) {
-               //flags |= FLAGS_CLOCKRT;
+               flags |= FLAGS_CLOCKRT;
                if (cmd != FUTEX_WAIT_BITSET && cmd != FUTEX_WAIT_REQUEUE_PI)
                        return -ENOSYS;
        }
index db447c8..c88e23c 100644 (file)
@@ -29,7 +29,7 @@
 #define MODULE "GRQ-"
 #include <popcorn/global_spinlock.h>
 
-#define FUTEX_REMOTE_VERBOSE 0
+#define FUTEX_REMOTE_VERBOSE 1
 #if FUTEX_REMOTE_VERBOSE
 #define FRPRINTK(...) printk(__VA_ARGS__)
 #else
@@ -121,14 +121,7 @@ struct _inc_remote_vm_pool {
        struct list_head list_member;
 };
 
-struct send_ticket_request {
-       struct pcn_kmsg_hdr header;
-       pid_t rem_pid;                  //4
-       unsigned long uaddr;    //8
-       char pad[48];
-}__attribute__((packed)) __attribute__((aligned(64)));
 
-typedef struct send_ticket_request send_ticket_request_t;
 
 int find_kernel_for_pfn(unsigned long addr, struct list_head *head) {
        struct list_head *iter;
@@ -347,7 +340,17 @@ out:
 }
 
 
+int fix_user_page(u32 __user * uaddr,struct task_struct *tsk){
+struct mm_struct *mm=tsk->mm;
+int ret;
 
+down_read(&mm->mmap_sem);
+ret =fixup_user_fault(tsk,mm, (unsigned long) uaddr, FAULT_FLAG_WRITE| FAULT_FLAG_NONLINEAR | FAULT_FLAG_MKWRITE |FAULT_FLAG_KILLABLE );
+up_read(&mm->mmap_sem);
+
+return ret < 0 ? ret :0;
+
+}
 int global_futex_wait(unsigned long uaddr, unsigned int flags, u32 val,
                ktime_t *abs_time, u32 bitset, pid_t rem, struct task_struct *origin,
                unsigned int fn_flags) {
@@ -368,13 +371,14 @@ int global_futex_wait(unsigned long uaddr, unsigned int flags, u32 val,
 
        //start wait setup
 retry:
+       
        ret = get_futex_key((u32 __user *)uaddr, flags & FLAGS_SHARED, &q->key, VERIFY_READ);
-       FRPRINTK(KERN_ALERT "%s: pid origin {%s} _cpu{%d} uaddr{%lx} uval{%d} \n ",__func__,tsk->comm,smp_processor_id(),uaddr,val);
+       FRPRINTK(KERN_ALERT "%s: pid origin {%s} _cpu{%d} uaddr{%lx} uval{%d} ret{%d} \n ",__func__,tsk->comm,smp_processor_id(),uaddr,val,ret);
        if (unlikely(ret != 0))
           return ret;
 
        //set private.mm to origin tasks mm
-       if (tsk)
+       if(tsk)
                q->key.private.mm = tsk->mm;
 
 retry_private:
@@ -382,12 +386,23 @@ retry_private:
        hb = hash_futex(&q->key);
        q->lock_ptr = &hb->lock;
        spin_lock(&hb->lock);
-
+fault:
        ret = get_futex_value_locked(&uval, (u32 __user *)uaddr);
 
        if (ret) {
                        spin_unlock(&hb->lock);
                        FRPRINTK(KERN_ALERT "%s:after spin unlock ret{%d} uval{%lx}\n ",__func__,ret,uval);
+
+                       if(ret == -EFAULT){
+                               if((ret = fix_user_page((u32 __user *)uaddr,tsk)) ==  0){
+                                       FRPRINTK(KERN_ALERT"%s:ret {%d} \n",__func__,ret);
+                                       //spin_lock(&hb->lock);
+                                       //goto fault;
+                               }
+
+                               //goto out;
+                       }
+
                        ret = get_user(uval, (u32 __user *)uaddr);
 
                        if (ret){
@@ -435,10 +450,6 @@ retry_private:
 
 out:
        if(ret){
-               if(ret == -EFAULT){
-               FRPRINTK(KERN_ALERT"check if the wake signal has reached origin\n");
-
-               }
                put_futex_key(&q->key);
        }
 
@@ -453,7 +464,6 @@ void global_worher_fn(struct work_struct* work) {
        global_request_work_t* w = (global_request_work_t*) work;
 
         _global_rq *this, *next;
-       struct plist_head * head;
        struct task_struct *tsk = current;
        struct task_struct *task, *g;
        struct mm_struct *cmm = NULL;
@@ -463,41 +473,28 @@ void global_worher_fn(struct work_struct* work) {
        unsigned long flags =0;
        //Set task struct for the worker
        worker_pid = current->pid;
-       w->_worker_pid = current->pid;
-       w->flush = &resume_;
-       w->free_work = &free_work;
 
        FRPRINTK(KERN_ALERT "%s:GRQ started {%s}\n",__func__,current->comm);
 
+        this = (_global_rq*) w->gq;
 
-       //TODO:inifinite loop till the thread dies or forcefully thread is stopped
+       has_work = 0;
+       FRPRINTK(KERN_ALERT "%s:retry has_work{%d} \n",__func__,has_work);
 
-retry:
-               has_work = 0;
-               FRPRINTK(KERN_ALERT "%s:retry has_work{%d} \n",__func__,has_work);
 
-               if(w->_is_alive == 0)
-                       goto exit;
+       struct spin_key sk;
+       _spin_key_init(&sk);
 
-               head = w->_grq_head;
-               plist_for_each_entry_safe(this, next, head, list)
+               if (this->ops == WAKE_OPS) //process wake request from GRQ
                {
-                       has_work = 1;
-                       struct spin_key sk;
-                       _spin_key_init(&sk);
-
-                       if (this->ops == WAKE_OPS) //process wake request from GRQ
-                       {
-                               _remote_wakeup_request_t* msg = (_remote_wakeup_request_t*) &this->wakeup;
-                               int ret =0;
+                       _remote_wakeup_request_t* msg = (_remote_wakeup_request_t*) &this->wakeup;
+                       int ret =0;
 
-                               getKey(msg->uaddr, &sk,msg->tghid);
-                               _spin_value *value = hashspinkey(&sk);
+                       getKey(msg->uaddr, &sk,msg->tghid);
+                       _spin_value *value = hashspinkey(&sk);
+                       FRPRINTK(KERN_ALERT"%s:wake--current msg pid{%d} msg->ticket{%d} \n", __func__,msg->pid,msg->ticket);
 
-                               FRPRINTK(KERN_ALERT"%s:wake--current msg pid{%d} msg->ticket{%d} \n", __func__,msg->pid,msg->ticket);
-
-
-                               if (msg->rflag == 0 || (msg->fn_flag & FLAGS_ORIGINCALL)) {
+                       if (msg->rflag == 0 || (msg->fn_flag & FLAGS_ORIGINCALL)) {
                                        if (current->mm != NULL) {
                                                null_flag = 1;
                                                cmm = (current->mm);
@@ -560,23 +557,23 @@ mm_exit:
                                        pcn_kmsg_send(ORIG_NODE(send_tkt.rem_pid), (struct pcn_kmsg_message*) (&send_tkt));
 
 
-                       } else if(this->ops == WAIT_OPS){ //wait request
+               } else if(this->ops == WAIT_OPS){ //wait request
 
-                               _remote_key_request_t* msg = (_remote_key_request_t*) &this->wait;
-                               int ret =0 ;
+                       _remote_key_request_t* msg = (_remote_key_request_t*) &this->wait;
+                       int ret =0 ;
 
-                               getKey(msg->uaddr, &sk,msg->tghid);
-                               _spin_value *value = hashspinkey(&sk);
+                       getKey(msg->uaddr, &sk,msg->tghid);
+                       _spin_value *value = hashspinkey(&sk);
 
-                               FRPRINTK(KERN_ALERT"%s:wait --current msg pid{%d} msg->ticket{%d} \n", __func__,msg->pid,msg->ticket);
+                       FRPRINTK(KERN_ALERT"%s:wait --current msg pid{%d} msg->ticket{%d} \n", __func__,msg->pid,msg->ticket);
 
-                               tsk = gettask(msg->tghid, msg->tghid);
-                               if (msg->fn_flags & FLAGS_ORIGINCALL) {
-                                       msg->fn_flags |= FLAGS_REMOTECALL;
-                                       ret = global_futex_wait(msg->uaddr, msg->flags, msg->val, 0, 0, msg->pid, tsk,
-                                                       msg->fn_flags);
-                               } else
-                                       ret = global_futex_wait(msg->uaddr, msg->flags, msg->val, 0, 0, msg->pid, tsk,
+                       tsk = gettask(msg->tghid, msg->tghid);
+                       if (msg->fn_flags & FLAGS_ORIGINCALL) {
+                               msg->fn_flags |= FLAGS_REMOTECALL;
+                               ret = global_futex_wait(msg->uaddr, msg->flags, msg->val, 0, 0, msg->pid, tsk,
+                                               msg->fn_flags);
+                       } else
+                               ret = global_futex_wait(msg->uaddr, msg->flags, msg->val, 0, 0, msg->pid, tsk,
                                                        msg->fn_flags);
 
 
@@ -595,22 +592,11 @@ mm_exit:
 
                        }
 cleanup:
-                       //Delete the entry
-                       counter++;
-                       FRPRINTK(KERN_ALERT "done iteration moving the head cnt{%d} counter{%d} \n",this->cnt,counter);
+               //Delete the entry
+               counter++;
+               FRPRINTK(KERN_ALERT "done iteration moving the head cnt{%d} counter{%d} \n",this->cnt,counter);
 
-                       plist_del(&this->list, w->_grq_head);
-                       kfree(this);
-               }
-               if (has_work == 0) {
-                       __set_current_state(TASK_INTERRUPTIBLE);
-                       FRPRINTK(KERN_ALERT "sleep until request \n");
-                       finish_work = 1;
-                       wake_up_interruptible(&wait_);
-                       wait_event_interruptible(resume_, free_work == 1);
-                       free_work = 0;
-               }
-               goto retry;
+               kfree(this);
 
 
 exit:
@@ -620,7 +606,7 @@ exit:
 
 static int handle_remote_futex_wake_response(struct pcn_kmsg_message* inc_msg) {
        _remote_wakeup_response_t* msg = (_remote_wakeup_response_t*) inc_msg;
-       preempt_disable();
+       //preempt_disable();
        FRPRINTK(KERN_ALERT"%s: response {%d} \n",
                        __func__, msg->errno);
        struct task_struct *p =  pid_task(find_vpid(msg->rem_pid), PIDTYPE_PID);
@@ -635,17 +621,15 @@ static int handle_remote_futex_wake_response(struct pcn_kmsg_message* inc_msg) {
        // update the local value status to has ticket
 
 
-       _local_rq_t *ptr = find_request(msg->request_id, &value->_lrq_head);
-       ptr->status = DONE;
-       ptr->errno = msg->errno;
-        smp_wmb();
-       FRPRINTK(KERN_ALERT"%s: errno{%d} p->tgp(%d} \n",__func__,ptr->errno,p->tgroup_home_id);
+       _local_rq_t *ptr = set_err_request(msg->request_id,msg->errno, &value->_lrq_head);
+       // smp_wmb();
+       FRPRINTK(KERN_ALERT"%s: errno{%d} p->tgp(%d} \n",__func__,msg->errno,p->tgroup_home_id);
        wake_up_interruptible(&ptr->_wq);
 
        put_task_struct(p);
 
        pcn_kmsg_free_msg(inc_msg);
-       preempt_enable();
+       //preempt_enable();
 
        return 0;
 }
@@ -678,58 +662,40 @@ static int handle_remote_futex_wake_request(struct pcn_kmsg_message* inc_msg) {
        gvp = hashgroup(tsk);
        if (!gvp->free) { //futex_wake is the first one
                //scnprintf(gvp->name, sizeof(gvp->name), MODULE);
-               FRPRINTK(KERN_ALERT"%s: wake gvp free \n", __func__);
-               gvp->global_wq = grq;// create_singlethread_workqueue(gvp->name);
-               gvp->free = 1;
-               gvp->thread_group_leader = tsk;
-               global_request_work_t* back_work = NULL;
-
-               // Spin up bottom half to process this event
-               back_work = (global_request_work_t*) kmalloc(sizeof(global_request_work_t),
+               gvp ->free =1;
+       }
+       FRPRINTK(KERN_ALERT"%s: wake gvp free \n", __func__);
+       gvp->global_wq = grq;// create_singlethread_workqueue(gvp->name);
+       gvp->thread_group_leader = tsk;
+       global_request_work_t* back_work = NULL;
+       // Spin up bottom half to process this event
+       back_work = (global_request_work_t*) kmalloc(sizeof(global_request_work_t),
                                GFP_ATOMIC);
-               if (back_work) {
+       if (back_work) {
                        INIT_WORK((struct work_struct* )back_work, global_worher_fn);
 
                        FRPRINTK(KERN_ALERT"%s: set up head\n", __func__);
-                       back_work->_grq_head = &gvp->_grq_head;//, sizeof(struct plist_head));
+                       
                        back_work->lock = &gvp->lock; // , sizeof(spinlock_t));
-                       back_work->ops = -1;
-                       back_work->_is_alive = 1;
-                       back_work->_worker_pid = 0;
+                       
+                       _global_rq *trq = (_global_rq *) kmalloc(sizeof(_global_rq), GFP_ATOMIC);                       
+                       memcpy(&trq->wakeup, msg, sizeof(_remote_wakeup_request_t));
+                       trq->ops = WAKE_OPS;
+                       trq->cnt = atomic_read(&progress);
+                       //smp_mb();barrier()i;
+
+                       back_work->gq = trq;
                        queue_work(gvp->global_wq, (struct work_struct*) back_work);
                }
-               gvp->worker_task = back_work;// pid_task(find_vpid(worker_pid), PIDTYPE_PID);
-       } else {
-               //TODO: check if it is the same thread group leader if not hash it to another bucket.
-       }
+       gvp->worker_task = back_work;// pid_task(find_vpid(worker_pid), PIDTYPE_PID);
        GENERAL_SPIN_UNLOCK(&access_global_value_table);
 
-       if(gvp->free == 0 || !gvp->global_wq ){
-                       FRPRINTK(KERN_ALERT"%s: wait finish\n", __func__);
-                               if(finish_work == 0){
-                               wait_event_interruptible(wait_, (finish_work != 0));
-                               finish_work = 1;
-                               }
-                               else if(atomic_read(&progress) > counter){
-                                       FRPRINTK(KERN_ALERT"%s: continue finish\n", __func__);
-                               }
-                       }
 
        FRPRINTK(KERN_ALERT"%s: ERROR msg ticket{%d}\n", __func__,msg->ticket);
 
        //Check whether the request is asking for ticket or holding the ticket?
                //if not holding the ticket add to the tail of Global request queue.
-               _global_rq *trq = (_global_rq *) kmalloc(sizeof(_global_rq), GFP_ATOMIC);
                //if not holding the ticket add to the tail of Global request queue.
-               memcpy(&trq->wakeup, msg, sizeof(_remote_wakeup_request_t));
-               trq->ops = WAKE_OPS;
-               trq->cnt = atomic_read(&progress);
-               plist_node_init(&trq->list, NORMAL_Q_PRIORITY);
-               plist_add(&trq->list, &gvp->_grq_head);
-               free_work = 1;
-               smp_mb();barrier();
-
-               wake_up_interruptible(&resume_);
        }
        else {
                 FRPRINTK(KERN_ALERT"need to wake_st\n");
@@ -794,7 +760,7 @@ out:
 }
 static int handle_remote_futex_key_response(struct pcn_kmsg_message* inc_msg) {
        _remote_key_response_t* msg = (_remote_key_response_t*) inc_msg;
-       preempt_disable();
+       //preempt_disable();
        FRPRINTK(KERN_ALERT"%s: response to revoke wait request as origin is dead {%d} \n",
                        __func__,msg->errno);
 
@@ -811,17 +777,16 @@ static int handle_remote_futex_key_response(struct pcn_kmsg_message* inc_msg) {
 
        FRPRINTK(KERN_ALERT"%s:  value {%d}  p->tgroup_home_id{%d}  \n",
                                        __func__, value->_st,p->tgroup_home_id);
-       smp_wmb();
+       //smp_wmb();
 
-       _local_rq_t *ptr = find_request(msg->request_id, &value->_lrq_head);
-       ptr->status = DONE;
-       ptr->errno = msg->errno;
+       _local_rq_t *ptr = set_err_request(msg->request_id,msg->errno, &value->_lrq_head);
+       
        wake_up_interruptible(&ptr->_wq);
 
        put_task_struct(p);
 
        pcn_kmsg_free_msg(inc_msg);
-       preempt_enable();
+       //preempt_enable();
        return 0;
 }
 
@@ -848,14 +813,15 @@ static int handle_remote_futex_key_request(struct pcn_kmsg_message* inc_msg) {
        GENERAL_SPIN_LOCK(&access_global_value_table);
        gvp = hashgroup(tsk);
        if(!gvp->free) {
+               gvp->free =1;
+       }
                //futex wait is the first global request
-               FRPRINTK(KERN_ALERT"%s: wait gvp free \n", __func__);
-               scnprintf(gvp->name, sizeof(gvp->name), MODULE);
+       FRPRINTK(KERN_ALERT"%s: wait gvp free \n", __func__);
+       scnprintf(gvp->name, sizeof(gvp->name), MODULE);
 
-               gvp->global_wq = grq;//create_singlethread_workqueue(gvp->name);
-               gvp->free = 1;
-               gvp->thread_group_leader = tsk;
-               global_request_work_t* back_work = NULL;
+       gvp->global_wq = grq;//create_singlethread_workqueue(gvp->name);
+       gvp->thread_group_leader = tsk;
+       global_request_work_t* back_work = NULL;
 
                // Spin up bottom half to process this event
                back_work = (global_request_work_t*) kmalloc(sizeof(global_request_work_t),
@@ -863,41 +829,21 @@ static int handle_remote_futex_key_request(struct pcn_kmsg_message* inc_msg) {
                if (back_work) {
                        INIT_WORK((struct work_struct* )back_work, global_worher_fn);
                        FRPRINTK(KERN_ALERT"%s: set up head\n", __func__);
-                       back_work->_grq_head = &gvp->_grq_head;//, sizeof(struct plist_head));
                        back_work->lock = &gvp->lock; // , sizeof(spinlock_t));
-                       back_work->ops = -1;
-                       back_work->_is_alive = 1;
-                       back_work->_worker_pid = 0;
+                       
+                       _global_rq *trq = (_global_rq *) kmalloc(sizeof(_global_rq), GFP_ATOMIC);
+                       memcpy(&trq->wait, msg, sizeof(_remote_key_request_t));
+                       trq->cnt =atomic_read(&progress);
+                       trq->ops = WAIT_OPS;
+
+                       back_work->gq = trq;
+                       FRPRINTK(KERN_ALERT"%s: wait token aqc trq->wait.ticket{%d} cnt{%d}\n", __func__,trq->wait.ticket,trq->cnt);
+                       
                        queue_work(grq, (struct work_struct*) back_work);
                }
-               gvp->worker_task = back_work;//pid_task(find_vpid(worker_pid), PIDTYPE_PID);
-       } else {
-               //TODO: check if it is the same thread group leader if not hash it to another bucket.
-       }
+       gvp->worker_task = back_work;//pid_task(find_vpid(worker_pid), PIDTYPE_PID);
        GENERAL_SPIN_UNLOCK(&access_global_value_table);
 
-       if(gvp->free == 0 || !gvp->global_wq ){
-                       FRPRINTK(KERN_ALERT"%s: wait finish\n", __func__);
-                               if(finish_work == 0){
-                               wait_event_interruptible(wait_, (finish_work != 0));
-                               finish_work = 1;
-                               }
-                               else if(atomic_read(&progress) > counter){
-                                       FRPRINTK(KERN_ALERT"%s: continue finish\n", __func__);
-                               }
-                       }
-
-               _global_rq *trq = (_global_rq *) kmalloc(sizeof(_global_rq), GFP_ATOMIC);
-               memcpy(&trq->wait, msg, sizeof(_remote_key_request_t));
-               trq->cnt =atomic_read(&progress);
-               trq->ops = WAIT_OPS;
-               FRPRINTK(KERN_ALERT"%s: wait token aqc trq->wait.ticket{%d} cnt{%d}\n", __func__,trq->wait.ticket,trq->cnt);
-               plist_node_init(&trq->list, NORMAL_Q_PRIORITY);
-               plist_add(&trq->list, &gvp->_grq_head);
-               free_work = 1;
-               smp_mb();barrier();
-               wake_up_interruptible(&resume_);
-
        pcn_kmsg_free_msg(inc_msg);
 
        return 0;
index f214e92..953fbcc 100644 (file)
@@ -114,7 +114,8 @@ extern int match_futex(union futex_key *key1, union futex_key *key2);
 extern void wake_futex(struct futex_q *q);
 extern void put_futex_key(union futex_key *key);
 extern void __unqueue_futex(struct futex_q *q);
-
+extern int fault_in_user_writeable(u32 __user * uaddr);
+extern int fault_in_user_writeable_task(u32 __user * uaddr, struct task_struct * tgid);
 pte_t *do_page_walk(unsigned long address);
 
 int find_kernel_for_pfn(unsigned long addr, struct list_head *head);
index 0ac53eb..dc51291 100644 (file)
@@ -29,8 +29,8 @@ DEFINE_SPINLOCK(request_queue_lock);
  _spin_value spin_bucket[1<<_SPIN_HASHBITS];
  _global_value global_bucket[1<<_SPIN_HASHBITS];
 
-#define GENERAL_SPIN_LOCK(x) spin_lock(x)
-#define GENERAL_SPIN_UNLOCK(x) spin_unlock(x)
+#define GENERAL_SPIN_LOCK(x,f) spin_lock_irqsave(x,f)
+#define GENERAL_SPIN_UNLOCK(x,f) spin_unlock_irqrestore(x,f)
 
 //extern functions
  extern struct vm_area_struct * getVMAfromUaddr(unsigned long uaddr);
@@ -39,7 +39,8 @@ DEFINE_SPINLOCK(request_queue_lock);
 
  _local_rq_t * add_request_node(int request_id, pid_t pid, struct list_head *head) {
 
-        GENERAL_SPIN_LOCK(&request_queue_lock);
+        unsigned long f;
+        GENERAL_SPIN_LOCK(&request_queue_lock,f);
         _local_rq_t *Ptr = (_local_rq_t *) kmalloc(
                        sizeof(_local_rq_t), GFP_ATOMIC);
 
@@ -51,7 +52,7 @@ DEFINE_SPINLOCK(request_queue_lock);
        init_waitqueue_head(&Ptr->_wq);
        INIT_LIST_HEAD(&Ptr->lrq_member);
        list_add(&Ptr->lrq_member, head);
-        GENERAL_SPIN_UNLOCK(&request_queue_lock);
+        GENERAL_SPIN_UNLOCK(&request_queue_lock,f);
 
        return Ptr;
  }
@@ -60,18 +61,19 @@ DEFINE_SPINLOCK(request_queue_lock);
 
        struct list_head *iter;
        _local_rq_t *objPtr;
-        GENERAL_SPIN_LOCK(&request_queue_lock);
+       unsigned long f;
+        GENERAL_SPIN_LOCK(&request_queue_lock,f);
        list_for_each(iter, head)
        {
                objPtr = list_entry(iter, _local_rq_t, lrq_member);
                if (objPtr->_request_id == request_id) {
                        list_del(&objPtr->lrq_member);
                        kfree(objPtr);
-                        GENERAL_SPIN_UNLOCK(&request_queue_lock);
+                        GENERAL_SPIN_UNLOCK(&request_queue_lock,f);
                        return 1;
                }
        }
-        GENERAL_SPIN_UNLOCK(&request_queue_lock);
+        GENERAL_SPIN_UNLOCK(&request_queue_lock,f);
  }
 
 
@@ -79,34 +81,58 @@ DEFINE_SPINLOCK(request_queue_lock);
 
        struct list_head *iter;
        _local_rq_t *objPtr;
-        GENERAL_SPIN_LOCK(&request_queue_lock);
+       unsigned long f;
+        GENERAL_SPIN_LOCK(&request_queue_lock,f);
        list_for_each(iter, head)
        {
                objPtr = list_entry(iter, _local_rq_t, lrq_member);
                if (objPtr->_request_id == request_id) {
-                       GENERAL_SPIN_UNLOCK(&request_queue_lock);
+                       GENERAL_SPIN_UNLOCK(&request_queue_lock,f);
                        return objPtr;
                }
        }
-        GENERAL_SPIN_UNLOCK(&request_queue_lock);
+        GENERAL_SPIN_UNLOCK(&request_queue_lock,f);
        return NULL;
  }
 
+ _local_rq_t * set_err_request(int request_id, int err, struct list_head *head) {
+
+struct list_head *iter;
+_local_rq_t *objPtr;
+unsigned long f;
+        GENERAL_SPIN_LOCK(&request_queue_lock,f);
+       list_for_each(iter, head)
+       {
+               objPtr = list_entry(iter, _local_rq_t, lrq_member);
+               if (objPtr->_request_id == request_id) {
+                       pagefault_disable();
+                       objPtr->status =DONE;
+                       objPtr->errno = err;
+                       pagefault_enable();
+                       GENERAL_SPIN_UNLOCK(&request_queue_lock,f);
+                       return objPtr;
+               }
+       }
+        GENERAL_SPIN_UNLOCK(&request_queue_lock,f);
+       return NULL;
+}
+
  _local_rq_t *find_request_by_pid(pid_t pid, struct list_head *head) {
 
        struct list_head *iter;
        _local_rq_t *objPtr;
-        GENERAL_SPIN_LOCK(&request_queue_lock);
+       unsigned long f;
+        GENERAL_SPIN_LOCK(&request_queue_lock,f);
        list_for_each(iter, head)
        {
                objPtr = list_entry(iter, _local_rq_t, lrq_member);
                if (objPtr->_pid == pid) {
                        objPtr->wake_st =1; //Set wake state as ON
-                       GENERAL_SPIN_UNLOCK(&request_queue_lock);
+                       GENERAL_SPIN_UNLOCK(&request_queue_lock,f);
                        return objPtr;
                }
        }
-        GENERAL_SPIN_UNLOCK(&request_queue_lock);
+        GENERAL_SPIN_UNLOCK(&request_queue_lock,f);
        return NULL;
  }
 
@@ -149,7 +175,9 @@ _global_value *hashgroup(struct task_struct *group_pid)
        struct task_struct *tsk =NULL;
        tsk= group_pid;
        //u32 hash = jhash2((u32*)&tsk->pid,(sizeof(tsk->pid))/4,JHASH_INITVAL);
+       pagefault_disable();
        u32 hash = sp_hashfn(tsk->pid,0);
+       pagefault_enable();
        //printk(KERN_ALERT"%s: globalhash{%u} \n", __func__,hash & ((1 << _SPIN_HASHBITS)-1));
        return &global_bucket[hash];
 }
@@ -359,7 +387,6 @@ static int __init global_spinlock_init(void)
                        global_bucket[i].thread_group_leader = NULL;
                        global_bucket[i].worker_task=NULL;
                        global_bucket[i].global_wq = NULL;
-                       plist_head_init(&global_bucket[i]._grq_head);
                        global_bucket[i].free = 0;
                }
 
index 851b162..5ce7ff3 100644 (file)
@@ -5571,6 +5571,7 @@ long sched_setaffinity(pid_t pid, const struct cpumask *in_mask)
        int retval;
     int current_cpu = smp_processor_id();
     int i,ret;
+    int spin = 0;
 
        get_online_cpus();
        rcu_read_lock();
@@ -5626,7 +5627,14 @@ extern struct list_head rlist_head;
             put_online_cpus();
             printk(KERN_ALERT"sched_setaffinity tsk{%d} state{%d} on run q{%d} RET{%d} current{%s} \n",p->pid,p->state,p->on_rq,ret,current->comm);
             schedule(); // this will save us from death
-
+       /*      do {
+                    spin = 0;
+                    schedule(); // this will save us from death
+                    if(current->return_disposition == RETURN_DISPOSITION_NONE) {
+                         __set_task_state(current,TASK_UNINTERRUPTIBLE);
+                          spin = 1;
+                      }
+                 } while (spin);*/
             // We are here because of either the task is exiting,
             // or because the task is migrating back.  Let's handle
             // that now.  If we're migrating back, this function
index 5570375..e262088 100644 (file)
@@ -45,7 +45,7 @@
 #include <linux/kernel.h>
 #include <popcorn/pid.h>
 #include <linux/kthread.h>
-
+#include <linux/process_server.h>
 
 //static DECLARE_WAIT_QUEUE_HEAD( wq);
 
@@ -313,7 +313,8 @@ static int handle_remote_kill_request(struct pcn_kmsg_message* inc_msg) {
                ret = kill_something_info(msg->sig, &info, msg->pid);
        } else //for killing with tgid
        {
-               ret = do_send_specific(msg->tgid, msg->pid, msg->sig, &info);
+               //for remote tgid will be different so set it to 0
+               ret = do_send_specific(0, msg->pid, msg->sig, &info);
        }
 
        if(ptr->assign_for_kthread == 0)
@@ -334,18 +335,6 @@ static int handle_remote_kill_request(struct pcn_kmsg_message* inc_msg) {
        else
        {
                //do nothing...taken care by kthread.
-               /*spin_lock(&kthread_lock);
-               if(ret==0)
-                       signum = SIGUSR1;
-               else
-                       signum = SIGUSR2;
-               //
-               signalfd_notify(waiting_thread, signum);
-               sigaddset(&waiting_thread->pending.signal, signum);
-               set_tsk_thread_flag(waiting_thread, TIF_SIGPENDING);
-               signal_wake_up(waiting_thread, 0);
-
-               spin_unlock(&kthread_lock);*/
        }
 
        pcn_kmsg_free_msg(inc_msg);
@@ -359,7 +348,7 @@ static int remote_kill_pid_info(int kernel, int sig, pid_t pid,
        int res = 0;
 
        _remote_kill_request_t *request = kmalloc(sizeof(_remote_kill_request_t),
-       GFP_KERNEL);
+       GFP_ATOMIC);
 
        _outgoing_remote_signal_pool_t *ptr;
        // Build request
@@ -417,7 +406,7 @@ int remote_kill_pid_info_thread(void *data) {
 
                                _remote_kill_request_t *request = kmalloc(
                                                sizeof(_remote_kill_request_t),
-                                               GFP_KERNEL);
+                                               GFP_ATOMIC);
 
                                _outgoing_remote_signal_pool_t *ptr;
                                // Build request
@@ -485,35 +474,6 @@ int remote_kill_pid_info_thread(void *data) {
                                set_tsk_thread_flag(current,TIF_SIGPENDING);
                                 __set_task_state(current,TASK_INTERRUPTIBLE);
                        }
-                       /*else if (recv_signal == SIGUSR1 || recv_signal == SIGUSR2) {
-                               _remote_kill_response_t response;
-                               // Finish constructing response
-                                       response.header.type = PCN_KMSG_TYPE_REMOTE_SENDSIG_RESPONSE;
-                                       response.header.prio = PCN_KMSG_PRIO_NORMAL;
-                                       if(killinfo->respon !=0 || recv_signal == SIGUSR2)
-                                       response.errno = -ESRCH;
-
-                                       if(recv_signal == SIGUSR1 && killinfo->respon ==0)
-                                       response.errno = 0;
-
-                                       response.request_id = killinfo->req_id;
-
-                                       printk("%s: request --remote:errno: %d \n", "remote_kill_pid_info_thread",
-                                                       response.errno );
-
-                                               // Send response
-                                       pcn_kmsg_send(killinfo->ret_cpu, (struct pcn_kmsg_message*) (&response));
-
-                                       spin_lock(&in_list_lock);
-                                               find_and_delete_incomming(killinfo->pid,&inc_head);
-                                       spin_unlock(&in_list_lock);
-
-                                       recv_signal = SIGSTOP;
-                                       sigaddset(&current->pending.signal,SIGSTOP);
-                                       set_tsk_thread_flag(current,TIF_SIGPENDING);
-                                       __set_task_state(current,TASK_INTERRUPTIBLE);
-
-                       }*/
 
                }
 
@@ -530,7 +490,7 @@ static int remote_do_send_specific(int kernel, pid_t tgid, pid_t pid, int sig,
 
        int res = 0;
        _remote_kill_request_t *request = kmalloc(sizeof(_remote_kill_request_t),
-       GFP_KERNEL);
+       GFP_ATOMIC);
 
        _outgoing_remote_signal_pool_t *ptr;
        // Build request
@@ -638,7 +598,7 @@ static int remote_send_sigprocmask(int kernel, pid_t pid, int how, sigset_t *new
 
        int res = 0;
        _remote_sigproc_request_t *request = kmalloc(sizeof(_remote_sigproc_request_t),
-       GFP_KERNEL);
+       GFP_ATOMIC);
 
        sigset_t local_new;
        sigset_t local_old;
@@ -3629,7 +3589,16 @@ int error = -ESRCH;
 
 rcu_read_lock();
 p = find_task_by_vpid(pid);
-if (p && (tgid <= 0 || task_tgid_vnr(p) == tgid)) {
+printk(KERN_ALERT"%s: pid{%d} tgid{%d} p{%d} \n",__func__,pid,tgid,(!p)?0:1);
+
+if(p && p->tgroup_distributed && !p->executing_for_remote){
+       if(p->return_disposition == RETURN_DISPOSITION_NONE) {
+               printk(KERN_ALERT"%s: ret disp pid{%d} next{%d} \n",__func__,pid,p->next_pid);
+               rcu_read_unlock();
+               return remote_do_send_specific(ORIG_NODE(p->next_pid),tgid,p->next_pid,sig,info);
+       }
+}
+if (p && (tgid <= 0 || task_tgid_vnr(p) == tgid) || p->executing_for_remote) {
        error = check_kill_permission(sig, info, p);
 
        if (info != SEND_SIG_NOINFO && info != SEND_SIG_PRIV
@@ -3645,7 +3614,7 @@ if (p && (tgid <= 0 || task_tgid_vnr(p) == tgid)) {
        if (!error && sig) {
                error = do_send_sig_info(sig, info, p, false);
                /*
-                * If lock_task_sighand() failed we pretend the task
+                * If lock_task_sighand() failed we pretend the tasp
                 * dies after receiving the signal. The window is tiny,
                 * and the signal is private anyway.
                 */
@@ -3654,6 +3623,8 @@ if (p && (tgid <= 0 || task_tgid_vnr(p) == tgid)) {
        }
 }
 if (p == NULL) {
+       printk(KERN_ALERT"%s: tgid{%d} pid{%d} sig{%d} \n",__func__ ,tgid,pid,sig);
+       rcu_read_unlock();
        return remote_do_send_specific(ORIG_NODE(pid), tgid, pid, sig, info);
 }
 rcu_read_unlock();