diff --git a/airflow/contrib/hooks/sagemaker_hook.py b/airflow/contrib/hooks/sagemaker_hook.py
index 40bebf89799b4..bc096ff55fccd 100644
--- a/airflow/contrib/hooks/sagemaker_hook.py
+++ b/airflow/contrib/hooks/sagemaker_hook.py
@@ -31,6 +31,8 @@ class SageMakerHook(AwsHook):
     sagemaker_conn_id is required for using
     the config stored in db for training/tuning
     """
+    non_terminal_states = {'InProgress', 'Stopping', 'Stopped'}
+    failed_states = {'Failed'}
 
     def __init__(self,
                  sagemaker_conn_id=None,
@@ -96,9 +98,9 @@ def check_status(self, non_terminal_states,
                      describe_function, *args):
         """
         :param non_terminal_states: the set of non_terminal states
-        :type non_terminal_states: dict
+        :type non_terminal_states: set
         :param failed_state: the set of failed states
-        :type failed_state: dict
+        :type failed_state: set
         :param key: the key of the response dict
         that points to the state
         :type key: str
@@ -177,7 +179,7 @@ def create_training_job(self, training_job_config, wait_for_completion=True):
         :param training_job_config: the config for training
         :type training_job_config: dict
         :param wait_for_completion: if the program should keep running until job finishes
-        :param wait_for_completion: bool
+        :type wait_for_completion: bool
         :return: A dict that contains ARN of the training job.
         """
         if self.use_db_config:
@@ -194,8 +196,8 @@ def create_training_job(self, training_job_config, wait_for_completion=True):
         response = self.conn.create_training_job(
             **training_job_config)
         if wait_for_completion:
-            self.check_status(['InProgress', 'Stopping', 'Stopped'],
-                              ['Failed'],
+            self.check_status(SageMakerHook.non_terminal_states,
+                              SageMakerHook.failed_states,
                               'TrainingJobStatus',
                               self.describe_training_job,
                               training_job_config['TrainingJobName'])
@@ -213,8 +215,8 @@ def create_tuning_job(self, tuning_job_config, wait_for_completion=True):
         if self.use_db_config:
             if not self.sagemaker_conn_id:
                 raise AirflowException(
-                    "sagemaker connection id must be present to \
-                    read sagemaker tunning job configuration.")
+                    "SageMaker connection id must be present to \
+                    read SageMaker tunning job configuration.")
 
             sagemaker_conn = self.get_connection(self.sagemaker_conn_id)
 
@@ -226,13 +228,59 @@ def create_tuning_job(self, tuning_job_config, wait_for_completion=True):
         response = self.conn.create_hyper_parameter_tuning_job(
             **tuning_job_config)
         if wait_for_completion:
-            self.check_status(['InProgress', 'Stopping', 'Stopped'],
-                              ['Failed'],
+            self.check_status(SageMakerHook.non_terminal_states,
+                              SageMakerHook.failed_states,
                               'HyperParameterTuningJobStatus',
                               self.describe_tuning_job,
                               tuning_job_config['HyperParameterTuningJobName'])
         return response
 
+    def create_transform_job(self, transform_job_config, wait_for_completion=True):
+        """
+        Create a transform job
+        :param transform_job_config: the config for transform job
+        :type transform_job_config: dict
+        :param wait_for_completion:
+        if the program should keep running until job finishes
+        :type wait_for_completion: bool
+        :return: A dict that contains ARN of the transform job.
+        """
+        if self.use_db_config:
+            if not self.sagemaker_conn_id:
+                raise AirflowException(
+                    "SageMaker connection id must be present to \
+                    read SageMaker transform job configuration.")
+
+            sagemaker_conn = self.get_connection(self.sagemaker_conn_id)
+
+            config = sagemaker_conn.extra_dejson.copy()
+            transform_job_config.update(config)
+
+        self.check_for_url(transform_job_config
+                           ['TransformInput']['DataSource']
+                           ['S3DataSource']['S3Uri'])
+
+        response = self.conn.create_transform_job(
+            **transform_job_config)
+        if wait_for_completion:
+            self.check_status(SageMakerHook.non_terminal_states,
+                              SageMakerHook.failed_states,
+                              'TransformJobStatus',
+                              self.describe_transform_job,
+                              transform_job_config['TransformJobName'])
+        return response
+
+    def create_model(self, model_config):
+        """
+        Create a model job
+        :param model_config: the config for model
+        :type model_config: dict
+        :return: A dict that contains ARN of the model.
+        """
+
+        return self.conn.create_model(
+            **model_config)
+
     def describe_training_job(self, training_job_name):
         """
         :param training_job_name: the name of the training job
@@ -245,11 +293,22 @@ def describe_training_job(self, training_job_name):
 
     def describe_tuning_job(self, tuning_job_name):
         """
-        :param tuning_job_name: the name of the training job
-        :type tuning_job_name: str
+        :param tuning_job_name: the name of the tuning job
+        :type tuning_job_name: string
         Return the tuning job info associated with the current job_name
         :return: A dict contains all the tuning job info
         """
         return self.conn\
             .describe_hyper_parameter_tuning_job(
                 HyperParameterTuningJobName=tuning_job_name)
+
+    def describe_transform_job(self, transform_job_name):
+        """
+        :param transform_job_name: the name of the transform job
+        :type transform_job_name: string
+        Return the transform job info associated with the current job_name
+        :return: A dict contains all the transform job info
+        """
+        return self.conn\
+            .describe_transform_job(
+                TransformJobName=transform_job_name)
diff --git a/airflow/contrib/operators/sagemaker_create_training_job_operator.py b/airflow/contrib/operators/sagemaker_create_training_job_operator.py
index 5b600c707de50..279220867956d 100644
--- a/airflow/contrib/operators/sagemaker_create_training_job_operator.py
+++ b/airflow/contrib/operators/sagemaker_create_training_job_operator.py
@@ -50,7 +50,7 @@ class SageMakerCreateTrainingJobOperator(BaseOperator):
        until training job finishes
        :type wait_for_completion: bool
        :param check_interval: if wait is set to be true, this is the time interval
-       which the operator will check the status of the training job
+       in seconds which the operator will check the status of the training job
        :type check_interval: int
        :param max_ingestion_time: if wait is set to be true, the operator will fail
        if the training job hasn't finish within the max_ingestion_time
diff --git a/airflow/contrib/operators/sagemaker_create_transform_job_operator.py b/airflow/contrib/operators/sagemaker_create_transform_job_operator.py
new file mode 100644
index 0000000000000..22c8c2b4ba297
--- /dev/null
+++ b/airflow/contrib/operators/sagemaker_create_transform_job_operator.py
@@ -0,0 +1,132 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.contrib.hooks.sagemaker_hook import SageMakerHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.exceptions import AirflowException
+
+
+class SageMakerCreateTransformJobOperator(BaseOperator):
+    """
+       Initiate a SageMaker transform
+
+       This operator returns The ARN of the model created in Amazon SageMaker
+
+       :param sagemaker_conn_id: The SageMaker connection ID to use.
+       :type sagemaker_conn_id: string
+       :param transform_job_config:
+       The configuration necessary to start a transform job (templated)
+       :type transform_job_config: dict
+       :param model_config:
+       The configuration necessary to create a model, the default is none
+       which means that user should provide a created model in transform_job_config
+       If given, will be used to create a model before creating transform job
+       :type model_config: dict
+       :param use_db_config: Whether or not to use db config
+       associated with sagemaker_conn_id.
+       If set to true, will automatically update the transform config
+       with what's in db, so the db config doesn't need to
+       included everything, but what's there does replace the ones
+       in the transform_job_config, so be careful
+       :type use_db_config: bool
+       :param region_name: The AWS region_name
+       :type region_name: string
+       :param wait_for_completion: if the program should keep running until job finishes
+       :type wait_for_completion: bool
+       :param check_interval: if wait is set to be true, this is the time interval
+       in seconds which the operator will check the status of the transform job
+       :type check_interval: int
+       :param max_ingestion_time: if wait is set to be true, the operator will fail
+       if the transform job hasn't finish within the max_ingestion_time
+       (Caution: be careful to set this parameters because transform can take very long)
+       :type max_ingestion_time: int
+       :param aws_conn_id: The AWS connection ID to use.
+       :type aws_conn_id: string
+
+       **Example**:
+           The following operator would start a transform job when executed
+
+            sagemaker_transform =
+               SageMakerCreateTransformJobOperator(
+                   task_id='sagemaker_transform',
+                   transform_job_config=config_transform,
+                   model_config=config_model,
+                   region_name='us-west-2'
+                   sagemaker_conn_id='sagemaker_customers_conn',
+                   use_db_config=True,
+                   aws_conn_id='aws_customers_conn'
+               )
+       """
+
+    template_fields = ['transform_job_config']
+    template_ext = ()
+    ui_color = '#ededed'
+
+    @apply_defaults
+    def __init__(self,
+                 sagemaker_conn_id=None,
+                 transform_job_config=None,
+                 model_config=None,
+                 use_db_config=False,
+                 region_name=None,
+                 wait_for_completion=True,
+                 check_interval=2,
+                 max_ingestion_time=None,
+                 *args, **kwargs):
+        super(SageMakerCreateTransformJobOperator, self).__init__(*args, **kwargs)
+
+        self.sagemaker_conn_id = sagemaker_conn_id
+        self.transform_job_config = transform_job_config
+        self.model_config = model_config
+        self.use_db_config = use_db_config
+        self.region_name = region_name
+        self.wait_for_completion = wait_for_completion
+        self.check_interval = check_interval
+        self.max_ingestion_time = max_ingestion_time
+
+    def execute(self, context):
+        sagemaker = SageMakerHook(
+            sagemaker_conn_id=self.sagemaker_conn_id,
+            use_db_config=self.use_db_config,
+            region_name=self.region_name,
+            check_interval=self.check_interval,
+            max_ingestion_time=self.max_ingestion_time
+        )
+
+        if self.model_config:
+            self.log.info(
+                "Creating SageMaker Model %s for transform job"
+                % self.model_config['ModelName']
+            )
+            sagemaker.create_model(self.model_config)
+
+        self.log.info(
+            "Creating SageMaker transform Job %s."
+            % self.transform_job_config['TransformJobName']
+        )
+        response = sagemaker.create_transform_job(
+            self.transform_job_config,
+            wait_for_completion=self.wait_for_completion)
+        if not response['ResponseMetadata']['HTTPStatusCode'] \
+           == 200:
+            raise AirflowException(
+                'Sagemaker transform Job creation failed: %s' % response)
+        else:
+            return response
diff --git a/airflow/contrib/operators/sagemaker_create_tuning_job_operator.py b/airflow/contrib/operators/sagemaker_create_tuning_job_operator.py
index 2dc3a88e972f3..d5f4396375993 100644
--- a/airflow/contrib/operators/sagemaker_create_tuning_job_operator.py
+++ b/airflow/contrib/operators/sagemaker_create_tuning_job_operator.py
@@ -48,7 +48,7 @@ class SageMakerCreateTuningJobOperator(BaseOperator):
        until tuning job finishes
        :type wait_for_completion: bool
        :param check_interval: if wait is set to be true, this is the time interval
-       which the operator will check the status of the tuning job
+       in seconds which the operator will check the status of the tuning job
        :type check_interval: int
        :param max_ingestion_time: if wait is set to be true, the operator will fail
        if the tuning job hasn't finish within the max_ingestion_time
diff --git a/airflow/contrib/sensors/sagemaker_training_sensor.py b/airflow/contrib/sensors/sagemaker_training_sensor.py
index 963ab7d29ff79..5e83e846efec5 100644
--- a/airflow/contrib/sensors/sagemaker_training_sensor.py
+++ b/airflow/contrib/sensors/sagemaker_training_sensor.py
@@ -45,10 +45,10 @@ def __init__(self,
         self.region_name = region_name
 
     def non_terminal_states(self):
-        return ['InProgress', 'Stopping', 'Stopped']
+        return SageMakerHook.non_terminal_states
 
     def failed_states(self):
-        return ['Failed']
+        return SageMakerHook.failed_states
 
     def get_sagemaker_response(self):
         sagemaker = SageMakerHook(
diff --git a/airflow/contrib/sensors/sagemaker_transform_sensor.py b/airflow/contrib/sensors/sagemaker_transform_sensor.py
new file mode 100644
index 0000000000000..68ef1d8dd7b05
--- /dev/null
+++ b/airflow/contrib/sensors/sagemaker_transform_sensor.py
@@ -0,0 +1,69 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.contrib.hooks.sagemaker_hook import SageMakerHook
+from airflow.contrib.sensors.sagemaker_base_sensor import SageMakerBaseSensor
+from airflow.utils.decorators import apply_defaults
+
+
+class SageMakerTransformSensor(SageMakerBaseSensor):
+    """
+    Asks for the state of the transform state until it reaches a terminal state.
+    The sensor will error if the job errors, throwing a AirflowException
+    containing the failure reason.
+
+    :param job_name: job_name of the transform job instance to check the state of
+    :type job_name: string
+    :param region_name: The AWS region_name
+    :type region_name: string
+    """
+
+    template_fields = ['job_name']
+    template_ext = ()
+
+    @apply_defaults
+    def __init__(self,
+                 job_name,
+                 region_name=None,
+                 *args,
+                 **kwargs):
+        super(SageMakerTransformSensor, self).__init__(*args, **kwargs)
+        self.job_name = job_name
+        self.region_name = region_name
+
+    def non_terminal_states(self):
+        return SageMakerHook.non_terminal_states
+
+    def failed_states(self):
+        return SageMakerHook.failed_states
+
+    def get_sagemaker_response(self):
+        sagemaker = SageMakerHook(
+            aws_conn_id=self.aws_conn_id,
+            region_name=self.region_name
+        )
+
+        self.log.info('Poking Sagemaker Transform Job %s', self.job_name)
+        return sagemaker.describe_transform_job(self.job_name)
+
+    def get_failed_reason_from_response(self, response):
+        return response['FailureReason']
+
+    def state_from_response(self, response):
+        return response['TransformJobStatus']
diff --git a/airflow/contrib/sensors/sagemaker_tuning_sensor.py b/airflow/contrib/sensors/sagemaker_tuning_sensor.py
index b2dd4604c0772..ec90964266a11 100644
--- a/airflow/contrib/sensors/sagemaker_tuning_sensor.py
+++ b/airflow/contrib/sensors/sagemaker_tuning_sensor.py
@@ -48,10 +48,10 @@ def __init__(self,
         self.region_name = region_name
 
     def non_terminal_states(self):
-        return ['InProgress', 'Stopping', 'Stopped']
+        return SageMakerHook.non_terminal_states
 
     def failed_states(self):
-        return ['Failed']
+        return SageMakerHook.failed_states
 
     def get_sagemaker_response(self):
         sagemaker = SageMakerHook(
diff --git a/tests/contrib/hooks/test_sagemaker_hook.py b/tests/contrib/hooks/test_sagemaker_hook.py
index 8bb56cc8e7d12..3a863b3cb0dc7 100644
--- a/tests/contrib/hooks/test_sagemaker_hook.py
+++ b/tests/contrib/hooks/test_sagemaker_hook.py
@@ -47,6 +47,8 @@
 
 job_name = 'test-job-name'
 
+model_name = 'test-model-name'
+
 image = 'test-image'
 
 test_arn_return = {'TrainingJobArn': 'testarn'}
@@ -152,6 +154,38 @@
         }
     }
 
+create_transform_params = \
+    {
+        'TransformJobName': job_name,
+        'ModelName': model_name,
+        'BatchStrategy': 'MultiRecord',
+        'TransformInput': {
+            'DataSource': {
+                'S3DataSource': {
+                    'S3DataType': 'S3Prefix',
+                    'S3Uri': data_url
+                }
+            }
+        },
+        'TransformOutput': {
+            'S3OutputPath': output_url,
+        },
+        'TransformResources': {
+            'InstanceType': 'ml.m4.xlarge',
+            'InstanceCount': 123
+        }
+    }
+
+create_model_params = \
+    {
+        'ModelName': model_name,
+        'PrimaryContainer': {
+            'Image': image,
+            'ModelDataUrl': output_url,
+        },
+        'ExecutionRoleArn': role
+    }
+
 db_config = {
     'Tags': [
         {
@@ -393,6 +427,52 @@ def test_create_tuning_job_db_config(self, mock_client, mock_check_tuning):
             assert_called_once_with(**updated_config)
         self.assertEqual(response, test_arn_return)
 
+    @mock.patch.object(SageMakerHook, 'check_for_url')
+    @mock.patch.object(SageMakerHook, 'get_conn')
+    def test_create_transform_job(self, mock_client, mock_check_url):
+        mock_check_url.return_value = True
+        mock_session = mock.Mock()
+        attrs = {'create_transform_job.return_value':
+                 test_arn_return}
+        mock_session.configure_mock(**attrs)
+        mock_client.return_value = mock_session
+        hook = SageMakerHook(sagemaker_conn_id='sagemaker_test_conn_id')
+        response = hook.create_transform_job(create_transform_params,
+                                             wait_for_completion=False)
+        mock_session.create_transform_job.assert_called_once_with(
+            **create_transform_params)
+        self.assertEqual(response, test_arn_return)
+
+    @mock.patch.object(SageMakerHook, 'check_for_url')
+    @mock.patch.object(SageMakerHook, 'get_conn')
+    def test_create_transform_job_db_config(self, mock_client, mock_check_url):
+        mock_check_url.return_value = True
+        mock_session = mock.Mock()
+        attrs = {'create_transform_job.return_value':
+                 test_arn_return}
+        mock_session.configure_mock(**attrs)
+        mock_client.return_value = mock_session
+        hook_use_db_config = SageMakerHook(sagemaker_conn_id='sagemaker_test_conn_id',
+                                           use_db_config=True)
+        response = hook_use_db_config.create_transform_job(
+            create_transform_params, wait_for_completion=False)
+        updated_config = copy.deepcopy(create_transform_params)
+        updated_config.update(db_config)
+        mock_session.create_transform_job.assert_called_once_with(**updated_config)
+        self.assertEqual(response, test_arn_return)
+
+    @mock.patch.object(SageMakerHook, 'get_conn')
+    def test_create_model(self, mock_client):
+        mock_session = mock.Mock()
+        attrs = {'create_model.return_value':
+                 test_arn_return}
+        mock_session.configure_mock(**attrs)
+        mock_client.return_value = mock_session
+        hook = SageMakerHook(sagemaker_conn_id='sagemaker_test_conn_id')
+        response = hook.create_model(create_model_params)
+        mock_session.create_model.assert_called_once_with(**create_model_params)
+        self.assertEqual(response, test_arn_return)
+
     @mock.patch.object(SageMakerHook, 'get_conn')
     def test_describe_training_job(self, mock_client):
         mock_session = mock.Mock()
@@ -418,6 +498,19 @@ def test_describe_tuning_job(self, mock_client):
             assert_called_once_with(HyperParameterTuningJobName=job_name)
         self.assertEqual(response, 'InProgress')
 
+    @mock.patch.object(SageMakerHook, 'get_conn')
+    def test_describe_transform_job(self, mock_client):
+        mock_session = mock.Mock()
+        attrs = {'describe_transform_job.return_value':
+                 'InProgress'}
+        mock_session.configure_mock(**attrs)
+        mock_client.return_value = mock_session
+        hook = SageMakerHook(sagemaker_conn_id='sagemaker_test_conn_id')
+        response = hook.describe_transform_job(job_name)
+        mock_session.describe_transform_job.\
+            assert_called_once_with(TransformJobName=job_name)
+        self.assertEqual(response, 'InProgress')
+
 
 if __name__ == '__main__':
     unittest.main()
diff --git a/tests/contrib/sensors/test_sagemaker_transform_sensor.py b/tests/contrib/sensors/test_sagemaker_transform_sensor.py
new file mode 100644
index 0000000000000..bb4a184bb2797
--- /dev/null
+++ b/tests/contrib/sensors/test_sagemaker_transform_sensor.py
@@ -0,0 +1,118 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+from airflow import configuration
+from airflow.contrib.sensors.sagemaker_transform_sensor \
+    import SageMakerTransformSensor
+from airflow.contrib.hooks.sagemaker_hook import SageMakerHook
+from airflow.exceptions import AirflowException
+
+DESCRIBE_TRANSFORM_INPROGRESS_RETURN = {
+    'TransformJobStatus': 'InProgress',
+    'ResponseMetadata': {
+        'HTTPStatusCode': 200,
+    }
+}
+DESCRIBE_TRANSFORM_COMPELETED_RETURN = {
+    'TransformJobStatus': 'Compeleted',
+    'ResponseMetadata': {
+        'HTTPStatusCode': 200,
+    }
+}
+DESCRIBE_TRANSFORM_FAILED_RETURN = {
+    'TransformJobStatus': 'Failed',
+    'ResponseMetadata': {
+        'HTTPStatusCode': 200,
+    },
+    'FailureReason': 'Unknown'
+}
+DESCRIBE_TRANSFORM_STOPPING_RETURN = {
+    'TransformJobStatus': 'Stopping',
+    'ResponseMetadata': {
+        'HTTPStatusCode': 200,
+    }
+}
+DESCRIBE_TRANSFORM_STOPPED_RETURN = {
+    'TransformJobStatus': 'Stopped',
+    'ResponseMetadata': {
+        'HTTPStatusCode': 200,
+    }
+}
+
+
+class TestSageMakerTransformSensor(unittest.TestCase):
+    def setUp(self):
+        configuration.load_test_config()
+
+    @mock.patch.object(SageMakerHook, 'get_conn')
+    @mock.patch.object(SageMakerHook, 'describe_transform_job')
+    def test_raises_errors_failed_state(self, mock_describe_job, mock_client):
+        mock_describe_job.side_effect = [DESCRIBE_TRANSFORM_FAILED_RETURN]
+        sensor = SageMakerTransformSensor(
+            task_id='test_task',
+            poke_interval=2,
+            aws_conn_id='aws_test',
+            job_name='test_job_name'
+        )
+        self.assertRaises(AirflowException, sensor.execute, None)
+        mock_describe_job.assert_called_once_with('test_job_name')
+
+    @mock.patch.object(SageMakerHook, 'get_conn')
+    @mock.patch.object(SageMakerHook, '__init__')
+    @mock.patch.object(SageMakerHook, 'describe_transform_job')
+    def test_calls_until_a_terminal_state(self,
+                                          mock_describe_job, hook_init, mock_client):
+        hook_init.return_value = None
+
+        mock_describe_job.side_effect = [
+            DESCRIBE_TRANSFORM_INPROGRESS_RETURN,
+            DESCRIBE_TRANSFORM_STOPPING_RETURN,
+            DESCRIBE_TRANSFORM_STOPPED_RETURN,
+            DESCRIBE_TRANSFORM_COMPELETED_RETURN
+        ]
+        sensor = SageMakerTransformSensor(
+            task_id='test_task',
+            poke_interval=2,
+            aws_conn_id='aws_test',
+            job_name='test_job_name',
+            region_name='us-east-1'
+        )
+
+        sensor.execute(None)
+
+        # make sure we called 4 times(terminated when its compeleted)
+        self.assertEqual(mock_describe_job.call_count, 4)
+
+        # make sure the hook was initialized with the specific params
+        hook_init.assert_called_with(aws_conn_id='aws_test',
+                                     region_name='us-east-1')
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/tests/operators/test_sagemaker_create_transform_job_operator.py b/tests/operators/test_sagemaker_create_transform_job_operator.py
new file mode 100644
index 0000000000000..a8701530d9daa
--- /dev/null
+++ b/tests/operators/test_sagemaker_create_transform_job_operator.py
@@ -0,0 +1,140 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+from airflow import configuration
+from airflow.contrib.hooks.sagemaker_hook import SageMakerHook
+from airflow.contrib.operators.sagemaker_create_transform_job_operator \
+    import SageMakerCreateTransformJobOperator
+from airflow.exceptions import AirflowException
+
+role = 'test-role'
+
+bucket = 'test-bucket'
+
+key = 'test/data'
+data_url = 's3://{}/{}'.format(bucket, key)
+
+job_name = 'test-job-name'
+
+model_name = 'test-model-name'
+
+image = 'test-image'
+
+output_url = 's3://{}/test/output'.format(bucket)
+
+create_transform_params = \
+    {
+        'TransformJobName': job_name,
+        'ModelName': model_name,
+        'BatchStrategy': 'MultiRecord',
+        'TransformInput': {
+            'DataSource': {
+                'S3DataSource': {
+                    'S3DataType': 'S3Prefix',
+                    'S3Uri': data_url
+                }
+            }
+        },
+        'TransformOutput': {
+            'S3OutputPath': output_url,
+        },
+        'TransformResources': {
+            'InstanceType': 'ml.m4.xlarge',
+            'InstanceCount': 123
+        }
+    }
+
+create_model_params = \
+    {
+        'ModelName': model_name,
+        'PrimaryContainer': {
+            'Image': image,
+            'ModelDataUrl': output_url,
+        },
+        'ExecutionRoleArn': role
+    }
+
+
+class TestSageMakertransformOperator(unittest.TestCase):
+
+    def setUp(self):
+        configuration.load_test_config()
+        self.sagemaker = SageMakerCreateTransformJobOperator(
+            task_id='test_sagemaker_operator',
+            sagemaker_conn_id='sagemaker_test_id',
+            transform_job_config=create_transform_params,
+            model_config=create_model_params,
+            region_name='us-west-2',
+            use_db_config=True,
+            wait_for_completion=False,
+            check_interval=5
+        )
+
+    @mock.patch.object(SageMakerHook, 'get_conn')
+    @mock.patch.object(SageMakerHook, 'create_model')
+    @mock.patch.object(SageMakerHook, 'create_transform_job')
+    @mock.patch.object(SageMakerHook, '__init__')
+    def test_hook_init(self, hook_init, mock_transform, mock_model, mock_client):
+        mock_transform.return_value = {"TransformJobArn": "testarn",
+                                       "ResponseMetadata":
+                                       {"HTTPStatusCode": 200}}
+        hook_init.return_value = None
+        self.sagemaker.execute(None)
+        hook_init.assert_called_once_with(
+            sagemaker_conn_id='sagemaker_test_id',
+            region_name='us-west-2',
+            use_db_config=True,
+            check_interval=5,
+            max_ingestion_time=None
+        )
+
+    @mock.patch.object(SageMakerHook, 'get_conn')
+    @mock.patch.object(SageMakerHook, 'create_model')
+    @mock.patch.object(SageMakerHook, 'create_transform_job')
+    def test_execute_without_failure(self, mock_transform, mock_model, mock_client):
+        mock_transform.return_value = {"TransformJobArn": "testarn",
+                                       "ResponseMetadata":
+                                       {"HTTPStatusCode": 200}}
+        self.sagemaker.execute(None)
+        mock_model.assert_called_once_with(create_model_params)
+        mock_transform.assert_called_once_with(create_transform_params,
+                                               wait_for_completion=False
+                                               )
+
+    @mock.patch.object(SageMakerHook, 'get_conn')
+    @mock.patch.object(SageMakerHook, 'create_model')
+    @mock.patch.object(SageMakerHook, 'create_transform_job')
+    def test_execute_with_failure(self, mock_transform, mock_model, mock_client):
+        mock_transform.return_value = {"TransformJobArn": "testarn",
+                                       "ResponseMetadata":
+                                       {"HTTPStatusCode": 404}}
+        self.assertRaises(AirflowException, self.sagemaker.execute, None)
+
+
+if __name__ == '__main__':
+    unittest.main()