diff --git a/.github/workflows/run-integrationTests-dotnet.yaml b/.github/workflows/run-integrationTests-dotnet.yaml index 332f82b..f5b586a 100644 --- a/.github/workflows/run-integrationTests-dotnet.yaml +++ b/.github/workflows/run-integrationTests-dotnet.yaml @@ -106,31 +106,33 @@ jobs: echo "Starting the integration test" echo "Running: docker exec ${DEV_CONTAINER_NAME} bash -c \"/spacefx-dev/debugShim-deploy.sh --debug_shim ${{ inputs.app_name }} --python_file /workspace/${{ inputs.app_name }}/test/integrationTests_python/integrationTest.py --disable_plugin_configs --port 5678\"" - docker exec ${DEV_CONTAINER_NAME} bash -c "/spacefx-dev/debugShim-deploy.sh --debug_shim ${{ inputs.app_name }} --python_file /workspace/${{ inputs.app_name }}/test/integrationTests_python/integrationTest.py --disable_plugin_configs --port 5678" & + docker exec ${DEV_CONTAINER_NAME} bash -c "/spacefx-dev/debugShim-deploy.sh --debug_shim ${{ inputs.app_name }} --python_file /workspace/${{ inputs.app_name }}/test/integrationTests_python/integrationTest.py --disable_plugin_configs --port 5678" - echo "k3s kubectl exec -n payload-app deploy/${{ inputs.app_name }} -- bash -c \"/workspace/${{ inputs.app_name }}/.git/spacefx-dev/dotnet/dotnet test --verbosity detailed /workspace/${{ inputs.app_name }}/test/integrationTests/bin/Debug/net6.0/integrationTests.dll --logger \"junit;LogFileName=/workspace/${{ inputs.app_name }}/.git/test-results.xml\"\" &" - ( - # Reroute the stdout to a file so we can uniquely identify this run - trap "" HUP - exec 0< /dev/null - exec 1> "${PWD}/.git/logs/${{ inputs.app_name }}.log.stdout" - exec 2>&1 + # echo "k3s kubectl exec -n payload-app deploy/${{ inputs.app_name }} -- bash -c \"/workspace/${{ inputs.app_name }}/.git/spacefx-dev/dotnet/dotnet test --verbosity detailed /workspace/${{ inputs.app_name }}/test/integrationTests/bin/Debug/net6.0/integrationTests.dll --logger \"junit;LogFileName=/workspace/${{ inputs.app_name }}/.git/test-results.xml\"\" &" + # ( + # # Reroute the stdout to a file so we can uniquely identify this run + # trap "" HUP + # exec 0< /dev/null + # exec 1> "${PWD}/.git/logs/${{ inputs.app_name }}.log.stdout" + # exec 2>&1 - k3s kubectl exec -n payload-app deploy/${{ inputs.app_name }} -- bash -c "/workspace/${{ inputs.app_name }}/.git/spacefx-dev/dotnet/dotnet test --verbosity detailed /workspace/${{ inputs.app_name }}/test/integrationTests/bin/Debug/net6.0/integrationTests.dll --logger \"junit;LogFileName=/workspace/${{ inputs.app_name }}/.git/test-results.xml\"" - ) & + # k3s kubectl exec -n payload-app deploy/${{ inputs.app_name }} -- bash -c "/workspace/${{ inputs.app_name }}/.git/spacefx-dev/dotnet/dotnet test --verbosity detailed /workspace/${{ inputs.app_name }}/test/integrationTests/bin/Debug/net6.0/integrationTests.dll --logger \"junit;LogFileName=/workspace/${{ inputs.app_name }}/.git/test-results.xml\"" + # ) & - client_pid=$! + k3s kubectl exec -n payload-app deploy/${{ inputs.app_name }} -- bash -c "/workspace/${{ inputs.app_name }}/.git/spacefx-dev/dotnet/dotnet test --verbosity detailed /workspace/${{ inputs.app_name }}/test/integrationTests/bin/Debug/net6.0/integrationTests.dll --logger \"junit;LogFileName=/workspace/${{ inputs.app_name }}/.git/test-results.xml\"" - echo "Waiting for the integration test host to finish..." + # client_pid=$! - client_pid_return_code=0 - wait "$client_pid" - client_pid_return_code=$? + # echo "Waiting for the integration test host to finish..." - echo "Client process complete with return code: $client_pid_return_code" + # client_pid_return_code=0 + # wait "$client_pid" + # client_pid_return_code=$? - echo "Outputting logs from the integration test client..." - cat ${PWD}/.git/logs/${{ inputs.app_name }}.log.stdout + # echo "Client process complete with return code: $client_pid_return_code" + + # echo "Outputting logs from the integration test client..." + # cat ${PWD}/.git/logs/${{ inputs.app_name }}.log.stdout echo "" echo "--------------------" echo "" @@ -140,10 +142,10 @@ jobs: exit 1 fi - if [[ $client_pid_return_code -gt 0 ]]; then - echo "Integration tests failed. Exiting with code 1." - exit 1 - fi + # if [[ $client_pid_return_code -gt 0 ]]; then + # echo "Integration tests failed. Exiting with code 1." + # exit 1 + # fi echo "Test results found." diff --git a/.github/workflows/run-integrationTests-python.yaml b/.github/workflows/run-integrationTests-python.yaml index 063cd76..e1c22b2 100644 --- a/.github/workflows/run-integrationTests-python.yaml +++ b/.github/workflows/run-integrationTests-python.yaml @@ -107,7 +107,7 @@ jobs: echo "Starting the integration test" echo "Running: docker exec ${DEV_CONTAINER_NAME} bash -c \"/spacefx-dev/debugShim-deploy.sh --debug_shim ${{ inputs.app_name }} --python_file /workspace/${{ inputs.app_name }}/test/integrationTests_python/integrationTest.py --disable_plugin_configs --port 5678\"" - docker exec ${DEV_CONTAINER_NAME} bash -c "/spacefx-dev/debugShim-deploy.sh --debug_shim ${{ inputs.app_name }} --python_file /workspace/${{ inputs.app_name }}/test/integrationTests_python/integrationTest.py --disable_plugin_configs --port 5678" & + docker exec ${DEV_CONTAINER_NAME} bash -c "/spacefx-dev/debugShim-deploy.sh --debug_shim ${{ inputs.app_name }} --python_file /workspace/${{ inputs.app_name }}/test/integrationTests_python/integrationTest.py --disable_plugin_configs --port 5678" echo "k3s kubectl exec -n payload-app deploy/${{ inputs.app_name }} -- bash -c \"cd /workspace/${{ inputs.app_name }}/test/integrationTests_python && /python3 /workspace/${{ inputs.app_name }}/test/integrationTests_python/integrationTest.py\" &" ( # Reroute the stdout to a file so we can uniquely identify this run @@ -119,7 +119,7 @@ jobs: ) & client_pid=$! - + echo "Waiting for the integration test host to finish..." client_pid_return_code=0 wait "$client_pid" @@ -133,4 +133,3 @@ jobs: exit $client_pid_return_code echo "--------------------" - \ No newline at end of file diff --git a/spacefx/logging.py b/spacefx/logging.py index f66181d..f98b689 100644 --- a/spacefx/logging.py +++ b/spacefx/logging.py @@ -1,9 +1,11 @@ +from typing import Union import uuid import logging from spacefx.protos.common.Common_pb2 import \ LogMessageResponse, \ - TelemetryMetricResponse + TelemetryMetricResponse, \ + TelemetryMultiMetricResponse import Microsoft.Azure.SpaceFx.MessageFormats.Common from spacefx._sdk_client import __sdk_logging, __sdk_utils @@ -58,29 +60,76 @@ def send_complex_log_message(log_message: Microsoft.Azure.SpaceFx.MessageFormats return response -def send_telemetry(metric_name: str, metric_value: int, response_timeout_seconds: int = 30, wait_for_response: bool = False) -> TelemetryMetricResponse: +def send_telemetry(metric_name_or_object: Union[str, Microsoft.Azure.SpaceFx.MessageFormats.Common.TelemetryMetric], metric_value: int = None, response_timeout_seconds: int = 30, wait_for_response: bool = False) -> TelemetryMetricResponse: """ Sends a telemetry message to the Logging Host Service Args: - metricName (str): message that will be logged within hostsvc-logging - metricValue (int): value of the metric to send + metric_name_or_object (Union[str, Microsoft.Azure.SpaceFx.MessageFormats.Common.TelemetryMetric]): + Either the metric name (str) or the Telemetry Metric object to be sent. + metric_value (int, optional): Value of the metric to send. Required if metric_or_log_message is a str. response_timeout_seconds (int, optional): the number of seconds to wait for a successful TelemetryMetricResponse wait_for_response (bool, optional): enable/disable whether or not to wait for a TelemetryMetricResponse from the Logging Service. Disabled by default. Returns: - response (TelemetryMetricResponse): A successful TelemetryMetricResponse, or the last heard TelemetryMetricResponse during the timeout period + response (TelemetryMetricResponse): The TelemetryMetricResponse received from Logging Service Raises: - TimeoutError: Raises a TimeoutError if no LogResponse message was heard during the timeout period + TimeoutError: Raises a TimeoutError if no response message was heard during the timeout period """ - _task = __sdk_logging.SendTelemetry(metricName=metric_name, metricValue=metric_value, responseTimeoutSecs=response_timeout_seconds, wait_for_response=wait_for_response) - _task.Wait() + + if isinstance(metric_name_or_object, str): + if metric_value is None: + raise ValueError("metric_value must be provided when sending a telemetry metric.") + + # Send telemetry metric + _task = __sdk_logging.SendTelemetry(metricName=metric_name_or_object, metricValue=metric_value, responseTimeoutSecs=response_timeout_seconds, wait_for_response=wait_for_response) + _task.Wait() + + + elif isinstance(metric_name_or_object, Microsoft.Azure.SpaceFx.MessageFormats.Common.LogMeTelemetryMetricssage): + # Send log message + telemetry_message = metric_name_or_object + telemetry_message.RequestHeader = telemetry_message.RequestHeader or Microsoft.Azure.SpaceFx.MessageFormats.Common.RequestHeader() + telemetry_message.RequestHeader.TrackingId = telemetry_message.RequestHeader.TrackingId or str(uuid.uuid4()) + telemetry_message.RequestHeader.CorrelationId = telemetry_message.RequestHeader.CorrelationId or telemetry_message.RequestHeader.TrackingId + + # Assuming similar logic to send the log message and wait for response + _task = __sdk_logging.SendTelemetry(telemetryMessage=telemetry_message, responseTimeoutSecs=response_timeout_seconds, wait_for_response=wait_for_response) + _task.Wait() + response = TelemetryMetricResponse() result_bytes = bytes(__sdk_utils.ConvertProtoToBytes(_task.Result)) response.ParseFromString(result_bytes) - return response +def send_telemetrymulti(telemetry_multi: Microsoft.Azure.SpaceFx.MessageFormats.Common.TelemetryMultiMetric, response_timeout_seconds: int = 30, wait_for_response: bool = False) -> TelemetryMultiMetricResponse: + """ + Sends a telemetry message to the Logging Host Service + + Args: + telemetryMulti (Microsoft.Azure.SpaceFx.MessageFormats.Common.TelemetryMultiMetric): The TelemetryMultiMetric object to be sent. + response_timeout_seconds (int, optional): the number of seconds to wait for a successful TelemetryMetricResponse + wait_for_response (bool, optional): enable/disable whether or not to wait for a TelemetryMetricResponse from the Logging Service. Disabled by default. + Returns: + response (TelemetryMultiMetricResponse): TelemetryMultiMetricResponse received from Logging Service + Raises: + TimeoutError: Raises a TimeoutError if no response message was heard during the timeout period + """ + + # Send log message + telemetry_multi.RequestHeader = telemetry_multi.RequestHeader or Microsoft.Azure.SpaceFx.MessageFormats.Common.RequestHeader() + telemetry_multi.RequestHeader.TrackingId = telemetry_multi.RequestHeader.TrackingId or str(uuid.uuid4()) + telemetry_multi.RequestHeader.CorrelationId = telemetry_multi.RequestHeader.CorrelationId or telemetry_multi.RequestHeader.TrackingId + + # Assuming similar logic to send the log message and wait for response + _task = __sdk_logging.SendMultiTelemetry(telemetryMessage=telemetry_multi, responseTimeoutSecs=response_timeout_seconds, wait_for_response=wait_for_response) + _task.Wait() + + + response = TelemetryMultiMetricResponse() + result_bytes = bytes(__sdk_utils.ConvertProtoToBytes(_task.Result)) + response.ParseFromString(result_bytes) + return response # This is inteded to be used as a drop-in replacement for the default python logger class # Use via spacefx.logger rather than accessing the logger directly diff --git a/src/Client.cs b/src/Client.cs index f37d99f..de72107 100644 --- a/src/Client.cs +++ b/src/Client.cs @@ -35,6 +35,7 @@ public static ILogger Logger { } public static EventHandler? LogMessageResponseEvent; public static EventHandler? TelemetryMetricResponseEvent; + public static EventHandler? TelemetryMultiMetricResponseEvent; public static EventHandler? SensorsAvailableResponseEvent; public static EventHandler? SensorsTaskingPreCheckResponseEvent; public static EventHandler? SensorsTaskingResponseEvent; @@ -44,6 +45,11 @@ public static ILogger Logger { public delegate void SensorDataEventPythonHandler(byte[] sensorData); public static event SensorDataEventPythonHandler? SensorDataEventPython; + /// (Optional) Provide a boolean response for the integrated app healthcheck. If used, any value other than "true" will signify the app is in a failed state and should be terminated. + public delegate bool IsAppHealthyDelegate(); + public static IsAppHealthyDelegate? IsAppHealthy; + + /// /// Instantiates the SDK Client and allows for messages to be sent and received /// @@ -68,8 +74,8 @@ public static async Task KeepAppOpen() { /// public static void Shutdown() { _globalCancellationTokenSource.Cancel(); - if (_client is null || _grpcHost is null) throw new Exception("Client is not provisioned. Please deploy the client before trying to run this"); - _grpcHost.StopAsync().Wait(); + if (_client is not null && _grpcHost is not null) + _grpcHost.StopAsync().Wait(); } @@ -126,6 +132,8 @@ public Client() { services.AddAzureOrbitalFramework(); services.AddSingleton, MessageHandler>(); services.AddSingleton, MessageHandler>(); + services.AddSingleton, MessageHandler>(); + services.AddSingleton, MessageHandler>(); services.AddSingleton, MessageHandler>(); services.AddSingleton, MessageHandler>(); services.AddSingleton, MessageHandler>(); @@ -134,8 +142,8 @@ public Client() { services.AddHostedService(); }).ConfigureLogging((logging) => { logging.AddProvider(new Microsoft.Extensions.Logging.SpaceFX.Logger.HostSvcLoggerProvider()); - logging.AddConsole(options => { - options.DisableColors = true; + logging.AddSimpleConsole(options => { + options.ColorBehavior = Extensions.Logging.Console.LoggerColorBehavior.Disabled; options.TimestampFormat = "[yyyy-MM-dd HH:mm:ss] "; }); logging.AddConfiguration(builder.Configuration.GetSection("Logging")); @@ -150,6 +158,7 @@ public Client() { _grpcHost.UseRouting(); _grpcHost.UseEndpoints(endpoints => { endpoints.MapGrpcService(); + endpoints.MapGrpcHealthChecksService(); endpoints.MapGet("/", async context => { await context.Response.WriteAsync("Communication with gRPC endpoints must be made through a gRPC client. To learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909"); }); @@ -197,10 +206,10 @@ public MessageHandler(ILogger> logger, IServiceProvider servic /// public void MessageReceived(T message, MessageFormats.Common.DirectToApp fullMessage) { using (var scope = _serviceProvider.CreateScope()) { - _logger.LogInformation($"Receieved message type '{typeof(T).Name}'"); + _logger.LogDebug($"Receieved message type '{typeof(T).Name}'"); if (message == null || EqualityComparer.Default.Equals(message, default)) { - _logger.LogInformation("Received empty message '{messageType}' from '{appId}'. Discarding message.", typeof(T).Name, fullMessage.SourceAppId); + _logger.LogDebug("Received empty message '{messageType}' from '{appId}'. Discarding message.", typeof(T).Name, fullMessage.SourceAppId); return; } @@ -214,10 +223,16 @@ public void MessageReceived(T message, MessageFormats.Common.DirectToApp fullMes case string messageType when messageType.Equals(typeof(MessageFormats.Common.LogMessageResponse).Name, StringComparison.CurrentCultureIgnoreCase): MessageEventRouter(message: message as MessageFormats.Common.LogMessageResponse, sourceAppId: fullMessage.SourceAppId, eventHandler: LogMessageResponseEvent); break; + case string messageType when messageType.Equals(typeof(MessageFormats.Common.TelemetryMetricResponse).Name, StringComparison.CurrentCultureIgnoreCase): + MessageEventRouter(message: message as MessageFormats.Common.TelemetryMetricResponse, sourceAppId: fullMessage.SourceAppId, eventHandler: TelemetryMetricResponseEvent); + break; + case string messageType when messageType.Equals(typeof(MessageFormats.Common.TelemetryMultiMetricResponse).Name, StringComparison.CurrentCultureIgnoreCase): + MessageEventRouter(message: message as MessageFormats.Common.TelemetryMultiMetricResponse, sourceAppId: fullMessage.SourceAppId, eventHandler: TelemetryMultiMetricResponseEvent); + break; case string messageType when messageType.Equals(typeof(MessageFormats.HostServices.Sensor.SensorData).Name, StringComparison.CurrentCultureIgnoreCase): MessageEventRouter(message: message as MessageFormats.HostServices.Sensor.SensorData, sourceAppId: fullMessage.SourceAppId, eventHandler: SensorDataEvent); if (message != null && message is MessageFormats.HostServices.Sensor.SensorData) { - _logger.LogInformation($"Routing message type '{typeof(T).Name}' to Python event handler"); + _logger.LogDebug($"Routing message type '{typeof(T).Name}' to Python event handler"); SensorDataEventPython?.Invoke(message.ToByteArray()); } break; @@ -246,7 +261,7 @@ public void MessageReceived(T message, MessageFormats.Common.DirectToApp fullMes } } - public class ServiceCallback : BackgroundService { + public class ServiceCallback : BackgroundService, Core.IMonitorableService { private readonly ILogger _logger; private readonly IServiceProvider _serviceProvider; private readonly Microsoft.Azure.SpaceFx.Core.Client _client; @@ -257,6 +272,26 @@ public ServiceCallback(ILogger logger, IServiceProvider service _client = client; } + public bool IsHealthy() { + if (IsAppHealthy == null) { + _logger.LogTrace("No AppHealthCheck event handler registered. Returning default value of 'true'."); + return true; + } + + _logger.LogTrace("Received Health Check request from cluster. Triggering IsAppHealthy event handler."); + try { + bool isHealthy = IsAppHealthy(); + _logger.LogDebug($"IsAppHealthy returned '{isHealthy}'."); + if (!isHealthy) { + _logger.LogCritical("IsAppHealthy returned 'false' and is unhealthy. Check logs for more details."); + } + return isHealthy; + } catch (Exception ex) { + _logger.LogError(ex, "Exception calling IsAppHealthy. Setting response to false."); + return false; + } + } + protected override Task ExecuteAsync(CancellationToken stoppingToken) { return Task.Run(() => { using (var scope = _serviceProvider.CreateScope()) { diff --git a/src/HostServices/Logging.cs b/src/HostServices/Logging.cs index 6750de6..2146b3e 100644 --- a/src/HostServices/Logging.cs +++ b/src/HostServices/Logging.cs @@ -169,4 +169,72 @@ void TelemetryResponseEventHandler(object? _, MessageFormats.Common.TelemetryMet return SendTelemetry(telemetryMessage: telemetryMessage, responseTimeoutSecs: responseTimeoutSecs, waitForResponse: waitForResponse); } + + public static Task SendMultiTelemetry(MessageFormats.Common.TelemetryMultiMetric telemetryMessage, int? responseTimeoutSecs = null, bool? waitForResponse = false) => Task.Run(async () => { + bool targetServiceOnline = false; + + if (telemetryMessage.RequestHeader is null) telemetryMessage.RequestHeader = new(); + if (string.IsNullOrWhiteSpace(telemetryMessage.RequestHeader.TrackingId)) telemetryMessage.RequestHeader.TrackingId = Guid.NewGuid().ToString(); + if (string.IsNullOrWhiteSpace(telemetryMessage.RequestHeader.CorrelationId)) telemetryMessage.RequestHeader.CorrelationId = telemetryMessage.RequestHeader.TrackingId; + + MessageFormats.Common.TelemetryMultiMetricResponse response = SpaceFx.Core.Utils.ResponseFromRequest(telemetryMessage, new MessageFormats.Common.TelemetryMultiMetricResponse()); + + Logger.LogDebug("Waiting for service '{service_app_id}' to come online (trackingId: '{trackingId}' / correlationId: '{correlationId}')", TARGET_SERVICE_APP_ID, telemetryMessage.RequestHeader.TrackingId, telemetryMessage.RequestHeader.CorrelationId); + + // Wait for the service to come online + targetServiceOnline = Utils.WaitForService(appId: TARGET_SERVICE_APP_ID, responseTimeoutSecs: responseTimeoutSecs).Result; + + if (!targetServiceOnline) { + Logger.LogError("Service '{service_app_id}' is not online and not available to handle the message request. No heartbeat was received within {responseTimeoutSecs} (trackingId: '{trackingId}' / correlationId: '{correlationId}')", TARGET_SERVICE_APP_ID, responseTimeoutSecs, telemetryMessage.RequestHeader.TrackingId, telemetryMessage.RequestHeader.CorrelationId); + + // Response didn't come back in time. Return with a failure + throw new InvalidOperationException($"Service '{TARGET_SERVICE_APP_ID}' is not online and not available to handle the message request."); + } + + Logger.LogDebug("Service '{service_app_id}' is online (trackingId: '{trackingId}' / correlationId: '{correlationId}')", TARGET_SERVICE_APP_ID, telemetryMessage.RequestHeader.TrackingId, telemetryMessage.RequestHeader.CorrelationId); + + Logger.LogDebug("WaitForResponse = '{wait_for_response}' (trackingId: '{trackingId}' / correlationId: '{correlationId}')", waitForResponse, telemetryMessage.RequestHeader.TrackingId, telemetryMessage.RequestHeader.CorrelationId); + + + // Only wire up a callback if we're waiting for a response + if (waitForResponse == true) { + void TelemetryResponseEventHandler(object? _, MessageFormats.Common.TelemetryMultiMetricResponse eventHandlerResponse) { + if (eventHandlerResponse.ResponseHeader.TrackingId == telemetryMessage.RequestHeader.TrackingId) { + Logger.LogDebug("Message response received for '{messageType}'. Status: '{status}' (trackingId: '{trackingId}' / correlationId: '{correlationId}' / status: '{status}')", eventHandlerResponse.GetType().Name, eventHandlerResponse.ResponseHeader.Status, eventHandlerResponse.ResponseHeader.TrackingId, eventHandlerResponse.ResponseHeader.CorrelationId, eventHandlerResponse.ResponseHeader.Status); + + response = eventHandlerResponse; + Client.TelemetryMultiMetricResponseEvent -= TelemetryResponseEventHandler; // Remove the callback so it's not called for future iterations + } + } + + Client.TelemetryMultiMetricResponseEvent += TelemetryResponseEventHandler; + } + + Logger.LogDebug("Sending '{messageType}' to '{appId}'. (trackingId: '{trackingId}' / correlationId: '{correlationId}')", telemetryMessage.GetType().Name, TARGET_SERVICE_APP_ID, telemetryMessage.RequestHeader.TrackingId, telemetryMessage.RequestHeader.CorrelationId); + + await Client.DirectToApp(appId: TARGET_SERVICE_APP_ID, message: telemetryMessage); + + // Only wait for a response if we're expecting one + if (waitForResponse == true) { + + TimeSpan maxWait = TimeSpan.FromSeconds(responseTimeoutSecs ?? Client.DefaultMessageResponseTimeout.TotalSeconds); + DateTime responseDeadline = DateTime.UtcNow.Add(maxWait); + + Logger.LogDebug("Waiting for '{messageType}'. Deadline: '{timeout}'. (trackingId: '{trackingId}' / correlationId: '{correlationId}')", nameof(TelemetryMultiMetricResponse), maxWait, telemetryMessage.RequestHeader.TrackingId, telemetryMessage.RequestHeader.CorrelationId); + + // Loop until we've received a response or we've timed out + for (; (response == null || response.ResponseHeader.Status == MessageFormats.Common.StatusCodes.Unknown) && DateTime.UtcNow <= responseDeadline; await Task.Delay((int) Client.DefaultPollingTime.TotalMilliseconds)) ; + + if (response == null) { + Logger.LogError("Timed out waiting for '{messageType}'. Deadline: '{timeout}'. (trackingId: '{trackingId}' / correlationId: '{correlationId}')", nameof(TelemetryMultiMetricResponse), maxWait, telemetryMessage.RequestHeader.TrackingId, telemetryMessage.RequestHeader.CorrelationId); + throw new TimeoutException($"Timed out waiting for a response from {TARGET_SERVICE_APP_ID}"); + } + } + + Logger.LogDebug("Returning '{messageType}' with status '{status}' to payload app (trackingId: '{trackingId}' / correlationId: '{correlationId}' / status: '{status}')", nameof(TelemetryMultiMetricResponse), response.ResponseHeader.Status, telemetryMessage.RequestHeader.TrackingId, telemetryMessage.RequestHeader.CorrelationId, response.ResponseHeader.Status); + + return response; + }); + + } diff --git a/test/integrationTests/Program.cs b/test/integrationTests/Program.cs index 1f61c51..c9db94d 100644 --- a/test/integrationTests/Program.cs +++ b/test/integrationTests/Program.cs @@ -4,9 +4,9 @@ public class Program { internal static TestSharedContext TEST_SHARED_CONTEXT = new(); public static void Main(string[] args) { Console.WriteLine("--------- Starting Tests ---------"); + RunTests(); RunTests(); RunTests(); - RunTests(); RunTests(); RunTests(); diff --git a/test/integrationTests/TestSharedContext.cs b/test/integrationTests/TestSharedContext.cs index 5371a95..37626d2 100644 --- a/test/integrationTests/TestSharedContext.cs +++ b/test/integrationTests/TestSharedContext.cs @@ -8,6 +8,7 @@ namespace Microsoft.Azure.SpaceFx.SDK.IntegrationTests; public class TestSharedContext : IDisposable { private static TestSharedContext TextContext { get; set; } = null!; private static bool IS_PROVISIONED = false; + public static bool HEALTH_CHECK_RECEIVED = false; internal static TimeSpan MAX_TIMESPAN_TO_WAIT_FOR_MSG = TimeSpan.FromSeconds(90); public readonly string GenericGuid = Guid.NewGuid().ToString(); public readonly int GenericInt = 12345; @@ -27,13 +28,18 @@ public TestSharedContext() { Client.Build(); Client.WaitForOnline(MAX_TIMESPAN_TO_WAIT_FOR_MSG); + + Client.IsAppHealthy = () => { + TestSharedContext.HEALTH_CHECK_RECEIVED = true; + return true; // Assume the app is healthy + }; + + + IS_PROVISIONED = true; } - public static void WritePropertyLineToScreen(string testName, string propertyName) { - Console.WriteLine($"[{testName}] testing property '{propertyName}'"); - } public void Dispose() { GC.SuppressFinalize(this); diff --git a/test/integrationTests/Tests/LogMessageTests.cs b/test/integrationTests/Tests/LogMessageTests.cs index 5d4dab7..fc7bade 100644 --- a/test/integrationTests/Tests/LogMessageTests.cs +++ b/test/integrationTests/Tests/LogMessageTests.cs @@ -27,4 +27,54 @@ public void SendLogMessage() { Assert.Equal(Microsoft.Azure.SpaceFx.MessageFormats.Common.StatusCodes.Successful, response.ResponseHeader.Status); } + + [Fact] + public void SendTelemetryMessage() { + DateTime maxTimeToWait = DateTime.Now.Add(TestSharedContext.MAX_TIMESPAN_TO_WAIT_FOR_MSG); + + MessageFormats.Common.TelemetryMetricResponse? response = null; + + Task.Run(async () => { + response = await Logging.SendTelemetry(metricName: "IntegrationTests", metricValue: 1, waitForResponse: true); + }); + + while (response == null && DateTime.Now <= maxTimeToWait) { + Thread.Sleep(100); + } + + if (response == null) throw new TimeoutException($"Failed to hear {nameof(response)} heartbeat after {TestSharedContext.MAX_TIMESPAN_TO_WAIT_FOR_MSG}. Please check that {TARGET_SERVICE_APP_ID} is deployed"); + + Assert.Equal(Microsoft.Azure.SpaceFx.MessageFormats.Common.StatusCodes.Successful, response.ResponseHeader.Status); + } + + [Fact] + public void SendTelemetryMultiMessage() { + DateTime maxTimeToWait = DateTime.Now.Add(TestSharedContext.MAX_TIMESPAN_TO_WAIT_FOR_MSG); + + MessageFormats.Common.TelemetryMultiMetricResponse? response = null; + + Task.Run(async () => { + MessageFormats.Common.TelemetryMultiMetric message = new MessageFormats.Common.TelemetryMultiMetric(); + message.RequestHeader = new MessageFormats.Common.RequestHeader() { + TrackingId = Guid.NewGuid().ToString(), + }; + message.RequestHeader.CorrelationId = message.RequestHeader.TrackingId; + + message.TelemetryMetrics.Add(new MessageFormats.Common.TelemetryMetric() { + RequestHeader = message.RequestHeader, + MetricName = "IntegrationTests", + MetricValue = 1, + }); + + response = await Logging.SendMultiTelemetry(telemetryMessage: message, waitForResponse: true); + }); + + while (response == null && DateTime.Now <= maxTimeToWait) { + Thread.Sleep(100); + } + + if (response == null) throw new TimeoutException($"Failed to hear {nameof(response)} heartbeat after {TestSharedContext.MAX_TIMESPAN_TO_WAIT_FOR_MSG}. Please check that {TARGET_SERVICE_APP_ID} is deployed"); + + Assert.Equal(Microsoft.Azure.SpaceFx.MessageFormats.Common.StatusCodes.Successful, response.ResponseHeader.Status); + } } diff --git a/test/integrationTests/Tests/ServicesHeardTests.cs b/test/integrationTests/Tests/ServicesHeardTests.cs index 0b61591..47a8600 100644 --- a/test/integrationTests/Tests/ServicesHeardTests.cs +++ b/test/integrationTests/Tests/ServicesHeardTests.cs @@ -12,7 +12,7 @@ public ServicesHeardTests(TestSharedContext context) { public void CheckServicesAreHeard() { // Services send out HeartBeats to let other apps know they are online. // We have to give enough time for heartbeats to come in before we check - TimeSpan pauseTime = TimeSpan.FromMilliseconds(Client.APP_CONFIG.HEARTBEAT_PULSE_TIMING_MS * 2); + TimeSpan pauseTime = TimeSpan.FromMilliseconds(Client.APP_CONFIG.HEARTBEAT_RECEIVED_TOLERANCE_MS); Console.WriteLine($"Waiting for {pauseTime.Seconds} seconds, then checking for services heard..."); Thread.Sleep(pauseTime); @@ -26,4 +26,22 @@ public void CheckServicesAreHeard() { Assert.True(heartBeats.Count > 0); } + + [Fact] + public void HealthCheckTest() { + TimeSpan waitTimeSpan = TimeSpan.FromMilliseconds(Client.APP_CONFIG.HEARTBEAT_RECEIVED_CRITICAL_TOLERANCE_MS); + DateTime maxTimeToWait = DateTime.Now.Add(waitTimeSpan); + + Console.WriteLine($"Starting HealthCheckTest. Waiting maximum of {waitTimeSpan} for IsAppHealthy to be called."); + + while (TestSharedContext.HEALTH_CHECK_RECEIVED == false && DateTime.Now <= maxTimeToWait) { + Thread.Sleep(100); + } + + Console.WriteLine($"IS_APP_HEALTHY received: {TestSharedContext.HEALTH_CHECK_RECEIVED}"); + + if (!TestSharedContext.HEALTH_CHECK_RECEIVED) throw new TimeoutException($"Failed to hear IsAppHealthy heartbeat after {TestSharedContext.MAX_TIMESPAN_TO_WAIT_FOR_MSG}."); + + Assert.True(TestSharedContext.HEALTH_CHECK_RECEIVED); + } }