| /* |
| * Copyright (C) 2021 The Android Open Source Project |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package com.android.car.telemetry.databroker; |
| |
| import android.annotation.NonNull; |
| import android.annotation.Nullable; |
| import android.car.builtin.util.Slogf; |
| import android.car.builtin.util.TimingsTraceLog; |
| import android.car.telemetry.MetricsConfigKey; |
| import android.content.ComponentName; |
| import android.content.Context; |
| import android.content.Intent; |
| import android.content.ServiceConnection; |
| import android.content.pm.PackageInfo; |
| import android.content.pm.PackageManager; |
| import android.os.Handler; |
| import android.os.HandlerThread; |
| import android.os.IBinder; |
| import android.os.Looper; |
| import android.os.Message; |
| import android.os.ParcelFileDescriptor; |
| import android.os.PersistableBundle; |
| import android.os.RemoteException; |
| import android.os.UserHandle; |
| import android.util.ArrayMap; |
| |
| import com.android.car.CarLog; |
| import com.android.car.CarServiceUtils; |
| import com.android.car.telemetry.CarTelemetryService; |
| import com.android.car.telemetry.ResultStore; |
| import com.android.car.telemetry.TelemetryProto; |
| import com.android.car.telemetry.TelemetryProto.MetricsConfig; |
| import com.android.car.telemetry.publisher.AbstractPublisher; |
| import com.android.car.telemetry.publisher.PublisherFactory; |
| import com.android.car.telemetry.scriptexecutorinterface.IScriptExecutor; |
| import com.android.car.telemetry.scriptexecutorinterface.IScriptExecutorListener; |
| import com.android.car.telemetry.util.IoUtils; |
| import com.android.internal.annotations.VisibleForTesting; |
| |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.lang.ref.WeakReference; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.concurrent.PriorityBlockingQueue; |
| |
| /** |
| * Implementation of the data path component of CarTelemetryService. Forwards the published data |
| * from publishers to consumers subject to the Controller's decision. |
| * All methods should be called from the telemetry thread unless otherwise specified as thread-safe. |
| */ |
| public class DataBrokerImpl implements DataBroker { |
| |
| @VisibleForTesting |
| static final int MSG_HANDLE_TASK = 1; |
| @VisibleForTesting |
| static final int MSG_BIND_TO_SCRIPT_EXECUTOR = 2; |
| |
| /** Bind to script executor 5 times before entering disabled state. */ |
| private static final int MAX_BIND_SCRIPT_EXECUTOR_ATTEMPTS = 5; |
| |
| // TODO(b/216134347): Find a better way to find the package. |
| private static final String[] SCRIPT_EXECUTOR_PACKAGE_CANDIDATES = |
| {"com.android.car.scriptexecutor", "com.google.android.car.scriptexecutor"}; |
| private static final String SCRIPT_EXECUTOR_CLASS = |
| "com.android.car.scriptexecutor.ScriptExecutor"; |
| |
| private final Context mContext; |
| private final PublisherFactory mPublisherFactory; |
| private final ResultStore mResultStore; |
| private final ScriptExecutorListener mScriptExecutorListener; |
| private final HandlerThread mTelemetryThread = CarServiceUtils.getHandlerThread( |
| CarTelemetryService.class.getSimpleName()); |
| private final Handler mTelemetryHandler = new TaskHandler(mTelemetryThread.getLooper()); |
| |
| /** Thread-safe priority queue for scheduling tasks. */ |
| private final PriorityBlockingQueue<ScriptExecutionTask> mTaskQueue = |
| new PriorityBlockingQueue<>(); |
| |
| /** |
| * Maps MetricsConfig's unique identifier to its subscriptions. This map is useful when |
| * removing a MetricsConfig. |
| */ |
| private final ArrayMap<MetricsConfigKey, List<DataSubscriber>> mSubscriptionMap = |
| new ArrayMap<>(); |
| |
| /** |
| * If something irrecoverable happened, DataBroker should enter into a disabled state to prevent |
| * doing futile work. |
| */ |
| private boolean mDisabled = false; |
| |
| /** Current number of attempts to bind to ScriptExecutor. */ |
| private int mBindScriptExecutorAttempts = 0; |
| |
| /** Priority of current system to determine if a {@link ScriptExecutionTask} can run. */ |
| private int mPriority = 1; |
| |
| /** Waiting period between attempts to bind script executor. Can be shortened for tests. */ |
| @VisibleForTesting long mBindScriptExecutorDelayMillis = 3_000L; |
| |
| /** |
| * {@link MetricsConfigKey} that uniquely identifies the current running {@link MetricsConfig}. |
| * A non-null value indicates ScriptExecutor is currently running this config, which means |
| * DataBroker should not make another ScriptExecutor binder call. |
| */ |
| private MetricsConfigKey mCurrentMetricsConfigKey; |
| private IScriptExecutor mScriptExecutor; |
| private ScriptFinishedCallback mScriptFinishedCallback; |
| |
| /** |
| * Used only for the purpose of tracking the duration of running a script. The duration |
| * starts before the ScriptExecutor binder call and ends when a status is returned via |
| * ScriptExecutorListener or when the binder call throws an exception. |
| */ |
| private TimingsTraceLog mScriptExecutionTraceLog; |
| |
| private final ServiceConnection mServiceConnection = new ServiceConnection() { |
| @Override |
| public void onServiceConnected(ComponentName name, IBinder service) { |
| mTelemetryHandler.post(() -> { |
| mScriptExecutor = IScriptExecutor.Stub.asInterface(service); |
| scheduleNextTask(); |
| }); |
| } |
| |
| @Override |
| public void onServiceDisconnected(ComponentName name) { |
| // TODO(b/198684473): clean up the state after script executor disconnects |
| mTelemetryHandler.post(() -> { |
| // if a script ran and crashed ScriptExecutor, end trace log |
| if (mCurrentMetricsConfigKey != null) { |
| mScriptExecutionTraceLog.traceEnd(); |
| } |
| mScriptExecutor = null; |
| unbindScriptExecutor(); |
| }); |
| } |
| }; |
| |
| public DataBrokerImpl( |
| @NonNull Context context, |
| @NonNull PublisherFactory publisherFactory, |
| @NonNull ResultStore resultStore, |
| @NonNull TimingsTraceLog traceLog) { |
| mContext = context; |
| mPublisherFactory = publisherFactory; |
| mResultStore = resultStore; |
| mScriptExecutorListener = new ScriptExecutorListener(this); |
| mPublisherFactory.initialize(this::onPublisherFailure); |
| mScriptExecutionTraceLog = traceLog; |
| } |
| |
| private void onPublisherFailure( |
| @NonNull AbstractPublisher publisher, |
| @NonNull List<TelemetryProto.MetricsConfig> affectedConfigs, |
| @Nullable Throwable error) { |
| // TODO(b/193680465): disable MetricsConfig and log the error |
| Slogf.w(CarLog.TAG_TELEMETRY, "publisher failed", error); |
| } |
| |
| @Nullable |
| private String findExecutorPackage() { |
| PackageInfo info = null; |
| for (int i = 0; i < SCRIPT_EXECUTOR_PACKAGE_CANDIDATES.length; i++) { |
| try { |
| info = mContext.getPackageManager().getPackageInfo( |
| SCRIPT_EXECUTOR_PACKAGE_CANDIDATES[i], /* flags= */ 0); |
| if (info != null) { |
| break; |
| } |
| } catch (PackageManager.NameNotFoundException e) { |
| // ignore |
| } |
| } |
| if (info == null) { |
| return null; |
| } |
| return info.packageName; |
| } |
| |
| private void bindScriptExecutor() { |
| // do not re-bind if broker is in a disabled state or if script executor is nonnull |
| if (mDisabled || mScriptExecutor != null) { |
| return; |
| } |
| String executorPackage = findExecutorPackage(); |
| if (executorPackage == null) { |
| Slogf.w(CarLog.TAG_TELEMETRY, "Cannot find executor package"); |
| return; |
| } |
| Intent intent = new Intent(); |
| intent.setComponent(new ComponentName(executorPackage, SCRIPT_EXECUTOR_CLASS)); |
| boolean success = mContext.bindServiceAsUser( |
| intent, |
| mServiceConnection, |
| Context.BIND_AUTO_CREATE, |
| UserHandle.SYSTEM); |
| if (success) { |
| mBindScriptExecutorAttempts = 0; // reset |
| return; |
| } |
| unbindScriptExecutor(); |
| mBindScriptExecutorAttempts++; |
| if (mBindScriptExecutorAttempts < MAX_BIND_SCRIPT_EXECUTOR_ATTEMPTS) { |
| Slogf.w(CarLog.TAG_TELEMETRY, |
| "failed to get valid connection to ScriptExecutor, retrying in " |
| + mBindScriptExecutorDelayMillis + "ms."); |
| mTelemetryHandler.sendEmptyMessageDelayed(MSG_BIND_TO_SCRIPT_EXECUTOR, |
| mBindScriptExecutorDelayMillis); |
| } else { |
| Slogf.w(CarLog.TAG_TELEMETRY, "failed to get valid connection to ScriptExecutor, " |
| + "disabling DataBroker"); |
| disableBroker(); |
| } |
| } |
| |
| /** |
| * Unbinds {@link ScriptExecutor} to release the connection. This method should be called from |
| * the telemetry thread. |
| */ |
| private void unbindScriptExecutor() { |
| // TODO(b/198648763): unbind from script executor when there is no work to do |
| mCurrentMetricsConfigKey = null; |
| try { |
| mContext.unbindService(mServiceConnection); |
| } catch (IllegalArgumentException e) { |
| // If ScriptExecutor is gone before unbinding, it will throw this exception |
| Slogf.w(CarLog.TAG_TELEMETRY, "Failed to unbind from ScriptExecutor", e); |
| } |
| } |
| |
| /** |
| * Enters into a disabled state because something irrecoverable happened. |
| * TODO(b/200841260): expose the state to the caller. |
| */ |
| private void disableBroker() { |
| mDisabled = true; |
| // remove all MetricConfigs, disable all publishers, stop receiving data |
| for (MetricsConfigKey key : mSubscriptionMap.keySet()) { |
| // get the metrics config from the DataSubscriber and remove the metrics config |
| if (mSubscriptionMap.get(key).size() != 0) { |
| removeMetricsConfig(key); |
| } |
| } |
| mSubscriptionMap.clear(); |
| } |
| |
| @Override |
| public void addMetricsConfig( |
| @NonNull MetricsConfigKey key, @NonNull MetricsConfig metricsConfig) { |
| // TODO(b/187743369): pass status back to caller |
| // if broker is disabled or metricsConfig already exists, do nothing |
| if (mDisabled || mSubscriptionMap.containsKey(key)) { |
| return; |
| } |
| // Create the subscribers for this metrics configuration |
| List<DataSubscriber> dataSubscribers = new ArrayList<>( |
| metricsConfig.getSubscribersList().size()); |
| for (TelemetryProto.Subscriber subscriber : metricsConfig.getSubscribersList()) { |
| // protobuf publisher to a concrete Publisher |
| AbstractPublisher publisher = mPublisherFactory.getPublisher( |
| subscriber.getPublisher().getPublisherCase()); |
| // create DataSubscriber from TelemetryProto.Subscriber |
| DataSubscriber dataSubscriber = new DataSubscriber( |
| this, |
| metricsConfig, |
| subscriber); |
| dataSubscribers.add(dataSubscriber); |
| |
| try { |
| // The publisher will start sending data to the subscriber. |
| // TODO(b/191378559): handle bad configs |
| publisher.addDataSubscriber(dataSubscriber); |
| } catch (IllegalArgumentException e) { |
| Slogf.w(CarLog.TAG_TELEMETRY, "Invalid config", e); |
| return; |
| } |
| } |
| mSubscriptionMap.put(key, dataSubscribers); |
| } |
| |
| @Override |
| public void removeMetricsConfig(@NonNull MetricsConfigKey key) { |
| // TODO(b/187743369): pass status back to caller |
| if (!mSubscriptionMap.containsKey(key)) { |
| return; |
| } |
| // get the subscriptions associated with this MetricsConfig, remove it from the map |
| List<DataSubscriber> dataSubscribers = mSubscriptionMap.remove(key); |
| // for each subscriber, remove it from publishers |
| for (DataSubscriber subscriber : dataSubscribers) { |
| AbstractPublisher publisher = mPublisherFactory.getPublisher( |
| subscriber.getPublisherParam().getPublisherCase()); |
| try { |
| publisher.removeDataSubscriber(subscriber); |
| } catch (IllegalArgumentException e) { |
| // It shouldn't happen, but if happens, let's just log it. |
| Slogf.w(CarLog.TAG_TELEMETRY, "Failed to remove subscriber from publisher", e); |
| } |
| } |
| // Remove all the tasks associated with this metrics config. The underlying impl uses the |
| // weakly consistent iterator, which is thread-safe but does not freeze the collection while |
| // iterating, so it may or may not reflect any updates since the iterator was created. |
| // But since adding & polling from queue should happen in the same thread, the task queue |
| // should not be changed while tasks are being iterated and removed. |
| mTaskQueue.removeIf(task -> task.isAssociatedWithMetricsConfig(key)); |
| } |
| |
| @Override |
| public void removeAllMetricsConfigs() { |
| mPublisherFactory.removeAllDataSubscribers(); |
| mSubscriptionMap.clear(); |
| mTaskQueue.clear(); |
| } |
| |
| @Override |
| public void addTaskToQueue(@NonNull ScriptExecutionTask task) { |
| if (mDisabled) { |
| return; |
| } |
| mTaskQueue.add(task); |
| scheduleNextTask(); |
| } |
| |
| /** |
| * This method can be called from any thread. |
| * It is possible for this method to be invoked from different threads at the same time, but |
| * it is not possible to schedule the same task twice, because the handler handles message |
| * in the order they come in, this means the task will be polled sequentially instead of |
| * concurrently. Every task that is scheduled and run will be distinct. |
| * TODO(b/187743369): If the threading behavior in DataSubscriber changes, ScriptExecutionTask |
| * will also have different threading behavior. Update javadoc when the behavior is decided. |
| */ |
| @Override |
| public void scheduleNextTask() { |
| if (mDisabled || mTelemetryHandler.hasMessages(MSG_HANDLE_TASK)) { |
| return; |
| } |
| mTelemetryHandler.sendEmptyMessage(MSG_HANDLE_TASK); |
| } |
| |
| @Override |
| public void setOnScriptFinishedCallback(@NonNull ScriptFinishedCallback callback) { |
| if (mDisabled) { |
| return; |
| } |
| mScriptFinishedCallback = callback; |
| } |
| |
| @Override |
| public void setTaskExecutionPriority(int priority) { |
| if (mDisabled) { |
| return; |
| } |
| mPriority = priority; |
| scheduleNextTask(); // when priority updates, schedule a task which checks task queue |
| } |
| |
| @VisibleForTesting |
| @NonNull |
| ArrayMap<MetricsConfigKey, List<DataSubscriber>> getSubscriptionMap() { |
| return new ArrayMap<>(mSubscriptionMap); |
| } |
| |
| @VisibleForTesting |
| @NonNull |
| Handler getTelemetryHandler() { |
| return mTelemetryHandler; |
| } |
| |
| @VisibleForTesting |
| @NonNull |
| PriorityBlockingQueue<ScriptExecutionTask> getTaskQueue() { |
| return mTaskQueue; |
| } |
| |
| /** |
| * Polls and runs a task from the head of the priority queue if the queue is nonempty and the |
| * head of the queue has priority higher than or equal to the current priority. A higher |
| * priority is denoted by a lower priority number, so head of the queue should have equal or |
| * lower priority number to be polled. |
| */ |
| private void pollAndExecuteTask() { |
| // check databroker state is ready to run script |
| if (mDisabled || mCurrentMetricsConfigKey != null) { |
| return; |
| } |
| // check task is valid and ready to be run |
| ScriptExecutionTask task = mTaskQueue.peek(); |
| if (task == null || task.getPriority() > mPriority) { |
| return; |
| } |
| // if script executor is null, bind service |
| if (mScriptExecutor == null) { |
| Slogf.w(CarLog.TAG_TELEMETRY, "script executor is null, binding to script executor"); |
| // upon successful binding, a task will be scheduled to run if there are any |
| mTelemetryHandler.sendEmptyMessage(MSG_BIND_TO_SCRIPT_EXECUTOR); |
| return; |
| } |
| mTaskQueue.poll(); // remove task from queue |
| // update current config key because a script is currently running |
| mCurrentMetricsConfigKey = new MetricsConfigKey(task.getMetricsConfig().getName(), |
| task.getMetricsConfig().getVersion()); |
| mScriptExecutionTraceLog.traceBegin( |
| "executing script " + mCurrentMetricsConfigKey.getName()); |
| try { |
| if (task.isLargeData()) { |
| Slogf.d(CarLog.TAG_TELEMETRY, "invoking script executor for large input"); |
| invokeScriptForLargeInput(task); |
| } else { |
| Slogf.d(CarLog.TAG_TELEMETRY, "invoking script executor"); |
| mScriptExecutor.invokeScript( |
| task.getMetricsConfig().getScript(), |
| task.getHandlerName(), |
| task.getData(), |
| mResultStore.getInterimResult(mCurrentMetricsConfigKey.getName()), |
| mScriptExecutorListener); |
| } |
| } catch (RemoteException e) { |
| mScriptExecutionTraceLog.traceEnd(); |
| Slogf.w(CarLog.TAG_TELEMETRY, "remote exception occurred invoking script", e); |
| unbindScriptExecutor(); |
| addTaskToQueue(task); // will trigger scheduleNextTask() and re-binding scriptexecutor |
| } catch (IOException e) { |
| mScriptExecutionTraceLog.traceEnd(); |
| Slogf.w(CarLog.TAG_TELEMETRY, "Either unable to create pipe or failed to pipe data" |
| + " to ScriptExecutor. Skipping the published data", e); |
| mCurrentMetricsConfigKey = null; |
| scheduleNextTask(); // drop this task and schedule the next one |
| } |
| } |
| |
| /** |
| * Sets up pipes, invokes ScriptExecutor#invokeScriptForLargeInput() API, and writes the |
| * script input to the pipe. |
| * |
| * @param task containing all the necessary parameters for ScriptExecutor API. |
| * @throws IOException if cannot create pipe or cannot write the bundle to pipe. |
| * @throws RemoteException if ScriptExecutor failed. |
| */ |
| private void invokeScriptForLargeInput(@NonNull ScriptExecutionTask task) |
| throws IOException, RemoteException { |
| ParcelFileDescriptor[] fds = ParcelFileDescriptor.createPipe(); |
| ParcelFileDescriptor readFd = fds[0]; |
| ParcelFileDescriptor writeFd = fds[1]; |
| try { |
| mScriptExecutor.invokeScriptForLargeInput( |
| task.getMetricsConfig().getScript(), |
| task.getHandlerName(), |
| readFd, |
| mResultStore.getInterimResult(mCurrentMetricsConfigKey.getName()), |
| mScriptExecutorListener); |
| } catch (RemoteException e) { |
| IoUtils.closeQuietly(readFd); |
| IoUtils.closeQuietly(writeFd); |
| throw e; |
| } |
| IoUtils.closeQuietly(readFd); |
| |
| Slogf.d(CarLog.TAG_TELEMETRY, "writing large script data to pipe"); |
| try (OutputStream outputStream = new ParcelFileDescriptor.AutoCloseOutputStream(writeFd)) { |
| task.getData().writeToStream(outputStream); |
| } |
| } |
| |
| /** Stores final metrics and schedules the next task. */ |
| private void onScriptFinished(@NonNull PersistableBundle result) { |
| mTelemetryHandler.post(() -> { |
| mScriptExecutionTraceLog.traceEnd(); // end trace as soon as script completes running |
| mResultStore.putFinalResult(mCurrentMetricsConfigKey.getName(), result); |
| mScriptFinishedCallback.onScriptFinished(mCurrentMetricsConfigKey); |
| mCurrentMetricsConfigKey = null; |
| scheduleNextTask(); |
| }); |
| } |
| |
| /** Stores interim metrics and schedules the next task. */ |
| private void onScriptSuccess(@NonNull PersistableBundle stateToPersist) { |
| mTelemetryHandler.post(() -> { |
| mScriptExecutionTraceLog.traceEnd(); // end trace as soon as script completes running |
| mResultStore.putInterimResult(mCurrentMetricsConfigKey.getName(), stateToPersist); |
| mCurrentMetricsConfigKey = null; |
| scheduleNextTask(); |
| }); |
| } |
| |
| /** Stores telemetry error and schedules the next task. */ |
| private void onScriptError( |
| int errorType, @NonNull String message, @Nullable String stackTrace) { |
| mTelemetryHandler.post(() -> { |
| mScriptExecutionTraceLog.traceEnd(); // end trace as soon as script completes running |
| TelemetryProto.TelemetryError.Builder error = TelemetryProto.TelemetryError.newBuilder() |
| .setErrorType(TelemetryProto.TelemetryError.ErrorType.forNumber(errorType)) |
| .setMessage(message); |
| if (stackTrace != null) { |
| error.setStackTrace(stackTrace); |
| } |
| mResultStore.putErrorResult(mCurrentMetricsConfigKey.getName(), error.build()); |
| mScriptFinishedCallback.onScriptFinished(mCurrentMetricsConfigKey); |
| mCurrentMetricsConfigKey = null; |
| scheduleNextTask(); |
| }); |
| } |
| |
| /** Listens for script execution status. Methods are called on the binder thread. */ |
| private static final class ScriptExecutorListener extends IScriptExecutorListener.Stub { |
| private final WeakReference<DataBrokerImpl> mWeakDataBroker; |
| |
| private ScriptExecutorListener(@NonNull DataBrokerImpl dataBroker) { |
| mWeakDataBroker = new WeakReference<>(dataBroker); |
| } |
| |
| @Override |
| public void onScriptFinished(@NonNull PersistableBundle result) { |
| DataBrokerImpl dataBroker = mWeakDataBroker.get(); |
| if (dataBroker == null) { |
| return; |
| } |
| dataBroker.onScriptFinished(result); |
| } |
| |
| @Override |
| public void onSuccess(@NonNull PersistableBundle stateToPersist) { |
| DataBrokerImpl dataBroker = mWeakDataBroker.get(); |
| if (dataBroker == null) { |
| return; |
| } |
| dataBroker.onScriptSuccess(stateToPersist); |
| } |
| |
| @Override |
| public void onError(int errorType, @NonNull String message, @Nullable String stackTrace) { |
| DataBrokerImpl dataBroker = mWeakDataBroker.get(); |
| if (dataBroker == null) { |
| return; |
| } |
| dataBroker.onScriptError(errorType, message, stackTrace); |
| } |
| } |
| |
| /** Callback handler to handle scheduling and rescheduling of {@link ScriptExecutionTask}s. */ |
| class TaskHandler extends Handler { |
| TaskHandler(@NonNull Looper looper) { |
| super(looper); |
| } |
| |
| /** |
| * Handles a message depending on the message ID. |
| * If the msg ID is MSG_HANDLE_TASK, it polls a task from the priority queue and executing a |
| * {@link ScriptExecutionTask}. There are multiple places where this message is sent: when |
| * priority updates, when a new task is added to the priority queue, and when a task |
| * finishes running. |
| */ |
| @Override |
| public void handleMessage(@NonNull Message msg) { |
| switch (msg.what) { |
| case MSG_HANDLE_TASK: |
| pollAndExecuteTask(); // run the next task |
| break; |
| case MSG_BIND_TO_SCRIPT_EXECUTOR: |
| bindScriptExecutor(); |
| break; |
| default: |
| Slogf.w(CarLog.TAG_TELEMETRY, "TaskHandler received unknown message."); |
| } |
| } |
| } |
| } |