-
Notifications
You must be signed in to change notification settings - Fork 13
Conversation
Current coverage is 97.55% (diff: 96.32%)@@ master #142 diff @@
==========================================
Files 9 10 +1
Lines 640 737 +97
Methods 0 0
Messages 0 0
Branches 0 0
==========================================
+ Hits 627 719 +92
- Misses 13 18 +5
Partials 0 0
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really good. I have a few things that I would like to poke at. :-)
|
||
return self._last_operation | ||
|
||
def _poll(self, timeout=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One useful change I might recommend here is for the polling timer to "back off". It would be pretty easy for someone to fall into the trap of calling result(timeout=None)
on an operation that will take (for example) five minutes to complete, and this method would end up polling every second -- meaning this is 300 API requests.
While there are several ways to approach this, I would recommend a recursive algorithm where the wait time is a parameter:
def _poll(self, timeout=None, increment=1):
"""Poll the LRO server to see if the operation has completed. If not, sleep for
`increment` seconds and then try again.
"""
# Sanity check: If we have exhausted our timeout, error out.
if timeout is not None and timeout < 0:
raise futures.TimeoutError()
# Poll the server. If the operation has completed, return the Operation instance
# with the result.
if self._get_operation().done:
return self._last_operation
# Okay, we are not done yet. Wait a little while and poll again.
time.sleep(increment)
return self._poll(
increment=min([30, increment * 2]),
timeout=timeout - increment,
)
This will -- assuming I implemented it correctly ;-) -- poll the server at the following increments: 1, 2, 4, 8, 16, 30, 30, 30, 30...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have an exponential backoff algorithm for retrying API calls with transient errors. We should reuse that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Ruby and Node I opted to use exponential backoff. All gax libraries have a BackoffSettings
class used for retryable api calls, I used this class to specify the backoff settings used for operation polling. Here is an example of it being used for retryable api calls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have an exponential backoff algorithm for retrying API calls with transient errors. We should reuse that.
Oh. Yeah, definitely use something already baked. :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added backoff
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would suggest reusing (or refactoring to make reusable) the code in api_callable.py
that @landrito linked to. I suspect you should be able to use the _retryable
function as-is, specifying the numerical parameters as a RetryOptions
object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_retryable
doesn't support infinite timeouts as-is. It's really messy, trying to reuse it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you refactor the generic (incl. infinite timeout support) exponential backoff algorithm into its own module retry
and then import retry
in both here and in api_callable
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@geigerj Refactored into a retry module, PTAL
import unittest2 | ||
|
||
from fixture_pb2 import Simple |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain to me what this is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
""" | ||
return _from_any(self._metadata_type, self._first_operation.metadata) | ||
|
||
def last_operation_data(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the docstring here is probably wrong? It says that this method will block if the first call is not done yet, but self._last_operation
is set by the constructor (and the argument to the constructor is required). As best as I can tell, there is not an actual situation where this method will block.
I am not sure whether the correct solution is to amend the comment or the code.
Also, do we want a public-facing method that is not part of the c.f.Future
interface? I can think of several reasons why maybe we do; I am just bringing it up to ensure it is an explicit decision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like the docstring is wrong. That's my fault, since I added this line in the design doc.
IIUC, we are waiting until the first call to get the longrunning.Operation
object is completed before we return an OperationFuture
wrapper, so the condition regarding the first call must be satisfied already.
@garrettjonesgoogle I copied this comment from your Java design. Are we missing anything here regarding "Blocks if the first call isn't done yet"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lukesneeringer Yes, we do want these extra methods that are not part of c.f.Future
. They provide access for power users to the actual meta-API mechanism that sits behind this wrapper.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, we are waiting until the first call to get the
longrunning.Operation
object is completed before we return anOperationFuture
wrapper, so the condition regarding the first call must be satisfied already.
That seems right. Regardless of the nitty-gritty, it is definitely the case that by the time this class is instantiated, there is at least one Operation
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed doctring
|
||
threading.Thread(target=_execute_clbk, args=(self, done_clbk)).start() | ||
|
||
def operation_name(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My inquiry about the blocking component in last_operation_data
(below, sorry) applies to this as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my response there
"""Issues OperationsApi.get_operation and returns value of Operation.done.""" | ||
return self._get_operation().done | ||
|
||
def add_done_callback(self, done_clbk): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few thoughts on this:
First, while it is a bit of a nitpick, I would ask that the positional argument be spelled fn
to match c.f.Future
and asyncio
. It is possible to pass positional arguments using their keyword names, and many people do.
Second, I am having a debate with myself over the thread here. Part of me thinks that this method should return the Thread
object (so that the programmer would have the option of calling thread.join()
later). On the other hand, I do not particularly want to be tied to this exact implementation; we might want to change this to use gevent or something. Since c.f.Future
and asyncio
do not expect this to return, probably we should leave it.
Finally, it is worth noting that the threading
module is relatively poorly implemented in Python and is generally disfavored (with multiprocess
being preferred). If this is something you can change quickly, it would probably be preferable. That said, I also think it is not the end of the world if that is time-consuming, especially since @bjwatson has already fingered this spot for a potential improvement as we discover how it is actually used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 to fn
.
I also agree with switching to multiprocess
, since Luke says its preferred among the Python community. However, let's not expose our implementation to the caller. We won't necessarily know what the thread or subprocess is at the time of calling add_done_callback()
; that could be dynamically determined at the time of executing the callback. The primary thread would need to use a monitor object or something like that if it needs to know when the callback has been triggered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another reason not to expose the implementation to the caller is that there are several reasons why we might change it in the future (for example, by implementing our own event loop as discussed elsewhere, although that is not the only reason that might happen).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- now
fn
- still hiding implementation details
- using
mp
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is close!
'Could not convert {} to {}'.format( | ||
any_pb.__class__.__name__, pb_type.__name__)) | ||
|
||
return pb_type.FromString(any_pb.value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be best to use the unpack instance method from here.
ie
def _from_any(pb_type, any_pb):
msg = pb_type()
if not any_pb.unpack(msg):
raise TypeError(
'Could not convert {} to {}'.format(
any_pb.__class__.__name__, pb_type.__name__))
return msg
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL! Thanks @landrito
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Switched to unpack
"""Returns the value of Operation.metadata from the initial Operation object | ||
returned from the first call. Blocks if the first call isn't done yet. | ||
""" | ||
return _from_any(self._metadata_type, self._first_operation.metadata) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From what I have seen, changes to metadata is meaningful. For instance, the AsyncRecognizeMetadata
message contains a percent complete field. This method would be more usable if it returns the value of the _last_operation
metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you. I copied this semantic from @garrettjonesgoogle's design for Java, and asked him about it in one of my review comments here.
|
||
return self._last_operation | ||
|
||
def _poll(self, timeout=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Ruby and Node I opted to use exponential backoff. All gax libraries have a BackoffSettings
class used for retryable api calls, I used this class to specify the backoff settings used for operation polling. Here is an example of it being used for retryable api calls.
call_options (google.gax.CallOptions, optional): the call options | ||
that are used when reloading the operation. | ||
""" | ||
self._first_operation = operation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think _first_operation
is a needed property. The _last_operation
property can be used where this is used per my comment on the metadata method and since the operation name will not change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we anticipate a situation where we might, in the future, have data that mutates between the first operation and the most recent one, where we might want to preserve the first for reference?
On the other hand, even if we do, we can always re-add _first_operation
then. It is one line of code. :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a case arises, the user can just get the needed information before polling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed _first_operation
except Exception as ex: # pylint: disable=broad-except | ||
_LOG.exception(ex) | ||
|
||
threading.Thread(target=_execute_clbk, args=(self, done_clbk)).start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will cause many more api calls than are necessary since each thread will do it's own polling. A solution would be to have the logic for this method be as follows:
- If the operation is done, execute the callback and return
- Add the callback to a list of callbacks
- If not started, start polling thread that once polling is completed call all callbacks in the list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point!
Regarding #1, we might still want to execute the callback in a separate thread. The expected performance characteristics of this method is that is just registers the callback and returns right away. The callback might have a non-trivial running time.
The design for having just one polling loop makes a lot of sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good! I have several comments, but this looks close to done.
Please note that one of my comments refers to rebuilding the LRO GAPIC code and including it in this PR.
"""Issues OperationsApi.get_operation and returns value of Operation.done.""" | ||
return self._get_operation().done | ||
|
||
def add_done_callback(self, done_clbk): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 to fn
.
I also agree with switching to multiprocess
, since Luke says its preferred among the Python community. However, let's not expose our implementation to the caller. We won't necessarily know what the thread or subprocess is at the time of calling add_done_callback()
; that could be dynamically determined at the time of executing the callback. The primary thread would need to use a monitor object or something like that if it needs to know when the callback has been triggered.
""" | ||
return _from_any(self._metadata_type, self._first_operation.metadata) | ||
|
||
def last_operation_data(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like the docstring is wrong. That's my fault, since I added this line in the design doc.
IIUC, we are waiting until the first call to get the longrunning.Operation
object is completed before we return an OperationFuture
wrapper, so the condition regarding the first call must be satisfied already.
@garrettjonesgoogle I copied this comment from your Java design. Are we missing anything here regarding "Blocks if the first call isn't done yet"?
""" | ||
return _from_any(self._metadata_type, self._first_operation.metadata) | ||
|
||
def last_operation_data(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lukesneeringer Yes, we do want these extra methods that are not part of c.f.Future
. They provide access for power users to the actual meta-API mechanism that sits behind this wrapper.
|
||
return self._last_operation | ||
|
||
def _poll(self, timeout=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have an exponential backoff algorithm for retrying API calls with transient errors. We should reuse that.
return pb_type.FromString(any_pb.value) | ||
|
||
|
||
class ResultError(GaxError): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem to add much to justify another error type. We should just wrap the operation_error
in a GaxError
(as the cause
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I disagree. :-) I think there could be a case where a developer wants to catch result errors but does not want to catch other ways that a GaxError
might be thrown, now or in the future. The subclass makes this really easy, and is much cleaner than catching GaxError
, checking cause
, and then re-raising if you do not recognize it.
(And nothing stops someone from catching GaxError
and getting these along with it.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @bjwatson here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Going with @bjwatson for now. Currently a subclass is YAGNI.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. The only GaxError
subclass we've defined so far is RetryError
, and that's only because it pertains to a specific client-side feature that we implement (automatic retry for transient errors on idempotent methods). Errors associated with LRO will be more along the lines of server-side errors we get from synchronous methods, which we just wrap in plain GaxError
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still disagree, but am entirely happy to be outvoted. :-)
"""If last Operation's value of `done` is true, returns false; | ||
otherwise, issues OperationsApi.cancel_operation and returns true. | ||
""" | ||
if not self._last_operation.done: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be self._get_operation()
to know for sure whether it's done.
|
||
def cancelled(self): | ||
"""Return True if the call was successfully cancelled.""" | ||
return self._cancelled |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessarily. We need to check the _client
to know for sure. See the cancel_operation()
documentation.
Therefore, we should only set self._cancelled
when the operations API tells us that it's been cancelled, and the only purpose of it is to cache that it's been cancelled once we know for sure that it has been.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do agree here.
self._client.cancel_operation(self._last_operation.name) | ||
self._cancelled = True | ||
|
||
return self._cancelled |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment below for cancelled()
. The return value from this method just indicates whether we attempted to do the cancellation. However, the cancellation itself is asynchronous, so the cancelled()
call needs to check if it was actually cancelled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, are you sure? The asyncio.Future.cancel()
documentation (somewhat ambiguously) suggests the implementation @eoogbe has now. And an upside to her implementation is that the cancel
request is guaranteed to be non-blocking.
There is also a potential race condition: what if you send cancel
, then ask if it was cancelled, but the cancellation is still pending. Under what you suggest, we would return False
there, but really True
is the proper response.
OTOH: An alternative reading is that "change the Future's state to cancelled" assumes actual cancellation success, and that we should return False if unable.
I think the only way to do this if we are trying to report on actual cancellation is if self._client.cancel_operation
actually returns True
or False
based on its ability to cancel. If it does, we should set self._cancelled
to that value and then return it. If it does not, then I think we should probably just return True
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was following c.f.Future.cancel()
, which says to return True if cancelling, not just when cancelled. (In fact, when it's already cancelled, I return False.) The cancelled
method has the semantics you're referring to, so I should change that one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The c.f.Future.cancel()
docs actually seem like a stronger case for @bjwatson's position to me.
I think where I am at at this point is that if you can get "cancelable" status with no race condition, return that. If you have to poll to see if it was actually cancelled, I think that would be a mistake and returning True
would be safer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lukesneeringer I think we're all in agreement. I'm completely okay with the implementation that @eoogbe has for this method (cancel()
); it matches the c.f.Future
doc and it's non-blocking.
I'm just not sure it's worth cacheing self._cancelled
here, because the cancelled()
method needs to check the Operations API to see if the cancellation actually happened. If the cancelled()
method finds out that it did, then it can cache a self._cancelled
value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ooooh, I see. You are not saying that the method should not return True
naively; you are saying it should not save self._canceled
. I agree.
start_time = time.time() | ||
|
||
while timeout is None or time.time() < start_time + timeout: | ||
if self._get_operation().done: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be simplified to if done()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if self.done():
:-)
@@ -54,6 +54,7 @@ | |||
'ply==3.8', | |||
'protobuf>=3.0.0, <4.0dev', | |||
'oauth2client>=2.0.0, <4.0dev', | |||
'googleapis-common-protos>=1.5.0', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add an upper-bound: googleapis-common-protos>=1.5.0, <2.0dev
@dhermes @daspecster @tseaver I'd like to invite your feedback as well. This is the interface we're developing to improve the interface for LRO. For example, Speech |
import time | ||
import threading | ||
from concurrent import futures | ||
from .errors import GaxError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why relative imports ever?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should change this to from google.gax.errors import GaxError
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. @eoogbe our Python style is to always use absolute imports. See https://google.github.io/styleguide/pyguide.html#Imports. I'm sure there's a PEP that says something similar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, absolute imports are almost always preferred. (There are some rare corner cases.)
'Could not convert {} to {}'.format( | ||
any_pb.__class__.__name__, pb_type.__name__)) | ||
|
||
return pb_type.FromString(any_pb.value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL! Thanks @landrito
return pb_type.FromString(any_pb.value) | ||
|
||
|
||
class ResultError(GaxError): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @bjwatson here
|
||
def __init__(self, operation, client, result_type, metadata_type, | ||
call_options=None): | ||
"""Constructor. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am with @lukesneeringer here. An alternative would be to have a non-public factory constructor that manually mucks with instance attributes that aren't public. For example, you'd "never" expect a user to pass metadata_type
to the constructor (though us upstream library dev "users" would be fine doing that)
target=_execute_clbks, args=(self,)) | ||
self._process.start() | ||
elif not self._process.is_alive() and self._last_operation.done: | ||
_execute_clbks(self) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Be careful here. This can cause some callbacks to be called twice since execute_clbks always calls every callback in the list.
For example:
Register cb A.
Register cb B.
Polling Completes.
Run cb A and cb B.
Some time.
Register cb C.
Run cb A, cb B, cb C
def cancelled(self): | ||
"""Return True if the call was successfully cancelled.""" | ||
self._get_operation() | ||
return self._last_operation.HasField('error') and \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: using a line continuation is a little weird, instead of something like the following:
return (self._last_operation.HasField('error') and
self._last_operation.error.code == code_pb2.CANCELLED)
PTAL (Sorry about spamming with the individual comments. I didn't realize this convention.) |
|
||
return self._last_operation | ||
|
||
def _poll(self, timeout=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would suggest reusing (or refactoring to make reusable) the code in api_callable.py
that @landrito linked to. I suspect you should be able to use the _retryable
function as-is, specifying the numerical parameters as a RetryOptions
object.
|
||
def __init__(self, operation, client, result_type, metadata_type, | ||
call_options=None): | ||
"""Constructor. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lukesneeringer It's not strange that the GAPIC docstrings will reference an underscore-prefixed class in GAX (i.e., the generated client will have a method that returns: A :class:`google.gax._OperationsFuture` instance
)?
call_options (google.gax.CallOptions, optional): the call options | ||
that are used when reloading the operation. | ||
""" | ||
self._last_operation = operation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why _last_operation
instead of just _operation
? The "last" implies to me that it might hold over time a sequence of operations, rather than a single one whose status is being updated.
client (google.gapic.longrunning.operations_client.OperationsClient): | ||
a client for the long-running operation service. | ||
result_type (type): the class type of the result. | ||
metadata_type (type): the class type of the metadata. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's optional for an Operation to have metadata (see: https://github.com/googleapis/googleapis/blob/master/google/longrunning/operations.proto#L90). Should this be correspondingly optional, or else have documented behavior when calling metadata()
method if metadata_type
is None
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not optional in ruby, but it makes sense to do so. I'll add a check if the metadata is None
and keep the TypeError
if the metadata_type
is not given when there's metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For ruby and node, the user will specify the response or metadata type as google.protobuf.Empty as shown in the cloud functions gapic yaml.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Understood. Not necessary to change the logic here if metadata will just be an Empty
instance in that case.
self._process = multiprocessing.Process( | ||
target=_execute_clbks, args=(self,)) | ||
self._process.start() | ||
mp.Process(target=self._execute_clbks).start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sure that self._process is getting assigned.
self._process = mp.Process(target=self._execute_clbks).start()
self._process = multiprocessing.Process( | ||
target=_execute_clbks, args=(self,)) | ||
self._process.start() | ||
mp.Process(target=self._execute_clbks).start() | ||
elif not self._process.is_alive() and self._last_operation.done: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect that there is a race condition here. If the _execute_clbks
process gets blocked after the loop in line 654 finishes, the fn
could be added to the queue before before the _execute_clbks
process is killed but after the callback pool is made, causing the fn
to be lost.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed add_done_callback
to use a list, so that it more closely matches the c.f.Future
spec. I think that gets rid of the race condition as well.
Reducing concurrency simplifies the implementation and makes it closer to the expected behavior of c.f.Future.
PTAL Also, not sure why Travis fails on 3.5. I think it may be a bug. Can I safely ignore? |
No, but I will help you fix it. :-) |
Trying to fix Travis CI error: "AttributeError: '_NamespacePath' object has no attribute 'sort'"
Still trying to fix the Travis CI error.
Still fixing Travis CI error.
This should actually fix the Travis CI error.
The Travis CI error appears to be an issue with pip 9.0.1. I think it's happening because google is a namespace package and google-gax is not. Not sure if this is fixable without downgrading/waiting on an upgrade from pip. |
from google.gax import (BackoffSettings, BundleOptions, bundling, _CallSettings, | ||
config, errors, PageIterator, ResourceIterator, | ||
RetryOptions) | ||
from google.gax.retry import add_timeout_arg, retryable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style: from google.gax import retry
(Note: Google style is only to import modules and packages; in this project, we additionally allow classes.)
|
||
|
||
def _has_timeout_settings(backoff_settings): | ||
return backoff_settings.rpc_timeout_multiplier is not None and \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style: use parens for line continuation rather than backslashes
|
||
raise exc | ||
|
||
return _retryable_without_timeout(inner, retry_options, **kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks to me like it retries inner
, and inner
itself retries with no delay; you effective have nested retry while
loops. So if I understand right, it'll do something like:
call1: timeout 10, delay 1
call2: timeout 20, delay 1
call3: timeout 40, delay 1
call4: timeout 10, delay 1.5
call5: timeout 20, delay 1.5
call6: timeout 40, delay 1.5
call7: timeout 10, delay 2.25
...
instead of
call1: timeout 10, delay 1
call2: timeout 20, delay 1.5
call3: timeout 40, delay 2.25
...
I think this might be more easily done with a single retryable function essentially identical the one previously in api_callable
with some conditionals only to update the timeout if applicable. Sort of like (not real Python):
def retryable(...):
def inner(...):
...
timeout = {timeout setting} if {timeout setting} else None
now = time.time()
deadline = now + {total_timeout} if {total_timeout} else None
while deadline is None or now < deadline
to_call = a_func
try:
if timeout:
to_call = add_timeout_arg(to_call, ...)
to_call()
except:
[update delay]
if timeout:
[update timeout]
...
Will something like that work here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I was having trouble finding a clean way to achieve this, but it looks like I have to check conditionals everywhere to get it to work. Fixed now.
Prevents double while loop when retrying with timeout.
@@ -0,0 +1,279 @@ | |||
# Copyright 2016 Google Inc. All rights reserved. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to switch this to BSD to conform to the rest of GAX. This is supposed to be configured in the GAPIC config here, but I expect it's not working because Python isn't on MVVM and the license is hardcoded into the template files.
In a separate toolkit PR, could you update the Python templates to select the correct license based on that GAPIC config field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you update this manually until we can regenerate the package in api-client-staging
? (Or, preferably, update the package in api-client-staging
and copy it back into here.)
@lukesneeringer @landrito @geigerj LGTY? |
Looking right now! |
from .errors import RetryError | ||
from google.gax import (BackoffSettings, BundleOptions, bundling, _CallSettings, | ||
config, errors, PageIterator, ResourceIterator, | ||
RetryOptions, retry as rt) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think retry
is more readable than rt
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
retry
is already a variable name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, OK. This is fine, then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the same system that you use to solve a name collision with the stdlib: retry_
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. I'd add also that I'd prefer that, if there's a naming collision between a module and a local variable, the local variable rather than the module be renamed.
|
||
my_callable = retry.retryable(mock_call, retry_options) | ||
|
||
self.assertRaises(errors.RetryError, my_callable) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a different condition than what the old test checked. Ideally, we should check that both
- a
RetryError
is raised - the error has the right cause
|
||
@mock.patch('google.gax.config.exc_to_code') | ||
@mock.patch('time.time') | ||
def test_retryable_with_timeout(self, mock_time, mock_exc_to_code): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like some of the old tests didn't make it into this file?
- function raises
CustomException
->CustomException
is set as theRetryError
cause - function has a timeout and throws
to_attempt
retryable codes before succeeding -> call count should beto_attempt
+ 1 - function does not retry when the status code list is nonempty in the
Retry
configuration, but the status code returned by the function isn't in the list
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
old retry tests added back in, PTAL
@@ -0,0 +1,279 @@ | |||
# Copyright 2016 Google Inc. All rights reserved. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you update this manually until we can regenerate the package in api-client-staging
? (Or, preferably, update the package in api-client-staging
and copy it back into here.)
self._process = None | ||
|
||
def cancel(self): | ||
"""If last Operation's value of `done` is true, returns false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: for consistency with elsewhere in GAX, don't indent the docstrings indent relative to the """
.
@@ -4,7 +4,7 @@ pytest-cov>=1.8.1 | |||
pytest-timeout>=1.0.0 | |||
unittest2>=1.1.0 | |||
grpcio-tools>=1.0.0 | |||
google-auth>=0.2.0 | |||
google-auth>=0.5.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this change connected to this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was part of trying to fix the Travis CI error. Should I change it back?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, let's revert the changes made when working on the Travis error unless they're related to the rest of this PR.
self._process = mp.Process(target=self._execute_clbks) | ||
self._process.start() | ||
elif not self._process.is_alive() and self._operation.done: | ||
_try_callback(self, fn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would be better written as such to avoid storing callbacks when it is unnecessary.
if self._process.is_alive() and self._operation.done:
_try_callback(self, fn)
return
self._done_clbks.append(fn)
if self._process is None:
self._process = mp.Process(target=self._execute_clbks)
self._process.start()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it ends up looking messier because I have to check if the process is None twice:
if self._process is not None and not self._process.is_alive() and self._operation.done:
_try_callback(self, fn)
return
self._done_clbks.append(fn)
if self._process is None:
self._process = mp.Process(target=self._execute_clbks)
self._process.start()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah good point! Maybe this?
if self._process is None:
self._done_clbks.append(fn)
self._process = mp.Process(target=self._execute_clbks)
self._process.start()
elif not self._process.is_alive() and self._operation.done:
_try_callback(self, fn)
else
self._done_clbks.append(fn)
_try_callback(self, fn) | ||
|
||
def operation_name(self): | ||
"""Returns the value of Operation.name from the last call to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Specifying which call the operation name comes from makes it seem like the operation name can change between calls. You should remove this specification to avoid confusion.
""" | ||
return self._operation | ||
|
||
def _get_operation(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be a useful method to expose publicly to allow users to implement their own polling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Polling can already be implemented in terms of the public methods done
and last_operation_data
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see. To me, done
-> last_operation_data
, seems a bit of a round about way for a user to simply ask the server for the for the current operation. If you see no problem with that, then just leave as is. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the user is polling, they will check if the operation is done first anyway. Is there a usecase where the user would implement polling without checking if done?
Though perhaps, it'd be better to implement last_operation_data
as _get_operation
. I don't see why the user would want the stale data for the supposed last operation. @bjwatson Does this make sense to change?
|
||
return self._operation | ||
|
||
def _done_check(self, _timeout): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of things I'm unsure about:
I think this would be better as a nested function inside _poll
, but I'm curious whether it is more pythonic to use a nested function or to have it be a private instance method as you have done.
I would use _
, instead of _timeout
for the parameter name since the parameter is unused, but I am also unsure if that convention is pythonic.
self._poll() | ||
|
||
for done_clbk in self._done_clbks: | ||
_try_callback(self, done_clbk) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why the callbacks are still being stored after the they are run?
@@ -673,3 +670,5 @@ def _execute_clbks(self): | |||
|
|||
for done_clbk in self._done_clbks: | |||
_try_callback(self, done_clbk) | |||
|
|||
self._done_clbks = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's bad practice (and also causes bad things to happen) to update a list that is being iterated on in a for loop. This would be better if it was a queue that was just getting shifted for each callback which would both empty the list after all callbacks were called and would avoid the updating a list while in a for loop.
In _OperationFuture constructor:
_done_clbks = deque()
In this function:
while _done_clbks:
done_clbk = _done_clbks.popleft()
_try_callback(self, done_clbk)
edit: accidentally deleted this comment. this is it reposted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now using a deque, PTAL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's bad practice (and also causes bad things to happen) to update a list that is being iterated on in a for loop.
It actually raises an exception in Python.
I don't see any needed changes. I will just dismiss my "requested changes" review as opposed to providing an LGTM since I am not one of the main Python reviewers for the team.
I am shifting to approved. This is good enough and I do not think that you will have an easy time fixing the 3.5 Travis issue. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall! Just one optional nit and one tiny test comment.
from .errors import RetryError | ||
from google.gax import (BackoffSettings, BundleOptions, bundling, _CallSettings, | ||
config, errors, PageIterator, ResourceIterator, | ||
RetryOptions, retry as rt) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. I'd add also that I'd prefer that, if there's a naming collision between a module and a local variable, the local variable rather than the module be renamed.
|
||
my_callable = retry.retryable(mock_call, retry_options) | ||
|
||
try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
even better:
try:
my_callable()
self.fail('Should not have been reached')
except ...
...
This gets the best of both the old and your new tests: it succeeds only if there's an exception and it's the right one.
(here and above)
Implements _OperationFuture class to use when generating LRO code.
Used in googleapis/gapic-generator#891.