Skip to Content.
Sympa Menu

charm - [charm] About processor idle during execution in CHARM++

charm AT lists.cs.illinois.edu

Subject: Charm++ parallel programming system

List archive

[charm] About processor idle during execution in CHARM++


Chronological Thread 
  • From: SAGAR SHEDGE <sagar.shedge92 AT gmail.com>
  • To: charm AT cs.uiuc.edu
  • Subject: [charm] About processor idle during execution in CHARM++
  • Date: Tue, 14 Jun 2011 18:42:44 +0530
  • List-archive: <http://lists.cs.uiuc.edu/pipermail/charm>
  • List-id: CHARM parallel programming system <charm.cs.uiuc.edu>


Respected Sir/Mam,

      Myself Sagar Dillip Shedge.Currently I am working on Parallel computing in Molecular Dynamics.I working at C-DAC R&D,Pune ,India.
I have chosen CHARM++ for parallel programming and I got some unexpected result.

As CHARM++ is asynchronous , even though 1 processor does not get message. And when sending processor complete its execution then that processor get message. But on sending side code I had send message before starting execution but my code start execution first and then send message.

I have written one code which takes .pdb file as input. >From that file I read x, y, z co-ordinates. And charge of atoms are explicitly added in code.
Reading of file and creation of chare is working correctly.

I have used 1D dimensional chare array. This chare array will have size equals to number of CkNumPes()-1 . Each chare array element will have equally distributed atoms (array of atoms class).
I have used 1 processor for mainchare and on other processor( CkNumPes()-1 ) I have inserted(using insert()) 1 chare array element.

Execution :

  1. mainchare will create chare array and wiil insert it on processor.
  2. in chare array default constructor , it will initialise data, read file and then will assign x,y,z coordinate and charge to each atom in init() function.
  3. then it will calculate local processor force.
  4. then it start calling remote method.
  5. in next level each chare element will send data to next chare element like ring i.e. 0 will send to 1 , 1 will send to 2 and so on .... last will send to 0.
  6. on each succeeding level it will send data to next chare of previously send chare i.e.  0 will send to 2 , 1 will send to 3 and so on .... second last will send to 0 and last will   send to 1.
  7. this iteration will continue till (CkNumPes()-1) / 2 .
  8. then each array element will contribute result.

Query :

  1. When I call remote entry method 1(mostly processor which get data from 0 index element ) processor will be idle and other are executing and when other complete there execution that processor will start executing.
  2. Even if instead of calling methods from mainchare , if I call method from array element, it will give me same output ( line no :- 448 ).
      Result will be correct but time required will be large as 1 processor keep idle and when other processor complete there execution then it starts.
  3. This output is not consistent. Some time all processor will running  and will give result in less time but some time one processor keep idle.

As messages are not sending in order , one processor does not get the message and when other processor completes their execution it get message and start execution.

Also I have attached code files and small input file. Code will run correctly on small data set like atoms upto 100000 (file which i have given). But it do not work properly on large data like atoms near about 783198.

Please reply .

Thank you.


--
Contact No.-9665728865
 
 Email-id:- sagar.shedge92 AT gmail.com

 
With Regards.
 

#include "fstream"
#include <string.h>
#include <stdlib.h>
#include "hello.decl.h"
#include "hello.h"
#include <math.h>
#include <iostream>
using namespace std;

	int numProcessor=0,numAtom=0,atomPerProcessor=0;
	CProxy_Hello mainProxy;

Hello::Hello(CkArgMsg* msg)
{


	int i,j,currProcessor=0;
	_count=0;
	_level=0;

	if(msg->argc!=2)
	{
		CkPrintf("Please enter atom number...\n");
		CkExit();
	}

		numAtom=atoi(msg->argv[1]);

	numProcessor=CkNumPes()-1;
	atomPerProcessor=(int)numAtom/numProcessor;
	mainProxy=thisProxy;

	delete msg;

	CkPrintf("in main constructor.. with total processor %d\n",numProcessor);

	totalArray=(atom**)malloc(numProcessor*sizeof(atom*));
	size=new int[numProcessor];

	arrayAtom=CProxy_BunchArray::ckNew();  //create object of array...
	

				
	CkCallback *cb=new CkCallback(CkIndex_Hello::finalResult(NULL),mainProxy);
	arrayAtom.ckSetReductionClient(cb);
	

	for(i=0,j=1;i<numProcessor;i++,j++)
	{
		arrayAtom[i].insert(j);

	}

	arrayAtom.doneInserting();
	
	CkPrintf("insertion done \n");

}

void Hello::finalResult(CkReductionMsg* msg)
{
	int reducedArrSize=msg->getSize()/sizeof(double);
	double* output=(double*)msg->getData();
	
	ofstream result("result4.txt");
	
	CkPrintf("\n");
	double finalResult=(*output)/2;
	
	if(result.is_open())
		result<<"result is "<<finalResult;

	cout<<"Result is "<<finalResult;
	result.close();
	delete msg;
	CkExit();
}


void Hello::check()             // it will check whether to go next level or take result from all arrays
{				// this method will get called when all processor are idle

		CkPrintf("in check method at level %d\n",_level);

		if(_level+1<=(numProcessor/2))
		{
			arrayAtom.nextLevel();
		}
		else
		{
			arrayAtom.callBack();
		}
	
}

void Hello::result(sendArray* tempArray) // every array will send data to main class
{

//deleting memory on each next level

		if(_count==0 && _level!=0)
		{
				if(totalArray!=NULL)
				{
					CkPrintf("\ntotal array not null\n");
					for(int i=0;i<numProcessor;i++)
					{
						if(totalArray[i]!=NULL)
						{
							CkPrintf("\ntotal array[%d] not null\n",i);
							free(totalArray[i]);
						}
					}
						
				}
		}
		_count++;

// initialising mainchare local array
	
				totalArray[tempArray->level]=(atom*)malloc(sizeof(atom)*(tempArray->length));
				size[tempArray->level]=tempArray->length;

				for(int i=0;i<tempArray->length;i++)
				{
					totalArray[tempArray->level][i]=tempArray->temp[i];
				}

			CkPrintf("index %d get initialized at level %d \n",tempArray->level,_level);
		
		int receiver;

		CkPrintf("index %d send data to result \n",tempArray->level);	
		if(_count==numProcessor)	// go to next level if all arrays send data to main class
		{	
				_level++;
				_count=0;
			for(int j=numProcessor-1;j>=0;j--)  // to call each array
			{

					if(_level<(numProcessor/2))
					{
						if((j+_level)<numProcessor)
						{
							receiver=j+_level;
						}
						else
							receiver=(j+_level)-numProcessor;

						sendArray* tempA=new (size[j]) sendArray;

						for(int i=0;i<size[j];i++)
							tempA->temp[i]=totalArray[j][i];

						tempA->length=size[j];
						tempA->level=_level;

						arrayAtom[receiver].calInterProcessor(tempA);
	
						CkPrintf("index %d called %d 's inter processor method at level %d \n\n",j,receiver,_level);
					}
	
					else if(_level==(numProcessor/2))
					{

						if(j<(numProcessor/2))
						{
							receiver=j+_level;
							sendArray* tempA=new (size[j]) sendArray;

							for(int i=0;i<size[j]/2;i++)
								tempA->temp[i]=totalArray[j][i];

							tempA->length=size[j]/2;
							tempA->level=_level;

							arrayAtom[receiver].calInterProcessor(tempA);

						}
						else
						{			
							receiver=j-_level;	

							sendArray* tempA=new (size[j]) sendArray;

							for(int i=0;i<size[j];i++)
								tempA->temp[i]=totalArray[j][i];

							tempA->length=size[j];
							tempA->level=_level;

							arrayAtom[receiver].calInterProcessor2(tempA);

						}
						CkPrintf("index %d called %d 's inter processor method at level %d \n\n",j,receiver,_level);
					}
	
				
				
			}

			const CkCallback *cb1=new CkCallback(CkIndex_Hello::check(),mainProxy);
			CkStartQD(*cb1); // to call check method when all processor are idle

				
		}
		delete tempArray;		
}

Hello::Hello(CkMigrateMessage* msg) //migration constructor
{ }


atom::atom() //array class parameterless constructor
{


}

//--------------------------------------------- array class parameterless constructor ------------------------------
atom::atom(double x,double y,double z,double charge) 
{	
	_x=x;
	_y=y;
	_z=z;
	_charge=charge;
	
}
//------------------------------------------- parameter pack and unpack method use in object migration -------------------------
void atom::pup(PUP::er &p) 
{
	p|_x;
	p|_y;
	p|_z;
	p|_charge;
}

atom::~atom() //destructor
{

}

//----------------------------------------- bunch array parameterless condtructor ----------------------------
BunchArray::BunchArray() 
{
	int limit=0,start=0;
	bool flag=false;
	
		limit=(thisIndex+1)*atomPerProcessor; //17488
		start= thisIndex*atomPerProcessor;    //0
	
	_length=limit-start;  //total number of atom on this processor
	
	if(thisIndex==0)  // if atom are not equally distributed then put on 0 index
	{
		if((numAtom%numProcessor)!=0)
		{
			flag=true;
			int num1=numAtom;
			int num2=numProcessor*atomPerProcessor;
			_length+=(num1-num2);  //to add extra atoms
		}
	}

	arrayObject=new atom[_length];

        string str;
        char* ptr=NULL;
        char* ptr1=NULL;
        long count=0;     // to count atom on this processor
	double x,y,z;
	long count1=0;    // counting atom sequencially
	long tempCount=0; //use to skip initially inserted atom 
	double charge;    // charge on atom

	char buff1[50];
	sprintf(buff1,"%d_%s",thisIndex,"example.txt");
        ofstream temp_file(buff1);
	
	sprintf(buff1,"%d_%s",thisIndex,"ex.txt");
        ofstream temp_file2(buff1);
	
        ifstream temp_file1("solvate.pdb");

// -------------------------------- READING FILE -------------------------------------
        if(temp_file1.is_open())
        {
		
                while(temp_file1.good())
                {
                        getline(temp_file1,str);
                        ptr=strtok(&str[0]," ");

			if(strcmp(ptr,"END")==0)
			{
				CkPrintf("end reached with count %d and n is %d for index %d with processor number %d..\n",count,_length,thisIndex,CkMyPe());
				break;
			}
						
			if(count>=_length)
			{	
				CkPrintf("count is %d and n is %d for index %d with processor number %d \n",count,_length,thisIndex,CkMyPe());	
				break;
			}
			
			if(flag)
			{
				int temp=(numProcessor-1)*(atomPerProcessor);
				temp-=2;
				if(count>=(limit-start))
				{
					CkPrintf("temp is :%d\n",temp);
					temp_file1.seekg(temp*79,ios::cur);
					flag=false;
					getline(temp_file1,str);
				}

			}

			if(strcmp(ptr,"ATOM")==0)
			{
				count1++;
				if(tempCount<start)
				{
					
					tempCount++;
					continue;
				}
					
				
				temp_file2<<ptr<<"\t"<<count1<<"\n";

				ptr=&str[13];
				ptr1=strtok(ptr," ");
				if(ptr1!=NULL)
				{
					if(strcmp(ptr1,"OH2")==0)
					{
						charge=-0.834000;
					}
					
					if(strcmp(ptr1,"H1")==0 || strcmp(ptr1,"H2")==0)
					{
						charge=0.417000;
					}
		
					
				}	

                                ptr=&str[31];
                                ptr1=strtok(ptr," ");
                                if(ptr1!=NULL)
 				{
					temp_file<<ptr1<<"\t";
					x=atof(ptr1);		
				}

                                ptr=&str[39];
                                ptr1=strtok(ptr," ");
                                if(ptr1!=NULL)
				{
                                        temp_file<<ptr1<<"\t";
					y=atof(ptr1);
				}
                                ptr=&str[47];
                                ptr1=strtok(ptr," ");
                                if(ptr1!=NULL);
					
				{
                                        temp_file<<ptr1<<"\t"<<thisIndex<<"\n";
					z=atof(ptr1);
				}

			
			temp_file1.clear();
			temp_file.clear();
			temp_file2.clear();
                     
			if(ptr1!=NULL)
			{				
				arrayObject[count]._x=x;
				arrayObject[count]._y=y;
				arrayObject[count]._z=z;
				arrayObject[count]._charge=charge;
			
			}
			
			count++;

			ptr=NULL;
			ptr1=NULL;
		
                     }
			
                }

	        temp_file1.close();
                temp_file.close();
                temp_file2.close();
	
	}
	init();
	
}
// -------------------   bunch array migration constructor --------------------
BunchArray::BunchArray(CkMigrateMessage* msg) 
{

}

void BunchArray::pup(PUP::er &p)
{
	p|_length;
	p|result;
	
	if(p.isUnpacking())
		arrayObject=new atom[_length];
	
	PUParray(p,arrayObject,_length);
}

BunchArray::~BunchArray()
{
	delete[] arrayObject;
}

void BunchArray::init()
{
	int level=1;	
	int i=0;
	CkPrintf("length is :%d \n",_length);
	double startTime=CkWallTimer();
	for(i=0;i<_length-1;i++)
	{
		for(int j=i+1;j<_length;j++)
		{	
			calDistance(i,j);
		}
	}
			
	double endTime=CkWallTimer();

	CkPrintf("Time required for index %d on local processor execution is : %f and having result %e \n",thisIndex,endTime-startTime,result);

	int receiver;

//------------------------- here it will directly call remote processor method in while method without waiting for remote processor ----------


	/*	while(level<=(numProcessor/2))
		{
			if(level<(numProcessor/2))
			{
				if((thisIndex+level)<numProcessor)
				{
					receiver=thisIndex+level;
				}
				else 
					receiver=(thisIndex+level)-numProcessor;
					
				thisProxy[receiver].calInterProcessor(arrayObject,_length,level);
			}
			else
			{
					atom tempArray1[_length/2];

				for(int k=0;k<_length/2;k++)
					tempArray1[k]=arrayObject[k];

				if(thisIndex<(numProcessor/2))
				{
					receiver=thisIndex+level;
					thisProxy[receiver].calInterProcessor(tempArray1,_length/2,level);
				}
				else
				{			
					receiver=thisIndex-level;	
					thisProxy[receiver].calInterProcessor2(arrayObject,_length,level);
				}
				
			}
			
			level++;
		}
	*/

//------------------ here all chare element will send there atom array using message to mainchare object  ----------------
//------------------ and mainchare will call respective remote chare element method ------------------------------


			sendArray* tempA=new (_length) sendArray;

			for(int i=0;i<_length;i++)
				tempA->temp[i]=arrayObject[i];

			tempA->length=_length;
			tempA->level=thisIndex;

			mainProxy.result(tempA);
}

//--------------------------------- method to calculate local processor force -------------------------------

void BunchArray::calDistance(int i,int j) 
{
	double x,y,z;
	double result1=arrayObject[j]._charge*arrayObject[i]._charge;
	
	x=arrayObject[j]._x-arrayObject[i]._x;
	y=arrayObject[j]._y-arrayObject[i]._y;
	z=arrayObject[j]._z-arrayObject[i]._z;
	
	double result2=(x*x)+(y*y)+(z*z);

	result2=sqrt(result2);

	double finalResult=result1/result2;
	
	result=result+finalResult;
}

//---------------------------------- method to send data to mainchare ----------------------------------

void BunchArray::nextLevel()
{

		CkPrintf("in next Level method of %d\n",thisIndex);

		sendArray* tempA=new (_length) sendArray;

		for(int i=0;i<_length;i++)
			tempA->temp[i]=arrayObject[i];

		tempA->length=_length;
		tempA->level=thisIndex;

		mainProxy.result(tempA);
}


//------------------------------------ method will calculate force on receiving data and full local data ---------------------------
//------------------------------------ and will store data in result -----------------------------------

void BunchArray::calInterProcessor(sendArray* tempArray)
{
	double result1,result2,finalResult;
	double x,y,z;

	int i=0,j=0;
	CkPrintf("inside inter-processor communication on index %d and level %d having length %d on Processor %d \n",thisIndex,tempArray->level , tempArray->length,CkMyPe());
	double startTime=CkWallTimer();


	for(i=0;i<tempArray->length;i++)
	{
		for(j=0;j<_length;j++)
		{
			result1=arrayObject[j]._charge*tempArray->temp[i]._charge;

			x=arrayObject[j]._x-tempArray->temp[i]._x;
			y=arrayObject[j]._y-tempArray->temp[i]._y;
			z=arrayObject[j]._z-tempArray->temp[i]._z;
	
			result2=sqrt((x*x)+(y*y)+(z*z));

			finalResult=result1/result2;

			result=result+finalResult;
			
		}

	}

	double endTime=CkWallTimer();
	CkPrintf("At level %d index %d require time %f having result %e \n",tempArray->level,thisIndex,endTime-startTime,result);
	
	delete tempArray;

}

//------------------------------------ method will calculate force on receiving data and half local data ---------------------------
//------------------------------------ and will store data in result -----------------------------------

void BunchArray::calInterProcessor2(sendArray* tempArray)
{
		
	double result1,result2,finalResult;
	double x,y,z;
	int i=0,j=0;
	CkPrintf("inside inter-processor2 communication on index %d and level %d having length %d on processor %d \n",thisIndex,tempArray->level , tempArray->length,CkMyPe());
	double startTime=CkWallTimer();


	for(i=0;i<tempArray->length;i++)
	{
		for(j=_length/2;j<_length;j++)
		{
			result1=arrayObject[j]._charge*tempArray->temp[i]._charge;

			x=arrayObject[j]._x-tempArray->temp[i]._x;
			y=arrayObject[j]._y-tempArray->temp[i]._y;
			z=arrayObject[j]._z-tempArray->temp[i]._z;
	
			result2=sqrt((x*x)+(y*y)+(z*z));

			finalResult=result1/result2;

			result=result+finalResult;
		}
	}

	double endTime=CkWallTimer();
	CkPrintf("At level %d index %d require time %f having result %e\n",tempArray->level,thisIndex,endTime-startTime,result);

	delete tempArray;
}

//-------------------------------------- method used to reduce result ------------------------------
void BunchArray::callBack()
{
	contribute(sizeof(double),&result,CkReduction::sum_double);	
}

sendArray::sendArray()
{

}

#include "hello.def.h"

Attachment: hello.ci
Description: Binary data

#ifndef __HELLO_H__
#define __HELLO_H__

class atom
{

	public : 
		double _x,_y,_z,_charge;

	atom();
	atom(double x,double y,double z,double charge);	
	void pup(PUP::er &p);
	~atom();

};

class Hello:public CBase_Hello
{	
	int _level,_count;
	public :
		CProxy_BunchArray arrayAtom;
	
	atom** totalArray;
	int* size;

	Hello(CkArgMsg* msg);
	Hello(CkMigrateMessage* msg);
	void finalResult(CkReductionMsg* msg);

	void result(sendArray* tempArray);
	void check();
};


class sendArray:public CMessage_sendArray
{
	public :
		atom* temp;
		int length;
		int level;
		sendArray();
};

class BunchArray:public CBase_BunchArray
{
	
	public :

	atom *arrayObject;	
	int _length;
	double result;

	BunchArray();
	BunchArray(CkMigrateMessage* msg);
	void init();
	void calDistance(int i,int j);
	void callBack();
	void nextLevel();	
	
	void calInterProcessor(sendArray* tempArray);
	void calInterProcessor2(sendArray* tempArray);

	void pup(PUP::er &p);
	~BunchArray();
};



#endif



  • [charm] About processor idle during execution in CHARM++, SAGAR SHEDGE, 06/14/2011

Archive powered by MHonArc 2.6.16.

Top of Page