diff --git a/libraries/bot-ai-qna/src/main/java/com/microsoft/bot/ai/qna/utils/GenerateAnswerUtils.java b/libraries/bot-ai-qna/src/main/java/com/microsoft/bot/ai/qna/utils/GenerateAnswerUtils.java index 9436b7a7d..52c09b3c0 100644 --- a/libraries/bot-ai-qna/src/main/java/com/microsoft/bot/ai/qna/utils/GenerateAnswerUtils.java +++ b/libraries/bot-ai-qna/src/main/java/com/microsoft/bot/ai/qna/utils/GenerateAnswerUtils.java @@ -282,20 +282,18 @@ private CompletableFuture emitTraceInfo( QnAMakerOptions withOptions ) { String knowledgeBaseId = this.endpoint.getKnowledgeBaseId(); - QnAMakerTraceInfo traceInfo = new QnAMakerTraceInfo() { - { - setMessage(messageActivity); - setQueryResults(result); - setKnowledgeBaseId(knowledgeBaseId); - setScoreThreshold(withOptions.getScoreThreshold()); - setTop(withOptions.getTop()); - setStrictFilters(withOptions.getStrictFilters()); - setContext(withOptions.getContext()); - setQnAId(withOptions.getQnAId()); - setIsTest(withOptions.getIsTest()); - setRankerType(withOptions.getRankerType()); - } - }; + QnAMakerTraceInfo traceInfo = new QnAMakerTraceInfo(); + traceInfo.setMessage(messageActivity); + traceInfo.setQueryResults(result); + traceInfo.setKnowledgeBaseId(knowledgeBaseId); + traceInfo.setScoreThreshold(withOptions.getScoreThreshold()); + traceInfo.setTop(withOptions.getTop()); + traceInfo.setStrictFilters(withOptions.getStrictFilters()); + traceInfo.setContext(withOptions.getContext()); + traceInfo.setQnAId(withOptions.getQnAId()); + traceInfo.setIsTest(withOptions.getIsTest()); + traceInfo.setRankerType(withOptions.getRankerType()); + Activity traceActivity = Activity.createTraceActivity( QnAMaker.QNA_MAKER_NAME, QnAMaker.QNA_MAKER_TRACE_TYPE, diff --git a/libraries/bot-ai-qna/src/test/java/com/microsoft/bot/ai/qna/QnAMakerTests.java b/libraries/bot-ai-qna/src/test/java/com/microsoft/bot/ai/qna/QnAMakerTests.java index 133c4db45..0dc66d4cb 100644 --- a/libraries/bot-ai-qna/src/test/java/com/microsoft/bot/ai/qna/QnAMakerTests.java +++ b/libraries/bot-ai-qna/src/test/java/com/microsoft/bot/ai/qna/QnAMakerTests.java @@ -11,7 +11,6 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; @@ -126,21 +125,17 @@ public void qnaMakerTraceActivity() { Assert.assertTrue(results.length == 1); Assert.assertEquals("BaseCamp: You can use a damp rag to clean around the Power Pack", results[0].getAnswer()); } - - conversationId[0] = turnContext.getActivity().getConversation().getId(); - Activity typingActivity = new Activity() { - { - setType(ActivityTypes.TYPING); - setRelatesTo(turnContext.getActivity().getRelatesTo()); - } - }; - turnContext.sendActivity(typingActivity).join(); - try { - TimeUnit.SECONDS.sleep(5); - } catch (InterruptedException e) { - // Empty error + delay(500); + conversationId[0] = turnContext.getActivity().getConversation().getId(); + Activity typingActivity = new Activity() { + { + setType(ActivityTypes.TYPING); + setRelatesTo(turnContext.getActivity().getRelatesTo()); } - turnContext.sendActivity(String.format("echo:%s", turnContext.getActivity().getText())).join(); + }; + turnContext.sendActivity(typingActivity).join(); + delay(500); + turnContext.sendActivity(String.format("echo:%s", turnContext.getActivity().getText())).join(); return CompletableFuture.completedFuture(null); }) .send("how do I clean the stove?") @@ -2087,6 +2082,18 @@ private void enqueueResponse(MockWebServer mockWebServer, JsonNode response) thr .setBody(mockResponse)); } + /** + * Time period delay. + * @param milliseconds Time to delay. + */ + private void delay(int milliseconds) { + try { + Thread.sleep(milliseconds); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + public class OverrideTelemetry extends QnAMaker { public OverrideTelemetry(QnAMakerEndpoint endpoint, QnAMakerOptions options, diff --git a/libraries/bot-azure/pom.xml b/libraries/bot-azure/pom.xml index 54954bae1..9ef60875d 100644 --- a/libraries/bot-azure/pom.xml +++ b/libraries/bot-azure/pom.xml @@ -86,6 +86,12 @@ test-jar test + + + com.azure + azure-storage-blob + 12.10.0 + diff --git a/libraries/bot-azure/src/main/java/com/microsoft/bot/azure/blobs/BlobsStorage.java b/libraries/bot-azure/src/main/java/com/microsoft/bot/azure/blobs/BlobsStorage.java new file mode 100644 index 000000000..19bda96ba --- /dev/null +++ b/libraries/bot-azure/src/main/java/com/microsoft/bot/azure/blobs/BlobsStorage.java @@ -0,0 +1,255 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.microsoft.bot.azure.blobs; + +import com.azure.core.exception.HttpResponseException; +import com.azure.core.util.Context; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobContainerClientBuilder; +import com.azure.storage.blob.models.BlobErrorCode; +import com.azure.storage.blob.models.BlobRequestConditions; +import com.azure.storage.blob.models.BlobStorageException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.exc.MismatchedInputException; +import com.microsoft.bot.builder.Storage; +import com.microsoft.bot.builder.StoreItem; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpStatus; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +/** + * Implements {@link Storage} using Azure Storage Blobs. + * This class uses a single Azure Storage Blob Container. + * Each entity or {@link StoreItem} is serialized into a JSON string and stored in an individual text blob. + * Each blob is named after the store item key, which is encoded so that it conforms a valid blob name. + * an entity is an {@link StoreItem}, the storage object will set the entity's {@link StoreItem} + * property value to the blob's ETag upon read. Afterward, an {@link BlobRequestConditions} with the ETag value + * will be generated during Write. New entities start with a null ETag. + */ +public class BlobsStorage implements Storage { + + private ObjectMapper objectMapper; + private final BlobContainerClient containerClient; + + private final Integer millisecondsTimeout = 2000; + private final Integer retryTimes = 8; + + /** + * Initializes a new instance of the {@link BlobsStorage} class. + * @param dataConnectionString Azure Storage connection string. + * @param containerName Name of the Blob container where entities will be stored. + */ + public BlobsStorage(String dataConnectionString, String containerName) { + if (StringUtils.isBlank(dataConnectionString)) { + throw new IllegalArgumentException("dataConnectionString is required."); + } + + if (StringUtils.isBlank(containerName)) { + throw new IllegalArgumentException("containerName is required."); + } + + objectMapper = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .findAndRegisterModules() + .enableDefaultTyping(); + + containerClient = new BlobContainerClientBuilder() + .connectionString(dataConnectionString) + .containerName(containerName) + .buildClient(); + } + + /** + * Deletes entity blobs from the configured container. + * @param keys An array of entity keys. + * @return A task that represents the work queued to execute. + */ + @Override + public CompletableFuture delete(String[] keys) { + if (keys == null) { + throw new IllegalArgumentException("The 'keys' parameter is required."); + } + + for (String key: keys) { + String blobName = getBlobName(key); + BlobClient blobClient = containerClient.getBlobClient(blobName); + if (blobClient.exists()) { + try { + blobClient.delete(); + } catch (BlobStorageException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + } + + return CompletableFuture.completedFuture(null); + } + + /** + * Retrieve entities from the configured blob container. + * @param keys An array of entity keys. + * @return A task that represents the work queued to execute. + */ + @Override + public CompletableFuture> read(String[] keys) { + if (keys == null) { + throw new IllegalArgumentException("The 'keys' parameter is required."); + } + + if (!containerClient.exists()) { + try { + containerClient.create(); + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + Map items = new HashMap<>(); + + for (String key : keys) { + String blobName = getBlobName(key); + BlobClient blobClient = containerClient.getBlobClient(blobName); + innerReadBlob(blobClient).thenAccept(value -> { + if (value != null) { + items.put(key, value); + } + }); + } + return CompletableFuture.completedFuture(items); + } + + /** + * Stores a new entity in the configured blob container. + * @param changes The changes to write to storage. + * @return A task that represents the work queued to execute. + */ + public CompletableFuture write(Map changes) { + if (changes == null) { + throw new IllegalArgumentException("The 'changes' parameter is required."); + } + + if (!containerClient.exists()) { + try { + containerClient.create(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + for (Map.Entry keyValuePair : changes.entrySet()) { + Object newValue = keyValuePair.getValue(); + StoreItem storeItem = newValue instanceof StoreItem ? (StoreItem) newValue : null; + + // "*" eTag in StoreItem converts to null condition for AccessCondition + boolean isNullOrEmpty = storeItem == null || StringUtils.isBlank(storeItem.getETag()) + || storeItem.getETag().equals("*"); + BlobRequestConditions accessCondition = !isNullOrEmpty + ? new BlobRequestConditions().setIfMatch(storeItem.getETag()) + : null; + + String blobName = getBlobName(keyValuePair.getKey()); + BlobClient blobReference = containerClient.getBlobClient(blobName); + try { + String json = objectMapper.writeValueAsString(newValue); + InputStream stream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); + //verify the corresponding length + blobReference.uploadWithResponse(stream, stream.available(), + null, null, + null, null, accessCondition, null, Context.NONE); + } catch (HttpResponseException e) { + if (e.getResponse().getStatusCode() == HttpStatus.SC_BAD_REQUEST) { + StringBuilder sb = + new StringBuilder("An error occurred while trying to write an object. The underlying "); + sb.append(BlobErrorCode.INVALID_BLOCK_LIST); + sb.append(" error is commonly caused due to " + + "concurrently uploading an object larger than 128MB in size."); + + throw new HttpResponseException(sb.toString(), e.getResponse()); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + return CompletableFuture.completedFuture(null); + } + + private static String getBlobName(String key) { + if (StringUtils.isBlank(key)) { + throw new IllegalArgumentException("The 'key' parameter is required."); + } + + String blobName; + try { + blobName = URLEncoder.encode(key, StandardCharsets.UTF_8.toString()); + } catch (UnsupportedEncodingException e) { + throw new IllegalArgumentException("The key could not be encoded"); + } + + return blobName; + } + + private CompletableFuture innerReadBlob(BlobClient blobReference) { + Integer i = 0; + while (true) { + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { + blobReference.download(outputStream); + String contentString = outputStream.toString(); + + Object obj; + // We are doing this try/catch because we are receiving String or HashMap + try { + // We need to deserialize to an Object class since there are contentString which has an Object type + obj = objectMapper.readValue(contentString, Object.class); + } catch (MismatchedInputException ex) { + // In case of the contentString has the structure of a HashMap, + // we need to deserialize it to a HashMap object + obj = objectMapper.readValue(contentString, HashMap.class); + } + + if (obj instanceof StoreItem) { + String eTag = blobReference.getProperties().getETag(); + ((StoreItem) obj).setETag(eTag); + } + + return CompletableFuture.completedFuture(obj); + } catch (HttpResponseException e) { + if (e.getResponse().getStatusCode() == HttpStatus.SC_PRECONDITION_FAILED) { + // additional retry logic, + // even though this is a read operation blob storage can return 412 if there is contention + if (i++ < retryTimes) { + try { + TimeUnit.MILLISECONDS.sleep(millisecondsTimeout); + continue; + } catch (InterruptedException ex) { + break; + } + } + throw e; + } else { + break; + } + } catch (IOException e) { + e.printStackTrace(); + break; + } + } + return CompletableFuture.completedFuture(null); + } +} diff --git a/libraries/bot-azure/src/main/java/com/microsoft/bot/azure/blobs/BlobsTranscriptStore.java b/libraries/bot-azure/src/main/java/com/microsoft/bot/azure/blobs/BlobsTranscriptStore.java new file mode 100644 index 000000000..ada2b7c32 --- /dev/null +++ b/libraries/bot-azure/src/main/java/com/microsoft/bot/azure/blobs/BlobsTranscriptStore.java @@ -0,0 +1,502 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.microsoft.bot.azure.blobs; + +import com.azure.core.exception.HttpResponseException; +import com.azure.core.http.rest.PagedResponse; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobContainerClientBuilder; +import com.azure.storage.blob.models.BlobItem; +import com.azure.storage.blob.models.BlobListDetails; +import com.azure.storage.blob.models.ListBlobsOptions; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.microsoft.bot.builder.BotAssert; +import com.microsoft.bot.builder.PagedResult; +import com.microsoft.bot.builder.TranscriptInfo; +import com.microsoft.bot.builder.TranscriptStore; +import com.microsoft.bot.schema.Activity; +import com.microsoft.bot.schema.ActivityTypes; +import com.microsoft.bot.schema.ChannelAccount; +import com.microsoft.bot.schema.Pair; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpStatus; + +import javax.annotation.Nullable; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * The blobs transcript store stores transcripts in an Azure Blob container. + * Each activity is stored as json blob in structure of + * container/{channelId]/{conversationId}/{Timestamp.ticks}-{activity.id}.json. + */ +public class BlobsTranscriptStore implements TranscriptStore { + + // Containers checked for creation. + private static final HashSet CHECKED_CONTAINERS = new HashSet(); + + private final Integer milisecondsTimeout = 2000; + private final Integer retryTimes = 3; + private final Integer longRadix = 16; + private final Integer multipleProductValue = 10_000_000; + + private final ObjectMapper jsonSerializer; + private BlobContainerClient containerClient; + + /** + * Initializes a new instance of the {@link BlobsTranscriptStore} class. + * @param dataConnectionString Azure Storage connection string. + * @param containerName Name of the Blob container where entities will be stored. + */ + public BlobsTranscriptStore(String dataConnectionString, String containerName) { + if (StringUtils.isBlank(dataConnectionString)) { + throw new IllegalArgumentException("dataConnectionString"); + } + + if (StringUtils.isBlank(containerName)) { + throw new IllegalArgumentException("containerName"); + } + + jsonSerializer = new ObjectMapper() + .setSerializationInclusion(JsonInclude.Include.NON_NULL) + .enable(SerializationFeature.INDENT_OUTPUT) + .findAndRegisterModules(); + + // Triggers a check for the existence of the container + containerClient = this.getContainerClient(dataConnectionString, containerName); + } + + /** + * Log an activity to the transcript. + * @param activity Activity being logged. + * @return A CompletableFuture that represents the work queued to execute. + */ + public CompletableFuture logActivity(Activity activity) { + BotAssert.activityNotNull(activity); + + switch (activity.getType()) { + case ActivityTypes.MESSAGE_UPDATE: + Activity updatedActivity = null; + try { + updatedActivity = jsonSerializer + .readValue(jsonSerializer.writeValueAsString(activity), Activity.class); + } catch (IOException ex) { + ex.printStackTrace(); + } + updatedActivity.setType(ActivityTypes.MESSAGE); // fixup original type (should be Message) + Activity finalUpdatedActivity = updatedActivity; + innerReadBlob(activity).thenAccept(activityAndBlob -> { + if (activityAndBlob != null && activityAndBlob.getLeft() != null) { + finalUpdatedActivity.setLocalTimestamp(activityAndBlob.getLeft().getLocalTimestamp()); + finalUpdatedActivity.setTimestamp(activityAndBlob.getLeft().getTimestamp()); + logActivityToBlobClient(finalUpdatedActivity, activityAndBlob.getRight(), true) + .thenApply(task -> CompletableFuture.completedFuture(null)); + } else { + // The activity was not found, so just add a record of this update. + this.innerLogActivity(finalUpdatedActivity) + .thenApply(task -> CompletableFuture.completedFuture(null)); + } + }); + + return CompletableFuture.completedFuture(null); + case ActivityTypes.MESSAGE_DELETE: + innerReadBlob(activity).thenAccept(activityAndBlob -> { + if (activityAndBlob != null && activityAndBlob.getLeft() != null) { + ChannelAccount from = new ChannelAccount(); + from.setId("deleted"); + from.setRole(activityAndBlob.getLeft().getFrom().getRole()); + ChannelAccount recipient = new ChannelAccount(); + recipient.setId("deleted"); + recipient.setRole(activityAndBlob.getLeft().getRecipient().getRole()); + + // tombstone the original message + Activity tombstonedActivity = new Activity(ActivityTypes.MESSAGE_DELETE); + tombstonedActivity.setId(activityAndBlob.getLeft().getId()); + tombstonedActivity.setFrom(from); + tombstonedActivity.setRecipient(recipient); + tombstonedActivity.setLocale(activityAndBlob.getLeft().getLocale()); + tombstonedActivity.setLocalTimestamp(activityAndBlob.getLeft().getTimestamp()); + tombstonedActivity.setTimestamp(activityAndBlob.getLeft().getTimestamp()); + tombstonedActivity.setChannelId(activityAndBlob.getLeft().getChannelId()); + tombstonedActivity.setConversation(activityAndBlob.getLeft().getConversation()); + tombstonedActivity.setServiceUrl(activityAndBlob.getLeft().getServiceUrl()); + tombstonedActivity.setReplyToId(activityAndBlob.getLeft().getReplyToId()); + + logActivityToBlobClient(tombstonedActivity, activityAndBlob.getRight(), true) + .thenApply(task -> CompletableFuture.completedFuture(null)); + } + }); + + return CompletableFuture.completedFuture(null); + default: + this.innerLogActivity(activity) + .thenApply(task -> CompletableFuture.completedFuture(null)); + return CompletableFuture.completedFuture(null); + } + } + + /** + * Get activities for a conversation (Aka the transcript). + * @param channelId The ID of the channel the conversation is in. + * @param conversationId The ID of the conversation. + * @param continuationToken The continuation token (if available). + * @param startDate A cutoff date. Activities older than this date are + * not included. + * @return PagedResult of activities. + */ + public CompletableFuture> getTranscriptActivities(String channelId, String conversationId, + @Nullable String continuationToken, + OffsetDateTime startDate) { + if (startDate == null) { + startDate = OffsetDateTime.MIN; + } + + final int pageSize = 20; + + if (StringUtils.isBlank(channelId)) { + throw new IllegalArgumentException("Missing channelId"); + } + + if (StringUtils.isBlank(conversationId)) { + throw new IllegalArgumentException("Missing conversationId"); + } + + PagedResult pagedResult = new PagedResult(); + + String token = null; + List blobs = new ArrayList(); + do { + String prefix = String.format("%s/%s/", sanitizeKey(channelId), sanitizeKey(conversationId)); + Iterable> resultSegment = containerClient + .listBlobsByHierarchy("/", this.getOptionsWithMetadata(prefix), null) + .iterableByPage(token); + token = null; + for (PagedResponse blobPage: resultSegment) { + for (BlobItem blobItem: blobPage.getValue()) { + OffsetDateTime parseDateTime = OffsetDateTime.parse(blobItem.getMetadata().get("Timestamp")); + if (parseDateTime.isAfter(startDate) + || parseDateTime.isEqual(startDate)) { + if (continuationToken != null) { + if (blobItem.getName().equals(continuationToken)) { + // we found continuation token + continuationToken = null; + } + } else { + blobs.add(blobItem); + if (blobs.size() == pageSize) { + break; + } + } + } + } + + // Get the continuation token and loop until it is empty. + token = blobPage.getContinuationToken(); + } + } while (!StringUtils.isBlank(token) && blobs.size() < pageSize); + + pagedResult.setItems(blobs + .stream() + .map(bl -> { + BlobClient blobClient = containerClient.getBlobClient(bl.getName()); + return this.getActivityFromBlobClient(blobClient); + }) + .map(t -> t.join()) + .collect(Collectors.toList())); + + if (pagedResult.getItems().size() == pageSize) { + pagedResult.setContinuationToken(blobs.get(blobs.size() - 1).getName()); + } + + return CompletableFuture.completedFuture(pagedResult); + } + + /** + * List conversations in the channelId. + * @param channelId The ID of the channel. + * @param continuationToken The continuation token (if available). + * @return A CompletableFuture that represents the work queued to execute. + */ + public CompletableFuture> listTranscripts(String channelId, + @Nullable String continuationToken) { + final int pageSize = 20; + + if (StringUtils.isBlank(channelId)) { + throw new IllegalArgumentException("Missing channelId"); + } + + String token = null; + + List conversations = new ArrayList(); + do { + String prefix = String.format("%s/", sanitizeKey(channelId)); + Iterable> resultSegment = containerClient. + listBlobsByHierarchy("/", this.getOptionsWithMetadata(prefix), null) + .iterableByPage(token); + token = null; + for (PagedResponse blobPage: resultSegment) { + for (BlobItem blobItem: blobPage.getValue()) { + // Unescape the Id we escaped when we saved it + String conversationId = new String(); + String lastName = Arrays.stream(blobItem.getName().split("/")) + .reduce((first, second) -> second.length() > 0 ? second : first).get(); + try { + conversationId = URLDecoder.decode(lastName, StandardCharsets.UTF_8.name()); + } catch (UnsupportedEncodingException ex) { + ex.printStackTrace(); + } + TranscriptInfo conversation = + new TranscriptInfo(conversationId, channelId, blobItem.getProperties().getCreationTime()); + if (continuationToken != null) { + if (StringUtils.equals(conversation.getId(), continuationToken)) { + // we found continuation token + continuationToken = null; + } + + // skip record + } else { + conversations.add(conversation); + if (conversations.size() == pageSize) { + break; + } + } + } + } + } while (!StringUtils.isBlank(token) && conversations.size() < pageSize); + + PagedResult pagedResult = new PagedResult(); + pagedResult.setItems(conversations); + + if (pagedResult.getItems().size() == pageSize) { + pagedResult.setContinuationToken(pagedResult.getItems().get(pagedResult.getItems().size() - 1).getId()); + } + + return CompletableFuture.completedFuture(pagedResult); + } + + /** + * Delete a specific conversation and all of it's activities. + * @param channelId The ID of the channel the conversation is in. + * @param conversationId The ID of the conversation to delete. + * @return A CompletableFuture that represents the work queued to execute. + */ + public CompletableFuture deleteTranscript(String channelId, String conversationId) { + if (StringUtils.isBlank(channelId)) { + throw new IllegalArgumentException("Missing channelId"); + } + + if (StringUtils.isBlank(conversationId)) { + throw new IllegalArgumentException("Missing conversationId"); + } + + String token = null; + do { + String prefix = String.format("%s/%s/", sanitizeKey(channelId), sanitizeKey(conversationId)); + Iterable> resultSegment = containerClient + .listBlobsByHierarchy("/", this.getOptionsWithMetadata(prefix), null).iterableByPage(token); + token = null; + + for (PagedResponse blobPage: resultSegment) { + for (BlobItem blobItem: blobPage.getValue()) { + BlobClient blobClient = containerClient.getBlobClient(blobItem.getName()); + if (blobClient.exists()) { + try { + blobClient.delete(); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + // Get the continuation token and loop until it is empty. + token = blobPage.getContinuationToken(); + } + } + } while (!StringUtils.isBlank(token)); + + return CompletableFuture.completedFuture(null); + } + + private CompletableFuture> innerReadBlob(Activity activity) { + int i = 0; + while (true) { + try { + String token = null; + do { + String prefix = String.format("%s/%s/", + sanitizeKey(activity.getChannelId()), sanitizeKey(activity.getConversation().getId())); + Iterable> resultSegment = containerClient + .listBlobsByHierarchy("/", + this.getOptionsWithMetadata(prefix), null).iterableByPage(token); + token = null; + for (PagedResponse blobPage: resultSegment) { + for (BlobItem blobItem: blobPage.getValue()) { + if (blobItem.getMetadata().get("Id").equals(activity.getId())) { + BlobClient blobClient = containerClient.getBlobClient(blobItem.getName()); + return this.getActivityFromBlobClient(blobClient) + .thenApply(blobActivity -> + new Pair(blobActivity, blobClient)); + } + } + + // Get the continuation token and loop until it is empty. + token = blobPage.getContinuationToken(); + } + } while (!StringUtils.isBlank(token)); + + return CompletableFuture.completedFuture(null); + } catch (HttpResponseException ex) { + if (ex.getResponse().getStatusCode() == HttpStatus.SC_PRECONDITION_FAILED) { + // additional retry logic, + // even though this is a read operation blob storage can return 412 if there is contention + if (i++ < retryTimes) { + try { + TimeUnit.MILLISECONDS.sleep(milisecondsTimeout); + continue; + } catch (InterruptedException e) { + break; + } + } + throw ex; + } + // This break will finish the while when the catch if condition is false + break; + } + } + return CompletableFuture.completedFuture(null); + } + + private CompletableFuture getActivityFromBlobClient(BlobClient blobClient) { + ByteArrayOutputStream content = new ByteArrayOutputStream(); + blobClient.download(content); + String contentString = new String(content.toByteArray()); + try { + return CompletableFuture.completedFuture(jsonSerializer.readValue(contentString, Activity.class)); + } catch (IOException ex) { + return CompletableFuture.completedFuture(null); + } + } + + private CompletableFuture innerLogActivity(Activity activity) { + String blobName = this.getBlobName(activity); + BlobClient blobClient = containerClient.getBlobClient(blobName); + return logActivityToBlobClient(activity, blobClient, null); + } + + private CompletableFuture logActivityToBlobClient(Activity activity, BlobClient blobClient, + Boolean overwrite) { + if (overwrite == null) { + overwrite = false; + } + String activityJson = null; + try { + activityJson = jsonSerializer.writeValueAsString(activity); + } catch (IOException ex) { + ex.printStackTrace(); + } + InputStream data = new ByteArrayInputStream(activityJson.getBytes(StandardCharsets.UTF_8)); + + try { + blobClient.upload(data, data.available(), overwrite); + } catch (IOException ex) { + ex.printStackTrace(); + } + Map metaData = new HashMap(); + metaData.put("Id", activity.getId()); + if (activity.getFrom() != null) { + metaData.put("FromId", activity.getFrom().getId()); + } + + if (activity.getRecipient() != null) { + metaData.put("RecipientId", activity.getRecipient().getId()); + } + metaData.put("Timestamp", activity.getTimestamp().toString()); + + blobClient.setMetadata(metaData); + + return CompletableFuture.completedFuture(null); + } + + private String getBlobName(Activity activity) { + String blobName = String.format("%s/%s/%s-%s.json", + sanitizeKey(activity.getChannelId()), sanitizeKey(activity.getConversation().getId()), + this.formatTicks(activity.getTimestamp()), sanitizeKey(activity.getId())); + + return blobName; + } + + private String sanitizeKey(String key) { + // Blob Name rules: case-sensitive any url char + try { + return URLEncoder.encode(key, StandardCharsets.UTF_8.name()); + } catch (Exception ex) { + ex.printStackTrace(); + } + return ""; + } + + private BlobContainerClient getContainerClient(String dataConnectionString, String containerName) { + containerName = containerName.toLowerCase(); + containerClient = new BlobContainerClientBuilder() + .connectionString(dataConnectionString) + .containerName(containerName) + .buildClient(); + if (!CHECKED_CONTAINERS.contains(containerName)) { + CHECKED_CONTAINERS.add(containerName); + if (!containerClient.exists()) { + try { + containerClient.create(); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + } + return containerClient; + } + + /** + * Formats a timestamp in a way that is consistent with the C# SDK. + * @param dateTime The dateTime used to get the ticks + * @return The String representing the ticks. + */ + private String formatTicks(OffsetDateTime dateTime) { + final Instant begin = ZonedDateTime.of(1, 1, 1, 0, 0, 0, 0, + ZoneOffset.UTC).toInstant(); + final Instant end = dateTime.toInstant(); + long secsDiff = Math.subtractExact(end.getEpochSecond(), begin.getEpochSecond()); + long totalHundredNanos = Math.multiplyExact(secsDiff, multipleProductValue); + final Long ticks = Math.addExact(totalHundredNanos, (end.getNano() - begin.getNano()) / 100); + return Long.toString(ticks, longRadix); + } + + private ListBlobsOptions getOptionsWithMetadata(String prefix) { + BlobListDetails details = new BlobListDetails(); + details.setRetrieveMetadata(true); + ListBlobsOptions options = new ListBlobsOptions(); + options.setDetails(details); + options.setPrefix(prefix); + return options; + } +} diff --git a/libraries/bot-azure/src/main/java/com/microsoft/bot/azure/blobs/package-info.java b/libraries/bot-azure/src/main/java/com/microsoft/bot/azure/blobs/package-info.java new file mode 100644 index 000000000..76f1907f6 --- /dev/null +++ b/libraries/bot-azure/src/main/java/com/microsoft/bot/azure/blobs/package-info.java @@ -0,0 +1,8 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for +// license information. + +/** + * This package contains the classes for bot-azure. + */ +package com.microsoft.bot.azure.blobs; diff --git a/libraries/bot-azure/src/main/java/com/microsoft/bot/azure/package-info.java b/libraries/bot-azure/src/main/java/com/microsoft/bot/azure/package-info.java index 28eafcbba..48232c6e0 100644 --- a/libraries/bot-azure/src/main/java/com/microsoft/bot/azure/package-info.java +++ b/libraries/bot-azure/src/main/java/com/microsoft/bot/azure/package-info.java @@ -3,6 +3,6 @@ // license information. /** - * This package contains the classes for bot-integration-core. + * This package contains the classes for bot-azure. */ package com.microsoft.bot.azure; diff --git a/libraries/bot-azure/src/test/java/com/microsoft/bot/azure/TranscriptStoreTests.java b/libraries/bot-azure/src/test/java/com/microsoft/bot/azure/TranscriptStoreTests.java new file mode 100644 index 000000000..b9653bcc4 --- /dev/null +++ b/libraries/bot-azure/src/test/java/com/microsoft/bot/azure/TranscriptStoreTests.java @@ -0,0 +1,572 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.microsoft.bot.azure; + +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobContainerClientBuilder; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.microsoft.bot.azure.blobs.BlobsTranscriptStore; +import com.microsoft.bot.builder.PagedResult; +import com.microsoft.bot.builder.TranscriptInfo; +import com.microsoft.bot.builder.TranscriptLoggerMiddleware; +import com.microsoft.bot.builder.TranscriptStore; +import com.microsoft.bot.builder.adapters.TestAdapter; +import com.microsoft.bot.builder.adapters.TestFlow; +import com.microsoft.bot.schema.Activity; +import com.microsoft.bot.schema.ActivityTypes; +import com.microsoft.bot.schema.ChannelAccount; +import com.microsoft.bot.schema.ConversationAccount; +import com.microsoft.bot.schema.ConversationReference; +import com.microsoft.bot.schema.ResourceResponse; +import org.apache.commons.lang3.StringUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * These tests require Azure Storage Emulator v5.7 + * The emulator must be installed at this path C:\Program Files (x86)\Microsoft SDKs\Azure\Storage Emulator\AzureStorageEmulator.exe + * More info: https://docs.microsoft.com/azure/storage/common/storage-use-emulator + */ +public class TranscriptStoreTests { + + @Rule + public TestName TEST_NAME = new TestName(); + + protected String blobStorageEmulatorConnectionString = + "AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;DefaultEndpointsProtocol=http;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;TableEndpoint=http://127.0.0.1:10002/devstoreaccount1;"; + + private String channelId = "test"; + + private static final String[] CONVERSATION_IDS = { + "qaz", "wsx", "edc", "rfv", "tgb", "yhn", "ujm", "123", "456", "789", + "ZAQ", "XSW", "CDE", "VFR", "BGT", "NHY", "NHY", "098", "765", "432", + "zxc", "vbn", "mlk", "jhy", "yui", "kly", "asd", "asw", "aaa", "zzz", + }; + + private static final String[] CONVERSATION_SPECIAL_IDS = { "asd !&/#.'+:?\"", "ASD@123<>|}{][", "$%^;\\*()_" }; + + private String getContainerName() { + return String.format("blobstranscript%s", TEST_NAME.getMethodName().toLowerCase()); + } + + private TranscriptStore getTranscriptStore() { + return new BlobsTranscriptStore(blobStorageEmulatorConnectionString, getContainerName()); + } + + private static final String NO_EMULATOR_MESSAGE = "This test requires Azure STORAGE Emulator! Go to https://docs.microsoft.com/azure/storage/common/storage-use-emulator to download and install."; + + @BeforeClass + public static void allTestsInit() throws IOException, InterruptedException { + assertEmulator(); + } + + @After + public void testCleanup() { + BlobContainerClient containerClient = new BlobContainerClientBuilder() + .connectionString(blobStorageEmulatorConnectionString) + .containerName(getContainerName()) + .buildClient(); + + if (containerClient.exists()) { + containerClient.delete(); + } + } + + // These tests require Azure Storage Emulator v5.7 + @Test + public void blobTranscriptParamTest() { + Assert.assertThrows(IllegalArgumentException.class, () -> new BlobsTranscriptStore(null, getContainerName())); + Assert.assertThrows(IllegalArgumentException.class, () -> new BlobsTranscriptStore(blobStorageEmulatorConnectionString, null)); + Assert.assertThrows(IllegalArgumentException.class, () -> new BlobsTranscriptStore(new String(), getContainerName())); + Assert.assertThrows(IllegalArgumentException.class, () -> new BlobsTranscriptStore(blobStorageEmulatorConnectionString, new String())); + } + + @Test + public void transcriptsEmptyTest() { + TranscriptStore transcriptStore = getTranscriptStore(); + String unusedChannelId = UUID.randomUUID().toString(); + PagedResult transcripts = transcriptStore.listTranscripts(unusedChannelId).join(); + Assert.assertEquals(0, transcripts.getItems().size()); + } + + @Test + public void activityEmptyTest() { + TranscriptStore transcriptStore = getTranscriptStore(); + for(String convoId: CONVERSATION_SPECIAL_IDS) { + PagedResult activities = transcriptStore.getTranscriptActivities(channelId, convoId).join(); + Assert.assertEquals(0, activities.getItems().size()); + } + } + + @Test + public void activityAddTest() { + TranscriptStore transcriptStore = getTranscriptStore(); + Activity[] loggedActivities = new Activity[5]; + List activities = new ArrayList(); + for (int i = 0; i < 5; i++) { + Activity a = TranscriptStoreTests.createActivity(i, i, CONVERSATION_IDS); + transcriptStore.logActivity(a).join(); + activities.add(a); + loggedActivities[i] = transcriptStore.getTranscriptActivities(channelId, CONVERSATION_IDS[i]) + .join().getItems().get(0); + } + + Assert.assertEquals(5, loggedActivities.length); + } + + @Test + public void transcriptRemoveTest() { + TranscriptStore transcriptStore = getTranscriptStore(); + for (int i = 0; i < 5; i++) { + Activity a = TranscriptStoreTests.createActivity(i, i, CONVERSATION_IDS); + transcriptStore.logActivity(a).join(); + transcriptStore.deleteTranscript(a.getChannelId(), a.getConversation().getId()).join(); + + PagedResult loggedActivities = transcriptStore + .getTranscriptActivities(channelId, CONVERSATION_IDS[i]).join(); + + Assert.assertEquals(0, loggedActivities.getItems().size()); + } + } + + @Test + public void activityAddSpecialCharsTest() { + TranscriptStore transcriptStore = getTranscriptStore(); + Activity[] loggedActivities = new Activity[CONVERSATION_SPECIAL_IDS.length]; + List activities = new ArrayList(); + for (int i = 0; i < CONVERSATION_SPECIAL_IDS.length; i++) { + Activity a = TranscriptStoreTests.createActivity(i, i, CONVERSATION_SPECIAL_IDS); + transcriptStore.logActivity(a).join(); + activities.add(a); + int pos = i; + transcriptStore.getTranscriptActivities(channelId, CONVERSATION_SPECIAL_IDS[i]).thenAccept(result -> { + loggedActivities[pos] = result.getItems().get(0); + }); + } + + Assert.assertEquals(activities.size(), loggedActivities.length); + } + + @Test + public void transcriptRemoveSpecialCharsTest() { + TranscriptStore transcriptStore = getTranscriptStore(); + for (int i = 0; i < CONVERSATION_SPECIAL_IDS.length; i++) { + Activity a = TranscriptStoreTests.createActivity(i, i, CONVERSATION_SPECIAL_IDS); + transcriptStore.deleteTranscript(a.getChannelId(), a.getConversation().getId()).join(); + + PagedResult loggedActivities = transcriptStore. + getTranscriptActivities(channelId, CONVERSATION_SPECIAL_IDS[i]).join(); + Assert.assertEquals(0, loggedActivities.getItems().size()); + } + } + + @Test + public void activityAddPagedResultTest() { + TranscriptStore transcriptStore = getTranscriptStore(); + String cleanChannel = UUID.randomUUID().toString(); + + List activities = new ArrayList(); + + for (int i = 0; i < CONVERSATION_IDS.length; i++) { + Activity a = TranscriptStoreTests.createActivity(0, i, CONVERSATION_IDS); + a.setChannelId(cleanChannel); + + transcriptStore.logActivity(a).join(); + activities.add(a); + } + + PagedResult loggedPagedResult = transcriptStore.getTranscriptActivities(cleanChannel, CONVERSATION_IDS[0]).join(); + String ct = loggedPagedResult.getContinuationToken(); + Assert.assertEquals(20, loggedPagedResult.getItems().size()); + Assert.assertNotNull(ct); + Assert.assertTrue(loggedPagedResult.getContinuationToken().length() > 0); + loggedPagedResult = transcriptStore.getTranscriptActivities(cleanChannel, CONVERSATION_IDS[0], ct).join(); + ct = loggedPagedResult.getContinuationToken(); + Assert.assertEquals(10, loggedPagedResult.getItems().size()); + Assert.assertNull(ct); + } + + @Test + public void transcriptRemovePagedTest() { + TranscriptStore transcriptStore = getTranscriptStore(); + int i; + for (i = 0; i < CONVERSATION_SPECIAL_IDS.length; i++) { + Activity a = TranscriptStoreTests.createActivity(i ,i , CONVERSATION_IDS); + transcriptStore.deleteTranscript(a.getChannelId(), a.getConversation().getId()).join(); + } + + PagedResult loggedActivities = transcriptStore.getTranscriptActivities(channelId, CONVERSATION_IDS[i]).join(); + Assert.assertEquals(0, loggedActivities.getItems().size()); + } + + @Test + public void nullParameterTests() { + TranscriptStore store = getTranscriptStore(); + + Assert.assertThrows(IllegalArgumentException.class, () -> store.logActivity(null)); + Assert.assertThrows(IllegalArgumentException.class, + () -> store.getTranscriptActivities(null, CONVERSATION_IDS[0])); + Assert.assertThrows(IllegalArgumentException.class, () -> store.getTranscriptActivities(channelId, null)); + } + + @Test + public void logActivities() { + TranscriptStore transcriptStore = getTranscriptStore(); + ConversationReference conversation = TestAdapter + .createConversationReference(UUID.randomUUID().toString(), "User1", "Bot"); + TestAdapter adapter = new TestAdapter(conversation) + .use(new TranscriptLoggerMiddleware(transcriptStore)); + new TestFlow(adapter, turnContext -> { + delay(500); + Activity typingActivity = new Activity(ActivityTypes.TYPING); + typingActivity.setRelatesTo(turnContext.getActivity().getRelatesTo()); + turnContext.sendActivity(typingActivity).join(); + delay(500); + turnContext.sendActivity(String.format("echo:%s", turnContext.getActivity().getText())).join(); + return CompletableFuture.completedFuture(null); + }) + .send("foo") + .assertReply(activity -> + Assert.assertTrue(activity.isType(ActivityTypes.TYPING)) + ) + .assertReply("echo:foo") + .send("bar") + .assertReply(activity -> + Assert.assertTrue(activity.isType(ActivityTypes.TYPING)) + ) + .assertReply("echo:bar") + .startTest().join(); + + PagedResult pagedResult = null; + try { + pagedResult = this.getPagedResult(conversation, 6, null).join(); + } catch (TimeoutException ex) { + Assert.fail(); + } + Assert.assertEquals(6, pagedResult.getItems().size()); + Assert.assertTrue(pagedResult.getItems().get(0).isType(ActivityTypes.MESSAGE)); + Assert.assertEquals("foo", pagedResult.getItems().get(0).getText()); + Assert.assertNotNull(pagedResult.getItems().get(1)); + Assert.assertTrue(pagedResult.getItems().get(1).isType(ActivityTypes.TYPING)); + Assert.assertTrue(pagedResult.getItems().get(2).isType(ActivityTypes.MESSAGE)); + Assert.assertEquals("echo:foo", pagedResult.getItems().get(2).getText()); + Assert.assertTrue(pagedResult.getItems().get(3).isType(ActivityTypes.MESSAGE)); + Assert.assertEquals("bar", pagedResult.getItems().get(3).getText()); + Assert.assertNotNull(pagedResult.getItems().get(4)); + Assert.assertTrue(pagedResult.getItems().get(4).isType(ActivityTypes.TYPING)); + Assert.assertTrue(pagedResult.getItems().get(5).isType(ActivityTypes.MESSAGE)); + Assert.assertEquals("echo:bar", pagedResult.getItems().get(5).getText()); + for (Activity activity: pagedResult.getItems()) { + Assert.assertTrue(!StringUtils.isBlank(activity.getId())); + Assert.assertTrue(activity.getTimestamp().isAfter(OffsetDateTime.MIN)); + } + } + + @Test + public void logUpdateActivities() { + TranscriptStore transcriptStore = getTranscriptStore(); + ConversationReference conversation = TestAdapter + .createConversationReference(UUID.randomUUID().toString(), "User1", "Bot"); + TestAdapter adapter = new TestAdapter(conversation) + .use(new TranscriptLoggerMiddleware(transcriptStore)); + final Activity[] activityToUpdate = {null}; + new TestFlow(adapter, turnContext -> { + delay(500); + if(turnContext.getActivity().getText().equals("update")) { + activityToUpdate[0].setText("new response"); + turnContext.updateActivity(activityToUpdate[0]).join(); + } else { + Activity activity = turnContext.getActivity().createReply("response"); + ResourceResponse response = turnContext.sendActivity(activity).join(); + activity.setId(response.getId()); + + ObjectMapper objectMapper = new ObjectMapper() + .findAndRegisterModules(); + try { + // clone the activity, so we can use it to do an update + activityToUpdate[0] = objectMapper.readValue(objectMapper.writeValueAsString(activity), Activity.class); + } catch (JsonProcessingException ex) { + ex.printStackTrace(); + } + } + return CompletableFuture.completedFuture(null); + }).send("foo") + .send("update") + .assertReply("new response") + .startTest().join(); + + PagedResult pagedResult = null; + try { + pagedResult = this.getPagedResult(conversation, 3, null).join(); + } catch (TimeoutException ex) { + Assert.fail(); + } + + Assert.assertEquals(3, pagedResult.getItems().size()); + Assert.assertTrue(pagedResult.getItems().get(0).isType(ActivityTypes.MESSAGE)); + Assert.assertEquals("foo", pagedResult.getItems().get(0).getText()); + Assert.assertTrue(pagedResult.getItems().get(1).isType(ActivityTypes.MESSAGE)); + Assert.assertEquals("new response", pagedResult.getItems().get(1).getText()); + Assert.assertTrue(pagedResult.getItems().get(2).isType(ActivityTypes.MESSAGE)); + Assert.assertEquals("update", pagedResult.getItems().get(2).getText()); + } + + @Test + public void logMissingUpdateActivity() { + TranscriptStore transcriptStore = getTranscriptStore(); + ConversationReference conversation = TestAdapter + .createConversationReference(UUID.randomUUID().toString(), "User1", "Bot"); + TestAdapter adapter = new TestAdapter(conversation) + .use(new TranscriptLoggerMiddleware(transcriptStore)); + final String[] fooId = {new String()}; + ObjectMapper objectMapper = new ObjectMapper() + .findAndRegisterModules(); + new TestFlow(adapter, turnContext -> { + fooId[0] = turnContext.getActivity().getId(); + Activity updateActivity = null; + try { + // clone the activity, so we can use it to do an update + updateActivity = objectMapper.readValue(objectMapper.writeValueAsString(turnContext.getActivity()), Activity.class); + } catch (JsonProcessingException ex) { + ex.printStackTrace(); + } + updateActivity.setText("updated response"); + ResourceResponse response = turnContext.updateActivity(updateActivity).join(); + return CompletableFuture.completedFuture(null); + }).send("foo") + .startTest().join(); + + delay(3000); + + PagedResult pagedResult = null; + try { + pagedResult = this.getPagedResult(conversation, 2, null).join(); + } catch (TimeoutException ex) { + Assert.fail(); + } + + Assert.assertEquals(2, pagedResult.getItems().size()); + Assert.assertTrue(pagedResult.getItems().get(0).isType(ActivityTypes.MESSAGE)); + Assert.assertEquals(fooId[0], pagedResult.getItems().get(0).getId()); + Assert.assertEquals("foo", pagedResult.getItems().get(0).getText()); + Assert.assertTrue(pagedResult.getItems().get(1).isType(ActivityTypes.MESSAGE)); + Assert.assertTrue(pagedResult.getItems().get(1).getId().startsWith("g_")); + Assert.assertEquals("updated response", pagedResult.getItems().get(1).getText()); + } + + @Test + public void testDateLogUpdateActivities() { + TranscriptStore transcriptStore = getTranscriptStore(); + OffsetDateTime dateTimeStartOffset1 = OffsetDateTime.now(); + ConversationReference conversation = TestAdapter + .createConversationReference(UUID.randomUUID().toString(), "User1", "Bot"); + TestAdapter adapter = new TestAdapter(conversation) + .use(new TranscriptLoggerMiddleware(transcriptStore)); + final Activity[] activityToUpdate = {null}; + new TestFlow(adapter, turnContext -> { + if (turnContext.getActivity().getText().equals("update")) { + activityToUpdate[0].setText("new response"); + turnContext.updateActivity(activityToUpdate[0]).join(); + } else { + Activity activity = turnContext.getActivity().createReply("response"); + + ResourceResponse response = turnContext.sendActivity(activity).join(); + activity.setId(response.getId()); + + ObjectMapper objectMapper = new ObjectMapper().findAndRegisterModules(); + try { + // clone the activity, so we can use it to do an update + activityToUpdate[0] = objectMapper.readValue(objectMapper.writeValueAsString(activity), Activity.class); + } catch (JsonProcessingException ex) { + ex.printStackTrace(); + } + } + return CompletableFuture.completedFuture(null); + }).send("foo") + .send("update") + .assertReply("new response") + .startTest().join(); + + try { + TimeUnit.MILLISECONDS.sleep(5000); + } catch (InterruptedException e) { + // Empty error + } + + // Perform some queries + PagedResult pagedResult = transcriptStore.getTranscriptActivities( + conversation.getChannelId(), + conversation.getConversation().getId(), + null, + dateTimeStartOffset1).join(); + Assert.assertEquals(3, pagedResult.getItems().size()); + Assert.assertTrue(pagedResult.getItems().get(0).isType(ActivityTypes.MESSAGE)); + Assert.assertEquals("foo", pagedResult.getItems().get(0).getText()); + Assert.assertTrue(pagedResult.getItems().get(1).isType(ActivityTypes.MESSAGE)); + Assert.assertEquals("new response", pagedResult.getItems().get(1).getText()); + Assert.assertTrue(pagedResult.getItems().get(2).isType(ActivityTypes.MESSAGE)); + Assert.assertEquals("update", pagedResult.getItems().get(2).getText()); + + // Perform some queries + pagedResult = transcriptStore.getTranscriptActivities( + conversation.getChannelId(), + conversation.getConversation().getId(), + null, + OffsetDateTime.MIN).join(); + Assert.assertEquals(3, pagedResult.getItems().size()); + Assert.assertTrue(pagedResult.getItems().get(0).isType(ActivityTypes.MESSAGE)); + Assert.assertEquals("foo", pagedResult.getItems().get(0).getText()); + Assert.assertTrue(pagedResult.getItems().get(1).isType(ActivityTypes.MESSAGE)); + Assert.assertEquals("new response", pagedResult.getItems().get(1).getText()); + Assert.assertTrue(pagedResult.getItems().get(2).isType(ActivityTypes.MESSAGE)); + Assert.assertEquals("update", pagedResult.getItems().get(2).getText()); + + // Perform some queries + pagedResult = transcriptStore.getTranscriptActivities( + conversation.getChannelId(), + conversation.getConversation().getId(), + null, + OffsetDateTime.MAX).join(); + Assert.assertEquals(0, pagedResult.getItems().size()); + } + + @Test + public void logDeleteActivities() { + TranscriptStore transcriptStore = getTranscriptStore(); + ConversationReference conversation = TestAdapter + .createConversationReference(UUID.randomUUID().toString(), "User1", "Bot"); + TestAdapter adapter = new TestAdapter(conversation) + .use(new TranscriptLoggerMiddleware(transcriptStore)); + final String[] activityId = {null}; + new TestFlow(adapter, turnContext -> { + delay(500); + if (turnContext.getActivity().getText().equals("deleteIt")) { + turnContext.deleteActivity(activityId[0]).join(); + } else { + Activity activity = turnContext.getActivity().createReply("response"); + ResourceResponse response = turnContext.sendActivity(activity).join(); + activityId[0] = response.getId(); + } + return CompletableFuture.completedFuture(null); + }).send("foo") + .assertReply("response") + .send("deleteIt") + .startTest().join(); + + PagedResult pagedResult = null; + try { + pagedResult = this.getPagedResult(conversation, 3, null).join(); + } catch (TimeoutException ex) { + Assert.fail(); + } + + Assert.assertEquals(3, pagedResult.getItems().size()); + Assert.assertTrue(pagedResult.getItems().get(0).isType(ActivityTypes.MESSAGE)); + Assert.assertEquals("foo", pagedResult.getItems().get(0).getText()); + Assert.assertNotNull(pagedResult.getItems().get(1)); + Assert.assertTrue(pagedResult.getItems().get(1).isType(ActivityTypes.MESSAGE_DELETE)); + Assert.assertTrue(pagedResult.getItems().get(2).isType(ActivityTypes.MESSAGE)); + Assert.assertEquals("deleteIt", pagedResult.getItems().get(2).getText()); + } + + protected static Activity createActivity(Integer i, Integer j, String[] CONVERSATION_IDS) { + return TranscriptStoreTests.createActivity(j, CONVERSATION_IDS[i]); + } + + private static Activity createActivity(Integer j, String conversationId) { + ConversationAccount conversationAccount = new ConversationAccount(); + conversationAccount.setId(conversationId); + Activity activity = new Activity(ActivityTypes.MESSAGE); + activity.setId(StringUtils.leftPad(String.valueOf(j + 1), 2, "0")); + activity.setChannelId("test"); + activity.setText("test"); + activity.setConversation(conversationAccount); + activity.setTimestamp(OffsetDateTime.now()); + activity.setFrom(new ChannelAccount("testUser")); + activity.setRecipient(new ChannelAccount("testBot")); + return activity; + } + + /** + * There are some async oddities within TranscriptLoggerMiddleware that make it difficult to set a short delay when + * running this tests that ensures + * the TestFlow completes while also logging transcripts. Some tests will not pass without longer delays, + * but this method minimizes the delay required. + * @param conversation ConversationReference to pass to GetTranscriptActivitiesAsync() + * that contains ChannelId and Conversation.Id. + * @param expectedLength Expected length of pagedResult array. + * @param maxTimeout Maximum time to wait to retrieve pagedResult. + * @return PagedResult. + * @throws TimeoutException + */ + private CompletableFuture> getPagedResult(ConversationReference conversation, + Integer expectedLength, Integer maxTimeout) throws TimeoutException { + TranscriptStore transcriptStore = getTranscriptStore(); + if (maxTimeout == null) { + maxTimeout = 5000; + } + + PagedResult pagedResult = null; + for (int timeout = 0; timeout < maxTimeout; timeout += 500) { + delay(500); + try { + pagedResult = transcriptStore + .getTranscriptActivities(conversation.getChannelId(), conversation.getConversation().getId()).join(); + if (pagedResult.getItems().size() >= expectedLength) { + break; + } + } catch (NoSuchElementException ex) { } + catch (NullPointerException e) { } + } + + if(pagedResult == null) { + throw new TimeoutException("Unable to retrieve pagedResult in time"); + } + + return CompletableFuture.completedFuture(pagedResult); + } + + private static void assertEmulator() throws IOException, InterruptedException { + if (!checkEmulator()) { + Assert.fail(NO_EMULATOR_MESSAGE); + } + } + + private static Boolean checkEmulator() throws IOException, InterruptedException { + Process p = Runtime.getRuntime().exec + ("cmd /C \"" + System.getenv("ProgramFiles") + " (x86)\\Microsoft SDKs\\Azure\\Storage Emulator\\AzureStorageEmulator.exe\" start"); + int result = p.waitFor(); + // status = 0: the service was started. + // status = -5: the service is already started. Only one instance of the application + // can be run at the same time. + return result == 0 || result == -5; + } + + /** + * Time period delay. + * @param delay Time to delay. + */ + private void delay(Integer delay) { + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + // Empty error + } + } +} diff --git a/libraries/bot-azure/src/test/java/com/microsoft/bot/azure/blobs/BlobsStorageTests.java b/libraries/bot-azure/src/test/java/com/microsoft/bot/azure/blobs/BlobsStorageTests.java new file mode 100644 index 000000000..9efc4de8c --- /dev/null +++ b/libraries/bot-azure/src/test/java/com/microsoft/bot/azure/blobs/BlobsStorageTests.java @@ -0,0 +1,291 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.microsoft.bot.azure.blobs; + +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobContainerClientBuilder; +import com.microsoft.bot.builder.BotAdapter; +import com.microsoft.bot.builder.ConversationState; +import com.microsoft.bot.builder.StatePropertyAccessor; +import com.microsoft.bot.builder.Storage; +import com.microsoft.bot.builder.StorageBaseTests; +import com.microsoft.bot.builder.StoreItem; +import com.microsoft.bot.builder.TurnContext; +import com.microsoft.bot.builder.TurnContextImpl; +import com.microsoft.bot.schema.Activity; +import com.microsoft.bot.schema.ActivityTypes; +import com.microsoft.bot.schema.ConversationAccount; +import com.microsoft.bot.schema.ConversationReference; +import com.microsoft.bot.schema.ResourceResponse; +import org.junit.After; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +public class BlobsStorageTests extends StorageBaseTests { + + @Rule + public TestName testName = new TestName(); + + private final String connectionString = "AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;DefaultEndpointsProtocol=http;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;TableEndpoint=http://127.0.0.1:10002/devstoreaccount1;"; + + private static boolean emulatorIsRunning = false; + + private static final String NO_EMULATOR_MESSAGE = "This test requires Azure STORAGE Emulator! Go to https://docs.microsoft.com/azure/storage/common/storage-use-emulator to download and install."; + + public String getContainerName() { + return "blobs" + testName.getMethodName().toLowerCase().replace("_", ""); + } + + @BeforeClass + public static void allTestsInit() throws IOException, InterruptedException { + Process p = Runtime.getRuntime().exec + ("cmd /C \"" + System.getenv("ProgramFiles") + " (x86)\\Microsoft SDKs\\Azure\\Storage Emulator\\AzureStorageEmulator.exe\" start"); + int result = p.waitFor(); + // status = 0: the service was started. + // status = -5: the service is already started. Only one instance of the application + // can be run at the same time. + emulatorIsRunning = result == 0 || result == -5; + } + + @After + public void testCleanup() { + BlobContainerClient containerClient = new BlobContainerClientBuilder() + .connectionString(connectionString) + .containerName(getContainerName()) + .buildClient(); + + if (containerClient.exists()) { + containerClient.delete(); + } + } + + @Test + public void blobStorageParamTest() { + if (runIfEmulator()) { + Assert.assertThrows(IllegalArgumentException.class, () -> new BlobsStorage(null, getContainerName())); + Assert.assertThrows(IllegalArgumentException.class, () -> new BlobsStorage(connectionString, null)); + Assert.assertThrows(IllegalArgumentException.class, () -> new BlobsStorage(new String(), getContainerName())); + Assert.assertThrows(IllegalArgumentException.class, () -> new BlobsStorage(connectionString, new String())); + } + } + + @Test + public void testBlobStorageWriteRead() + { + if (runIfEmulator()) { + // Arrange + Storage storage = new BlobsStorage(connectionString, getContainerName()); + + Map changes = new HashMap(); + changes.put("x", "hello"); + changes.put("y", "world"); + + // Act + storage.write(changes).join(); + Map result = storage.read(new String[] {"x", "y"}).join(); + + // Assert + Assert.assertEquals(2, result.size()); + Assert.assertEquals("hello", result.get("x")); + Assert.assertEquals("world", result.get("y")); + } + } + + @Test + public void testBlobStorageWriteDeleteRead() + { + if (runIfEmulator()) { + // Arrange + Storage storage = new BlobsStorage(connectionString, getContainerName()); + + Map changes = new HashMap(); + changes.put("x", "hello"); + changes.put("y", "world"); + + // Act + storage.write(changes).join(); + storage.delete(new String[] { "x" }).join(); + Map result = storage.read(new String[] {"x", "y"}).join(); + + // Assert + Assert.assertEquals(1, result.size()); + Assert.assertEquals("world", result.get("y")); + } + } + + @Test + public void testBlobStorageChanges() { + if (runIfEmulator()) { + // Arrange + Storage storage = new BlobsStorage(connectionString, getContainerName()); + + // Act + Map changes = new HashMap(); + changes.put("a", "1.0"); + changes.put("b", "2.0"); + storage.write(changes).join(); + + changes.clear(); + changes.put("c", "3.0"); + storage.write(changes).join(); + storage.delete(new String[] { "b" }).join(); + + changes.clear(); + changes.put("a", "1.1"); + storage.write(changes).join(); + + Map result = storage.read(new String[] { "a", "b", "c", "d", "e" }).join(); + + // Assert + Assert.assertEquals(2, result.size()); + Assert.assertEquals("1.1", result.get("a")); + Assert.assertEquals("3.0", result.get("c")); + } + } + + @Test + public void testConversationStateBlobStorage() { + if (runIfEmulator()) { + // Arrange + Storage storage = new BlobsStorage(connectionString, getContainerName()); + + ConversationState conversationState = new ConversationState(storage); + StatePropertyAccessor propAccessor = conversationState.createProperty("prop"); + + TestStorageAdapter adapter = new TestStorageAdapter(); + Activity activity = new Activity(ActivityTypes.MESSAGE); + activity.setChannelId("123"); + ConversationAccount conversationAccount = new ConversationAccount(); + conversationAccount.setId("abc"); + activity.setConversation(conversationAccount); + + // Act + TurnContext turnContext1 = new TurnContextImpl(adapter, activity); + Prop propValue1 = propAccessor.get(turnContext1, Prop::new).join(); + propValue1.setX("hello"); + propValue1.setY("world"); + conversationState.saveChanges(turnContext1).join(); + + TurnContext turnContext2 = new TurnContextImpl(adapter, activity); + Prop propValue2 = propAccessor.get(turnContext2).join(); + + // Assert + Assert.assertEquals("hello", propValue2.getX()); + Assert.assertEquals("world", propValue2.getY()); + + propAccessor.delete(turnContext1).join(); + conversationState.saveChanges(turnContext1).join(); + } + } + + @Test + public void testConversationStateBlobStorage_TypeNameHandlingDefault() { + if (runIfEmulator()) { + Storage storage = new BlobsStorage(connectionString, getContainerName()); + testConversationStateBlobStorage_Method(storage); + } + } + + @Test + public void statePersistsThroughMultiTurn_TypeNameHandlingNone() { + if (runIfEmulator()) { + Storage storage = new BlobsStorage(connectionString, getContainerName()); + statePersistsThroughMultiTurn(storage); + } + } + + private void testConversationStateBlobStorage_Method(Storage blobs) { + if (runIfEmulator()) { + // Arrange + ConversationState conversationState = new ConversationState(blobs); + StatePropertyAccessor propAccessor = conversationState.createProperty("prop"); + TestStorageAdapter adapter = new TestStorageAdapter(); + Activity activity = new Activity(ActivityTypes.MESSAGE); + activity.setChannelId("123"); + ConversationAccount conversationAccount = new ConversationAccount(); + conversationAccount.setId("abc"); + activity.setConversation(conversationAccount); + + // Act + TurnContext turnContext1 = new TurnContextImpl(adapter, activity); + Prop propValue1 = propAccessor.get(turnContext1, Prop::new).join(); + propValue1.setX("hello"); + propValue1.setY("world"); + conversationState.saveChanges(turnContext1).join(); + + TurnContext turnContext2 = new TurnContextImpl(adapter, activity); + Prop propValue2 = propAccessor.get(turnContext2).join(); + + // Assert + Assert.assertEquals("hello", propValue2.getX()); + Assert.assertEquals("world", propValue2.getY()); + } + } + + private boolean runIfEmulator() { + if (!emulatorIsRunning) { + System.out.println(NO_EMULATOR_MESSAGE); + return false; + } + + return true; + } + + private class TestStorageAdapter extends BotAdapter { + + @Override + public CompletableFuture sendActivities(TurnContext context, List activities) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture updateActivity(TurnContext context, Activity activity) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture deleteActivity(TurnContext context, ConversationReference reference) { + throw new UnsupportedOperationException(); + } + } + + private static class Prop { + private String X; + private String Y; + StoreItem storeItem; + + public String getX() { + return X; + } + + public void setX(String x) { + X = x; + } + + public String getY() { + return Y; + } + + public void setY(String y) { + Y = y; + } + + public StoreItem getStoreItem() { + return storeItem; + } + + public void setStoreItem(StoreItem storeItem) { + this.storeItem = storeItem; + } + } +} diff --git a/libraries/bot-builder/src/test/java/com/microsoft/bot/builder/StorageBaseTests.java b/libraries/bot-builder/src/test/java/com/microsoft/bot/builder/StorageBaseTests.java index 0d7e15674..142e921fc 100644 --- a/libraries/bot-builder/src/test/java/com/microsoft/bot/builder/StorageBaseTests.java +++ b/libraries/bot-builder/src/test/java/com/microsoft/bot/builder/StorageBaseTests.java @@ -3,10 +3,13 @@ package com.microsoft.bot.builder; +import com.microsoft.bot.builder.adapters.TestAdapter; +import com.microsoft.bot.builder.adapters.TestFlow; import org.junit.Assert; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; public class StorageBaseTests { protected void readUnknownTest(Storage storage) { @@ -241,6 +244,32 @@ protected void deleteUnknownObjectTest(Storage storage) { storage.delete(new String[] { "unknown_key" }).join(); } + protected void statePersistsThroughMultiTurn(Storage storage) { + UserState userState = new UserState(storage); + StatePropertyAccessor testProperty = userState.createProperty("test"); + TestAdapter adapter = new TestAdapter() + .use(new AutoSaveStateMiddleware(userState)); + + new TestFlow(adapter, context -> { + TestPocoState state = testProperty.get(context, TestPocoState::new).join(); + Assert.assertNotNull(state); + switch (context.getActivity().getText()) { + case "set value": + state.setValue("test"); + context.sendActivity("value saved").join(); + break; + case "get value": + context.sendActivity(state.getValue()).join(); + break; + } + + return CompletableFuture.completedFuture(null); + }) + .test("set value", "value saved") + .test("get value", "test") + .startTest().join(); + } + private static class PocoItem { public PocoItem() { diff --git a/libraries/bot-builder/src/test/java/com/microsoft/bot/builder/TestPocoState.java b/libraries/bot-builder/src/test/java/com/microsoft/bot/builder/TestPocoState.java new file mode 100644 index 000000000..e17d40e3d --- /dev/null +++ b/libraries/bot-builder/src/test/java/com/microsoft/bot/builder/TestPocoState.java @@ -0,0 +1,16 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.microsoft.bot.builder; + +public class TestPocoState { + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + + private String value; +} diff --git a/libraries/bot-builder/src/test/java/com/microsoft/bot/builder/TranscriptMiddlewareTest.java b/libraries/bot-builder/src/test/java/com/microsoft/bot/builder/TranscriptMiddlewareTest.java index 0945d80dd..7a1b83598 100644 --- a/libraries/bot-builder/src/test/java/com/microsoft/bot/builder/TranscriptMiddlewareTest.java +++ b/libraries/bot-builder/src/test/java/com/microsoft/bot/builder/TranscriptMiddlewareTest.java @@ -56,6 +56,7 @@ public final void Transcript_LogActivities() { final String[] conversationId = { null }; new TestFlow(adapter, (context) -> { + delay(500); conversationId[0] = context.getActivity().getConversation().getId(); Activity typingActivity = new Activity(ActivityTypes.TYPING) { { @@ -65,12 +66,7 @@ public final void Transcript_LogActivities() { context.sendActivity(typingActivity).join(); - try { - Thread.sleep(500); - } catch (InterruptedException e) { - e.printStackTrace(); - Assert.fail(); - } + delay(500); context.sendActivity("echo:" + context.getActivity().getText()).join(); return CompletableFuture.completedFuture(null); @@ -109,6 +105,7 @@ public void Transcript_LogUpdateActivities() { final String[] conversationId = { null }; final Activity[] activityToUpdate = { null }; new TestFlow(adapter, (context) -> { + delay(500); conversationId[0] = context.getActivity().getConversation().getId(); if (context.getActivity().getText().equals("update")) { activityToUpdate[0].setText("new response"); @@ -152,6 +149,7 @@ public final void Transcript_LogDeleteActivities() { final String[] conversationId = { null }; final String[] activityId = { null }; new TestFlow(adapter, (context) -> { + delay(500); conversationId[0] = context.getActivity().getConversation().getId(); if (context.getActivity().getText().equals("deleteIt")) { context.deleteActivity(activityId[0]).join(); @@ -210,6 +208,7 @@ public void Transcript_TestDateLogUpdateActivities() { final String[] conversationId = { null }; final Activity[] activityToUpdate = { null }; new TestFlow(adapter, (context) -> { + delay(500); conversationId[0] = context.getActivity().getConversation().getId(); if (context.getActivity().getText().equals("update")) { activityToUpdate[0].setText("new response"); @@ -278,6 +277,7 @@ public final void Transcript_RolesAreFilled() { final String[] conversationId = { null }; new TestFlow(adapter, (context) -> { + delay(500); // The next assert implicitly tests the immutability of the incoming // message. As demonstrated by the asserts after this TestFlow block // the role attribute is present on the activity as it is passed to @@ -300,4 +300,16 @@ public final void Transcript_RolesAreFilled() { System.out.printf("Complete"); } + + /** + * Time period delay. + * @param milliseconds Time to delay. + */ + private void delay(int milliseconds) { + try { + Thread.sleep(milliseconds); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } } diff --git a/libraries/bot-schema/src/main/java/com/microsoft/bot/schema/Activity.java b/libraries/bot-schema/src/main/java/com/microsoft/bot/schema/Activity.java index 5cdbe13a5..f442fb2df 100644 --- a/libraries/bot-schema/src/main/java/com/microsoft/bot/schema/Activity.java +++ b/libraries/bot-schema/src/main/java/com/microsoft/bot/schema/Activity.java @@ -19,14 +19,15 @@ import com.microsoft.bot.schema.teams.TeamsMeetingInfo; import org.apache.commons.lang3.StringUtils; +import java.time.Clock; import java.time.LocalDateTime; import java.time.OffsetDateTime; -import java.time.ZoneId; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.stream.Collectors; /** @@ -212,7 +213,8 @@ public class Activity { * normally required. */ protected Activity() { - setTimestamp(OffsetDateTime.now(ZoneId.of("UTC"))); + final Clock clock = new NanoClockHelper(); + setTimestamp(OffsetDateTime.now(clock)); } /** @@ -407,9 +409,26 @@ public static Activity clone(Activity activity) { clone.setProperties(entry.getKey(), entry.getValue()); } + clone = ensureActivityHasId(clone); + return clone; } + private static Activity ensureActivityHasId(Activity activity) { + Activity activityWithId = activity; + + if (activity == null) { + throw new IllegalArgumentException("Cannot check or add Id on a null Activity."); + } + + if (activity.getId() == null) { + String generatedId = String.format("g_%s", UUID.randomUUID().toString()); + activity.setId(generatedId); + } + + return activityWithId; + } + /** * Gets the {@link ActivityTypes} of the activity. * diff --git a/libraries/bot-schema/src/main/java/com/microsoft/bot/schema/NanoClockHelper.java b/libraries/bot-schema/src/main/java/com/microsoft/bot/schema/NanoClockHelper.java new file mode 100644 index 000000000..20496d036 --- /dev/null +++ b/libraries/bot-schema/src/main/java/com/microsoft/bot/schema/NanoClockHelper.java @@ -0,0 +1,65 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.microsoft.bot.schema; + +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; + +/** + * A customized nanoseconds clock providing access to the current instant, date and time using a time-zone. + */ +public class NanoClockHelper extends Clock { + + private final Clock clock; + private final long initialNanos; + private final Instant initialInstant; + + /** + * Obtains a clock that returns the current instant using the best available + * system clock with nanoseconds. + */ + public NanoClockHelper() { + this(Clock.systemUTC()); + } + + /** + * Obtains a clock that returns the current instant using the best available + * system clock with nanoseconds. + * @param clock A {@link Clock} + */ + public NanoClockHelper(final Clock clock) { + this.clock = clock; + initialInstant = clock.instant(); + initialNanos = getSystemNanos(); + } + + /** + * {@inheritDoc} + */ + @Override + public ZoneId getZone() { + return clock.getZone(); + } + + /** + * {@inheritDoc} + */ + @Override + public Instant instant() { + return initialInstant.plusNanos(getSystemNanos() - initialNanos); + } + + /** + * {@inheritDoc} + */ + @Override + public Clock withZone(final ZoneId zone) { + return new NanoClockHelper(clock.withZone(zone)); + } + + private long getSystemNanos() { + return System.nanoTime(); + } +}