Conversation
Rework the task work plugin
Update provision descritpor, data channel input_elements
| except ImportError: | ||
| BatchV1Api = None | ||
| V1EnvVar = None | ||
| V1Container = None | ||
| V1PodTemplateSpec = None | ||
| V1PodSpec = None | ||
| V1ObjectMeta = None | ||
| V1Job = None | ||
| V1JobSpec = None | ||
| V1DeleteOptions = None | ||
| ApiException = None | ||
| load_kube_config = None |
There was a problem hiding this comment.
I followed the pattern in some other modules that require "extras" that may not be installed. This will not work without the kubernetes API installed, but the module needs to be importable due to it getting imported via entrypoints regardless of whether or not it gets used.
I can clean this up a bit and add a logger warning message.
There was a problem hiding this comment.
sounds like there is both a precedent and a reason. just leave a comment explaining "the module needs to be importable due to it getting imported via entrypoints regardless of whether or not it gets used"
There was a problem hiding this comment.
though maybe this would be easier to look at and maintain if we did:
try:
from kubernetes import client
except ImportError:
client = NoneThere was a problem hiding this comment.
Yeah I ended up with something similar. I've also setup apache_beam.task.task_worker.kubejob.transforms so that it will raise an ImportError if the kubernetes API isn't available.
…beTask payload; misc cleanup and organization
| def process_bundle(self, instruction_id): | ||
| # type: (str) -> Tuple[List[beam_fn_api_pb2.DelayedBundleApplication], bool] | ||
| def process_bundle(self, instruction_id, use_task_worker=True): | ||
| # type: (str, bool) -> Tuple[List[beam_fn_api_pb2.DelayedBundleApplication], bool] |
There was a problem hiding this comment.
@violalyu Is the BundleProcessor re-used for many bundles? If not, should use_task_worker be passed to __init__?
There was a problem hiding this comment.
IIUC, a BundleProcessor may be reused, the number of bundle processor should be the same as the number of SDK worker we have I think (each SDK worker request BundleProcessor from BundleProcessorCache, and if there's no more "available" bundle processor it creates a new one, otherwise it use the available ones)
There was a problem hiding this comment.
Can use_task_worker vary over the lifetime of a BundleProcessor? If not, we should make it an __init__ arg.
| input_op_by_transform_id, # type: Dict[str, DataInputOperation] | ||
| use_task_worker=True # type: bool | ||
| ): | ||
| # type: (...) -> Union[Tuple[None, None], Tuple[List[beam_fn_api_pb2.DelayedBundleApplication], bool]] |
There was a problem hiding this comment.
@violalyu like-wise, how many times in the lifespan of a BundleProcessor would maybe_process_remotely be called?
There was a problem hiding this comment.
it would be called the same number of times as the number of bundles this current BundleProcessor is asked to process
This is a PR to make it easy to see what we've changed before we move forward with a PR against apache/beam