Skip to content

Commit

Permalink
Add process lock for remote filesystem scraper (datalab-org#562)
Browse files Browse the repository at this point in the history
* Add descriptions for remote fs config

* Add database lock to remote filesystem scraper

Add functionality for releasing the lock

Fix lock releaser

* Refactor remote filesystems tests around new locking/sessions
ml-evs authored Jun 6, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 7067d01 commit 325b554
Showing 4 changed files with 152 additions and 29 deletions.
9 changes: 6 additions & 3 deletions pydatalab/pydatalab/config.py
Original file line number Diff line number Diff line change
@@ -101,9 +101,12 @@ class RemoteFilesystem(BaseModel):
accessible from the server.
"""

name: str
hostname: Optional[str]
path: Path
name: str = Field(description="The name of the filesystem to use in the UI.")
hostname: Optional[str] = Field(
None,
description="The hostname for the filesystem. `None` indicates the filesystem is already mounted locally.",
)
path: Path = Field(description="The path to the base of the filesystem to include.")


class SMTPSettings(BaseModel):
141 changes: 130 additions & 11 deletions pydatalab/pydatalab/remote_filesystems.py
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
import multiprocessing
import os
import subprocess
import time
from typing import Any, Dict, List, Optional, Union

import pydatalab.mongo
@@ -50,6 +51,7 @@ def get_directory_structures(
def get_directory_structure(
directory: RemoteFilesystem,
invalidate_cache: Optional[bool] = False,
max_retries: int = 5,
) -> Dict[str, Any]:
"""For the given remote directory, either reconstruct the directory
structure in full, or access the cached version if is it recent
@@ -65,6 +67,8 @@ def get_directory_structure(
be reset, provided the cache was not updated very recently. If `False`,
the cache will not be reset, even if it is older than the maximum configured
age.
max_retries: Used when called recursively to limit the number of attempts each PID
will make to acquire the lock on the directory structure before returning an error.
Returns:
A dictionary with keys "name", "type" and "contents" for the
@@ -93,6 +97,7 @@ def get_directory_structure(
# `invalidate_cache` has not been explicitly set to false,
# 3) the `invalidate_cache` parameter is true, and the cache
# is older than the min age,
# AND, if no other processes is updating the cache,
# then rebuild the cache.
if (
(not cached_dir_structure)
@@ -105,17 +110,32 @@ def get_directory_structure(
and cache_age > datetime.timedelta(minutes=CONFIG.REMOTE_CACHE_MIN_AGE)
)
):
dir_structure = _get_latest_directory_structure(directory.path, directory.hostname)
last_updated = _save_directory_structure(
directory,
dir_structure,
)
LOGGER.debug(
"Remote filesystems cache miss for '%s': last updated %s",
directory.name,
cache_last_updated,
)
status = "updated"
owns_lock = _acquire_lock_dir_structure(directory)
if owns_lock:
dir_structure = _get_latest_directory_structure(directory.path, directory.hostname)
# Save the directory structure to the database, which also releases the lock
last_updated = _save_directory_structure(
directory,
dir_structure,
)
LOGGER.debug(
"Remote filesystems cache miss for '%s': last updated %s",
directory.name,
cache_last_updated,
)
status = "updated"
else:
if max_retries <= 0:
raise RuntimeError(
f"Failed to acquire lock for {directory.name} after the max number of attempts. This may indicate something wrong with the filesystem; please try again later."
)
LOGGER.debug(
"PID %s waiting 5 seconds until FS %s is updated", os.getpid(), directory.name
)
time.sleep(5)
return get_directory_structure(
directory, invalidate_cache=invalidate_cache, max_retries=max_retries - 1
)

else:
last_updated = cached_dir_structure["last_updated"]
@@ -132,6 +152,9 @@ def get_directory_structure(
last_updated = datetime.datetime.now()
status = "error"

finally:
_release_lock_dir_structure(directory)

return {
"name": directory.name,
"type": "toplevel",
@@ -333,6 +356,7 @@ def _save_directory_structure(
"contents": dir_structure,
"last_updated": last_updated,
"type": "toplevel",
"_lock": None,
}
},
upsert=True,
@@ -347,6 +371,101 @@ def _save_directory_structure(
return last_updated


def _acquire_lock_dir_structure(
directory: RemoteFilesystem,
) -> bool:
"""Attempt to acquire the lock on the directory structure to hint to other processes
to not update it.
Parameters:
directory: The remote filesystem entry to lock.
Returns:
`True` if the lock was acquired, `False` otherwise.
"""
client = pydatalab.mongo._get_active_mongo_client()
with client.start_session() as session:
collection = client.get_database().remoteFilesystems
doc = collection.find_one({"name": directory.name}, projection=["_lock"], session=session)
if doc and doc.get("_lock") is not None:
pid = doc["_lock"].get("pid")
ctime = doc["_lock"].get("ctime")
lock_age = datetime.datetime.now() - ctime
if lock_age > datetime.timedelta(minutes=CONFIG.REMOTE_CACHE_MIN_AGE):
LOGGER.debug(
"Lock for %s already held by process %s for %s, forcing this process to acquire lock",
directory.name,
pid,
lock_age,
)
else:
LOGGER.debug(
"Lock for %s already held by process %s since %s", directory.name, pid, ctime
)
return False

collection.update_one(
{"name": directory.name},
{"$set": {"_lock": {"pid": os.getpid(), "ctime": datetime.datetime.now()}}},
upsert=True,
session=session,
)
LOGGER.debug("Acquired lock for %s as PID %s", directory.name, os.getpid())

return True


def _release_lock_dir_structure(directory) -> bool:
"""Attempt to release the lock on the directory structure.
Parameters:
directory: The remote filesystem entry to lock.
Returns:
`True` if the lock was released successfully, `False` otherwise.
"""
client = pydatalab.mongo._get_active_mongo_client()
with client.start_session() as session:
collection = client.get_database().remoteFilesystems
doc = collection.find_one({"name": directory.name}, session=session)
if doc:
if doc.get("_lock") is not None:
pid = doc["_lock"].get("pid")

if pid != os.getpid():
LOGGER.debug(
"PID %s tried to release lock for %s, but lock was held by PID %s",
os.getpid(),
directory.name,
pid,
)
return False

if doc.get("contents") is None:
# If the lock is held by this process, but the directory structure has not been updated, then delete it
collection.delete_one({"name": directory.name}, session=session)
LOGGER.debug(
"PID %s is removed dir_structure stub %s",
os.getpid(),
doc,
)
return True

# Otherwise just release the lock
collection.update_one(
{"name": directory.name}, {"$set": {"_lock": None}}, session=session
)
LOGGER.debug(
"PID %s released locked on %s",
os.getpid(),
directory.name,
)

return True


def _get_cached_directory_structure(
directory: RemoteFilesystem,
) -> Optional[Dict[str, Any]]:
5 changes: 5 additions & 0 deletions pydatalab/tests/server/conftest.py
Original file line number Diff line number Diff line change
@@ -183,6 +183,11 @@ def generate_api_key():
return "".join(random.choices("abcdef0123456789", k=24))


@pytest.fixture(scope="function")
def random_string():
return generate_api_key()


@pytest.fixture(scope="session")
def admin_api_key() -> str:
return generate_api_key()
Original file line number Diff line number Diff line change
@@ -3,10 +3,8 @@
import time
from pathlib import Path

import mongomock
import pytest

import pydatalab.mongo
from pydatalab.config import CONFIG, RemoteFilesystem
from pydatalab.remote_filesystems import (
get_directory_structure,
@@ -22,13 +20,13 @@


@pytest.mark.skipif(not TREE_AVAILABLE, reason="`tree` utility not installed locally")
@mongomock.patch(on_new="create")
def test_get_directory_structure_local():
def test_get_directory_structure_local(random_string):
"""Check that the file directory cache is used on the second
attempt to query a directory.
"""
test_dir = RemoteFilesystem(**{"path": Path(__file__).parent, "name": "test"})
dir_name = random_string
test_dir = RemoteFilesystem(**{"path": Path(__file__).parent.parent, "name": dir_name})
dir_structure = get_directory_structure(test_dir)
dir_structure.pop("last_updated")
assert dir_structure
@@ -77,31 +75,31 @@ def test_get_directory_structure_local():


@pytest.mark.skipif(not TREE_AVAILABLE, reason="`tree` utility not installed locally")
@mongomock.patch(on_new="create")
def test_get_missing_directory_structure_local():
def test_get_missing_directory_structure_local(random_string):
"""Check that missing directories do not crash everything, and that
they still get cached.
"""
test_dir = RemoteFilesystem(**{"path": "this_directory_does_not_exist", "name": "test"})
dir_name = random_string
test_dir = RemoteFilesystem(**{"path": "this_directory_does_not_exist", "name": dir_name})
dir_structure = get_directory_structure(test_dir)
dir_structure.pop("last_updated")
assert dir_structure
assert all(k in dir_structure for k in ("type", "name", "contents"))
dir_structure_cached = get_directory_structure(test_dir)
last_updated_cached = dir_structure_cached.pop("last_updated")
dir_structure.pop("last_updated")
assert dir_structure_cached == dir_structure
assert last_updated_cached


@pytest.mark.skipif(not TREE_AVAILABLE, reason="`tree` utility not installed locally")
@mongomock.patch(on_new="create")
def test_get_directory_structure_remote():
def test_get_directory_structure_remote(real_mongo_client, random_string):
"""Check that a fake ssh server initially fails, then successfully returns
once the cache has been mocked.
"""
dir_name = random_string
test_dir = RemoteFilesystem(
**{"name": "test", "hostname": "ssh://fake.host", "path": Path(__file__).parent}
**{"name": dir_name, "hostname": "ssh://fake.host", "path": Path(__file__).parent.parent}
)
dir_structure = get_directory_structure(test_dir)
assert dir_structure["contents"][0]["type"] == "error"
@@ -111,9 +109,7 @@ def test_get_directory_structure_remote():
"last_updated": datetime.datetime.now(),
"type": "toplevel",
}
pydatalab.mongo._get_active_mongo_client().get_database().remoteFilesystems.insert_one(
dummy_dir_structure
)
real_mongo_client.get_database().remoteFilesystems.insert_one(dummy_dir_structure)
dir_structure = get_directory_structure(test_dir)
assert dir_structure["last_updated"]

0 comments on commit 325b554

Please sign in to comment.