===== Pipeline with Pthreads, sample code ===== Sample code relative to * implementation of an unbounded channel with standard C++ and Pthread mechanisms * implementation of the main forking the three stages and computing the pipeline * implementation of a bounded channel * implementation of the main using the bounded channel * sample makefile #include #include template class Channel { private: std::queue chan; pthread_mutex_t mutex; pthread_cond_t cond; public: Channel() { mutex = PTHREAD_MUTEX_INITIALIZER; cond = PTHREAD_COND_INITIALIZER; } void send(Task t) { pthread_mutex_lock(&mutex); chan.push(t); pthread_cond_signal(&cond); // for waiting receives pthread_mutex_unlock(&mutex); return; } Task receive() { pthread_mutex_lock(&mutex); while(chan.empty()) { pthread_cond_wait(&cond, &mutex); } Task t = chan.front(); chan.pop(); pthread_mutex_unlock(&mutex); return(t); } }; #include #include #include #include using namespace std; #include "Channel.hpp" typedef enum {EOS, TASK} Task_t; class Task { private: float x; Task_t tag; public: Task(float x, Task_t tag):x(x),tag(tag) {} Task(Task_t tag):tag(tag) {} bool isEos() { return(tag == EOS); } float get() { return(x); } void set(float y) { x = y; return; } }; typedef struct __iochans { Channel *in; Channel *out; } IO_Channels; int ntasks = 0; void * source(IO_Channels * chans) { // produce as many tasks as required for(int i=0; iout)->send(t); #ifdef TRACEMSG cout << "Source: output task " << i << endl; #endif } cout << "Source: sending EOS" << endl; (chans->out)->send(*(new Task(EOS))); #ifdef TRACEMSG cout << "Source: EOS Sent\n terminating ... " << endl; #endif return(NULL); } void * square(IO_Channels * chans) { Task t = (chans->in)->receive(); while(!t.isEos()) { #ifdef TRACEMSG cout << "Stage1 received task " << t.get() << endl; #endif float x = t.get(); t.set(x*x); (chans->out)->send(t); t = (chans->in)->receive(); } #ifdef TRACEMSG cout << "Square: propagating EOS" << endl; #endif (chans->out)->send(t); //eos return(NULL); } void * drain(IO_Channels * chans) { float sum = 0.0; int taskno = 0; Task t = (chans->in)->receive(); while(!t.isEos()) { #ifdef TRACEMSG cout << "Drain received task " << t.get() << endl; #endif sum += t.get(); taskno++; t = (chans->in)->receive(); } #ifdef TRACEMSG cout << "Drain: got EOS, terminating" << endl; #endif cout << "Drain computed " << sum << " from " << taskno << " tasks " << endl; return(NULL); } int main(int argc, char * argv[]) { // usage: a.out ntasks Channel ch12, ch23; IO_Channels stage1, stage2, stage3; pthread_t tid1, tid2, tid3; stage1.in = NULL; stage1.out = &ch12; stage2.in = &ch12; stage2.out = &ch23; stage3.in = &ch23; stage3.out = NULL; ntasks = atoi(argv[1]); // start third stage if(pthread_create(&tid3, NULL, (void *(*)(void *))drain, (void *) &stage3) != 0) { cout << "Error while creating stage3 " << endl; return(-1); } cout << "Drain started" << endl; // start second stage if(pthread_create(&tid2, NULL, (void *(*)(void *))square, (void *) &stage2) != 0) { cout << "Error while creating stage2 " << endl; return(-1); } cout << "Square started!" << endl; // start first stage if(pthread_create(&tid1, NULL, (void *(*)(void *))source, (void *) &stage1) != 0) { cout << "Error while creating stage1 " << endl; return(-1); } cout << "Source started " << endl; void * retval; pthread_join(tid3, &retval); return(0); } #include #include template class Channel { private: std::queue chan; pthread_mutex_t mutex; pthread_cond_t cond_r, cond_w; int max; int msgno; public: Channel() { mutex = PTHREAD_MUTEX_INITIALIZER; cond_r = PTHREAD_COND_INITIALIZER; cond_w = PTHREAD_COND_INITIALIZER; msgno = 0; max = 4; } Channel(int bound):max(bound) { mutex = PTHREAD_MUTEX_INITIALIZER; cond_r = PTHREAD_COND_INITIALIZER; cond_w = PTHREAD_COND_INITIALIZER; msgno = 0; } void send(Task t) { pthread_mutex_lock(&mutex); while(msgno == max) pthread_cond_wait(&cond_w, &mutex); chan.push(t); msgno++; pthread_cond_signal(&cond_r); // for waiting receives pthread_mutex_unlock(&mutex); return; } Task receive() { pthread_mutex_lock(&mutex); while(chan.empty()) { pthread_cond_wait(&cond_r, &mutex); } Task t = chan.front(); chan.pop(); msgno--; pthread_cond_signal(&cond_w); pthread_mutex_unlock(&mutex); return(t); } }; #include #include #include #include using namespace std; #include "BChannel.hpp" typedef enum {EOS, TASK} Task_t; class Task { private: float x; Task_t tag; public: Task(float x, Task_t tag):x(x),tag(tag) {} Task(Task_t tag):tag(tag) {} bool isEos() { return(tag == EOS); } float get() { return(x); } void set(float y) { x = y; return; } }; typedef struct __iochans { Channel *in; Channel *out; } IO_Channels; int ntasks = 0; void * source(IO_Channels * chans) { // produce as many tasks as required for(int i=0; iout)->send(t); #ifdef TRACEMSG cout << "Source: output task " << i << endl; #endif } cout << "Source: sending EOS" << endl; (chans->out)->send(*(new Task(EOS))); #ifdef TRACEMSG cout << "Source: EOS Sent\n terminating ... " << endl; #endif return(NULL); } void * square(IO_Channels * chans) { Task t = (chans->in)->receive(); while(!t.isEos()) { #ifdef TRACEMSG cout << "Stage1 received task " << t.get() << endl; #endif float x = t.get(); t.set(x*x); (chans->out)->send(t); t = (chans->in)->receive(); } #ifdef TRACEMSG cout << "Square: propagating EOS" << endl; #endif (chans->out)->send(t); //eos return(NULL); } void * drain(IO_Channels * chans) { float sum = 0.0; int taskno = 0; Task t = (chans->in)->receive(); while(!t.isEos()) { #ifdef TRACEMSG cout << "Drain received task " << t.get() << endl; #endif sum += t.get(); taskno++; t = (chans->in)->receive(); } #ifdef TRACEMSG cout << "Drain: got EOS, terminating" << endl; #endif cout << "Drain computed " << sum << " from " << taskno << " tasks " << endl; return(NULL); } int main(int argc, char * argv[]) { // usage: a.out ntasks Channel ch12, ch23; IO_Channels stage1, stage2, stage3; pthread_t tid1, tid2, tid3; stage1.in = NULL; stage1.out = &ch12; stage2.in = &ch12; stage2.out = &ch23; stage3.in = &ch23; stage3.out = NULL; ntasks = atoi(argv[1]); // start third stage if(pthread_create(&tid3, NULL, (void *(*)(void *))drain, (void *) &stage3) != 0) { cout << "Error while creating stage3 " << endl; return(-1); } cout << "Drain started" << endl; // start second stage if(pthread_create(&tid2, NULL, (void *(*)(void *))square, (void *) &stage2) != 0) { cout << "Error while creating stage2 " << endl; return(-1); } cout << "Square started!" << endl; // start first stage if(pthread_create(&tid1, NULL, (void *(*)(void *))source, (void *) &stage1) != 0) { cout << "Error while creating stage1 " << endl; return(-1); } cout << "Source started " << endl; void * retval; pthread_join(tid3, &retval); return(0); } CFLAGS = -DTRACEMSG -pthread -std=c++11 CC = g++ all: pipe bpipe pipe: pipe.cpp Channel.hpp $(CC) $(CFLAGS) pipe.cpp -o pipe bpipe: bpipe.cpp BChannel.hpp $(CC) $(CFLAGS) bpipe.cpp -o bpipe clean: rm -f pipe bpipe a.out *~