Skip to content

Commit

Permalink
UIElement in LinuxOSACI
Browse files Browse the repository at this point in the history
  • Loading branch information
alckasoc committed Jan 17, 2025
1 parent 11d421d commit 162b0b9
Showing 1 changed file with 6 additions and 210 deletions.
216 changes: 6 additions & 210 deletions gui_agents/aci/LinuxOSACI.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,209 +674,6 @@ def fail(self):
return """FAIL"""


_accessibility_ns_map_ubuntu = {
"st": "https://accessibility.ubuntu.example.org/ns/state",
"attr": "https://accessibility.ubuntu.example.org/ns/attributes",
"cp": "https://accessibility.ubuntu.example.org/ns/component",
"doc": "https://accessibility.ubuntu.example.org/ns/document",
"docattr": "https://accessibility.ubuntu.example.org/ns/document/attributes",
"txt": "https://accessibility.ubuntu.example.org/ns/text",
"val": "https://accessibility.ubuntu.example.org/ns/value",
"act": "https://accessibility.ubuntu.example.org/ns/action",
}

MAX_WIDTH = 1024
MAX_DEPTH = 50


# Ref: https://github.com/xlang-ai/OSWorld/blob/main/desktop_env/server/main.py#L793
def _create_atspi_node(
node: Accessible, depth: int = 0, flag: Optional[str] = None
) -> _Element:
node_name = node.name
attribute_dict: Dict[str, Any] = {"name": node_name}

# States
states: List[StateType] = node.getState().get_states()
for st in states:
state_name: str = StateType._enum_lookup[st]
state_name: str = state_name.split("_", maxsplit=1)[1].lower()
if len(state_name) == 0:
continue
attribute_dict[
"{{{:}}}{:}".format(_accessibility_ns_map_ubuntu["st"], state_name)
] = "true"

# Attributes
attributes: Dict[str, str] = node.get_attributes()
for attribute_name, attribute_value in attributes.items():
if len(attribute_name) == 0:
continue
attribute_dict[
"{{{:}}}{:}".format(_accessibility_ns_map_ubuntu["attr"], attribute_name)
] = attribute_value

# Component
if (
attribute_dict.get(
"{{{:}}}visible".format(_accessibility_ns_map_ubuntu["st"]), "false"
)
== "true"
and attribute_dict.get(
"{{{:}}}showing".format(_accessibility_ns_map_ubuntu["st"]), "false"
)
== "true"
):
try:
component: Component = node.queryComponent()
except NotImplementedError:
pass
else:
bbox: Sequence[int] = component.getExtents(pyatspi.XY_SCREEN)
attribute_dict[
"{{{:}}}screencoord".format(_accessibility_ns_map_ubuntu["cp"])
] = str(tuple(bbox[0:2]))
attribute_dict["{{{:}}}size".format(_accessibility_ns_map_ubuntu["cp"])] = (
str(tuple(bbox[2:]))
)

text = ""
# Text
try:
text_obj: ATText = node.queryText()
# only text shown on current screen is available
# attribute_dict["txt:text"] = text_obj.getText(0, text_obj.characterCount)
text: str = text_obj.getText(0, text_obj.characterCount)
# if flag=="thunderbird":
# appeared in thunderbird (uFFFC) (not only in thunderbird), "Object
# Replacement Character" in Unicode, "used as placeholder in text for
# an otherwise unspecified object; uFFFD is another "Replacement
# Character", just in case
text = text.replace("\ufffc", "").replace("\ufffd", "")
except NotImplementedError:
pass

# Image, Selection, Value, Action
try:
node.queryImage()
attribute_dict["image"] = "true"
except NotImplementedError:
pass

try:
node.querySelection()
attribute_dict["selection"] = "true"
except NotImplementedError:
pass

try:
value: ATValue = node.queryValue()
value_key = f"{{{_accessibility_ns_map_ubuntu['val']}}}"

for attr_name, attr_func in [
("value", lambda: value.currentValue),
("min", lambda: value.minimumValue),
("max", lambda: value.maximumValue),
("step", lambda: value.minimumIncrement),
]:
try:
attribute_dict[f"{value_key}{attr_name}"] = str(attr_func())
except:
pass
except NotImplementedError:
pass

try:
action: ATAction = node.queryAction()
for i in range(action.nActions):
action_name: str = action.getName(i).replace(" ", "-")
attribute_dict[
"{{{:}}}{:}_desc".format(
_accessibility_ns_map_ubuntu["act"], action_name
)
] = action.getDescription(i)
attribute_dict[
"{{{:}}}{:}_kb".format(_accessibility_ns_map_ubuntu["act"], action_name)
] = action.getKeyBinding(i)
except NotImplementedError:
pass

# Add from here if we need more attributes in the future...

raw_role_name: str = node.getRoleName().strip()
node_role_name = (raw_role_name or "unknown").replace(" ", "-")

if not flag:
if raw_role_name == "document spreadsheet":
flag = "calc"
if raw_role_name == "application" and node.name == "Thunderbird":
flag = "thunderbird"

xml_node = lxml.etree.Element(
node_role_name, attrib=attribute_dict, nsmap=_accessibility_ns_map_ubuntu
)

if len(text) > 0:
xml_node.text = text

if depth == MAX_DEPTH:
return xml_node

if flag == "calc" and node_role_name == "table":
# Maximum column: 1024 if ver<=7.3 else 16384
# Maximum row: 104 8576
# Maximun sheet: 1 0000

global libreoffice_version_tuple
MAXIMUN_COLUMN = 1024 if libreoffice_version_tuple < (7, 4) else 16384
MAX_ROW = 104_8576

index_base = 0
first_showing = False
column_base = None
for r in range(MAX_ROW):
for clm in range(column_base or 0, MAXIMUN_COLUMN):
child_node: Accessible = node[index_base + clm]
showing: bool = child_node.getState().contains(STATE_SHOWING)
if showing:
child_node: _Element = _create_atspi_node(
child_node, depth + 1, flag
)
if not first_showing:
column_base = clm
first_showing = True
xml_node.append(child_node)
elif first_showing and column_base is not None or clm >= 500:
break
if first_showing and clm == column_base or not first_showing and r >= 500:
break
index_base += MAXIMUN_COLUMN
return xml_node
else:
try:
for i, ch in enumerate(node):
if i == MAX_WIDTH:
break
xml_node.append(_create_atspi_node(ch, depth + 1, flag))
except:
pass
return xml_node


def get_acc_tree() -> str:
desktop: Accessible = pyatspi.Registry.getDesktop(0)
xml_node = lxml.etree.Element("desktop-frame", nsmap=_accessibility_ns_map_ubuntu)
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [
executor.submit(_create_atspi_node, app_node, 1) for app_node in desktop
]
for future in concurrent.futures.as_completed(futures):
xml_tree = future.result()
xml_node.append(xml_tree)
acc_tree = lxml.etree.tostring(xml_node, encoding="unicode")
return acc_tree


class UIElement(object):
def __init__(self, node: Accessible):
self.node: Accessible = node
Expand All @@ -886,13 +683,12 @@ def getAttributeNames(self):

@staticmethod
def systemWideElement():
# desktop = pyatspi.Registry.getDesktop(0)
# for app in desktop:
# for window in app:
# if window.getState().contains(pyatspi.STATE_ACTIVE):
# active_node = app
# return UIElement(active_node)
return get_acc_tree()
desktop = pyatspi.Registry.getDesktop(0)
for app in desktop:
for window in app:
if window.getState().contains(pyatspi.STATE_ACTIVE):
active_node = app
return UIElement(active_node)

@property
def states(self):
Expand Down

0 comments on commit 162b0b9

Please sign in to comment.