Skip to content

Commit 47b7560

Browse files
axelsrzmsomanathan
andauthored
Axsuarez/streaming extensions (microsoft#986)
* Initial commit for POC. This is WIP * Updates for POC. This is WIP * Pylint: POC updates * Updates on POC, protocol adapter pending * Updates on POC, protocol adapter in progress * POC almost ready for testing, changes on BFAdapter pending * POC waiting on client injection in connector * black: POC waiting on client injection in connector * POC for http client injection in connector * got rid of importing errors when loading libraries. Currently in the process of testing. * Fix couple of errors, still in debugging phase. Initial receive doesnt work with current structure. * Several fixes including deadlock in threading, serialization and minor logic bugs. PayloadStream logic pending. * More errors fixed, trying to fit websocket into ms rest pipeline. Receiving is working with some bugs. * Disassembler fixes, sender struggling to send through socket * changes on disassembler and receiver * adding streaming to ci pipeline * Pylint fixes * updated streaming setup.py * Removing 3.6 * Changing all concurrent mechanisms in streaming to asyncio * Added validation for abrupt closing of websocket, added tracebacks and async validation for disconnecting callbacks * UnblockActivityProcessorThread * Header serialization fix and stream serialization fix. * Parity change in the internal buffer structure of the payload_stream object, fixes on stream writing behavior in web_socket_transport and send response instead of request fix in protocol adapter. * Fixes on the RecieveResponse path * PayloadStream length fix * Grouping related imports * payload receiver unit test (microsoft#1664) * Payload sender unit test (microsoft#1666) * payload receiver unit test * senderTest * disconnect * fixWarning Co-authored-by: Axel Suarez <[email protected]> * blackcheck * pylintfix * test_req_processor (microsoft#1668) * Axsuarez/streaming receive loop unittest (microsoft#1667) * payload receiver unit test * StreamingRequestHandler test listen * renaming straming to botframework scope * Updating pipeline * Removing sleep() safety measure * Remove unused import Co-authored-by: msomanathan <[email protected]> Co-authored-by: Muthuveer Somanathan <[email protected]>
1 parent 4003d71 commit 47b7560

File tree

87 files changed

+3868
-18
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

87 files changed

+3868
-18
lines changed

.pylintrc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,9 @@ disable=print-statement,
158158
too-many-return-statements,
159159
import-error,
160160
no-name-in-module,
161-
too-many-branches
161+
too-many-branches,
162+
too-many-ancestors,
163+
too-many-nested-blocks
162164

163165
# Enable the message, report, category or checker with the given id(s). You can
164166
# either give multiple identifier separated by comma (,) or put this option

libraries/botbuilder-core/botbuilder/core/bot_framework_adapter.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1282,6 +1282,16 @@ async def exchange_token_from_credentials(
12821282
)
12831283
raise TypeError(f"exchange token returned improper result: {type(result)}")
12841284

1285+
def can_process_outgoing_activity(
1286+
self, activity: Activity # pylint: disable=unused-argument
1287+
) -> bool:
1288+
return False
1289+
1290+
async def process_outgoing_activity(
1291+
self, turn_context: TurnContext, activity: Activity
1292+
) -> ResourceResponse:
1293+
raise Exception("NotImplemented")
1294+
12851295
@staticmethod
12861296
def key_for_connector_client(service_url: str, app_id: str, scope: str):
12871297
return f"{service_url if service_url else ''}:{app_id if app_id else ''}:{scope if scope else ''}"

libraries/botbuilder-core/botbuilder/core/integration/aiohttp_channel_service_exception_middleware.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# Copyright (c) Microsoft Corporation. All rights reserved.
22
# Licensed under the MIT License.
33

4+
import traceback
5+
46
from aiohttp.web import (
57
middleware,
68
HTTPNotImplemented,
@@ -26,4 +28,5 @@ async def aiohttp_error_middleware(request, handler):
2628
except KeyError:
2729
raise HTTPNotFound()
2830
except Exception:
31+
traceback.print_exc()
2932
raise HTTPInternalServerError()
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
4+
from .bot_framework_http_adapter_base import BotFrameworkHttpAdapterBase
5+
from .streaming_activity_processor import StreamingActivityProcessor
6+
from .streaming_request_handler import StreamingRequestHandler
7+
from .version_info import VersionInfo
8+
9+
__all__ = [
10+
"BotFrameworkHttpAdapterBase",
11+
"StreamingActivityProcessor",
12+
"StreamingRequestHandler",
13+
"VersionInfo",
14+
]
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
4+
from http import HTTPStatus
5+
from typing import Awaitable, Callable, List
6+
7+
from botbuilder.core import (
8+
Bot,
9+
BotFrameworkAdapter,
10+
BotFrameworkAdapterSettings,
11+
InvokeResponse,
12+
TurnContext,
13+
)
14+
from botbuilder.schema import Activity, ActivityTypes, ResourceResponse
15+
from botframework.connector import AsyncBfPipeline, BotFrameworkConnectorConfiguration
16+
from botframework.connector.aio import ConnectorClient
17+
from botframework.connector.auth import (
18+
ClaimsIdentity,
19+
MicrosoftAppCredentials,
20+
MicrosoftGovernmentAppCredentials,
21+
)
22+
23+
from .streaming_activity_processor import StreamingActivityProcessor
24+
from .streaming_request_handler import StreamingRequestHandler
25+
from .streaming_http_client import StreamingHttpDriver
26+
27+
28+
class BotFrameworkHttpAdapterBase(BotFrameworkAdapter, StreamingActivityProcessor):
29+
# pylint: disable=pointless-string-statement
30+
def __init__(self, settings: BotFrameworkAdapterSettings):
31+
super().__init__(settings)
32+
33+
self.connected_bot: Bot = None
34+
self.claims_identity: ClaimsIdentity = None
35+
self.request_handlers: List[StreamingRequestHandler] = None
36+
37+
async def process_streaming_activity(
38+
self,
39+
activity: Activity,
40+
bot_callback_handler: Callable[[TurnContext], Awaitable],
41+
) -> InvokeResponse:
42+
if not activity:
43+
raise TypeError(
44+
f"'activity: {activity.__class__.__name__}' argument can't be None"
45+
)
46+
47+
"""
48+
If a conversation has moved from one connection to another for the same Channel or Skill and
49+
hasn't been forgotten by the previous StreamingRequestHandler. The last requestHandler
50+
the conversation has been associated with should always be the active connection.
51+
"""
52+
request_handler = [
53+
handler
54+
for handler in self.request_handlers
55+
if handler.service_url == activity.service_url
56+
and handler.has_conversation(activity.conversation.id)
57+
]
58+
request_handler = request_handler[-1] if request_handler else None
59+
context = TurnContext(self, activity)
60+
61+
if self.claims_identity:
62+
context.turn_state[self.BOT_IDENTITY_KEY] = self.claims_identity
63+
64+
connector_client = self._create_streaming_connector_client(
65+
activity, request_handler
66+
)
67+
context.turn_state[self.BOT_CONNECTOR_CLIENT_KEY] = connector_client
68+
69+
await self.run_pipeline(context, bot_callback_handler)
70+
71+
if activity.type == ActivityTypes.invoke:
72+
activity_invoke_response = context.turn_state.get(self._INVOKE_RESPONSE_KEY)
73+
74+
if not activity_invoke_response:
75+
return InvokeResponse(status=HTTPStatus.NOT_IMPLEMENTED)
76+
return activity_invoke_response.value
77+
78+
return None
79+
80+
async def send_streaming_activity(self, activity: Activity) -> ResourceResponse:
81+
raise NotImplementedError()
82+
83+
def can_process_outgoing_activity(self, activity: Activity) -> bool:
84+
if not activity:
85+
raise TypeError(
86+
f"'activity: {activity.__class__.__name__}' argument can't be None"
87+
)
88+
89+
return not activity.service_url.startswith("https")
90+
91+
async def process_outgoing_activity(
92+
self, turn_context: TurnContext, activity: Activity
93+
) -> ResourceResponse:
94+
if not activity:
95+
raise TypeError(
96+
f"'activity: {activity.__class__.__name__}' argument can't be None"
97+
)
98+
99+
# TODO: Check if we have token responses from OAuth cards.
100+
101+
# The ServiceUrl for streaming channels begins with the string "urn" and contains
102+
# information unique to streaming connections. Now that we know that this is a streaming
103+
# activity, process it in the streaming pipeline.
104+
# Process streaming activity.
105+
return await self.send_streaming_activity(activity)
106+
107+
def _create_streaming_connector_client(
108+
self, activity: Activity, request_handler: StreamingRequestHandler
109+
) -> ConnectorClient:
110+
empty_credentials = (
111+
MicrosoftAppCredentials.empty()
112+
if self._channel_provider and self._channel_provider.is_government()
113+
else MicrosoftGovernmentAppCredentials.empty()
114+
)
115+
streaming_driver = StreamingHttpDriver(request_handler)
116+
config = BotFrameworkConnectorConfiguration(
117+
empty_credentials,
118+
activity.service_url,
119+
pipeline_type=AsyncBfPipeline,
120+
driver=streaming_driver,
121+
)
122+
streaming_driver.config = config
123+
connector_client = ConnectorClient(None, custom_configuration=config)
124+
125+
return connector_client
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
4+
from abc import ABC
5+
from typing import Awaitable, Callable
6+
7+
from botbuilder.core import TurnContext, InvokeResponse
8+
from botbuilder.schema import Activity
9+
10+
11+
class StreamingActivityProcessor(ABC):
12+
"""
13+
Process streaming activities.
14+
"""
15+
16+
async def process_streaming_activity(
17+
self,
18+
activity: Activity,
19+
bot_callback_handler: Callable[[TurnContext], Awaitable],
20+
) -> InvokeResponse:
21+
raise NotImplementedError()
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
4+
from http import HTTPStatus
5+
from logging import Logger
6+
from typing import Any
7+
8+
from msrest.universal_http import ClientRequest
9+
from msrest.universal_http.async_abc import AsyncClientResponse
10+
from msrest.universal_http.async_requests import (
11+
AsyncRequestsHTTPSender as AsyncRequestsHTTPDriver,
12+
)
13+
from botframework.streaming import StreamingRequest, ReceiveResponse
14+
15+
from .streaming_request_handler import StreamingRequestHandler
16+
17+
18+
class StreamingProtocolClientResponse(AsyncClientResponse):
19+
def __init__(
20+
self, request: StreamingRequest, streaming_response: ReceiveResponse
21+
) -> None:
22+
super(StreamingProtocolClientResponse, self).__init__(
23+
request, streaming_response
24+
)
25+
# https://aiohttp.readthedocs.io/en/stable/client_reference.html#aiohttp.ClientResponse
26+
self.status_code = streaming_response.status_code
27+
# self.headers = streaming_response.headers
28+
# self.reason = streaming_response.reason
29+
self._body = None
30+
31+
def body(self) -> bytes:
32+
"""Return the whole body as bytes in memory.
33+
"""
34+
if not self._body:
35+
return bytes([])
36+
return self._body
37+
38+
async def load_body(self) -> None:
39+
"""Load in memory the body, so it could be accessible from sync methods."""
40+
self._body: ReceiveResponse
41+
self._body = self.internal_response.read_body()
42+
43+
def raise_for_status(self):
44+
if 400 <= self.internal_response.status_code <= 599:
45+
raise Exception(f"Http error: {self.internal_response.status_code}")
46+
47+
48+
class StreamingHttpDriver(AsyncRequestsHTTPDriver):
49+
def __init__(
50+
self,
51+
request_handler: StreamingRequestHandler,
52+
*,
53+
config=None,
54+
logger: Logger = None,
55+
):
56+
super().__init__(config)
57+
if not request_handler:
58+
raise TypeError(
59+
f"'request_handler: {request_handler.__class__.__name__}' argument can't be None"
60+
)
61+
self._request_handler = request_handler
62+
self._logger = logger
63+
64+
async def send(
65+
self, request: ClientRequest, **config: Any # pylint: disable=unused-argument
66+
) -> AsyncClientResponse:
67+
# TODO: validate form of request to perform operations
68+
streaming_request = StreamingRequest(
69+
path=request.url[request.url.index("v3/") :], verb=request.method
70+
)
71+
streaming_request.set_body(request.data)
72+
73+
return await self._send_request(streaming_request)
74+
75+
async def _send_request(
76+
self, request: StreamingRequest
77+
) -> StreamingProtocolClientResponse:
78+
try:
79+
server_response = await self._request_handler.send_streaming_request(
80+
request
81+
)
82+
83+
if not server_response:
84+
raise Exception("Server response from streaming request is None")
85+
86+
if server_response.status_code == HTTPStatus.OK:
87+
# TODO: this should be an object read from json
88+
89+
return StreamingProtocolClientResponse(request, server_response)
90+
except Exception as error:
91+
# TODO: log error
92+
raise error
93+
94+
return None

0 commit comments

Comments
 (0)