#include #include #include #include #include #include #include #include #include #include // on demand scheduling of tasks to workers // receive request from worker i -> send task to compute to worker i // when no more tasks are available, send an EOS termination // // usage is: // a.out portno // number of the port used to get worker task requests // // this is the version checking worker failure (emitter side) // and just signaling a worker failure // // the structure hosting the tasks #include "task.h" // this hosts tasks initially scheduled for execution TASK * task_list = NULL; // this hosts tasks to be rescheduled (possibly) TASK * task_limbo = NULL; // to manage task list pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t mutex_limbo = PTHREAD_MUTEX_INITIALIZER; // moved outside main: global int task = 0; // tasks are positive integeres, in this case int tasklimit = 10; // we'll send tasklimit tasks before stopping TASK eos; // special task to denote End Of Stream void * worker_handler(void * vs) { int s = *((int *) vs); int retcode,i; TASK * task_to_send; int * taskNo = (int *) calloc(sizeof(int),1); // host tasks computed // start reading tasks printf("worker_handler %ld started\n",pthread_self()); while(1==1) { SERVICE request; printf("Waiting for a request \n"); // read request: task request from woker or task completion msg from coll retcode = read(s,&request,sizeof(SERVICE)); if(retcode != sizeof(SERVICE)) { // should be != 0 (read can be split) perror("while reading task from worker"); fflush(stderr); } if(retcode == 0) { // connection broken printf("NEED TO RUN A SUBSTITUTE WORKER AFTER THIS DIED !!! \n"); // you should know which is the last task sent, in such a way it can // be rescheduled onto a different worker printf("TASK PRESUMABLY NOT COMPLETED is TASK <%d>\n",task_to_send->tag); pthread_mutex_lock(&mutex_limbo); // add task to limbo for reschedule { TASK * t = (TASK *) calloc(1,sizeof(TASK)); t->v = task_to_send->v; t->tag = task_to_send->tag; t->next = task_limbo; task_limbo = t; printf("Task <%d,%d> -> LIMBO\n",t->v, t->tag); } pthread_mutex_unlock(&mutex_limbo); return (NULL); } if(request.tag == TAG_COMPLETED) { // this is from collector // cancel request.v tag from limbo TASK * p, *pp ; p = pp = task_limbo; pthread_mutex_lock(&mutex_limbo); while(p!=NULL) { if(p->tag == request.v) { // completed: to be removed if(p == pp) { // first element to be removed p = p->next; } else { pp->next = p->next; // TODO: free unused one ! } } } printf("Computed task tag=%d removed from LIMBO\n",request.tag); pthread_mutex_unlock(&mutex_limbo); return(NULL); // eventually terminate the thread } // else: emit a task to the worker and cycle (this is a worker manager) printf("Got request from %d ",request.v); fflush(stdout); pthread_mutex_lock(&mutex); if(task_list != NULL) { // send a task to the requesting worker TASK * t = task_list; task_list = t->next; task_to_send = t; taskNo++; pthread_mutex_unlock(&mutex); } else { // if no more tasks, then check limbo or send EOS pthread_mutex_unlock(&mutex); pthread_mutex_lock(&mutex_limbo); if(task_limbo!= NULL) { task_to_send = task_limbo ; task_limbo = task_limbo->next; printf("Task pool is empty but sending task from Limbo\n"); } else { task_to_send = &eos; } pthread_mutex_lock(&mutex_limbo); } write(s,&task_to_send,sizeof(TASK)); // send either task or eos if(task_to_send != &eos) { printf("sent task %d to worker %d\n",task,i); } else { printf("Send EOS to worker %d\n",i); close(s); return((void *)taskNo); } } } #define MAXHOSTNAME 80 #define MAXTHREADS 16 int main(int argc, char * argv[]) { int s,si, retcode, i; unsigned int salen; struct sockaddr_in sa,sai; char hostname[MAXHOSTNAME]; pthread_t tids[MAXTHREADS]; // thread handlers int tNo = 0; // thread to allocate // set up task structure for(i=0; inext = task_list; t->v = i; t->tag = i; task_list = t; } // set up eos eos.v = -1; eos.tag = -2; eos.next = NULL; // code needed to set up the communication infrastructure printf("Declaring socket\n"); si = socket(AF_INET,SOCK_STREAM,0); // socket for inputs if(si == -1) {perror("opening socket for input"); return -1;} sai.sin_family = AF_INET; sai.sin_port = htons(atoi(argv[1])); gethostname(hostname,MAXHOSTNAME); memcpy(&sai.sin_addr, (gethostbyname(hostname)->h_addr), sizeof(sa.sin_addr)); printf("Binding to %s\n",inet_ntoa(sai.sin_addr)); retcode = bind(si,(struct sockaddr *) & sai, sizeof(sai)); if(retcode == -1) { perror("while calling bind"); return -1; } printf("Listening socket\n"); retcode = listen(si,1); if(retcode == -1) { perror("while calling listen"); return -1; } while(1==1) { int * s = (int *) calloc(sizeof(int),1); salen = sizeof(sa); printf("Accepting connections .... \n"); *s = accept(si,(struct sockaddr *)&sa,&salen); // accept a connection if(*s == 1) { perror("while calling an accept"); return -1; } // now fork a thread to permanently handle the connection pthread_create(&tids[tNo],NULL,worker_handler,(void *) s); printf("Created worker_handle No %d\n",tNo); tNo++; } printf("Closing operations\n"); return 0; }