dynmap.cpp
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/* ***************************************************************************
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License version 2 as 
 *  published by the Free Software Foundation.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program; if not, write to the Free Software
 *  Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 *  As a special exception, you may use this file as part of a free software
 *  library without restriction.  Specifically, if other files instantiate
 *  templates or use macros or inline functions from this file, or you compile
 *  this file and link it with other files to produce an executable, this
 *  file does not by itself cause the resulting executable to be covered by
 *  the GNU General Public License.  This exception does not however
 *  invalidate any other reasons why the executable file might be covered by
 *  the GNU General Public License.
 *
 ****************************************************************************
 */
/* 
 * Author: Massimo Torquati <torquati@di.unipi.it> 
 * Date:   Dicember 2014
 */
 
/* Map skeleton implementation with dynamic scheduling. 
 * SPM Class Work 5.
 *
 */
 
#include <cstdio>
#include <iostream>
#include <vector>
#include <ff/farm.hpp>
 
// selective debugging, compile with -DDEBUG to enable debugging info
#if defined(DEBUG)
#define DBG(X) X
#else
#define DBG(X)
#endif
 
using namespace ff;
 
// this is the task
struct task_t {
    void set(long s, long e)  { start=s,end=e; }
    long start, end;
};
 
// compare function for pairs
static inline bool data_cmp(const std::pair<long,task_t> &a,const std::pair<long,task_t> &b) {
    return a.first < b.first;
}
 
// this is the scheduler 
class Scheduler: public ff_node_t<task_t> {
protected:
    // initialize the internal table (data vector)
    inline size_t init_data(long start, long stop) {
        const long numtasks  = stop-start;
        long totalnumtasks   = std::lrint(std::ceil(numtasks/(double)_chunk));
        long tt     = totalnumtasks;
        size_t ntxw = totalnumtasks / _nw;
        size_t r    = totalnumtasks % _nw;
 
        // try to keep the n. of tasks per worker as smaller as possible
        if (ntxw == 0 && r>=1) {  ntxw = 1, r = 0; }
 
        data.resize(_nw); taskv.resize(_nw); 
 
        long end, t=0, e;
        for(size_t i=0;i<_nw && totalnumtasks>0;++i, totalnumtasks-=t) {
            t       = ntxw + ( (r>1 && (i<r)) ? 1 : 0 );
            e       = start + (t*_chunk - 1) + 1;
            end     = (e<stop) ? e : stop;
            data[i].first=t;
            data[i].second.set(start,end);
            start   = (end-1)+ 1;
        }
 
        if (totalnumtasks) {
            assert(totalnumtasks==1);
            // try to keep the n. of tasks per worker as smaller as possible
            if (ntxw > 1) data[_nw-1].first += totalnumtasks;
            else { --tt, _chunk*=2; }
            data[_nw-1].second.end = stop;
        } 
        DBG(printf("init_data\n");
	    for(size_t i=0;i<_nw;++i) {
            printf("W=%ld %ld <%ld,%ld>\n", i, data[i].first, data[i].second.start, data[i].second.end);
	    }
            printf("totaltasks=%ld\n", tt);
            );
        return tt;
    }    
 
    inline void fillTask(task_t &task, const int id) {
	long start = data[id].second.start;
	long end = std::min(start+_chunk, data[id].second.end);
	--data[id].first, (data[id].second).start = (end-1) + 1;
	task.set(start, end);
    }
 
public:
    Scheduler(ff_loadbalancer* lb, long start, long stop, long chunk, int nw):
        lb(lb),_start(start),_stop(stop),_chunk(chunk),totaltasks(0),_nw(nw) {
 
        totaltasks = init_data(start,stop);
        assert(totaltasks>=1);
    }
 
    // get the next task if any
    bool nextTask(task_t &task, const int wid) {
        if (data[wid].first) {
            fillTask(task,wid);
            return true;
        }	
        // no available task for the current thread
 
        long maxid = (std::max_element(data.begin(),data.end(),data_cmp) - data.begin());
        if (data[maxid].first > 0)  { fillTask(task,maxid); return true; }
 
        return false; 
    }
 
    inline task_t* svc(task_t* t) {
        if (t==nullptr) {
            size_t remaining    = totaltasks;
 
            for(size_t wid=0;wid<_nw;++wid) {
                if (data[wid].first >0) {
                    long start = data[wid].second.start;
                    long end   = std::min(start+_chunk, data[wid].second.end);
                    taskv[wid].set(start, end);
                    lb->ff_send_out_to(&taskv[wid], (int)wid);
                    --remaining, --data[wid].first;
                    (data[wid].second).start = (end-1)+1;
                }
            }
            return (remaining>0) ? GO_ON : EOS;
        }
 
        auto wid =  lb->get_channel_id();
        if (--totaltasks <=0) return EOS;
 
        if (nextTask(*t, (int)wid)) lb->ff_send_out_to(t, int(wid));            
        else  lb->ff_send_out_to(EOS, (int)wid);
 
        return GO_ON;
    }
protected:
 
    ff_loadbalancer  *lb;
    long              _start,_stop, _chunk;  
    size_t            totaltasks, _nw;
 
    std::vector<std::pair<long, task_t> >  data;  // internal table of tasks
    std::vector<task_t>                    taskv; // tasks vector, avoiding dynanic memory allocation
};
 
 
template<typename FUNC_t>
class Worker: public ff_node_t<task_t> {
public:
    Worker(FUNC_t F):F(F) {}
 
    inline task_t* svc(task_t* task) {
	DBG(printf("W(%d) <%ld,%ld>\n", get_my_id(), task->start, task->end));
        F(task->start,task->end);
        return task;
    }
 
    FUNC_t  F;
};
 
void usage(char *argv[]) {
    std::cerr << "use: " << argv[0] << " numworkers chunksize arraysize\n";
    std::cerr << "  example: " << argv[0] << " 2 10 1000\n\n";
}
 
int main(int argc, char *argv[]) {
    if (argc<4) {
	usage(argv);
	return -1;
    }
    int  nw    = atoi(argv[1]);
    long chunk = atol(argv[2]);
    long size  = atol(argv[3]);
 
    if (nw <= 0 || size <= 0) { usage(argv); return -1; }
    if (chunk<=0) chunk = size/nw;
 
    // create and initialize the array
    std::vector<double> V(size);
    for (size_t i = 0; i < (size_t)size; i++)  V[i] = ( pow(i,3) / (i+1) );
    DBG(
	for(size_t i=0;i<V.size();++i)
	    printf("%g ", V[i]);
	printf("\n");
       );
 
    // this is the function to compute on each single element of the vector V
    auto F = [&V](const long start, const long stop) {
        for(long i=start; i < stop; ++i) {
            size_t k = V[i];
            double r = 1.0;
 
            for(size_t j=0;j<k;++j) r*=r*sin(r/k); // just some dummy computation
 
            V[i] = r;
        }
    };
 
 
    ffTime(START_TIME);
    if (nw == 1) 
        F(0,size);    
    else {
        std::vector<ff_node *> W;
        for(int i=0;i<nw;++i)
            W.push_back(new Worker<decltype(F)>(F));
 
        ff_farm<> farm(W);
        farm.remove_collector(); // remove default collector
        Scheduler Sched(farm.getlb(),0,size,chunk,nw);
        farm.add_emitter(&Sched);
        farm.wrap_around();
 
        if (farm.run_and_wait_end() < 0) {
            error("running farm\n");
            return -1;
        }
    }
    ffTime(STOP_TIME);
    std::cout << "Time (ms) = " << ffTime(GET_TIME) << "\n";
 
#if defined(CHECK_RESULT)
    // summing all elements
    double s=0;
    for (size_t i = 0; i < (size_t)size; i++)  
	s+=V[i];
    std::cout << "Result s=" << s << "\n";
#endif
    return 0;
}