diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java
index b257594ea..635bc92d5 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java
@@ -82,6 +82,7 @@ class MessageDispatcher {
   private final FlowController flowController;
 
   private AtomicBoolean exactlyOnceDeliveryEnabled = new AtomicBoolean(false);
+  private AtomicBoolean messageOrderingEnabled = new AtomicBoolean(false);
 
   private final Waiter messagesWaiter;
 
@@ -343,6 +344,11 @@ void setExactlyOnceDeliveryEnabled(boolean exactlyOnceDeliveryEnabled) {
     }
   }
 
+  @InternalApi
+  void setMessageOrderingEnabled(boolean messageOrderingEnabled) {
+    this.messageOrderingEnabled.set(messageOrderingEnabled);
+  }
+
   private static class OutstandingMessage {
     private final ReceivedMessage receivedMessage;
     private final AckHandler ackHandler;
@@ -506,7 +512,7 @@ public void run() {
             }
           }
         };
-    if (message.getOrderingKey().isEmpty()) {
+    if (!messageOrderingEnabled.get() || message.getOrderingKey().isEmpty()) {
       executor.execute(deliverMessageTask);
     } else {
       sequentialExecutor.submit(message.getOrderingKey(), deliverMessageTask);
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java
index 014771f2a..7849bdb74 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java
@@ -236,9 +236,12 @@ public void onResponse(StreamingPullResponse response) {
 
       boolean exactlyOnceDeliveryEnabledResponse =
           response.getSubscriptionProperties().getExactlyOnceDeliveryEnabled();
+      boolean messageOrderingEnabledResponse =
+          response.getSubscriptionProperties().getMessageOrderingEnabled();
 
       setExactlyOnceDeliveryEnabled(exactlyOnceDeliveryEnabledResponse);
       messageDispatcher.setExactlyOnceDeliveryEnabled(exactlyOnceDeliveryEnabledResponse);
+      messageDispatcher.setMessageOrderingEnabled(messageOrderingEnabledResponse);
       messageDispatcher.processReceivedMessages(response.getReceivedMessagesList());
 
       // Only request more if we're not shutdown.
diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java
index 9321272b4..c608ee8d5 100644
--- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java
+++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java
@@ -30,18 +30,41 @@
 import java.util.concurrent.*;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.threeten.bp.Duration;
 
 public class MessageDispatcherTest {
   private static final ByteString MESSAGE_DATA = ByteString.copyFromUtf8("message-data");
   private static final int DELIVERY_INFO_COUNT = 3;
   private static final String ACK_ID = "ACK-ID";
+  private static final String ORDERING_KEY = "KEY";
   private static final ReceivedMessage TEST_MESSAGE =
       ReceivedMessage.newBuilder()
           .setAckId(ACK_ID)
           .setMessage(PubsubMessage.newBuilder().setData(MESSAGE_DATA).build())
           .setDeliveryAttempt(DELIVERY_INFO_COUNT)
           .build();
+  private static final ByteString ORDERED_MESSAGE_DATA_1 = ByteString.copyFromUtf8("message-data1");
+  private static final ReceivedMessage ORDERED_TEST_MESSAGE_1 =
+      ReceivedMessage.newBuilder()
+          .setAckId("ACK-ID-1")
+          .setMessage(
+              PubsubMessage.newBuilder()
+                  .setData(ORDERED_MESSAGE_DATA_1)
+                  .setOrderingKey(ORDERING_KEY)
+                  .build())
+          .build();
+  private static final ByteString ORDERED_MESSAGE_DATA_2 = ByteString.copyFromUtf8("message-data2");
+  private static final ReceivedMessage ORDERED_TEST_MESSAGE_2 =
+      ReceivedMessage.newBuilder()
+          .setAckId("ACK-ID-2")
+          .setMessage(
+              PubsubMessage.newBuilder()
+                  .setData(ORDERED_MESSAGE_DATA_2)
+                  .setOrderingKey(ORDERING_KEY)
+                  .build())
+          .build();
   private static final int MAX_SECONDS_PER_ACK_EXTENSION = 60;
   private static final int MIN_ACK_DEADLINE_SECONDS = 10;
   private static final Duration MAX_ACK_EXTENSION_PERIOD = Duration.ofMinutes(60);
@@ -494,6 +517,84 @@ public void testAckExtensionDefaultsExactlyOnceDeliveryEnabledThenDisabled() {
         Math.toIntExact(Subscriber.MAX_STREAM_ACK_DEADLINE.getSeconds()));
   }
 
+  @Test
+  public void testOrderedDeliveryOrderingDisabled() throws Exception {
+    MessageReceiver mockMessageReceiver = mock(MessageReceiver.class);
+    MessageDispatcher messageDispatcher =
+        getMessageDispatcher(mockMessageReceiver, Executors.newFixedThreadPool(5));
+
+    // This would normally be set from the streaming pull response in the
+    // StreamingSubscriberConnection
+    messageDispatcher.setMessageOrderingEnabled(false);
+
+    CountDownLatch receiveCalls = new CountDownLatch(2);
+
+    doAnswer(
+            new Answer<Void>() {
+              public Void answer(InvocationOnMock invocation) throws Exception {
+                Thread.sleep(1000);
+                receiveCalls.countDown();
+                return null;
+              }
+            })
+        .when(mockMessageReceiver)
+        .receiveMessage(eq(ORDERED_TEST_MESSAGE_1.getMessage()), any(AckReplyConsumer.class));
+    doAnswer(
+            new Answer<Void>() {
+              public Void answer(InvocationOnMock invocation) {
+                // Ensure the previous method didn't finish and we could process in parallel.
+                assertEquals(2, receiveCalls.getCount());
+                receiveCalls.countDown();
+                return null;
+              }
+            })
+        .when(mockMessageReceiver)
+        .receiveMessage(eq(ORDERED_TEST_MESSAGE_2.getMessage()), any(AckReplyConsumer.class));
+
+    messageDispatcher.processReceivedMessages(
+        Arrays.asList(ORDERED_TEST_MESSAGE_1, ORDERED_TEST_MESSAGE_2));
+    receiveCalls.await();
+  }
+
+  @Test
+  public void testOrderedDeliveryOrderingEnabled() throws Exception {
+    MessageReceiver mockMessageReceiver = mock(MessageReceiver.class);
+    MessageDispatcher messageDispatcher =
+        getMessageDispatcher(mockMessageReceiver, Executors.newFixedThreadPool(5));
+
+    // This would normally be set from the streaming pull response in the
+    // StreamingSubscriberConnection
+    messageDispatcher.setMessageOrderingEnabled(true);
+
+    CountDownLatch receiveCalls = new CountDownLatch(2);
+
+    doAnswer(
+            new Answer<Void>() {
+              public Void answer(InvocationOnMock invocation) throws Exception {
+                Thread.sleep(1000);
+                receiveCalls.countDown();
+                return null;
+              }
+            })
+        .when(mockMessageReceiver)
+        .receiveMessage(eq(ORDERED_TEST_MESSAGE_1.getMessage()), any(AckReplyConsumer.class));
+    doAnswer(
+            new Answer<Void>() {
+              public Void answer(InvocationOnMock invocation) {
+                // Ensure the previous method has finished completely.
+                assertEquals(1, receiveCalls.getCount());
+                receiveCalls.countDown();
+                return null;
+              }
+            })
+        .when(mockMessageReceiver)
+        .receiveMessage(eq(ORDERED_TEST_MESSAGE_2.getMessage()), any(AckReplyConsumer.class));
+
+    messageDispatcher.processReceivedMessages(
+        Arrays.asList(ORDERED_TEST_MESSAGE_1, ORDERED_TEST_MESSAGE_2));
+    receiveCalls.await();
+  }
+
   @Test
   public void testAckExtensionCustomMinExactlyOnceDeliveryDisabledThenEnabled() {
     int customMinSeconds = 30;
@@ -569,20 +670,28 @@ private void assertMinAndMaxAckDeadlines(
   }
 
   private MessageDispatcher getMessageDispatcher() {
-    return getMessageDispatcher(mock(MessageReceiver.class));
+    return getMessageDispatcher(mock(MessageReceiver.class), MoreExecutors.directExecutor());
   }
 
   private MessageDispatcher getMessageDispatcher(MessageReceiver messageReceiver) {
-    return getMessageDispatcherFromBuilder(MessageDispatcher.newBuilder(messageReceiver));
+    return getMessageDispatcherFromBuilder(
+        MessageDispatcher.newBuilder(messageReceiver), MoreExecutors.directExecutor());
+  }
+
+  private MessageDispatcher getMessageDispatcher(
+      MessageReceiver messageReceiver, Executor executor) {
+    return getMessageDispatcherFromBuilder(MessageDispatcher.newBuilder(messageReceiver), executor);
   }
 
   private MessageDispatcher getMessageDispatcher(
       MessageReceiverWithAckResponse messageReceiverWithAckResponse) {
     return getMessageDispatcherFromBuilder(
-        MessageDispatcher.newBuilder(messageReceiverWithAckResponse));
+        MessageDispatcher.newBuilder(messageReceiverWithAckResponse),
+        MoreExecutors.directExecutor());
   }
 
-  private MessageDispatcher getMessageDispatcherFromBuilder(MessageDispatcher.Builder builder) {
+  private MessageDispatcher getMessageDispatcherFromBuilder(
+      MessageDispatcher.Builder builder, Executor executor) {
     MessageDispatcher messageDispatcher =
         builder
             .setAckProcessor(mockAckProcessor)
@@ -594,7 +703,7 @@ private MessageDispatcher getMessageDispatcherFromBuilder(MessageDispatcher.Buil
             .setMaxDurationPerAckExtensionDefaultUsed(true)
             .setAckLatencyDistribution(mock(Distribution.class))
             .setFlowController(mock(FlowController.class))
-            .setExecutor(MoreExecutors.directExecutor())
+            .setExecutor(executor)
             .setSystemExecutor(systemExecutor)
             .setApiClock(clock)
             .build();