Implemented a functionality that allows user to designate some function to be execute...
authorChao Mei <chaomei2@illinois.edu>
Mon, 30 Jan 2012 21:27:59 +0000 (15:27 -0600)
committerChao Mei <chaomei2@illinois.edu>
Mon, 30 Jan 2012 21:27:59 +0000 (15:27 -0600)
src/arch/bluegenep/Makefile.machine
src/arch/gemini_gni/Makefile.machine
src/arch/mpi/Makefile.machine
src/arch/net/Makefile.machine
src/arch/net/machine.c
src/arch/util/machine-common-core.c
src/arch/util/machine-commthd-util.c [new file with mode: 0644]
src/conv-core/converse.h

index 7c35a4bddaa5ede16c64c64bee8e00e406cf6088..c777c633e5d35a8790637f8cd5a73e25102bb314 100644 (file)
@@ -1,2 +1,2 @@
 #  Bluegene/P specific Make rules
-
+$(L)/libconv-cplus-n.a: machine.c machine-common-core.c machine-broadcast.c machine-lrts.h machine-commthd-util.c
index adbf27ac88d5014ad275a68b4a810461d5bc89cf..d08ea07f59a0d2ad9f30648500e2133e594fc5df 100644 (file)
@@ -1,2 +1,2 @@
-$(L)/libconv-cplus-n.a: machine.c machine-common-core.c machine-broadcast.c machine-lrts.h machine-pxshm.c machine-persistent.c
+$(L)/libconv-cplus-n.a: machine.c machine-common-core.c machine-broadcast.c machine-lrts.h machine-pxshm.c machine-persistent.c machine-commthd-util.c
 
index 6eb18da73af7624ffc60372076e7adba30c6afdf..cb69de8a253d8117b5f5b005ff0a01682d31221c 100644 (file)
@@ -9,5 +9,5 @@ charm++: hybridAPI
 .PHONY: hybridAPI
 endif
 
-$(L)/libconv-cplus-n.a: machine.c machine-common-core.c machine-broadcast.c machine-lrts.h
+$(L)/libconv-cplus-n.a: machine.c machine-common-core.c machine-broadcast.c machine-lrts.h machine-commthd-util.c
 
index 4e13a57a67d68f7eb24a009433b61ee4d4ebe2cc..de63e72f8b6d013c16b57ada3f4aad40c2c4cc35 100644 (file)
@@ -9,4 +9,4 @@ charm++: hybridAPI
 .PHONY: hybridAPI
 endif
 
-$(L)/libconv-cplus-n.a: machine.c machine-dgram.c machine-tcp.c machine-eth.c machine-mx.c machine-gm.c machine.h machine-smp.c machine-ammasso.h machine-ammasso.c machine-ibverbs.c machine-pxshm.c machine-sysvshm.c conv-onesided.c immediate.c machine-recover.c $(CVHEADERS)
+$(L)/libconv-cplus-n.a: machine.c machine-dgram.c machine-tcp.c machine-eth.c machine-mx.c machine-gm.c machine.h machine-smp.c machine-ammasso.h machine-ammasso.c machine-ibverbs.c machine-pxshm.c machine-sysvshm.c conv-onesided.c immediate.c machine-recover.c machine-commthd-util.c $(CVHEADERS)
index 9663fc4d06a3a1fa397800e5c40f35548856b633..1a19aa01b4c7b9a71083887b72a4ea16efa4c095 100644 (file)
@@ -289,6 +289,10 @@ int getDestHandler;
 #endif
 #endif
 
+#if CMK_SMP && CMK_LEVERAGE_COMMTHREAD
+#include "machine-commthd-util.c"
+#endif
+
 static void CommunicationServer(int withDelayMs, int where);
 
 void CmiHandleImmediate();
@@ -2542,6 +2546,14 @@ static void ConverseRunPE(int everReturn)
     }
 #endif
   }
+
+#if CMK_SMP && CMK_LEVERAGE_COMMTHREAD
+  if(CmiMyRank() == CmiMyNodeSize()){
+    /* Comm thread */
+    CsvAccess(notifyCommThdQ) = PCQueueCreate();
+  }
+#endif
+
 #if MEMORYUSAGE_OUTPUT
   memoryusage_counter = 0;
 #endif
@@ -2641,7 +2653,12 @@ static void ConverseRunPE(int everReturn)
   if (CmiMyRank() == CmiMyNodeSize()) {
     if(!everReturn) Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
     if (Cmi_charmrun_fd!=-1)
-          while (1) CommunicationServer(5, COMM_SERVER_FROM_SMP);
+          while (1) {
+              CommunicationServer(5, COMM_SERVER_FROM_SMP);
+          #if CMK_SMP && CMK_LEVERAGE_COMMTHREAD
+              CmiPollCommThdNotificationQ();
+          #endif
+          }
   }
   else{
     if (!everReturn) {
@@ -2850,6 +2867,9 @@ void ConverseInit(int argc, char **argv, CmiStartFn fn, int usc, int everReturn)
   CsvInitialize(CmiNodeState, NodeState);
   CmiNodeStateInit(&CsvAccess(NodeState));
  
+#if CMK_SMP && CMK_LEVERAGE_COMMTHREAD
+  CsvInitialize(PCQueue, notifyCommThdQ);
+#endif
 
   /* Network progress function is used to poll the network when for
      messages. This flushes receive buffers on some  implementations*/ 
index a45f51b76864ea0d386add01016e1987288e6fd8..bf6efdb5e7089807de2316fc02627a59f1dd9509 100644 (file)
@@ -344,6 +344,7 @@ CsvDeclare(CmiNodeState, NodeState);
 /* ===== End of Processor/Node State-related Stuff =====*/
 
 #include "machine-broadcast.c"
+#include "machine-commthd-util.c"
 #include "immediate.c"
 
 /* ===== Beginning of Common Function Definitions ===== */
@@ -675,6 +676,10 @@ if (  MSG_STATISTIC)
 #endif
 #endif
 
+#if CMK_SMP && CMK_LEVERAGE_COMMTHREAD
+    CsvInitialize(PCQueue, notifyCommThdQ);
+#endif
+
     CmiStartThreads(argv);
 
     ConverseRunPE(initret);
@@ -705,6 +710,12 @@ static void ConverseRunPE(int everReturn) {
     }
 #endif
 
+#if CMK_SMP && CMK_LEVERAGE_COMMTHREAD
+    if (CmiMyRank() == CmiMyNodeSize()) {
+        CsvAccess(notifyCommThdQ) = PCQueueCreate();
+    }
+#endif
+
     CmiNodeAllBarrier();
 
     cs = CmiGetState();
@@ -773,6 +784,10 @@ static INLINE_KEYWORD void AdvanceCommunication(int whenidle) {
 #endif
 #endif
 
+#if CMK_SMP && CMK_LEVERAGE_COMMTHREAD
+    CmiPollCommThdNotificationQ();
+#endif
+
 }
 
 static void CommunicationServer(int sleepTime) {
diff --git a/src/arch/util/machine-commthd-util.c b/src/arch/util/machine-commthd-util.c
new file mode 100644 (file)
index 0000000..5b2939e
--- /dev/null
@@ -0,0 +1,58 @@
+/* This file is considered be used inside the machine layer, not to be used separately */
+#if CMK_SMP && CMK_LEVERAGE_COMMTHREAD
+
+/** Usage:
+ * First: define a function that needs to be invoked on comm thread. E.g.
+ *   void CommThdSayHello(int numParams, void *params) {
+ *     int src = *(int *)params;
+ *     CkPrintf("Notification from source %d\n", src);
+ *   }
+ * 
+ * Second: create a notification msg and push it to comm thread. E.g.
+ *   int *argv = (int *)malloc(sizeof(int)*1); *argv = SRCIDX;
+ *   CmiNotifyCommThdMsg *one = CmiCreateNotifyCommThdMsg(CommThdSayHello, 1, (void *)(argv), 0);
+ *   CmiNotifyCommThd(one);
+ * Since we have set "toKeep" 0, the msg itself with the input arguments 
+ * will be freed by the comm thread after the msg is processed. Otherwise,
+ * the developer has to be responsible for freeing such message.
+ */
+
+/* This queue is only created on comm thread, and it's multi-producer-single-consumer */
+CsvDeclare(PCQueue, notifyCommThdQ);
+
+CmiNotifyCommThdMsg *CmiCreateNotifyCommThdMsg(CmiCommThdFnPtr fn, int numParams, void *params, int toKeep){
+    CmiNotifyCommThdMsg *one = (CmiNotifyCommThdMsg *)malloc(sizeof(CmiNotifyCommThdMsg));
+    one->fn = fn;
+    one->numParams = numParams;
+    one->params = params;
+    one->toKeep = toKeep;
+}
+
+void CmiFreeNotifyCommThdMsg(CmiNotifyCommThdMsg *msg){
+    free(msg->params);
+    free(msg);
+}
+
+void CmiSetNotifyCommThdMsg(CmiNotifyCommThdMsg *msg, CmiCommThdFnPtr fn, int numParams, void *params, int toKeep){
+    msg->fn = fn;
+    msg->numParams = numParams;
+    msg->params = params;
+    msg->toKeep = toKeep;
+}
+
+void CmiNotifyCommThd(CmiNotifyCommThdMsg *msg){
+    PCQueuePush(CsvAccess(notifyCommThdQ), (char *)msg);
+}
+
+void CmiPollCommThdNotificationQ(){
+    PCQueue q = CsvAccess(notifyCommThdQ);
+    CmiNotifyCommThdMsg *msg = (CmiNotifyCommThdMsg *)PCQueuePop(q);
+    
+    while(msg){
+        /* execute the function indicated by the msg */
+        (msg->fn)(msg->numParams, msg->params);
+        if(!msg->toKeep) CmiFreeNotifyCommThdMsg(msg);
+        msg = (CmiNotifyCommThdMsg *)PCQueuePop(q);
+    }
+}
+#endif
index 868c7dac310c6f3b610994ef068b39a61d017ef6..f4f69d67ebfd49cc23a33edba845e86725ae9f6a 100644 (file)
@@ -1902,4 +1902,26 @@ extern unsigned int CmiILog2(unsigned int);
 extern double CmiLog2(double);
 #endif
 
+#if CMK_SMP && CMK_LEVERAGE_COMMTHREAD
+#if defined(__cplusplus)
+#define EXTERN extern "C"
+#else
+#define EXTERN extern
+#endif
+typedef void (*CmiCommThdFnPtr)(int numParams, void *params);
+typedef struct CmiNotifyCommThdMsg {
+    CmiCommThdFnPtr fn;
+    int numParams;
+    void *params;
+    int toKeep; /* whether to free this msg by comm thread when the msg is processed */ 
+}CmiNotifyCommThdMsg;
+
+EXTERN CmiNotifyCommThdMsg *CmiCreateNotifyCommThdMsg(CmiCommThdFnPtr fn, int numParams, void *params, int toKeep);
+EXTERN void CmiFreeNotifyCommThdMsg(CmiNotifyCommThdMsg *msg);
+/* Initialize a notification msg */
+EXTERN void CmiSetNotifyCommThdMsg(CmiNotifyCommThdMsg *msg, CmiCommThdFnPtr fn, int numParams, void *params, int toKeep);
+/* Enqueue the msg into the local comm thread, and wait for being processed */
+EXTERN void CmiNotifyCommThd(CmiNotifyCommThdMsg *msg);
+#endif
+
 #endif /* CONVERSE_H */