[docs]@dataclassclassBeakerCallback(Callback):""" Adds metadata to the Beaker experiment description when running as a Beaker batch job. """priority:ClassVar[int]=min(CometCallback.priority-1,WandBCallback.priority-1)experiment_id:Optional[str]=Noneupdate_interval:Optional[int]=Nonedescription:Optional[str]=Noneenabled:Optional[bool]=Noneconfig:Optional[Dict[str,Any]]=None""" A JSON-serializable config to save to the results dataset as ``config.json``. """result_dir:str=BEAKER_RESULT_DIR""" The directory of the Beaker results dataset where the config and other data will be saved. """_url:str|None=dataclasses.field(repr=False,default=None)_last_update:float|None=dataclasses.field(repr=False,default=None)defpost_attach(self):ifself.enabledisNone:fromolmo_core.launch.beakerimportis_running_in_beaker_batch_jobself.enabled=is_running_in_beaker_batch_job()defpre_train(self):ifself.enabledandget_rank()==0:fromolmo_core.launch.beakerimportget_beaker_clientifself.experiment_idisNone:fromolmo_core.launch.beakerimportget_beaker_experiment_idself.experiment_id=get_beaker_experiment_id()assertself.experiment_idisnotNonewithget_beaker_client()asbeaker:workload=beaker.workload.get(self.experiment_id)beaker_url=beaker.workload.url(workload)log.info(f"Running in Beaker workload {beaker_url}")# Add Beaker URL to W&B and Comet config if available.forcallbackinself.trainer.callbacks.values():ifisinstance(callback,WandBCallback):ifcallback.enabledandcallback.runisnotNone:callback.run.config.update({"beaker_experiment_url":beaker_url,"beaker_experiment_id":self.experiment_id,})log.info(f"Added beaker_experiment_url to W&B config: {beaker_url}")log.info(f"Added beaker_experiment_id to W&B config: {self.experiment_id}")elifisinstance(callback,CometCallback):ifcallback.enabledandcallback.expisnotNone:callback.exp.log_parameter("beaker_experiment_url",beaker_url)callback.exp.log_parameter("beaker_experiment_id",self.experiment_id)log.info(f"Added beaker_experiment_url to Comet: {beaker_url}")log.info(f"Added beaker_experiment_id to Comet: {self.experiment_id}")# Ensure result dataset directory exists.result_dir=Path(self.result_dir)/"olmo-core"result_dir.mkdir(parents=True,exist_ok=True)# Save config to result dir.ifself.configisnotNone:config_path=result_dir/"config.json"withconfig_path.open("w")asconfig_file:log.info(f"Saving config to '{config_path}'")json.dump(self.config,config_file)# Try saving Python requirements.requirements_path=result_dir/"requirements.txt"try:withrequirements_path.open("w")asrequirements_file:requirements_file.write(f"# python={platform.python_version()}\n")withrequirements_path.open("a")asrequirements_file:subprocess.call(["pip","freeze"],stdout=requirements_file,stderr=subprocess.DEVNULL,timeout=10,)exceptExceptionase:log.exception(f"Error saving Python packages: {e}")# Try to get W&B/Comet URL of experiment.forcallbackinself.trainer.callbacks.values():ifisinstance(callback,WandBCallback)andcallback.enabled:if(url:=callback.run.get_url())isnotNone:self._url=urlbreakelifisinstance(callback,CometCallback)andcallback.enabled:if(url:=callback.exp.url)isnotNone:self._url=urlbreakself._update()defpost_step(self):update_interval=self.update_intervalorself.trainer.metrics_collect_intervalifself.enabledandget_rank()==0andself.step%update_interval==0:# Make sure we don't update too frequently.ifself._last_updateisNoneor(time.monotonic()-self._last_update)>10:self._update()defpost_train(self):ifself.enabledandget_rank()==0:self._update()def_update(self):self.trainer.run_bookkeeping_op(self._set_description,op_name="beaker_set_description",allow_multiple=False,distributed=False,)self._last_update=time.monotonic()def_set_description(self):frombeaker.exceptionsimportBeakerError,HTTPError,RequestException,RpcErrorfromgantry.apiimportupdate_workload_descriptionfromolmo_core.launch.beakerimportget_beaker_clientdescription=f"[{self.trainer.training_progress}] "ifself.descriptionisnotNone:description=f"{description}{self.description}\n"ifself._urlisnotNone:description=f"{description}{self._url} "try:withget_beaker_client()asbeaker:update_workload_description(description.strip(),client=beaker)except(RequestException,BeakerError,HTTPError,RpcError)ase:log.warning(f"Failed to update Beaker experiment description: {e}")