Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions bigframes/functions/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,23 @@ def get_remote_function_locations(bq_location):

def _get_hash(def_, package_requirements=None):
"Get hash (32 digits alphanumeric) of a function."
def_repr = cloudpickle.dumps(def_, protocol=_pickle_protocol_version)
# There is a known cell-id sensitivity of the cloudpickle serialization in
# notebooks https://github.com/cloudpipe/cloudpickle/issues/538. Because of
# this, if a cell contains a udf decorated with @remote_function, a unique
# cloudpickle code is generated every time the cell is run, creating new
# cloud artifacts every time. This is slow and wasteful.
# A workaround of the same can be achieved by replacing the filename in the
# code object to a static value
# https://github.com/cloudpipe/cloudpickle/issues/120#issuecomment-338510661.
#
# To respect the user code/environment let's make this modification on a
# copy of the udf, not on the original udf itself.
def_copy = cloudpickle.loads(cloudpickle.dumps(def_))
def_copy.__code__ = def_copy.__code__.replace(
co_filename="bigframes_place_holder_filename"
)
Comment on lines +181 to +184
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a more efficient way to do this? Also do we want to replace filename for other code objects in the dependency tree? Seems the most efficient solution would require modifying cloudpickle a bit though

Copy link
Contributor Author

@shobsi shobsi Jul 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reg. more efficient way: the most efficient would be to modify one of the pickling routines to take replacements on a live object, I'll consider this in my effort explained down below.

Reg. replacement in the dependency tree: We are not doing dependency tree yet, I'm looking into the cloudpickle (and the native pickle which it builds on top of) to evaluate what modification we can make. Consider this change as an instant work around to reduce re-deployments in two scenarios:

  1. rerun of a cell in the same session
  2. rerun of a notebook with explicitly named remote function


def_repr = cloudpickle.dumps(def_copy, protocol=_pickle_protocol_version)
if package_requirements:
for p in sorted(package_requirements):
def_repr += p.encode()
Expand Down Expand Up @@ -877,11 +893,16 @@ def remote_function(
dynamically using the `bigquery_connection_client` assuming the user has necessary
priviliges. The PROJECT_ID should be the same as the BigQuery connection project.
reuse (bool, Optional):
Reuse the remote function if is already exists.
`True` by default, which results in reusing an existing remote
Reuse the remote function if already exists.
`True` by default, which will result in reusing an existing remote
function and corresponding cloud function (if any) that was
previously created for the same udf.
Setting it to `False` forces the creation of a unique remote function.
Please note that for an unnamed (i.e. created without an explicit
`name` argument) remote function, the BigQuery DataFrames
session id is attached in the cloud artifacts names. So for the
effective reuse across the sessions it is recommended to create
the remote function with an explicit `name`.
Setting it to `False` would force creating a unique remote function.
If the required remote function does not exist then it would be
created irrespective of this param.
name (str, Optional):
Expand Down
18 changes: 9 additions & 9 deletions bigframes/functions/remote_function_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,23 +215,23 @@ def udf_http_row_processor(request):


def generate_udf_code(def_, directory):
"""Generate serialized bytecode using cloudpickle given a udf."""
"""Generate serialized code using cloudpickle given a udf."""
udf_code_file_name = "udf.py"
udf_bytecode_file_name = "udf.cloudpickle"
udf_pickle_file_name = "udf.cloudpickle"

# original code, only for debugging purpose
udf_code = textwrap.dedent(inspect.getsource(def_))
udf_code_file_path = os.path.join(directory, udf_code_file_name)
with open(udf_code_file_path, "w") as f:
f.write(udf_code)

# serialized bytecode
udf_bytecode_file_path = os.path.join(directory, udf_bytecode_file_name)
# serialized udf
udf_pickle_file_path = os.path.join(directory, udf_pickle_file_name)
# TODO(b/345433300): try io.BytesIO to avoid writing to the file system
with open(udf_bytecode_file_path, "wb") as f:
with open(udf_pickle_file_path, "wb") as f:
cloudpickle.dump(def_, f, protocol=_pickle_protocol_version)

return udf_code_file_name, udf_bytecode_file_name
return udf_code_file_name, udf_pickle_file_name


def generate_cloud_function_main_code(
Expand All @@ -252,15 +252,15 @@ def generate_cloud_function_main_code(
"""

# Pickle the udf with all its dependencies
udf_code_file, udf_bytecode_file = generate_udf_code(def_, directory)
udf_code_file, udf_pickle_file = generate_udf_code(def_, directory)

code_blocks = [
f"""\
import cloudpickle

# original udf code is in {udf_code_file}
# serialized udf code is in {udf_bytecode_file}
with open("{udf_bytecode_file}", "rb") as f:
# serialized udf code is in {udf_pickle_file}
with open("{udf_pickle_file}", "rb") as f:
udf = cloudpickle.load(f)

input_types = {repr(input_types)}
Expand Down
16 changes: 11 additions & 5 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1629,15 +1629,21 @@ def remote_function(
`True` by default, which will result in reusing an existing remote
function and corresponding cloud function (if any) that was
previously created for the same udf.
Please note that for an unnamed (i.e. created without an explicit
`name` argument) remote function, the BigQuery DataFrames
session id is attached in the cloud artifacts names. So for the
effective reuse across the sessions it is recommended to create
the remote function with an explicit `name`.
Setting it to `False` would force creating a unique remote function.
If the required remote function does not exist then it would be
created irrespective of this param.
name (str, Optional):
Explicit name of the persisted BigQuery remote function. Use it with
caution, because two users working in the same project and dataset
could overwrite each other's remote functions if they use the same
persistent name. When an explicit name is provided, any session
specific clean up (``bigframes.session.Session.close``/
Explicit name of the persisted BigQuery remote function. Use it
with caution, because more than one users working in the same
project and dataset could overwrite each other's remote
functions if they use the same persistent name. When an explicit
name is provided, any session specific clean up (
``bigframes.session.Session.close``/
``bigframes.pandas.close_session``/
``bigframes.pandas.reset_session``/
``bigframes.pandas.clean_up_by_session_id``) does not clean up
Expand Down
Loading