I’m new in Dagster and try to create success hook that will send alerts through a telegram bot. Need help, please
Resource:
@resource def send_message(message): class TelegramConnection: def telegram_resource(message): botid = os.environ['telegram_bot'] chat = os.environ['telegram_chat'] bot = telegram.Bot(token=botid) return bot.sendMessage(chat_id=chat, text=message, parse_mode='HTML') return TelegramConnection()
Hook:
def _default_status_message(context: HookContext, status: str) -> str: return "Op {op_name} on job {pipeline_name} {status}!nRun ID: {run_id}".format( op_name=context.op.name, pipeline_name=context.pipeline_name, run_id=context.run_id, status=status, ) def _default_success_message(context: HookContext) -> str: return _default_status_message(context, status="succeeded") def telegram_on_success( message_fn: Callable[[HookContext], str] = _default_success_message, dagit_base_url: Optional[str] = "http://localhost:3000", ): @success_hook(required_resource_keys={"telegram"}) def _hook(context: HookContext): text = message_fn(context) if dagit_base_url: text += "n<{base_url}/instance/runs/{run_id}|View in Dagit>".format( base_url=dagit_base_url, run_id=context.run_id ) context.resources.telegram.send_message(text=text) # type: ignore return _hook
Job:
@job (resource_defs={"telegram": send_message}, hooks={success_hook}, op_retry_policy = default_policy) def job_text_for_pictures(): bq = extract_ID_with_photo() numbers, numbers_withoun_null = extract_id_for_items() df_case, df_prod = extract_data_from_sf() subset, query = transform_two_df(df_case, df_prod, numbers, numbers_withoun_null) final = final_sql_result(query, subset) merged_df = result_merge(final, bq) load_df(merged_df)
Error:
dagster._check.CheckError: Member of set mismatches type. Expected <class ‘dagster._core.definitions.hook_definition.HookDefinition’>. Got <function success_hook at 0x00000284AC2BB250> of type <class ‘function’>.
UPDATE:
@resource def telegram_resource(message): class TelegramConnection: def send_message(message): botid = os.environ['telegram_bot'] chat = os.environ['telegram_chat'] bot = telegram.Bot(token=botid) return bot.sendMessage(chat_id=chat, text=message, parse_mode='HTML') return TelegramConnection()
Job:
@job (resource_defs={"telegram": telegram_resource}, hooks={**telegram_on_success**}, op_retry_policy = default_policy) def job_text_for_pictures(): bq = extract_ID_with_photo() numbers, numbers_withoun_null = extract_id_for_items() df_case, df_prod = extract_data_from_sf() subset, query = transform_two_df(df_case, df_prod, numbers, numbers_withoun_null) final = final_sql_result(query, subset) merged_df = result_merge(final, bq) load_df(merged_df)
Hook:
def _default_status_message(context: HookContext, status: str) -> str: return "Op {op_name} on job {pipeline_name} {status}!nRun ID: {run_id}".format( op_name=context.op.name, pipeline_name=context.pipeline_name, run_id=context.run_id, status=status, ) def _default_success_message(context: HookContext) -> str: return _default_status_message(context, status="succeeded") def telegram_on_success( message_fn: Callable[[HookContext], str] = _default_success_message, dagit_base_url: Optional[str] = "http://localhost:3000", ): @success_hook(required_resource_keys={"telegram"}) def _hook(context: HookContext): text = message_fn(context) if dagit_base_url: text += "n<{base_url}/instance/runs/{run_id}|View in Dagit>".format( base_url=dagit_base_url, run_id=context.run_id ) context.resources.telegram.send_message(**text**) # type: ignore return _hook
New Error: TypeError: telegram_resource..TelegramConnection.send_message() takes 1 positional argument but 2 were given
Stack Trace: File “C:UsersAlBelyaevAppDataLocalProgramsPythonPython310libsite-packagesdagster_coreerrors.py”, line 188, in user_code_error_boundary yield , File “C:UsersAlBelyaevAppDataLocalProgramsPythonPython310libsite-packagesdagster_coreexecutionplanexecute_plan.py”, line 162, in _trigger_hook hook_execution_result = hook_def.hook_fn(hook_context, step_event_list) , File “C:UsersAlBelyaevAppDataLocalProgramsPythonPython310libsite-packagesdagster_coredefinitionsdecoratorshook_decorator.py”, line 198, in _success_hook fn(context) , File “C:DEdagstermy-dagster-projectmy_dagster_projecthookstext_for_pictures.py”, line 50, in _hook context.resources.telegram.send_message(text) # type: ignore
Advertisement
Answer
I think you need to the result of calling your telegram_on_success
function to your @job
definition, something like this:
def telegram_on_success( message_fn: Callable[[HookContext], str] = _default_success_message, dagit_base_url: Optional[str] = "http://localhost:3000", ): @success_hook(required_resource_keys={"telegram"}) def _hook(context: HookContext): text = message_fn(context) if dagit_base_url: text += "n<{base_url}/instance/runs/{run_id}|View in Dagit>".format( base_url=dagit_base_url, run_id=context.run_id ) context.resources.telegram.send_message(text=text) # type: ignore return _hook @job (resource_defs={"telegram": send_message}, hooks={telegram_on_success("some_default_success_message")}, op_retry_policy = default_policy) def job_text_for_pictures(): bq = extract_ID_with_photo() numbers, numbers_withoun_null = extract_id_for_items() df_case, df_prod = extract_data_from_sf() subset, query = transform_two_df(df_case, df_prod, numbers, numbers_withoun_null) final = final_sql_result(query, subset) merged_df = result_merge(final, bq) load_df(merged_df)
Right now by passing success_hook
to the hooks=
argument, you’re just passing the @success_hook
wrapper and not your actual success hook definition.