blob: f4b3e6aa2e25fe22636465ff2b96692fb7fbfd81 [file] [log] [blame]
/*
* Copyright (C) 2021 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.android.car.telemetry.databroker;
import android.content.ComponentName;
import android.content.Context;
import android.content.Intent;
import android.content.ServiceConnection;
import android.os.Handler;
import android.os.HandlerThread;
import android.os.IBinder;
import android.os.Looper;
import android.os.Message;
import android.os.PersistableBundle;
import android.os.RemoteException;
import android.os.UserHandle;
import android.util.ArrayMap;
import android.util.Slog;
import com.android.car.CarLog;
import com.android.car.CarServiceUtils;
import com.android.car.telemetry.CarTelemetryService;
import com.android.car.telemetry.ResultStore;
import com.android.car.telemetry.TelemetryProto;
import com.android.car.telemetry.TelemetryProto.MetricsConfig;
import com.android.car.telemetry.publisher.AbstractPublisher;
import com.android.car.telemetry.publisher.PublisherFactory;
import com.android.car.telemetry.scriptexecutorinterface.IScriptExecutor;
import com.android.car.telemetry.scriptexecutorinterface.IScriptExecutorListener;
import com.android.internal.annotations.VisibleForTesting;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.PriorityBlockingQueue;
/**
* Implementation of the data path component of CarTelemetryService. Forwards the published data
* from publishers to consumers subject to the Controller's decision.
* TODO(b/187743369): Handle thread-safety of member variables.
*/
public class DataBrokerImpl implements DataBroker {
private static final int MSG_HANDLE_TASK = 1;
private static final int MSG_BIND_TO_SCRIPT_EXECUTOR = 2;
/** Bind to script executor 5 times before entering disabled state. */
private static final int MAX_BIND_SCRIPT_EXECUTOR_ATTEMPTS = 5;
private static final String SCRIPT_EXECUTOR_PACKAGE = "com.android.car.scriptexecutor";
private static final String SCRIPT_EXECUTOR_CLASS =
"com.android.car.scriptexecutor.ScriptExecutor";
private final Context mContext;
private final PublisherFactory mPublisherFactory;
private final ResultStore mResultStore;
private final ScriptExecutorListener mScriptExecutorListener;
private final HandlerThread mTelemetryThread = CarServiceUtils.getHandlerThread(
CarTelemetryService.class.getSimpleName());
private final Handler mTelemetryHandler = new TaskHandler(mTelemetryThread.getLooper());
/** Thread-safe priority queue for scheduling tasks. */
private final PriorityBlockingQueue<ScriptExecutionTask> mTaskQueue =
new PriorityBlockingQueue<>();
/**
* Maps MetricsConfig's name to its subscriptions. This map is useful when removing a
* MetricsConfig.
*/
private final Map<String, List<DataSubscriber>> mSubscriptionMap = new ArrayMap<>();
/**
* If something irrecoverable happened, DataBroker should enter into a disabled state to prevent
* doing futile work.
*/
private boolean mDisabled = false;
/** Current number of attempts to bind to ScriptExecutor. */
private int mBindScriptExecutorAttempts = 0;
/** Priority of current system to determine if a {@link ScriptExecutionTask} can run. */
private int mPriority = 1;
/** Waiting period between attempts to bind script executor. Can be shortened for tests. */
@VisibleForTesting long mBindScriptExecutorDelayMillis = 3_000L;
/**
* Name of the script that's currently running. If no script is running, value is null.
* A non-null script name indicates a script is running, which means DataBroker should not
* make another ScriptExecutor binder call.
*/
private String mCurrentScriptName;
private IScriptExecutor mScriptExecutor;
private ScriptFinishedCallback mScriptFinishedCallback;
private final ServiceConnection mServiceConnection = new ServiceConnection() {
@Override
public void onServiceConnected(ComponentName name, IBinder service) {
mTelemetryHandler.post(() -> {
mScriptExecutor = IScriptExecutor.Stub.asInterface(service);
scheduleNextTask();
});
}
@Override
public void onServiceDisconnected(ComponentName name) {
// TODO(b/198684473): clean up the state after script executor disconnects
mTelemetryHandler.post(() -> {
mScriptExecutor = null;
unbindScriptExecutor();
});
}
};
public DataBrokerImpl(
Context context, PublisherFactory publisherFactory, ResultStore resultStore) {
mContext = context;
mPublisherFactory = publisherFactory;
mResultStore = resultStore;
mScriptExecutorListener = new ScriptExecutorListener(this);
mPublisherFactory.setFailureConsumer(this::onPublisherFailure);
}
private void onPublisherFailure(AbstractPublisher publisher, Throwable error) {
// TODO(b/193680465): disable MetricsConfig and log the error
Slog.w(CarLog.TAG_TELEMETRY, "publisher failed", error);
}
private void bindScriptExecutor() {
// do not re-bind if broker is in a disabled state or if script executor is nonnull
if (mDisabled || mScriptExecutor != null) {
return;
}
Intent intent = new Intent();
intent.setComponent(new ComponentName(SCRIPT_EXECUTOR_PACKAGE, SCRIPT_EXECUTOR_CLASS));
boolean success = mContext.bindServiceAsUser(
intent,
mServiceConnection,
Context.BIND_AUTO_CREATE,
UserHandle.SYSTEM);
if (success) {
mBindScriptExecutorAttempts = 0; // reset
return;
}
unbindScriptExecutor();
mBindScriptExecutorAttempts++;
if (mBindScriptExecutorAttempts < MAX_BIND_SCRIPT_EXECUTOR_ATTEMPTS) {
Slog.w(CarLog.TAG_TELEMETRY,
"failed to get valid connection to ScriptExecutor, retrying in "
+ mBindScriptExecutorDelayMillis + "ms.");
mTelemetryHandler.sendEmptyMessageDelayed(MSG_BIND_TO_SCRIPT_EXECUTOR,
mBindScriptExecutorDelayMillis);
} else {
Slog.w(CarLog.TAG_TELEMETRY, "failed to get valid connection to ScriptExecutor, "
+ "disabling DataBroker");
disableBroker();
}
}
/**
* Unbinds {@link ScriptExecutor} to release the connection. This method should be called from
* the telemetry thread.
*/
private void unbindScriptExecutor() {
// TODO(b/198648763): unbind from script executor when there is no work to do
mCurrentScriptName = null;
try {
mContext.unbindService(mServiceConnection);
} catch (IllegalArgumentException e) {
// If ScriptExecutor is gone before unbinding, it will throw this exception
Slog.w(CarLog.TAG_TELEMETRY, "Failed to unbind from ScriptExecutor", e);
}
}
/**
* Enters into a disabled state because something irrecoverable happened.
* TODO(b/200841260): expose the state to the caller.
*/
private void disableBroker() {
mDisabled = true;
// remove all MetricConfigs, disable all publishers, stop receiving data
for (String metricsConfigName : mSubscriptionMap.keySet()) {
// get the metrics config from the DataSubscriber and remove the metrics config
if (mSubscriptionMap.get(metricsConfigName).size() != 0) {
removeMetricsConfiguration(mSubscriptionMap.get(metricsConfigName).get(0)
.getMetricsConfig());
}
}
mSubscriptionMap.clear();
}
@Override
public void addMetricsConfiguration(MetricsConfig metricsConfig) {
// TODO(b/187743369): pass status back to caller
// if broker is disabled or metricsConfig already exists, do nothing
if (mDisabled || mSubscriptionMap.containsKey(metricsConfig.getName())) {
return;
}
// Create the subscribers for this metrics configuration
List<DataSubscriber> dataSubscribers = new ArrayList<>(
metricsConfig.getSubscribersList().size());
for (TelemetryProto.Subscriber subscriber : metricsConfig.getSubscribersList()) {
// protobuf publisher to a concrete Publisher
AbstractPublisher publisher = mPublisherFactory.getPublisher(
subscriber.getPublisher().getPublisherCase());
// create DataSubscriber from TelemetryProto.Subscriber
DataSubscriber dataSubscriber = new DataSubscriber(
this,
metricsConfig,
subscriber);
dataSubscribers.add(dataSubscriber);
try {
// The publisher will start sending data to the subscriber.
// TODO(b/191378559): handle bad configs
publisher.addDataSubscriber(dataSubscriber);
} catch (IllegalArgumentException e) {
Slog.w(CarLog.TAG_TELEMETRY, "Invalid config", e);
return;
}
}
mSubscriptionMap.put(metricsConfig.getName(), dataSubscribers);
}
@Override
public void removeMetricsConfiguration(MetricsConfig metricsConfig) {
// TODO(b/187743369): pass status back to caller
if (!mSubscriptionMap.containsKey(metricsConfig.getName())) {
return;
}
// get the subscriptions associated with this MetricsConfig, remove it from the map
List<DataSubscriber> dataSubscribers = mSubscriptionMap.remove(metricsConfig.getName());
// for each subscriber, remove it from publishers
for (DataSubscriber subscriber : dataSubscribers) {
AbstractPublisher publisher = mPublisherFactory.getPublisher(
subscriber.getPublisherParam().getPublisherCase());
try {
publisher.removeDataSubscriber(subscriber);
} catch (IllegalArgumentException e) {
// It shouldn't happen, but if happens, let's just log it.
Slog.w(CarLog.TAG_TELEMETRY, "Failed to remove subscriber from publisher", e);
}
}
// Remove all the tasks associated with this metrics config. The underlying impl uses the
// weakly consistent iterator, which is thread-safe but does not freeze the collection while
// iterating, so it may or may not reflect any updates since the iterator was created.
// But since adding & polling from queue should happen in the same thread, the task queue
// should not be changed while tasks are being iterated and removed.
mTaskQueue.removeIf(task -> task.isAssociatedWithMetricsConfig(metricsConfig));
}
@Override
public void addTaskToQueue(ScriptExecutionTask task) {
if (mDisabled) {
return;
}
mTaskQueue.add(task);
scheduleNextTask();
}
/**
* This method can be called from any thread. It is thread-safe because atomic values and the
* blocking queue are thread-safe. It is possible for this method to be invoked from different
* threads at the same time, but it is not possible to schedule the same task twice, because
* the handler handles message in the order they come in, this means the task will be polled
* sequentially instead of concurrently. Every task that is scheduled and run will be distinct.
* TODO(b/187743369): If the threading behavior in DataSubscriber changes, ScriptExecutionTask
* will also have different threading behavior. Update javadoc when the behavior is decided.
*/
@Override
public void scheduleNextTask() {
if (mDisabled || mTelemetryHandler.hasMessages(MSG_HANDLE_TASK)) {
return;
}
mTelemetryHandler.sendEmptyMessage(MSG_HANDLE_TASK);
}
@Override
public void setOnScriptFinishedCallback(ScriptFinishedCallback callback) {
if (mDisabled) {
return;
}
mScriptFinishedCallback = callback;
}
@Override
public void setTaskExecutionPriority(int priority) {
if (mDisabled) {
return;
}
mPriority = priority;
scheduleNextTask(); // when priority updates, schedule a task which checks task queue
}
@VisibleForTesting
Map<String, List<DataSubscriber>> getSubscriptionMap() {
return new ArrayMap<>((ArrayMap<String, List<DataSubscriber>>) mSubscriptionMap);
}
@VisibleForTesting
Handler getTelemetryHandler() {
return mTelemetryHandler;
}
@VisibleForTesting
PriorityBlockingQueue<ScriptExecutionTask> getTaskQueue() {
return mTaskQueue;
}
/**
* Polls and runs a task from the head of the priority queue if the queue is nonempty and the
* head of the queue has priority higher than or equal to the current priority. A higher
* priority is denoted by a lower priority number, so head of the queue should have equal or
* lower priority number to be polled.
*/
private void pollAndExecuteTask() {
// check databroker state is ready to run script
if (mDisabled || mCurrentScriptName != null) {
return;
}
// check task is valid and ready to be run
ScriptExecutionTask task = mTaskQueue.peek();
if (task == null || task.getPriority() > mPriority) {
return;
}
mTaskQueue.poll(); // remove task from queue
try {
if (mScriptExecutor == null) {
Slog.w(CarLog.TAG_TELEMETRY,
"script executor is null, cannot execute task");
mTaskQueue.add(task);
// upon successful binding, a task will be scheduled to run if there are any
mTelemetryHandler.sendEmptyMessage(MSG_BIND_TO_SCRIPT_EXECUTOR);
} else {
Slog.d(CarLog.TAG_TELEMETRY, "invoking script executor");
// update current name because a script is currently running
mCurrentScriptName = task.getMetricsConfig().getName();
mScriptExecutor.invokeScript(
task.getMetricsConfig().getScript(),
task.getHandlerName(),
task.getData(),
mResultStore.getInterimResult(mCurrentScriptName),
mScriptExecutorListener);
}
} catch (RemoteException e) {
Slog.d(CarLog.TAG_TELEMETRY, "remote exception occurred invoking script", e);
mTaskQueue.add(task); // will not trigger scheduleNextTask()
mCurrentScriptName = null;
}
}
/** Stores final metrics and schedules the next task. */
private void onScriptFinished(PersistableBundle result) {
mTelemetryHandler.post(() -> {
mResultStore.putFinalResult(mCurrentScriptName, result);
mCurrentScriptName = null;
scheduleNextTask();
});
}
/** Stores interim metrics and schedules the next task. */
private void onScriptSuccess(PersistableBundle stateToPersist) {
mTelemetryHandler.post(() -> {
mResultStore.putInterimResult(mCurrentScriptName, stateToPersist);
mCurrentScriptName = null;
scheduleNextTask();
});
}
/** Stores telemetry error and schedules the next task. */
private void onScriptError(int errorType, String message, String stackTrace) {
mTelemetryHandler.post(() -> {
TelemetryProto.TelemetryError.Builder error = TelemetryProto.TelemetryError.newBuilder()
.setErrorType(TelemetryProto.TelemetryError.ErrorType.forNumber(errorType))
.setMessage(message);
if (stackTrace != null) {
error.setStackTrace(stackTrace);
}
mResultStore.putError(mCurrentScriptName, error.build());
mCurrentScriptName = null;
scheduleNextTask();
});
}
/** Listens for script execution status. Methods are called on the binder thread. */
private static final class ScriptExecutorListener extends IScriptExecutorListener.Stub {
private final WeakReference<DataBrokerImpl> mWeakDataBroker;
private ScriptExecutorListener(DataBrokerImpl dataBroker) {
mWeakDataBroker = new WeakReference<>(dataBroker);
}
@Override
public void onScriptFinished(PersistableBundle result) {
DataBrokerImpl dataBroker = mWeakDataBroker.get();
if (dataBroker == null) {
return;
}
dataBroker.onScriptFinished(result);
}
@Override
public void onSuccess(PersistableBundle stateToPersist) {
DataBrokerImpl dataBroker = mWeakDataBroker.get();
if (dataBroker == null) {
return;
}
dataBroker.onScriptSuccess(stateToPersist);
}
@Override
public void onError(int errorType, String message, String stackTrace) {
DataBrokerImpl dataBroker = mWeakDataBroker.get();
if (dataBroker == null) {
return;
}
dataBroker.onScriptError(errorType, message, stackTrace);
}
}
/** Callback handler to handle scheduling and rescheduling of {@link ScriptExecutionTask}s. */
class TaskHandler extends Handler {
TaskHandler(Looper looper) {
super(looper);
}
/**
* Handles a message depending on the message ID.
* If the msg ID is MSG_HANDLE_TASK, it polls a task from the priority queue and executing a
* {@link ScriptExecutionTask}. There are multiple places where this message is sent: when
* priority updates, when a new task is added to the priority queue, and when a task
* finishes running.
*/
@Override
public void handleMessage(Message msg) {
switch (msg.what) {
case MSG_HANDLE_TASK:
pollAndExecuteTask(); // run the next task
break;
case MSG_BIND_TO_SCRIPT_EXECUTOR:
bindScriptExecutor();
break;
default:
Slog.w(CarLog.TAG_TELEMETRY, "TaskHandler received unknown message.");
}
}
}
}