[gemlca-commit] r23 - in trunk/JavaSource/uk/ac/wmin/cpc/gemlca: backend client/helpers

commit at globus.org commit at globus.org
Tue Nov 18 07:48:04 CST 2008


Author: kecske
Date: 2008-11-18 07:48:03 -0600 (Tue, 18 Nov 2008)
New Revision: 23

Modified:
   trunk/JavaSource/uk/ac/wmin/cpc/gemlca/backend/BaseCogKitJob.java
   trunk/JavaSource/uk/ac/wmin/cpc/gemlca/backend/JavaEGEEJob.java
   trunk/JavaSource/uk/ac/wmin/cpc/gemlca/client/helpers/GlobusConnectionHelper.java
   trunk/JavaSource/uk/ac/wmin/cpc/gemlca/client/helpers/ProcessWebClient.java
Log:


Modified: trunk/JavaSource/uk/ac/wmin/cpc/gemlca/backend/BaseCogKitJob.java
===================================================================
--- trunk/JavaSource/uk/ac/wmin/cpc/gemlca/backend/BaseCogKitJob.java	2008-11-17 16:55:11 UTC (rev 22)
+++ trunk/JavaSource/uk/ac/wmin/cpc/gemlca/backend/BaseCogKitJob.java	2008-11-18 13:48:03 UTC (rev 23)
@@ -156,7 +156,11 @@
         spec.setArguments(argsBuilder.toString());
         logger.info("Arguments="+argsBuilder);
         
-        spec.setAttribute("jobtype",environment.getBackendSpecificDataParameterValue(backendID,"jobType"));
+        String jobType=environment.getBackendSpecificDataParameterValue(backendID,"jobType");
+        spec.setAttribute("jobtype",jobType);
+        if(jobType.equalsIgnoreCase("mpi")) {
+        	spec.addEnvironmentVariable("GLOBUS_DUROC_SUBJOB_INDEX", "0");
+        }
         
         String count = environment.getBackendSpecificDataParameterValue(backendID,"count");
         if ( count != null && !"".equals(count) ) spec.setAttribute("count",count);

Modified: trunk/JavaSource/uk/ac/wmin/cpc/gemlca/backend/JavaEGEEJob.java
===================================================================
--- trunk/JavaSource/uk/ac/wmin/cpc/gemlca/backend/JavaEGEEJob.java	2008-11-17 16:55:11 UTC (rev 22)
+++ trunk/JavaSource/uk/ac/wmin/cpc/gemlca/backend/JavaEGEEJob.java	2008-11-18 13:48:03 UTC (rev 23)
@@ -35,8 +35,6 @@
 import org.glite.wmsui.apij.Job;
 import org.glite.wmsui.apij.JobStatus;
 import org.glite.wmsui.apij.Url;
-import org.globus.ftp.GridFTPClient;
-import org.globus.ftp.MlsxEntry;
 import org.globus.gsi.GlobusCredentialException;
 import org.globus.gsi.gssapi.GlobusGSSCredentialImpl;
 import org.gridforum.jgss.ExtendedGSSCredential;
@@ -56,7 +54,7 @@
 public class JavaEGEEJob extends BackendPlugin {
 	private static final String jobAccessHelper = "X";
 	public static final String brokeredSites = "BROKER";
-	public static final String JOB_MANAGER="FORK";
+	public static final String JOB_MANAGER = "FORK";
 
 	private static final Logger logger = LCLog.getLogger();
 
@@ -74,6 +72,12 @@
 	public final String baseRequirement;
 	public final String baseRank;
 
+	public static final String resubmissioncount_id = "ResubmissionCount";
+	public static final String resubmissiondelay_id = "ResubmissionDelayInSecs";
+	public final int resubmissionCount;
+	public final int resubmissionDelay;
+
+	
 	// Variables required in the LCID
 	public static final String voName_id = "VirtualOrganisation";
 
@@ -130,6 +134,28 @@
 				backendID, requirement_id);
 		baseRank = LCGeneralParameters.getBackendConfigParameter(backendID,
 				rank_id);
+		String rec=LCGeneralParameters.getBackendConfigParameter(backendID, resubmissioncount_id);
+		if(rec!=null) {
+			int recNum;
+			try {
+				recNum=Integer.parseInt(rec);
+			}
+			catch(NumberFormatException e) {
+				recNum=5;
+			}
+			resubmissionCount=recNum;
+		} else resubmissionCount=5;
+		String red=LCGeneralParameters.getBackendConfigParameter(backendID, resubmissiondelay_id);
+		if(rec!=null) {
+			int redNum;
+			try {
+				redNum=Integer.parseInt(red)*1000;
+			}
+			catch(NumberFormatException e) {
+				redNum=10000;
+			}
+			resubmissionDelay=redNum;
+		} else resubmissionDelay=10000;
 		logger.info("Detecting previous related executions.");
 		LCEnvironment locEnvi = environment;
 		String dest = null;
@@ -147,6 +173,39 @@
 		logger.debug("stop");
 	}
 
+	protected JobAd getJDL(String environmentFolder) throws JobAdException, GEMLCAException {
+		logger.debug("start");
+		JobAd jdl = new JobAd();
+		// FIXME on system LCs the sitelist should be collected from a
+		// different environment
+		EnvToJDL jdlCreator = new EnvToJDL(environment, environmentFolder,
+				previousDestination == null ? super.getExecutorSites()
+						.toArray(new String[1]) : null);
+		jdlCreator
+				.setInputFilenameTranslator(new EnvToJDL.InputFilenameTranslator() {
+					// @Override //Java 1.6 Only
+					public String translate(String me) {
+						return new File(
+								getProposedOutputFolder(environment)
+										.getParent(), me).toString();
+					}
+				});
+		jdl.fromRecord(jdlCreator.populateRecordExpr(jdlCreator
+				.createMinimalJdl(environment.getBackendAttributeValue(
+						backendID, voName_id), baseRequirement, baseRank)));
+		if (logger.isInfoEnabled())
+			logger.info("Generated JDL: " + jdl.toString());
+		logger.debug("stop");
+		return jdl;
+	}
+	
+	protected void prepareGliteJob() throws IllegalArgumentException, JobAdException, GEMLCAException, GlobusCredentialException, FileNotFoundException {
+		synchronized (jobAccessHelper) {
+			gliteJob = new Job(getJDL(environmentFolder));
+			gliteJob.setCredPath(savedProxy);
+		}
+	}
+	
 	/**
 	 * Prepares the gLite Job Submission. Generates the necessary JDL, creates
 	 * the corresponding glite Job instance, and puts the proxy certificate
@@ -169,31 +228,8 @@
 		logger.debug("start");
 		try {
 			environment.createJobEnvironmentFolder(environmentFolder, null);
-			JobAd jdl = new JobAd();
-			// FIXME on system LCs the sitelist should be collected from a
-			// different environment
-			EnvToJDL jdlCreator = new EnvToJDL(environment, environmentFolder,
-					previousDestination == null ? super.getExecutorSites()
-							.toArray(new String[1]) : null);
-			jdlCreator
-					.setInputFilenameTranslator(new EnvToJDL.InputFilenameTranslator() {
-						// @Override //Java 1.6 Only
-						public String translate(String me) {
-							return new File(
-									getProposedOutputFolder(environment)
-											.getParent(), me).toString();
-						}
-					});
-			jdl.fromRecord(jdlCreator.populateRecordExpr(jdlCreator
-					.createMinimalJdl(environment.getBackendAttributeValue(
-							backendID, voName_id), baseRequirement, baseRank)));
-			if (logger.isInfoEnabled())
-				logger.info("Generated JDL: " + jdl.toString());
 			prepareCert();
-			synchronized (jobAccessHelper) {
-				gliteJob = new Job(jdl);
-				gliteJob.setCredPath(savedProxy);
-			}
+			prepareGliteJob();
 		} catch (IllegalArgumentException e) {
 			e.printStackTrace();
 			LCLog.throwFatal(logger, GEMLCAInternalException.class,
@@ -335,10 +371,10 @@
 					returner = "Failed";
 				break;
 			case JobStatus.SCHEDULED:
-				returner="Pending";
+				returner = "Pending";
 				break;
 			case JobStatus.RUNNING:
-				returner="Active";
+				returner = "Active";
 				break;
 			case JobStatus.WAITING:
 				if (in.getValString(JobStatus.REASON).toLowerCase().indexOf(
@@ -364,7 +400,7 @@
 	public String getResultsUrl() throws GEMLCAException {
 		String returner = isSystem() ? LCGeneralParameters
 				.getGemlcaSystemTmpFolder() : outputFolder.toString();
-		returner=LCGeneralParameters.getLCStorageURL()+returner;
+		returner = LCGeneralParameters.getLCStorageURL() + returner;
 
 		if (logger.isDebugEnabled())
 			logger.debug("start/stop" + returner);
@@ -392,6 +428,7 @@
 	 * @return the suggested foldername
 	 */
 	private File getProposedOutputFolder(LCEnvironment nonSysEnv) {
+		logger.debug("start/stop");
 		return new File(nonSysEnv.getEnvironmentFolder().replace(
 				"$GEMLCAUSERHOME",
 				LCGeneralParameters.getGemlcaSystemTmpFolder()));
@@ -405,12 +442,14 @@
 	 *            A NON system LC for which the folder is created
 	 */
 	private void makeTmpFolderforEnv(LCEnvironment nonSysEnv) {
+		logger.debug("start");
 		outputFolder = getProposedOutputFolder(nonSysEnv);
 		if (!outputFolder.isDirectory()) {
 			outputFolder.mkdirs();
 			Api.shadow("chmod 777 " + outputFolder); // FIXME sec
 			// risk + Java6 fix needed + CE skip hack 4/5
 		}
+		logger.debug("stop");
 	}
 
 	/**
@@ -452,7 +491,15 @@
 									+ outputFolder);
 						if (environment.getOutputFileParameters().size() > 0)
 							synchronized (jobAccessHelper) {
-								gliteJob.getOutput(outputFolder.toString());
+								try {
+									gliteJob.getOutput(outputFolder.toString());
+								} catch (UnsupportedOperationException e) {
+									e.printStackTrace();
+									LCLog.throwFatal(logger,
+											GEMLCAInternalException.class,
+											"The output of the job cannot be retrieved, reason:"
+													+ e.getMessage());
+								}
 							}
 						logger.info("Output sandbox saved.");
 					} else {
@@ -465,15 +512,25 @@
 						case JobStatus.RUNNING:
 							logger.info("Cancellation request!");
 							synchronized (jobAccessHelper) {
-								gliteJob.cancel();
+								try {
+									gliteJob.cancel();
+								} catch (UnsupportedOperationException e) {
+									e.printStackTrace();
+									LCLog.throwFatal(logger,
+											GEMLCAInternalException.class,
+											"This job cannot be killed, reason:"
+													+ e.getMessage());
+								}
 							}
 						}
 					}
 				}
 			} catch (UnsupportedOperationException e) {
 				e.printStackTrace();
-				LCLog.throwFatal(logger, GEMLCAInternalException.class,
-						"This job cannot be killed, reason:" + e.getMessage());
+				// Do nothing.
+				logger.warn("An unsubmitted job was cancelled: "
+						+ environment.getId() + "(" + environment.getEnvID()
+						+ ")");
 			} catch (FileNotFoundException e) {
 				e.printStackTrace();
 				LCLog.throwFatal(logger, GEMLCAInternalException.class,
@@ -552,6 +609,7 @@
 	 * @return
 	 */
 	private boolean isSystem() {
+		logger.debug("start/stop");
 		return environment.getStatus().equals(
 				LCEnvironmentDocument.LCEnvironment.Status.SYSTEM);
 	}
@@ -573,12 +631,33 @@
 		logger.debug("start");
 		try {
 			// FIXME CE skip hack! (5/5)
-			if (!isSystem())
-				synchronized (jobAccessHelper) {
-					logger.debug(gliteJob.submit(networkServerURL,
-							loggingServerURL, previousDestination).toString());
+			int submitCounter = 0;
+			while (true) {
+				try {
+					if (!isSystem()) {
+						synchronized (jobAccessHelper) {
+							logger.debug(gliteJob.submit(networkServerURL,
+									loggingServerURL, previousDestination)
+									.toString());
+						}
+					}
+					startMonitoring();
+					break;
+				} catch (RuntimeException e) {
+					submitCounter++;
+					if (submitCounter == resubmissionCount) {
+						throw new UnsupportedOperationException(
+								"Resubmission count reached!",e);
+					}
+					try {
+						Thread.sleep(resubmissionDelay);
+						prepareGliteJob(); // reinitialize the JDL.
+					} catch (InterruptedException intex) {
+						// ignore.
+					}
 				}
-			startMonitoring();
+
+			}
 		} catch (InvalidAttributeValueException e) {
 			e.printStackTrace();
 			LCLog.throwFatal(logger, GEMLCAInternalException.class,
@@ -638,9 +717,10 @@
 				.put("0-1," + LCEnvironment.xpathOrder[11] + "@" + retry_id, "");
 		returner.put(
 				"0-1," + LCEnvironment.xpathOrder[11] + "@" + shalRetry_id, "");
-	    
-		returner.put("1,"+LCEnvironment.xpathOrder[11]+"/siteInfo at jobManager",JOB_MANAGER);
-	       
+
+		returner.put("1," + LCEnvironment.xpathOrder[11]
+				+ "/siteInfo at jobManager", JOB_MANAGER);
+
 		logger.debug("stop");
 		return returner;
 	}
@@ -719,7 +799,8 @@
 	@Override
 	public void filterUserUploadedInputs(TreeMap<Integer, String> defaultInputs)
 			throws GEMLCAException {
-    	if(logger.isDebugEnabled()) logger.debug("start("+defaultInputs+")");
+		if (logger.isDebugEnabled())
+			logger.debug("start(" + defaultInputs + ")");
 		String uploadLoc = environment.getResultsURL();
 		try {
 			uploadLoc = uploadLoc.replaceAll(" ", "%20");
@@ -742,6 +823,6 @@
 			LCLog.throwFatal(logger, GEMLCAUserFriendlyException.class,
 					"Invalid results URL:" + uploadLoc);
 		}
-    	logger.debug("stop");
+		logger.debug("stop");
 	}
 }

Modified: trunk/JavaSource/uk/ac/wmin/cpc/gemlca/client/helpers/GlobusConnectionHelper.java
===================================================================
--- trunk/JavaSource/uk/ac/wmin/cpc/gemlca/client/helpers/GlobusConnectionHelper.java	2008-11-17 16:55:11 UTC (rev 22)
+++ trunk/JavaSource/uk/ac/wmin/cpc/gemlca/client/helpers/GlobusConnectionHelper.java	2008-11-18 13:48:03 UTC (rev 23)
@@ -57,6 +57,10 @@
  */
 public class GlobusConnectionHelper {
     
+	public interface RetryAble<E> {
+		public E callMe() throws Exception; 
+	}
+	
     static {
         Util.registerTransport();
     }
@@ -72,6 +76,8 @@
     
     private static String         defaultIdentity = "/C=UK/O=eScience/OU=Westminster/L=ComputerScience/CN=n10.cluster.cpc.wmin.ac.uk/E=t.delaitre at wmin.ac.uk";
     
+    private static final int      defaultCallRetryCount=5;
+    
     private static final String GEMLCA_FACTORY_SERVICE="GemlcaFactoryService";
     
     /**
@@ -329,7 +335,7 @@
         
         try {
                        
-            ClientSecurityDescriptor csdesc = new ClientSecurityDescriptor();
+            final ClientSecurityDescriptor csdesc = new ClientSecurityDescriptor();
             
             csdesc.setAuthz(new IdentityAuthorization(Identity));
             csdesc.setGSSCredential(cred);
@@ -341,7 +347,7 @@
             int port = endpoint.getAddress().getPort();
             final String BASE_SERVICE_PATH = "/wsrf/services/";
 
-            String factoryUrl
+            final String factoryUrl
                 = protocol + "://" + host + ":" + port
                 + BASE_SERVICE_PATH
                 + DelegationConstants.FACTORY_PATH;
@@ -349,11 +355,15 @@
             EndpointReferenceType endp = new EndpointReferenceType();
             endp.setAddress(new Address(factoryUrl));
 
-            X509Certificate[] cert = DelegationUtil.getCertificateChainRP(endp,csdesc);
+            final X509Certificate[] cert = DelegationUtil.getCertificateChainRP(endp,csdesc);
             
-            GlobusCredential globusCred = ((GlobusGSSCredentialImpl)cred).getGlobusCredential();
+            final GlobusCredential globusCred = ((GlobusGSSCredentialImpl)cred).getGlobusCredential();
                     
-            return DelegationUtil.delegate(factoryUrl,globusCred, cert[0], true,  csdesc );
+            return callWithRetries(new RetryAble<EndpointReferenceType>() {
+            	public EndpointReferenceType callMe() throws Exception {
+            		return DelegationUtil.delegate(factoryUrl,globusCred, cert[0], true,  csdesc );
+            	}
+            });
             
             
         } catch(Exception e) {
@@ -378,4 +388,18 @@
         return GCH.createResource(true);    
     }
     
+    public static <E> E callWithRetries(RetryAble<E> function) throws Exception {
+    	Exception throwMe=null;
+    	for(int i=0;i<defaultCallRetryCount;i++) {
+    		try {
+    			return function.callMe();
+    		}
+    		catch(Exception e) {
+    			throwMe=e;
+    			Thread.sleep(10000);
+    		}
+    	}
+    	throw throwMe;
+    }
+    
 }

Modified: trunk/JavaSource/uk/ac/wmin/cpc/gemlca/client/helpers/ProcessWebClient.java
===================================================================
--- trunk/JavaSource/uk/ac/wmin/cpc/gemlca/client/helpers/ProcessWebClient.java	2008-11-17 16:55:11 UTC (rev 22)
+++ trunk/JavaSource/uk/ac/wmin/cpc/gemlca/client/helpers/ProcessWebClient.java	2008-11-18 13:48:03 UTC (rev 23)
@@ -177,8 +177,8 @@
 							logger.info("Job with jobID '" + jobId
 									+ "' dropped.");
 							jobPool.remove(jobId); // FIXME client still
-													// running unnecessarily -
-													// thread kill is necessary.
+							// running unnecessarily -
+							// thread kill is necessary.
 						}
 					}
 				}
@@ -274,7 +274,9 @@
 						logger.info("submission failed");
 						throw new UnknownHostException();
 					}
-					jobPool.put(new Integer(jid), myje);
+					synchronized (jobPool) {
+						jobPool.put(new Integer(jid), myje);
+					}
 					if (!handlePool.containsKey(GHandle))
 						handlePool.put(GHandle, new Vector<Integer>());
 					handlePool.get(GHandle).addElement(jid);
@@ -296,21 +298,24 @@
 						}
 						logger.info("State request");
 						Integer nJID = Integer.valueOf(JID);
-						ClientGLCProcess.JobExecutor myje = (ClientGLCProcess.JobExecutor) jobPool
-								.get(nJID);
-						if (myje == null) {
-							logger.info("nonexistent job");
-							throw new UnknownHostException();
-						} else {
-							PrintWriter out = response.getWriter();
-							response.setContentType("text/plain");
-							out.write(myje.getCurrentState());
-							if (!myje.isAlive()) {
-								jobPool.remove(nJID); // Cleanup of the Job
-								// Pool
-								Exception DC = myje.getDeathCause();
-								if (DC != null)
-									throw new Exception(DC);
+						synchronized (jobPool) {
+							ClientGLCProcess.JobExecutor myje = (ClientGLCProcess.JobExecutor) jobPool
+									.get(nJID);
+							if (myje == null) {
+								logger.info("nonexistent job");
+								throw new UnknownHostException();
+							} else {
+								PrintWriter out = response.getWriter();
+								response.setContentType("text/plain");
+								out.write(myje.getCurrentState());
+								if (!myje.isAlive()) {
+									jobPool.remove(nJID); // Cleanup of the
+									// Job
+									// Pool
+									Exception DC = myje.getDeathCause();
+									if (DC != null)
+										throw new Exception(DC);
+								}
 							}
 						}
 					} else {
@@ -336,7 +341,9 @@
 							client.loadSerializedEndpointreference(GHandle);
 							Integer nJID = Integer.valueOf(JID);
 							client.killProcess(nJID.intValue());
-							jobPool.remove(nJID);
+							synchronized (jobPool) {
+								jobPool.remove(nJID);
+							}
 						} else {
 							String destroyProcess = request
 									.getParameter("destroyProcess");
@@ -352,12 +359,14 @@
 								}
 								logger.info("Destroy process");
 								for (Integer jid : handlePool.get(ghandle)) {
-									if (jobPool.containsKey(jid)) {
-										ClientGLCProcess.JobExecutor currentJE = jobPool
-												.get(jid);
-										jobPool.remove(jid);
-										while (currentJE.isAlive()) {
-											Thread.sleep(50);
+									synchronized (jobPool) {
+										if (jobPool.containsKey(jid)) {
+											ClientGLCProcess.JobExecutor currentJE = jobPool
+													.get(jid);
+											jobPool.remove(jid);
+											while (currentJE.isAlive()) {
+												Thread.sleep(50);
+											}
 										}
 									}
 								}
@@ -416,18 +425,21 @@
 										}
 										logger.info("Executor site query");
 										Integer nJID = Integer.valueOf(JID);
-										ClientGLCProcess.JobExecutor myje = (ClientGLCProcess.JobExecutor) jobPool
-												.get(nJID);
-										if (myje == null) {
-											logger.info("nonexistent job");
-											throw new UnknownHostException();
-										} else {
-											PrintWriter out = response
-													.getWriter();
-											response
-													.setContentType("text/plain");
-											out
-													.write(myje.getJobData().executorSite);
+										synchronized (jobPool) {
+											ClientGLCProcess.JobExecutor myje = (ClientGLCProcess.JobExecutor) jobPool
+													.get(nJID);
+											if (myje == null) {
+												logger.info("nonexistent job");
+												throw new UnknownHostException();
+											} else {
+												PrintWriter out = response
+														.getWriter();
+												response
+														.setContentType("text/plain");
+												out
+														.write(myje
+																.getJobData().executorSite);
+											}
 										}
 									} else {
 										logger.info("Unrecognised command");




More information about the gemlca-commit mailing list