Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions casbin_pymongo_adapter/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,35 @@ def remove_filtered_policy(self, sec, ptype, field_index, *field_values):
query["ptype"] = ptype
results = self._collection.delete_many(query)
return results.deleted_count > 0

def update_policy(self, sec, ptype, old_rule, new_rule):
"""Update the old_rule with the new_rule in the database (storage).

Args:
sec (str): section type
ptype (str): policy type
old_rule (list[str]): the old rule that needs to be modified
new_rule (list[str]): the new rule to replace the old rule
"""
filter_query = {}
for index, value in enumerate(old_rule):
filter_query[f"v{index}"] = value

self._collection.find_one_and_update(
filter_query,
{"$set": {f"v{index}": value for index, value in enumerate(new_rule)}},
)

return None

def update_policies(self, sec, ptype, old_rules, new_rules):
"""Update the old_rule with the new_rule in the database (storage).

Args:
sec (str): section type
ptype (str): policy type
old_rules (list[list[str]]): the old rules that needs to be modified
new_rules (list[list[str]]): the new rules to replace the old rule
"""
for old_rule, new_rule in zip(old_rules, new_rules):
self.update_policy(sec, ptype, old_rule, new_rule)
32 changes: 32 additions & 0 deletions casbin_pymongo_adapter/asynchronous/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,35 @@ async def remove_filtered_policy(self, sec, ptype, field_index, *field_values):
query["ptype"] = ptype
results = await self._collection.delete_many(query)
return results.deleted_count > 0

async def update_policy(self, sec, ptype, old_rule, new_rule):
"""Update the old_rule with the new_rule in the database (storage).

Args:
sec (str): section type
ptype (str): policy type
old_rule (list[str]): the old rule that needs to be modified
new_rule (list[str]): the new rule to replace the old rule
"""
filter_query = {}
for index, value in enumerate(old_rule):
filter_query[f"v{index}"] = value

await self._collection.find_one_and_update(
filter_query,
{"$set": {f"v{index}": value for index, value in enumerate(new_rule)}},
)

return None

async def update_policies(self, sec, ptype, old_rules, new_rules):
"""Update the old_rule with the new_rule in the database (storage).

Args:
sec (str): section type
ptype (str): policy type
old_rules (list[list[str]]): the old rules that needs to be modified
new_rules (list[list[str]]): the new rules to replace the old rule
"""
for old_rule, new_rule in zip(old_rules, new_rules):
await self.update_policy(sec, ptype, old_rule, new_rule)
59 changes: 59 additions & 0 deletions tests/asynchronous/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,3 +338,62 @@ async def test_filtered_policy_with_raw_query(self):
self.assertFalse(e.enforce("bob", "data1", "write"))
self.assertFalse(e.enforce("bob", "data2", "read"))
self.assertTrue(e.enforce("bob", "data2", "write"))

async def test_update_policy(self):
e = await get_enforcer()
example_p = ["mike", "cookie", "eat"]

self.assertTrue(e.enforce("alice", "data1", "read"))
await e.update_policy(["alice", "data1", "read"], ["alice", "data1", "no_read"])
self.assertFalse(e.enforce("alice", "data1", "read"))

self.assertFalse(e.enforce("bob", "data1", "read"))
await e.add_policy(example_p)
await e.update_policy(example_p, ["bob", "data1", "read"])
self.assertTrue(e.enforce("bob", "data1", "read"))

self.assertFalse(e.enforce("bob", "data1", "write"))
await e.update_policy(["bob", "data1", "read"], ["bob", "data1", "write"])
self.assertTrue(e.enforce("bob", "data1", "write"))

self.assertTrue(e.enforce("bob", "data2", "write"))
await e.update_policy(["bob", "data2", "write"], ["bob", "data2", "read"])
self.assertFalse(e.enforce("bob", "data2", "write"))

self.assertTrue(e.enforce("bob", "data2", "read"))
await e.update_policy(["bob", "data2", "read"], ["carl", "data2", "write"])
self.assertFalse(e.enforce("bob", "data2", "write"))

self.assertTrue(e.enforce("carl", "data2", "write"))
await e.update_policy(["carl", "data2", "write"], ["carl", "data2", "no_write"])
self.assertFalse(e.enforce("bob", "data2", "write"))

async def test_update_policies(self):
e = await get_enforcer()

old_rule_0 = ["alice", "data1", "read"]
old_rule_1 = ["bob", "data2", "write"]
old_rule_2 = ["data2_admin", "data2", "read"]
old_rule_3 = ["data2_admin", "data2", "write"]

new_rule_0 = ["alice", "data_test", "read"]
new_rule_1 = ["bob", "data_test", "write"]
new_rule_2 = ["data2_admin", "data_test", "read"]
new_rule_3 = ["data2_admin", "data_test", "write"]

old_rules = [old_rule_0, old_rule_1, old_rule_2, old_rule_3]
new_rules = [new_rule_0, new_rule_1, new_rule_2, new_rule_3]

await e.update_policies(old_rules, new_rules)

self.assertFalse(e.enforce("alice", "data1", "read"))
self.assertTrue(e.enforce("alice", "data_test", "read"))

self.assertFalse(e.enforce("bob", "data2", "write"))
self.assertTrue(e.enforce("bob", "data_test", "write"))

self.assertFalse(e.enforce("data2_admin", "data2", "read"))
self.assertTrue(e.enforce("data2_admin", "data_test", "read"))

self.assertFalse(e.enforce("data2_admin", "data2", "write"))
self.assertTrue(e.enforce("data2_admin", "data_test", "write"))
59 changes: 59 additions & 0 deletions tests/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,65 @@ async def test_filtered_policy_with_raw_query(self):
self.assertFalse(e.enforce("bob", "data2", "read"))
self.assertTrue(e.enforce("bob", "data2", "write"))

async def test_update_policy(self):
e = get_enforcer()
example_p = ["mike", "cookie", "eat"]

self.assertTrue(e.enforce("alice", "data1", "read"))
e.update_policy(["alice", "data1", "read"], ["alice", "data1", "no_read"])
self.assertFalse(e.enforce("alice", "data1", "read"))

self.assertFalse(e.enforce("bob", "data1", "read"))
e.add_policy(example_p)
e.update_policy(example_p, ["bob", "data1", "read"])
self.assertTrue(e.enforce("bob", "data1", "read"))

self.assertFalse(e.enforce("bob", "data1", "write"))
e.update_policy(["bob", "data1", "read"], ["bob", "data1", "write"])
self.assertTrue(e.enforce("bob", "data1", "write"))

self.assertTrue(e.enforce("bob", "data2", "write"))
e.update_policy(["bob", "data2", "write"], ["bob", "data2", "read"])
self.assertFalse(e.enforce("bob", "data2", "write"))

self.assertTrue(e.enforce("bob", "data2", "read"))
e.update_policy(["bob", "data2", "read"], ["carl", "data2", "write"])
self.assertFalse(e.enforce("bob", "data2", "write"))

self.assertTrue(e.enforce("carl", "data2", "write"))
e.update_policy(["carl", "data2", "write"], ["carl", "data2", "no_write"])
self.assertFalse(e.enforce("bob", "data2", "write"))

async def test_update_policies(self):
e = get_enforcer()

old_rule_0 = ["alice", "data1", "read"]
old_rule_1 = ["bob", "data2", "write"]
old_rule_2 = ["data2_admin", "data2", "read"]
old_rule_3 = ["data2_admin", "data2", "write"]

new_rule_0 = ["alice", "data_test", "read"]
new_rule_1 = ["bob", "data_test", "write"]
new_rule_2 = ["data2_admin", "data_test", "read"]
new_rule_3 = ["data2_admin", "data_test", "write"]

old_rules = [old_rule_0, old_rule_1, old_rule_2, old_rule_3]
new_rules = [new_rule_0, new_rule_1, new_rule_2, new_rule_3]

e.update_policies(old_rules, new_rules)

self.assertFalse(e.enforce("alice", "data1", "read"))
self.assertTrue(e.enforce("alice", "data_test", "read"))

self.assertFalse(e.enforce("bob", "data2", "write"))
self.assertTrue(e.enforce("bob", "data_test", "write"))

self.assertFalse(e.enforce("data2_admin", "data2", "read"))
self.assertTrue(e.enforce("data2_admin", "data_test", "read"))

self.assertFalse(e.enforce("data2_admin", "data2", "write"))
self.assertTrue(e.enforce("data2_admin", "data_test", "write"))

def test_str(self):
"""
test __str__ function
Expand Down