Skip to content
This repository was archived by the owner on Dec 4, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -282,20 +282,18 @@ private CompletableFuture<Void> emitTraceInfo(
QnAMakerOptions withOptions
) {
String knowledgeBaseId = this.endpoint.getKnowledgeBaseId();
QnAMakerTraceInfo traceInfo = new QnAMakerTraceInfo() {
{
setMessage(messageActivity);
setQueryResults(result);
setKnowledgeBaseId(knowledgeBaseId);
setScoreThreshold(withOptions.getScoreThreshold());
setTop(withOptions.getTop());
setStrictFilters(withOptions.getStrictFilters());
setContext(withOptions.getContext());
setQnAId(withOptions.getQnAId());
setIsTest(withOptions.getIsTest());
setRankerType(withOptions.getRankerType());
}
};
QnAMakerTraceInfo traceInfo = new QnAMakerTraceInfo();
traceInfo.setMessage(messageActivity);
traceInfo.setQueryResults(result);
traceInfo.setKnowledgeBaseId(knowledgeBaseId);
traceInfo.setScoreThreshold(withOptions.getScoreThreshold());
traceInfo.setTop(withOptions.getTop());
traceInfo.setStrictFilters(withOptions.getStrictFilters());
traceInfo.setContext(withOptions.getContext());
traceInfo.setQnAId(withOptions.getQnAId());
traceInfo.setIsTest(withOptions.getIsTest());
traceInfo.setRankerType(withOptions.getRankerType());

Activity traceActivity = Activity.createTraceActivity(
QnAMaker.QNA_MAKER_NAME,
QnAMaker.QNA_MAKER_TRACE_TYPE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
Expand Down Expand Up @@ -126,21 +125,17 @@ public void qnaMakerTraceActivity() {
Assert.assertTrue(results.length == 1);
Assert.assertEquals("BaseCamp: You can use a damp rag to clean around the Power Pack", results[0].getAnswer());
}

conversationId[0] = turnContext.getActivity().getConversation().getId();
Activity typingActivity = new Activity() {
{
setType(ActivityTypes.TYPING);
setRelatesTo(turnContext.getActivity().getRelatesTo());
}
};
turnContext.sendActivity(typingActivity).join();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
// Empty error
delay(500);
conversationId[0] = turnContext.getActivity().getConversation().getId();
Activity typingActivity = new Activity() {
{
setType(ActivityTypes.TYPING);
setRelatesTo(turnContext.getActivity().getRelatesTo());
}
turnContext.sendActivity(String.format("echo:%s", turnContext.getActivity().getText())).join();
};
turnContext.sendActivity(typingActivity).join();
delay(500);
turnContext.sendActivity(String.format("echo:%s", turnContext.getActivity().getText())).join();
return CompletableFuture.completedFuture(null);
})
.send("how do I clean the stove?")
Expand Down Expand Up @@ -2087,6 +2082,18 @@ private void enqueueResponse(MockWebServer mockWebServer, JsonNode response) thr
.setBody(mockResponse));
}

/**
* Time period delay.
* @param milliseconds Time to delay.
*/
private void delay(int milliseconds) {
try {
Thread.sleep(milliseconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public class OverrideTelemetry extends QnAMaker {

public OverrideTelemetry(QnAMakerEndpoint endpoint, QnAMakerOptions options,
Expand Down
6 changes: 6 additions & 0 deletions libraries/bot-azure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
<version>12.10.0</version>
</dependency>
</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.microsoft.bot.azure.blobs;

import com.azure.core.exception.HttpResponseException;
import com.azure.core.util.Context;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import com.azure.storage.blob.models.BlobErrorCode;
import com.azure.storage.blob.models.BlobRequestConditions;
import com.azure.storage.blob.models.BlobStorageException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.MismatchedInputException;
import com.microsoft.bot.builder.Storage;
import com.microsoft.bot.builder.StoreItem;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpStatus;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
* Implements {@link Storage} using Azure Storage Blobs.
* This class uses a single Azure Storage Blob Container.
* Each entity or {@link StoreItem} is serialized into a JSON string and stored in an individual text blob.
* Each blob is named after the store item key, which is encoded so that it conforms a valid blob name.
* an entity is an {@link StoreItem}, the storage object will set the entity's {@link StoreItem}
* property value to the blob's ETag upon read. Afterward, an {@link BlobRequestConditions} with the ETag value
* will be generated during Write. New entities start with a null ETag.
*/
public class BlobsStorage implements Storage {

private ObjectMapper objectMapper;
private final BlobContainerClient containerClient;

private final Integer millisecondsTimeout = 2000;
private final Integer retryTimes = 8;

/**
* Initializes a new instance of the {@link BlobsStorage} class.
* @param dataConnectionString Azure Storage connection string.
* @param containerName Name of the Blob container where entities will be stored.
*/
public BlobsStorage(String dataConnectionString, String containerName) {
if (StringUtils.isBlank(dataConnectionString)) {
throw new IllegalArgumentException("dataConnectionString is required.");
}

if (StringUtils.isBlank(containerName)) {
throw new IllegalArgumentException("containerName is required.");
}

objectMapper = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.findAndRegisterModules()
.enableDefaultTyping();

containerClient = new BlobContainerClientBuilder()
.connectionString(dataConnectionString)
.containerName(containerName)
.buildClient();
}

/**
* Deletes entity blobs from the configured container.
* @param keys An array of entity keys.
* @return A task that represents the work queued to execute.
*/
@Override
public CompletableFuture<Void> delete(String[] keys) {
if (keys == null) {
throw new IllegalArgumentException("The 'keys' parameter is required.");
}

for (String key: keys) {
String blobName = getBlobName(key);
BlobClient blobClient = containerClient.getBlobClient(blobName);
if (blobClient.exists()) {
try {
blobClient.delete();
} catch (BlobStorageException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}

return CompletableFuture.completedFuture(null);
}

/**
* Retrieve entities from the configured blob container.
* @param keys An array of entity keys.
* @return A task that represents the work queued to execute.
*/
@Override
public CompletableFuture<Map<String, Object>> read(String[] keys) {
if (keys == null) {
throw new IllegalArgumentException("The 'keys' parameter is required.");
}

if (!containerClient.exists()) {
try {
containerClient.create();
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}

Map<String, Object> items = new HashMap<>();

for (String key : keys) {
String blobName = getBlobName(key);
BlobClient blobClient = containerClient.getBlobClient(blobName);
innerReadBlob(blobClient).thenAccept(value -> {
if (value != null) {
items.put(key, value);
}
});
}
return CompletableFuture.completedFuture(items);
}

/**
* Stores a new entity in the configured blob container.
* @param changes The changes to write to storage.
* @return A task that represents the work queued to execute.
*/
public CompletableFuture<Void> write(Map<String, Object> changes) {
if (changes == null) {
throw new IllegalArgumentException("The 'changes' parameter is required.");
}

if (!containerClient.exists()) {
try {
containerClient.create();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

for (Map.Entry<String, Object> keyValuePair : changes.entrySet()) {
Object newValue = keyValuePair.getValue();
StoreItem storeItem = newValue instanceof StoreItem ? (StoreItem) newValue : null;

// "*" eTag in StoreItem converts to null condition for AccessCondition
boolean isNullOrEmpty = storeItem == null || StringUtils.isBlank(storeItem.getETag())
|| storeItem.getETag().equals("*");
BlobRequestConditions accessCondition = !isNullOrEmpty
? new BlobRequestConditions().setIfMatch(storeItem.getETag())
: null;

String blobName = getBlobName(keyValuePair.getKey());
BlobClient blobReference = containerClient.getBlobClient(blobName);
try {
String json = objectMapper.writeValueAsString(newValue);
InputStream stream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
//verify the corresponding length
blobReference.uploadWithResponse(stream, stream.available(),
null, null,
null, null, accessCondition, null, Context.NONE);
} catch (HttpResponseException e) {
if (e.getResponse().getStatusCode() == HttpStatus.SC_BAD_REQUEST) {
StringBuilder sb =
new StringBuilder("An error occurred while trying to write an object. The underlying ");
sb.append(BlobErrorCode.INVALID_BLOCK_LIST);
sb.append(" error is commonly caused due to "
+ "concurrently uploading an object larger than 128MB in size.");

throw new HttpResponseException(sb.toString(), e.getResponse());
}
} catch (IOException e) {
e.printStackTrace();
}
}

return CompletableFuture.completedFuture(null);
}

private static String getBlobName(String key) {
if (StringUtils.isBlank(key)) {
throw new IllegalArgumentException("The 'key' parameter is required.");
}

String blobName;
try {
blobName = URLEncoder.encode(key, StandardCharsets.UTF_8.toString());
} catch (UnsupportedEncodingException e) {
throw new IllegalArgumentException("The key could not be encoded");
}

return blobName;
}

private CompletableFuture<Object> innerReadBlob(BlobClient blobReference) {
Integer i = 0;
while (true) {
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
blobReference.download(outputStream);
String contentString = outputStream.toString();

Object obj;
// We are doing this try/catch because we are receiving String or HashMap
try {
// We need to deserialize to an Object class since there are contentString which has an Object type
obj = objectMapper.readValue(contentString, Object.class);
} catch (MismatchedInputException ex) {
// In case of the contentString has the structure of a HashMap,
// we need to deserialize it to a HashMap object
obj = objectMapper.readValue(contentString, HashMap.class);
}

if (obj instanceof StoreItem) {
String eTag = blobReference.getProperties().getETag();
((StoreItem) obj).setETag(eTag);
}

return CompletableFuture.completedFuture(obj);
} catch (HttpResponseException e) {
if (e.getResponse().getStatusCode() == HttpStatus.SC_PRECONDITION_FAILED) {
// additional retry logic,
// even though this is a read operation blob storage can return 412 if there is contention
if (i++ < retryTimes) {
try {
TimeUnit.MILLISECONDS.sleep(millisecondsTimeout);
continue;
} catch (InterruptedException ex) {
break;
}
}
throw e;
} else {
break;
}
} catch (IOException e) {
e.printStackTrace();
break;
}
}
return CompletableFuture.completedFuture(null);
}
}
Loading