@@ -896,6 +896,124 @@ def on_failure_callable(context):
896
896
updated_dag_state = dag_run .update_state ()
897
897
self .assertEqual (State .FAILED , updated_dag_state )
898
898
899
+ def test_dagrun_set_state_end_date (self ):
900
+ session = settings .Session ()
901
+
902
+ dag = DAG (
903
+ 'test_dagrun_set_state_end_date' ,
904
+ start_date = DEFAULT_DATE ,
905
+ default_args = {'owner' : 'owner1' })
906
+
907
+ dag .clear ()
908
+
909
+ now = timezone .utcnow ()
910
+ dr = dag .create_dagrun (run_id = 'test_dagrun_set_state_end_date' ,
911
+ state = State .RUNNING ,
912
+ execution_date = now ,
913
+ start_date = now )
914
+
915
+ # Initial end_date should be NULL
916
+ # State.SUCCESS and State.FAILED are all ending state and should set end_date
917
+ # State.RUNNING set end_date back to NULL
918
+ session .add (dr )
919
+ session .commit ()
920
+ self .assertIsNone (dr .end_date )
921
+
922
+ dr .set_state (State .SUCCESS )
923
+ session .merge (dr )
924
+ session .commit ()
925
+
926
+ dr_database = session .query (DagRun ).filter (
927
+ DagRun .run_id == 'test_dagrun_set_state_end_date'
928
+ ).one ()
929
+ self .assertIsNotNone (dr_database .end_date )
930
+ self .assertEqual (dr .end_date , dr_database .end_date )
931
+
932
+ dr .set_state (State .RUNNING )
933
+ session .merge (dr )
934
+ session .commit ()
935
+
936
+ dr_database = session .query (DagRun ).filter (
937
+ DagRun .run_id == 'test_dagrun_set_state_end_date'
938
+ ).one ()
939
+
940
+ self .assertIsNone (dr_database .end_date )
941
+
942
+ dr .set_state (State .FAILED )
943
+ session .merge (dr )
944
+ session .commit ()
945
+ dr_database = session .query (DagRun ).filter (
946
+ DagRun .run_id == 'test_dagrun_set_state_end_date'
947
+ ).one ()
948
+
949
+ self .assertIsNotNone (dr_database .end_date )
950
+ self .assertEqual (dr .end_date , dr_database .end_date )
951
+
952
+ def test_dagrun_update_state_end_date (self ):
953
+ session = settings .Session ()
954
+
955
+ dag = DAG (
956
+ 'test_dagrun_update_state_end_date' ,
957
+ start_date = DEFAULT_DATE ,
958
+ default_args = {'owner' : 'owner1' })
959
+
960
+ # A -> B
961
+ with dag :
962
+ op1 = DummyOperator (task_id = 'A' )
963
+ op2 = DummyOperator (task_id = 'B' )
964
+ op1 .set_upstream (op2 )
965
+
966
+ dag .clear ()
967
+
968
+ now = timezone .utcnow ()
969
+ dr = dag .create_dagrun (run_id = 'test_dagrun_update_state_end_date' ,
970
+ state = State .RUNNING ,
971
+ execution_date = now ,
972
+ start_date = now )
973
+
974
+ # Initial end_date should be NULL
975
+ # State.SUCCESS and State.FAILED are all ending state and should set end_date
976
+ # State.RUNNING set end_date back to NULL
977
+ session .merge (dr )
978
+ session .commit ()
979
+ self .assertIsNone (dr .end_date )
980
+
981
+ ti_op1 = dr .get_task_instance (task_id = op1 .task_id )
982
+ ti_op1 .set_state (state = State .SUCCESS , session = session )
983
+ ti_op2 = dr .get_task_instance (task_id = op2 .task_id )
984
+ ti_op2 .set_state (state = State .SUCCESS , session = session )
985
+
986
+ dr .update_state ()
987
+
988
+ dr_database = session .query (DagRun ).filter (
989
+ DagRun .run_id == 'test_dagrun_update_state_end_date'
990
+ ).one ()
991
+ self .assertIsNotNone (dr_database .end_date )
992
+ self .assertEqual (dr .end_date , dr_database .end_date )
993
+
994
+ ti_op1 .set_state (state = State .RUNNING , session = session )
995
+ ti_op2 .set_state (state = State .RUNNING , session = session )
996
+ dr .update_state ()
997
+
998
+ dr_database = session .query (DagRun ).filter (
999
+ DagRun .run_id == 'test_dagrun_update_state_end_date'
1000
+ ).one ()
1001
+
1002
+ self .assertEqual (dr ._state , State .RUNNING )
1003
+ self .assertIsNone (dr .end_date )
1004
+ self .assertIsNone (dr_database .end_date )
1005
+
1006
+ ti_op1 .set_state (state = State .FAILED , session = session )
1007
+ ti_op2 .set_state (state = State .FAILED , session = session )
1008
+ dr .update_state ()
1009
+
1010
+ dr_database = session .query (DagRun ).filter (
1011
+ DagRun .run_id == 'test_dagrun_update_state_end_date'
1012
+ ).one ()
1013
+
1014
+ self .assertIsNotNone (dr_database .end_date )
1015
+ self .assertEqual (dr .end_date , dr_database .end_date )
1016
+
899
1017
def test_get_task_instance_on_empty_dagrun (self ):
900
1018
"""
901
1019
Make sure that a proper value is returned when a dagrun has no task instances
0 commit comments