[gemlca-commit] r23 - in trunk/JavaSource/uk/ac/wmin/cpc/gemlca: backend client/helpers
commit at globus.org
commit at globus.org
Tue Nov 18 07:48:04 CST 2008
Author: kecske
Date: 2008-11-18 07:48:03 -0600 (Tue, 18 Nov 2008)
New Revision: 23
Modified:
trunk/JavaSource/uk/ac/wmin/cpc/gemlca/backend/BaseCogKitJob.java
trunk/JavaSource/uk/ac/wmin/cpc/gemlca/backend/JavaEGEEJob.java
trunk/JavaSource/uk/ac/wmin/cpc/gemlca/client/helpers/GlobusConnectionHelper.java
trunk/JavaSource/uk/ac/wmin/cpc/gemlca/client/helpers/ProcessWebClient.java
Log:
Modified: trunk/JavaSource/uk/ac/wmin/cpc/gemlca/backend/BaseCogKitJob.java
===================================================================
--- trunk/JavaSource/uk/ac/wmin/cpc/gemlca/backend/BaseCogKitJob.java 2008-11-17 16:55:11 UTC (rev 22)
+++ trunk/JavaSource/uk/ac/wmin/cpc/gemlca/backend/BaseCogKitJob.java 2008-11-18 13:48:03 UTC (rev 23)
@@ -156,7 +156,11 @@
spec.setArguments(argsBuilder.toString());
logger.info("Arguments="+argsBuilder);
- spec.setAttribute("jobtype",environment.getBackendSpecificDataParameterValue(backendID,"jobType"));
+ String jobType=environment.getBackendSpecificDataParameterValue(backendID,"jobType");
+ spec.setAttribute("jobtype",jobType);
+ if(jobType.equalsIgnoreCase("mpi")) {
+ spec.addEnvironmentVariable("GLOBUS_DUROC_SUBJOB_INDEX", "0");
+ }
String count = environment.getBackendSpecificDataParameterValue(backendID,"count");
if ( count != null && !"".equals(count) ) spec.setAttribute("count",count);
Modified: trunk/JavaSource/uk/ac/wmin/cpc/gemlca/backend/JavaEGEEJob.java
===================================================================
--- trunk/JavaSource/uk/ac/wmin/cpc/gemlca/backend/JavaEGEEJob.java 2008-11-17 16:55:11 UTC (rev 22)
+++ trunk/JavaSource/uk/ac/wmin/cpc/gemlca/backend/JavaEGEEJob.java 2008-11-18 13:48:03 UTC (rev 23)
@@ -35,8 +35,6 @@
import org.glite.wmsui.apij.Job;
import org.glite.wmsui.apij.JobStatus;
import org.glite.wmsui.apij.Url;
-import org.globus.ftp.GridFTPClient;
-import org.globus.ftp.MlsxEntry;
import org.globus.gsi.GlobusCredentialException;
import org.globus.gsi.gssapi.GlobusGSSCredentialImpl;
import org.gridforum.jgss.ExtendedGSSCredential;
@@ -56,7 +54,7 @@
public class JavaEGEEJob extends BackendPlugin {
private static final String jobAccessHelper = "X";
public static final String brokeredSites = "BROKER";
- public static final String JOB_MANAGER="FORK";
+ public static final String JOB_MANAGER = "FORK";
private static final Logger logger = LCLog.getLogger();
@@ -74,6 +72,12 @@
public final String baseRequirement;
public final String baseRank;
+ public static final String resubmissioncount_id = "ResubmissionCount";
+ public static final String resubmissiondelay_id = "ResubmissionDelayInSecs";
+ public final int resubmissionCount;
+ public final int resubmissionDelay;
+
+
// Variables required in the LCID
public static final String voName_id = "VirtualOrganisation";
@@ -130,6 +134,28 @@
backendID, requirement_id);
baseRank = LCGeneralParameters.getBackendConfigParameter(backendID,
rank_id);
+ String rec=LCGeneralParameters.getBackendConfigParameter(backendID, resubmissioncount_id);
+ if(rec!=null) {
+ int recNum;
+ try {
+ recNum=Integer.parseInt(rec);
+ }
+ catch(NumberFormatException e) {
+ recNum=5;
+ }
+ resubmissionCount=recNum;
+ } else resubmissionCount=5;
+ String red=LCGeneralParameters.getBackendConfigParameter(backendID, resubmissiondelay_id);
+ if(rec!=null) {
+ int redNum;
+ try {
+ redNum=Integer.parseInt(red)*1000;
+ }
+ catch(NumberFormatException e) {
+ redNum=10000;
+ }
+ resubmissionDelay=redNum;
+ } else resubmissionDelay=10000;
logger.info("Detecting previous related executions.");
LCEnvironment locEnvi = environment;
String dest = null;
@@ -147,6 +173,39 @@
logger.debug("stop");
}
+ protected JobAd getJDL(String environmentFolder) throws JobAdException, GEMLCAException {
+ logger.debug("start");
+ JobAd jdl = new JobAd();
+ // FIXME on system LCs the sitelist should be collected from a
+ // different environment
+ EnvToJDL jdlCreator = new EnvToJDL(environment, environmentFolder,
+ previousDestination == null ? super.getExecutorSites()
+ .toArray(new String[1]) : null);
+ jdlCreator
+ .setInputFilenameTranslator(new EnvToJDL.InputFilenameTranslator() {
+ // @Override //Java 1.6 Only
+ public String translate(String me) {
+ return new File(
+ getProposedOutputFolder(environment)
+ .getParent(), me).toString();
+ }
+ });
+ jdl.fromRecord(jdlCreator.populateRecordExpr(jdlCreator
+ .createMinimalJdl(environment.getBackendAttributeValue(
+ backendID, voName_id), baseRequirement, baseRank)));
+ if (logger.isInfoEnabled())
+ logger.info("Generated JDL: " + jdl.toString());
+ logger.debug("stop");
+ return jdl;
+ }
+
+ protected void prepareGliteJob() throws IllegalArgumentException, JobAdException, GEMLCAException, GlobusCredentialException, FileNotFoundException {
+ synchronized (jobAccessHelper) {
+ gliteJob = new Job(getJDL(environmentFolder));
+ gliteJob.setCredPath(savedProxy);
+ }
+ }
+
/**
* Prepares the gLite Job Submission. Generates the necessary JDL, creates
* the corresponding glite Job instance, and puts the proxy certificate
@@ -169,31 +228,8 @@
logger.debug("start");
try {
environment.createJobEnvironmentFolder(environmentFolder, null);
- JobAd jdl = new JobAd();
- // FIXME on system LCs the sitelist should be collected from a
- // different environment
- EnvToJDL jdlCreator = new EnvToJDL(environment, environmentFolder,
- previousDestination == null ? super.getExecutorSites()
- .toArray(new String[1]) : null);
- jdlCreator
- .setInputFilenameTranslator(new EnvToJDL.InputFilenameTranslator() {
- // @Override //Java 1.6 Only
- public String translate(String me) {
- return new File(
- getProposedOutputFolder(environment)
- .getParent(), me).toString();
- }
- });
- jdl.fromRecord(jdlCreator.populateRecordExpr(jdlCreator
- .createMinimalJdl(environment.getBackendAttributeValue(
- backendID, voName_id), baseRequirement, baseRank)));
- if (logger.isInfoEnabled())
- logger.info("Generated JDL: " + jdl.toString());
prepareCert();
- synchronized (jobAccessHelper) {
- gliteJob = new Job(jdl);
- gliteJob.setCredPath(savedProxy);
- }
+ prepareGliteJob();
} catch (IllegalArgumentException e) {
e.printStackTrace();
LCLog.throwFatal(logger, GEMLCAInternalException.class,
@@ -335,10 +371,10 @@
returner = "Failed";
break;
case JobStatus.SCHEDULED:
- returner="Pending";
+ returner = "Pending";
break;
case JobStatus.RUNNING:
- returner="Active";
+ returner = "Active";
break;
case JobStatus.WAITING:
if (in.getValString(JobStatus.REASON).toLowerCase().indexOf(
@@ -364,7 +400,7 @@
public String getResultsUrl() throws GEMLCAException {
String returner = isSystem() ? LCGeneralParameters
.getGemlcaSystemTmpFolder() : outputFolder.toString();
- returner=LCGeneralParameters.getLCStorageURL()+returner;
+ returner = LCGeneralParameters.getLCStorageURL() + returner;
if (logger.isDebugEnabled())
logger.debug("start/stop" + returner);
@@ -392,6 +428,7 @@
* @return the suggested foldername
*/
private File getProposedOutputFolder(LCEnvironment nonSysEnv) {
+ logger.debug("start/stop");
return new File(nonSysEnv.getEnvironmentFolder().replace(
"$GEMLCAUSERHOME",
LCGeneralParameters.getGemlcaSystemTmpFolder()));
@@ -405,12 +442,14 @@
* A NON system LC for which the folder is created
*/
private void makeTmpFolderforEnv(LCEnvironment nonSysEnv) {
+ logger.debug("start");
outputFolder = getProposedOutputFolder(nonSysEnv);
if (!outputFolder.isDirectory()) {
outputFolder.mkdirs();
Api.shadow("chmod 777 " + outputFolder); // FIXME sec
// risk + Java6 fix needed + CE skip hack 4/5
}
+ logger.debug("stop");
}
/**
@@ -452,7 +491,15 @@
+ outputFolder);
if (environment.getOutputFileParameters().size() > 0)
synchronized (jobAccessHelper) {
- gliteJob.getOutput(outputFolder.toString());
+ try {
+ gliteJob.getOutput(outputFolder.toString());
+ } catch (UnsupportedOperationException e) {
+ e.printStackTrace();
+ LCLog.throwFatal(logger,
+ GEMLCAInternalException.class,
+ "The output of the job cannot be retrieved, reason:"
+ + e.getMessage());
+ }
}
logger.info("Output sandbox saved.");
} else {
@@ -465,15 +512,25 @@
case JobStatus.RUNNING:
logger.info("Cancellation request!");
synchronized (jobAccessHelper) {
- gliteJob.cancel();
+ try {
+ gliteJob.cancel();
+ } catch (UnsupportedOperationException e) {
+ e.printStackTrace();
+ LCLog.throwFatal(logger,
+ GEMLCAInternalException.class,
+ "This job cannot be killed, reason:"
+ + e.getMessage());
+ }
}
}
}
}
} catch (UnsupportedOperationException e) {
e.printStackTrace();
- LCLog.throwFatal(logger, GEMLCAInternalException.class,
- "This job cannot be killed, reason:" + e.getMessage());
+ // Do nothing.
+ logger.warn("An unsubmitted job was cancelled: "
+ + environment.getId() + "(" + environment.getEnvID()
+ + ")");
} catch (FileNotFoundException e) {
e.printStackTrace();
LCLog.throwFatal(logger, GEMLCAInternalException.class,
@@ -552,6 +609,7 @@
* @return
*/
private boolean isSystem() {
+ logger.debug("start/stop");
return environment.getStatus().equals(
LCEnvironmentDocument.LCEnvironment.Status.SYSTEM);
}
@@ -573,12 +631,33 @@
logger.debug("start");
try {
// FIXME CE skip hack! (5/5)
- if (!isSystem())
- synchronized (jobAccessHelper) {
- logger.debug(gliteJob.submit(networkServerURL,
- loggingServerURL, previousDestination).toString());
+ int submitCounter = 0;
+ while (true) {
+ try {
+ if (!isSystem()) {
+ synchronized (jobAccessHelper) {
+ logger.debug(gliteJob.submit(networkServerURL,
+ loggingServerURL, previousDestination)
+ .toString());
+ }
+ }
+ startMonitoring();
+ break;
+ } catch (RuntimeException e) {
+ submitCounter++;
+ if (submitCounter == resubmissionCount) {
+ throw new UnsupportedOperationException(
+ "Resubmission count reached!",e);
+ }
+ try {
+ Thread.sleep(resubmissionDelay);
+ prepareGliteJob(); // reinitialize the JDL.
+ } catch (InterruptedException intex) {
+ // ignore.
+ }
}
- startMonitoring();
+
+ }
} catch (InvalidAttributeValueException e) {
e.printStackTrace();
LCLog.throwFatal(logger, GEMLCAInternalException.class,
@@ -638,9 +717,10 @@
.put("0-1," + LCEnvironment.xpathOrder[11] + "@" + retry_id, "");
returner.put(
"0-1," + LCEnvironment.xpathOrder[11] + "@" + shalRetry_id, "");
-
- returner.put("1,"+LCEnvironment.xpathOrder[11]+"/siteInfo at jobManager",JOB_MANAGER);
-
+
+ returner.put("1," + LCEnvironment.xpathOrder[11]
+ + "/siteInfo at jobManager", JOB_MANAGER);
+
logger.debug("stop");
return returner;
}
@@ -719,7 +799,8 @@
@Override
public void filterUserUploadedInputs(TreeMap<Integer, String> defaultInputs)
throws GEMLCAException {
- if(logger.isDebugEnabled()) logger.debug("start("+defaultInputs+")");
+ if (logger.isDebugEnabled())
+ logger.debug("start(" + defaultInputs + ")");
String uploadLoc = environment.getResultsURL();
try {
uploadLoc = uploadLoc.replaceAll(" ", "%20");
@@ -742,6 +823,6 @@
LCLog.throwFatal(logger, GEMLCAUserFriendlyException.class,
"Invalid results URL:" + uploadLoc);
}
- logger.debug("stop");
+ logger.debug("stop");
}
}
Modified: trunk/JavaSource/uk/ac/wmin/cpc/gemlca/client/helpers/GlobusConnectionHelper.java
===================================================================
--- trunk/JavaSource/uk/ac/wmin/cpc/gemlca/client/helpers/GlobusConnectionHelper.java 2008-11-17 16:55:11 UTC (rev 22)
+++ trunk/JavaSource/uk/ac/wmin/cpc/gemlca/client/helpers/GlobusConnectionHelper.java 2008-11-18 13:48:03 UTC (rev 23)
@@ -57,6 +57,10 @@
*/
public class GlobusConnectionHelper {
+ public interface RetryAble<E> {
+ public E callMe() throws Exception;
+ }
+
static {
Util.registerTransport();
}
@@ -72,6 +76,8 @@
private static String defaultIdentity = "/C=UK/O=eScience/OU=Westminster/L=ComputerScience/CN=n10.cluster.cpc.wmin.ac.uk/E=t.delaitre at wmin.ac.uk";
+ private static final int defaultCallRetryCount=5;
+
private static final String GEMLCA_FACTORY_SERVICE="GemlcaFactoryService";
/**
@@ -329,7 +335,7 @@
try {
- ClientSecurityDescriptor csdesc = new ClientSecurityDescriptor();
+ final ClientSecurityDescriptor csdesc = new ClientSecurityDescriptor();
csdesc.setAuthz(new IdentityAuthorization(Identity));
csdesc.setGSSCredential(cred);
@@ -341,7 +347,7 @@
int port = endpoint.getAddress().getPort();
final String BASE_SERVICE_PATH = "/wsrf/services/";
- String factoryUrl
+ final String factoryUrl
= protocol + "://" + host + ":" + port
+ BASE_SERVICE_PATH
+ DelegationConstants.FACTORY_PATH;
@@ -349,11 +355,15 @@
EndpointReferenceType endp = new EndpointReferenceType();
endp.setAddress(new Address(factoryUrl));
- X509Certificate[] cert = DelegationUtil.getCertificateChainRP(endp,csdesc);
+ final X509Certificate[] cert = DelegationUtil.getCertificateChainRP(endp,csdesc);
- GlobusCredential globusCred = ((GlobusGSSCredentialImpl)cred).getGlobusCredential();
+ final GlobusCredential globusCred = ((GlobusGSSCredentialImpl)cred).getGlobusCredential();
- return DelegationUtil.delegate(factoryUrl,globusCred, cert[0], true, csdesc );
+ return callWithRetries(new RetryAble<EndpointReferenceType>() {
+ public EndpointReferenceType callMe() throws Exception {
+ return DelegationUtil.delegate(factoryUrl,globusCred, cert[0], true, csdesc );
+ }
+ });
} catch(Exception e) {
@@ -378,4 +388,18 @@
return GCH.createResource(true);
}
+ public static <E> E callWithRetries(RetryAble<E> function) throws Exception {
+ Exception throwMe=null;
+ for(int i=0;i<defaultCallRetryCount;i++) {
+ try {
+ return function.callMe();
+ }
+ catch(Exception e) {
+ throwMe=e;
+ Thread.sleep(10000);
+ }
+ }
+ throw throwMe;
+ }
+
}
Modified: trunk/JavaSource/uk/ac/wmin/cpc/gemlca/client/helpers/ProcessWebClient.java
===================================================================
--- trunk/JavaSource/uk/ac/wmin/cpc/gemlca/client/helpers/ProcessWebClient.java 2008-11-17 16:55:11 UTC (rev 22)
+++ trunk/JavaSource/uk/ac/wmin/cpc/gemlca/client/helpers/ProcessWebClient.java 2008-11-18 13:48:03 UTC (rev 23)
@@ -177,8 +177,8 @@
logger.info("Job with jobID '" + jobId
+ "' dropped.");
jobPool.remove(jobId); // FIXME client still
- // running unnecessarily -
- // thread kill is necessary.
+ // running unnecessarily -
+ // thread kill is necessary.
}
}
}
@@ -274,7 +274,9 @@
logger.info("submission failed");
throw new UnknownHostException();
}
- jobPool.put(new Integer(jid), myje);
+ synchronized (jobPool) {
+ jobPool.put(new Integer(jid), myje);
+ }
if (!handlePool.containsKey(GHandle))
handlePool.put(GHandle, new Vector<Integer>());
handlePool.get(GHandle).addElement(jid);
@@ -296,21 +298,24 @@
}
logger.info("State request");
Integer nJID = Integer.valueOf(JID);
- ClientGLCProcess.JobExecutor myje = (ClientGLCProcess.JobExecutor) jobPool
- .get(nJID);
- if (myje == null) {
- logger.info("nonexistent job");
- throw new UnknownHostException();
- } else {
- PrintWriter out = response.getWriter();
- response.setContentType("text/plain");
- out.write(myje.getCurrentState());
- if (!myje.isAlive()) {
- jobPool.remove(nJID); // Cleanup of the Job
- // Pool
- Exception DC = myje.getDeathCause();
- if (DC != null)
- throw new Exception(DC);
+ synchronized (jobPool) {
+ ClientGLCProcess.JobExecutor myje = (ClientGLCProcess.JobExecutor) jobPool
+ .get(nJID);
+ if (myje == null) {
+ logger.info("nonexistent job");
+ throw new UnknownHostException();
+ } else {
+ PrintWriter out = response.getWriter();
+ response.setContentType("text/plain");
+ out.write(myje.getCurrentState());
+ if (!myje.isAlive()) {
+ jobPool.remove(nJID); // Cleanup of the
+ // Job
+ // Pool
+ Exception DC = myje.getDeathCause();
+ if (DC != null)
+ throw new Exception(DC);
+ }
}
}
} else {
@@ -336,7 +341,9 @@
client.loadSerializedEndpointreference(GHandle);
Integer nJID = Integer.valueOf(JID);
client.killProcess(nJID.intValue());
- jobPool.remove(nJID);
+ synchronized (jobPool) {
+ jobPool.remove(nJID);
+ }
} else {
String destroyProcess = request
.getParameter("destroyProcess");
@@ -352,12 +359,14 @@
}
logger.info("Destroy process");
for (Integer jid : handlePool.get(ghandle)) {
- if (jobPool.containsKey(jid)) {
- ClientGLCProcess.JobExecutor currentJE = jobPool
- .get(jid);
- jobPool.remove(jid);
- while (currentJE.isAlive()) {
- Thread.sleep(50);
+ synchronized (jobPool) {
+ if (jobPool.containsKey(jid)) {
+ ClientGLCProcess.JobExecutor currentJE = jobPool
+ .get(jid);
+ jobPool.remove(jid);
+ while (currentJE.isAlive()) {
+ Thread.sleep(50);
+ }
}
}
}
@@ -416,18 +425,21 @@
}
logger.info("Executor site query");
Integer nJID = Integer.valueOf(JID);
- ClientGLCProcess.JobExecutor myje = (ClientGLCProcess.JobExecutor) jobPool
- .get(nJID);
- if (myje == null) {
- logger.info("nonexistent job");
- throw new UnknownHostException();
- } else {
- PrintWriter out = response
- .getWriter();
- response
- .setContentType("text/plain");
- out
- .write(myje.getJobData().executorSite);
+ synchronized (jobPool) {
+ ClientGLCProcess.JobExecutor myje = (ClientGLCProcess.JobExecutor) jobPool
+ .get(nJID);
+ if (myje == null) {
+ logger.info("nonexistent job");
+ throw new UnknownHostException();
+ } else {
+ PrintWriter out = response
+ .getWriter();
+ response
+ .setContentType("text/plain");
+ out
+ .write(myje
+ .getJobData().executorSite);
+ }
}
} else {
logger.info("Unrecognised command");
More information about the gemlca-commit
mailing list