Skip to content
This repository was archived by the owner on Dec 4, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.microsoft.azure.documentdb.RequestOptions;
import com.microsoft.bot.builder.Storage;
import com.microsoft.bot.builder.StoreItem;
import com.microsoft.bot.connector.ExecutorFactory;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -135,7 +134,7 @@ public CompletableFuture<Map<String, Object>> read(String[] keys) {
return CompletableFuture.completedFuture(new HashMap<>());
}

return getCollection().thenApplyAsync(collection -> {
return getCollection().thenApply(collection -> {
// Issue all of the reads at once
List<CompletableFuture<Document>> documentFutures = new ArrayList<>();
for (String key : keys) {
Expand Down Expand Up @@ -175,7 +174,7 @@ public CompletableFuture<Map<String, Object>> read(String[] keys) {
});

return storeItems;
}, ExecutorFactory.getExecutor());
});
}

/**
Expand All @@ -196,7 +195,7 @@ public CompletableFuture<Void> write(Map<String, Object> changes) {
return CompletableFuture.completedFuture(null);
}

return getCollection().thenApplyAsync(collection -> {
return getCollection().thenApply(collection -> {
for (Map.Entry<String, Object> change : changes.entrySet()) {
try {
ObjectNode node = objectMapper.valueToTree(change.getValue());
Expand Down Expand Up @@ -242,6 +241,9 @@ public CompletableFuture<Void> write(Map<String, Object> changes) {

} catch (JsonProcessingException | DocumentClientException e) {
logger.warn("Error upserting document: " + change.getKey(), e);
if (e instanceof DocumentClientException) {
throw new RuntimeException(e.getMessage());
}
}
}

Expand All @@ -265,7 +267,7 @@ public CompletableFuture<Void> delete(String[] keys) {
return getCollection().thenCompose(collection -> Arrays.stream(keys).map(key -> {
String escapedKey = CosmosDbKeyEscape
.escapeKey(key, cosmosDbStorageOptions.getKeySuffix(), cosmosDbStorageOptions.getCompatibilityMode());
return getDocumentById(escapedKey).thenApplyAsync(document -> {
return getDocumentById(escapedKey).thenApply(document -> {
if (document != null) {
try {
RequestOptions options = new RequestOptions();
Expand All @@ -279,7 +281,7 @@ public CompletableFuture<Void> delete(String[] keys) {
}

return null;
}, ExecutorFactory.getExecutor());
});
}).collect(CompletableFutures.toFutureList()).thenApply(deleteResponses -> null));
}

Expand Down Expand Up @@ -324,7 +326,6 @@ private CompletableFuture<DocumentCollection> getCollection() {
return CompletableFuture.completedFuture(collectionCache);
}

return CompletableFuture.supplyAsync(() -> {
// Get the collection if it exists.
List<DocumentCollection> collectionList = client.queryCollections(
getDatabase().getSelfLink(),
Expand Down Expand Up @@ -359,14 +360,12 @@ private CompletableFuture<DocumentCollection> getCollection() {
throw new RuntimeException("getCollection", e);
}
}

return collectionCache;
}, ExecutorFactory.getExecutor());
return CompletableFuture.completedFuture(collectionCache);
}
}

private CompletableFuture<Document> getDocumentById(String id) {
return getCollection().thenApplyAsync(collection -> {
return getCollection().thenApply(collection -> {
// Retrieve the document using the DocumentClient.
List<Document> documentList = client
.queryDocuments(collection.getSelfLink(), "SELECT * FROM root r WHERE r.id='" + id + "'", null)
Expand All @@ -378,7 +377,7 @@ private CompletableFuture<Document> getDocumentById(String id) {
} else {
return null;
}
}, ExecutorFactory.getExecutor());
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MT License.

package com.microsoft.bot.builder.teams;

import java.net.HttpURLConnection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import com.microsoft.bot.builder.InvokeResponse;
import com.microsoft.bot.builder.Middleware;
import com.microsoft.bot.builder.NextDelegate;
import com.microsoft.bot.builder.Storage;
import com.microsoft.bot.builder.StoreItem;
import com.microsoft.bot.builder.TurnContext;
import com.microsoft.bot.builder.UserTokenProvider;
import com.microsoft.bot.connector.rest.RestOAuthClient;
import com.microsoft.bot.schema.Activity;
import com.microsoft.bot.schema.ActivityTypes;
import com.microsoft.bot.schema.Serialization;
import com.microsoft.bot.schema.SignInConstants;
import com.microsoft.bot.schema.TokenExchangeInvokeRequest;
import com.microsoft.bot.schema.TokenExchangeInvokeResponse;
import com.microsoft.bot.schema.TokenExchangeRequest;
import com.microsoft.bot.schema.TokenResponse;

import org.apache.commons.lang3.StringUtils;

/**
* If the activity name is signin/tokenExchange, this middleware will attempt
* toexchange the token, and deduplicate the incoming call, ensuring only
* oneexchange request is processed.
*
* If a user is signed into multiple Teams clients, the Bot could receive a
* "signin/tokenExchange" from each client. Each token exchange request for a
* specific user login will have an identical Activity.getValue().getId(). Only
* one of these token exchange requests should be processed by the bot.The
* others return PreconditionFailed. For a distributed bot in production, this
* requires a distributed storage ensuring only one token exchange is processed.
* This middleware supports CosmosDb storage found in
* Microsoft.getBot().getBuilder().getAzure(), or MemoryStorage for local
* development. Storage's ETag implementation for token exchange activity
* deduplication.
*/
public class TeamsSSOTokenExchangeMiddleware implements Middleware {

private final Storage storage;
private final String oAuthConnectionName;

/**
* Initializes a new instance of the {@link TeamsSSOTokenExchangeMiddleware}
* class.
*
* @param storage The {@link Storage} to use for deduplication.
* @param connectionName The connection name to use for the single sign on token
* exchange.
*/
public TeamsSSOTokenExchangeMiddleware(Storage storage, String connectionName) {
if (storage == null) {
throw new IllegalArgumentException("storage cannot be null.");
}

if (StringUtils.isBlank(connectionName)) {
throw new IllegalArgumentException("connectionName cannot be null.");
}

this.oAuthConnectionName = connectionName;
this.storage = storage;
}

/**
* Processes an incoming activity.
*
* @param turnContext The context object for this turn.
* @param next The delegate to call to continue the bot middleware
* pipeline.
* @return A task that represents the work queued to execute. Middleware calls
* the {@code next} delegate to pass control to the next middleware in
* the pipeline. If middleware doesn’t call the next delegate, the
* adapter does not call any of the subsequent middleware’s request
* handlers or the bot’s receive handler, and the pipeline short
* circuits.
* <p>
* The {@code context} provides information about the incoming activity,
* and other data needed to process the activity.
* </p>
* <p>
* {@link TurnContext} {@link com.microsoft.bot.schema.Activity}
*/
public CompletableFuture<Void> onTurn(TurnContext turnContext, NextDelegate next) {
if (turnContext.getActivity() != null && turnContext.getActivity().getName() != null
&& turnContext.getActivity().getName().equals(SignInConstants.TOKEN_EXCHANGE_OPERATION_NAME)) {
// If the TokenExchange is NOT successful, the response will have
// already been sent by ExchangedTokenAsync
if (!this.exchangedToken(turnContext).join()) {
return CompletableFuture.completedFuture(null);
}

// Only one token exchange should proceed from here. Deduplication is performed
// second because in the case of failure due to consent required, every caller
// needs to receive the
if (!deDuplicatedTokenExchangeId(turnContext).join()) {
// If the token is not exchangeable, do not process this activity further.
return CompletableFuture.completedFuture(null);
}
}

return next.next();
}

private CompletableFuture<Boolean> deDuplicatedTokenExchangeId(TurnContext turnContext) {

// Create a StoreItem with Etag of the unique 'signin/tokenExchange' request
String idValue = null;
TokenStoreItem storeItem = new TokenStoreItem();
TokenExchangeInvokeRequest tokenExchangeRequest = Serialization.getAs(turnContext.getActivity().getValue(),
TokenExchangeInvokeRequest.class);
if (tokenExchangeRequest != null) {
idValue = tokenExchangeRequest.getId();
}

storeItem.setETag(idValue);

Map<String, Object> storeItems = new HashMap<String, Object>();
storeItems.put(storeItem.getStorageKey(turnContext), storeItem);
try {
// Writing the StoreItem with ETag of unique id will succeed only once
storage.write(storeItems).join();
} catch (Exception ex) {

// Memory storage throws a generic exception with a Message of 'etag conflict.
// [other error info]'
// CosmosDbPartitionedStorage throws: RuntimeException with a message that contains "precondition is
// not met")
if (ex.getMessage().contains("eTag conflict") || ex.getMessage().contains("precondition is not met")) {
// Do NOT proceed processing this message, some other thread or
// machine already has processed it.

// Send 200 invoke response.
return sendInvokeResponse(turnContext, null, HttpURLConnection.HTTP_OK).thenApply(result -> false);
}
}

return CompletableFuture.completedFuture(true);
}

private CompletableFuture<Void> sendInvokeResponse(TurnContext turnContext, Object body, int statusCode) {
Activity activity = new Activity(ActivityTypes.INVOKE_RESPONSE);
InvokeResponse response = new InvokeResponse(statusCode, body);
activity.setValue(response);
return turnContext.sendActivity(activity).thenApply(result -> null);
}

@SuppressWarnings("PMD.EmptyCatchBlock")
private CompletableFuture<Boolean> exchangedToken(TurnContext turnContext) {
TokenResponse tokenExchangeResponse = null;
TokenExchangeInvokeRequest tokenExchangeRequest = Serialization.getAs(turnContext.getActivity().getValue(),
TokenExchangeInvokeRequest.class);

try {
RestOAuthClient userTokenClient = turnContext.getTurnState().get(RestOAuthClient.class);
TokenExchangeRequest exchangeRequest = new TokenExchangeRequest();
exchangeRequest.setToken(tokenExchangeRequest.getToken());
if (userTokenClient != null) {
tokenExchangeResponse = userTokenClient.getUserToken()
.exchangeToken(turnContext.getActivity().getFrom().getId(), oAuthConnectionName,
turnContext.getActivity().getChannelId(), exchangeRequest)
.join();
} else if (turnContext.getAdapter() instanceof UserTokenProvider) {
UserTokenProvider adapter = (UserTokenProvider) turnContext.getAdapter();
tokenExchangeResponse = adapter.exchangeToken(turnContext, oAuthConnectionName,
turnContext.getActivity().getFrom().getId(), exchangeRequest).join();
} else {
throw new RuntimeException("Token Exchange is not supported by the current adapter.");
}
} catch (Exception ex) {
// Ignore Exceptions
// If token exchange failed for any reason, tokenExchangeResponse above stays
// null,
// and hence we send back a failure invoke response to the caller.
}

if (tokenExchangeResponse != null && StringUtils.isEmpty(tokenExchangeResponse.getToken())) {
// The token could not be exchanged (which could be due to a consent
// requirement)
// Notify the sender that PreconditionFailed so they can respond accordingly.

TokenExchangeInvokeResponse invokeResponse = new TokenExchangeInvokeResponse();
invokeResponse.setId(tokenExchangeRequest.getId());
invokeResponse.setConnectionName(oAuthConnectionName);
invokeResponse.setFailureDetail("The bot is unable to exchange token. Proceed with regular login.");

sendInvokeResponse(turnContext, invokeResponse, HttpURLConnection.HTTP_PRECON_FAILED);

return CompletableFuture.completedFuture(false);
}

return CompletableFuture.completedFuture(true);
}

/**
* Class to store the etag for token exchange.
*/
private class TokenStoreItem implements StoreItem {

private String etag;

@Override
public String getETag() {
return etag;
}

@Override
public void setETag(String withETag) {
etag = withETag;
}

public String getStorageKey(TurnContext turnContext) {
Activity activity = turnContext.getActivity();
if (activity.getChannelId() == null) {
throw new RuntimeException("invalid activity-missing channelId");
}
if (activity.getConversation() == null || activity.getConversation().getId() == null) {
throw new RuntimeException("invalid activity-missing Conversation.Id");
}

String channelId = activity.getChannelId();
String conversationId = activity.getConversation().getId();

TokenExchangeInvokeRequest tokenExchangeRequest = Serialization.getAs(turnContext.getActivity().getValue(),
TokenExchangeInvokeRequest.class);

if (tokenExchangeRequest != null) {
return String.format("%s/%s/%s", channelId, conversationId, tokenExchangeRequest.getId());
} else {
throw new RuntimeException("Invalid signin/tokenExchange. Missing activity.getValue().getId().");
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.microsoft.bot.schema.CardAction;
import com.microsoft.bot.schema.InputHints;
import com.microsoft.bot.schema.OAuthCard;
import com.microsoft.bot.schema.Serialization;
import com.microsoft.bot.schema.SignInConstants;
import com.microsoft.bot.schema.SignInResource;
import com.microsoft.bot.schema.SigninCard;
Expand Down Expand Up @@ -341,9 +342,8 @@ public static CompletableFuture<PromptRecognizerResult<TokenResponse>> recognize
sendInvokeResponse(turnContext, HttpURLConnection.HTTP_INTERNAL_ERROR, null);
}
} else if (isTokenExchangeRequestInvoke(turnContext)) {
TokenExchangeInvokeRequest tokenExchangeRequest =
turnContext.getActivity().getValue() instanceof TokenExchangeInvokeRequest
? (TokenExchangeInvokeRequest) turnContext.getActivity().getValue() : null;
TokenExchangeInvokeRequest tokenExchangeRequest = Serialization.getAs(turnContext.getActivity().getValue(),
TokenExchangeInvokeRequest.class);

if (tokenExchangeRequest == null) {
TokenExchangeInvokeResponse response = new TokenExchangeInvokeResponse();
Expand All @@ -353,7 +353,7 @@ public static CompletableFuture<PromptRecognizerResult<TokenResponse>> recognize
+ "TokenExchangeInvokeRequest value. This is required to be "
+ "sent with the InvokeActivity.");
sendInvokeResponse(turnContext, HttpURLConnection.HTTP_BAD_REQUEST, response).join();
} else if (tokenExchangeRequest.getConnectionName() != settings.getConnectionName()) {
} else if (!tokenExchangeRequest.getConnectionName().equals(settings.getConnectionName())) {
TokenExchangeInvokeResponse response = new TokenExchangeInvokeResponse();
response.setId(tokenExchangeRequest.getId());
response.setConnectionName(settings.getConnectionName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.microsoft.bot.schema.ConversationAccount;
import com.microsoft.bot.schema.InputHints;
import com.microsoft.bot.schema.OAuthCard;
import com.microsoft.bot.schema.Serialization;
import com.microsoft.bot.schema.SignInConstants;
import com.microsoft.bot.schema.TokenExchangeInvokeRequest;
import com.microsoft.bot.schema.TokenExchangeInvokeResponse;
Expand Down Expand Up @@ -292,7 +293,7 @@ public void OAuthPromptWithTokenExchangeInvoke() {
value.setToken(exchangeToken);
Activity activityToSend = new Activity(ActivityTypes.INVOKE);
activityToSend.setName(SignInConstants.TOKEN_EXCHANGE_OPERATION_NAME);
activityToSend.setValue(value);
activityToSend.setValue(Serialization.objectToTree(value));

new TestFlow(adapter, botCallbackHandler)
.send("hello")
Expand Down Expand Up @@ -362,7 +363,7 @@ public void OAuthPromptWithTokenExchangeFail() {
value.setToken(exchangeToken);
Activity activityToSend = new Activity(ActivityTypes.INVOKE);
activityToSend.setName(SignInConstants.TOKEN_EXCHANGE_OPERATION_NAME);
activityToSend.setValue(value);
activityToSend.setValue(Serialization.objectToTree(value));

new TestFlow(adapter, botCallbackHandler)
.send("hello")
Expand Down