blob: 843cbdb70d4a7c742d11b704f4f69fb48093eae3 [file] [log] [blame]
/*
* Copyright (C) 2015 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.android.tv.tuner.source;
import android.content.Context;
import android.util.Log;
import android.util.Pair;
import com.google.android.exoplayer.C;
import com.google.android.exoplayer.upstream.DataSpec;
import com.android.tv.common.SoftPreconditions;
import com.android.tv.tuner.ChannelScanFileParser;
import com.android.tv.tuner.TunerHal;
import com.android.tv.tuner.TunerPreferences;
import com.android.tv.tuner.data.TunerChannel;
import com.android.tv.tuner.tvinput.EventDetector;
import com.android.tv.tuner.tvinput.EventDetector.EventListener;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
/**
* Provides MPEG-2 TS stream sources for channel playing from an underlying tuner device.
*/
public class TunerTsStreamer implements TsStreamer {
private static final String TAG = "TunerTsStreamer";
private static final int MIN_READ_UNIT = 1500;
private static final int READ_BUFFER_SIZE = MIN_READ_UNIT * 10; // ~15KB
private static final int CIRCULAR_BUFFER_SIZE = MIN_READ_UNIT * 20000; // ~ 30MB
private static final int TS_PACKET_SIZE = 188;
private static final int READ_TIMEOUT_MS = 5000; // 5 secs.
private static final int BUFFER_UNDERRUN_SLEEP_MS = 10;
private static final int READ_ERROR_STREAMING_ENDED = -1;
private static final int READ_ERROR_BUFFER_OVERWRITTEN = -2;
private final Object mCircularBufferMonitor = new Object();
private final byte[] mCircularBuffer = new byte[CIRCULAR_BUFFER_SIZE];
private long mBytesFetched;
private final AtomicLong mLastReadPosition = new AtomicLong();
private boolean mStreaming;
private final TunerHal mTunerHal;
private TunerChannel mChannel;
private Thread mStreamingThread;
private final EventDetector mEventDetector;
private final List<Pair<EventListener, Boolean>> mEventListenerActions = new ArrayList<>();
private final TsStreamWriter mTsStreamWriter;
private String mChannelNumber;
public static class TunerDataSource extends TsDataSource {
private final TunerTsStreamer mTsStreamer;
private final AtomicLong mLastReadPosition = new AtomicLong(0);
private long mStartBufferedPosition;
private TunerDataSource(TunerTsStreamer tsStreamer) {
mTsStreamer = tsStreamer;
mStartBufferedPosition = tsStreamer.getBufferedPosition();
}
@Override
public long getBufferedPosition() {
return mTsStreamer.getBufferedPosition() - mStartBufferedPosition;
}
@Override
public long getLastReadPosition() {
return mLastReadPosition.get();
}
@Override
public void shiftStartPosition(long offset) {
SoftPreconditions.checkState(mLastReadPosition.get() == 0);
SoftPreconditions.checkArgument(0 <= offset && offset <= getBufferedPosition());
mStartBufferedPosition += offset;
}
@Override
public long open(DataSpec dataSpec) throws IOException {
mLastReadPosition.set(0);
return C.LENGTH_UNBOUNDED;
}
@Override
public void close() {
}
@Override
public int read(byte[] buffer, int offset, int readLength) throws IOException {
int ret = mTsStreamer.readAt(mStartBufferedPosition + mLastReadPosition.get(), buffer,
offset, readLength);
if (ret > 0) {
mLastReadPosition.addAndGet(ret);
} else if (ret == READ_ERROR_BUFFER_OVERWRITTEN) {
long currentPosition = mStartBufferedPosition + mLastReadPosition.get();
long endPosition = mTsStreamer.getBufferedPosition();
long diff = ((endPosition - currentPosition + TS_PACKET_SIZE - 1) / TS_PACKET_SIZE)
* TS_PACKET_SIZE;
Log.w(TAG, "Demux position jump by overwritten buffer: " + diff);
mStartBufferedPosition = currentPosition + diff;
mLastReadPosition.set(0);
return 0;
}
return ret;
}
}
/**
* Creates {@link TsStreamer} for playing or recording the specified channel.
* @param tunerHal the HAL for tuner device
* @param eventListener the listener for channel & program information
*/
public TunerTsStreamer(TunerHal tunerHal, EventListener eventListener, Context context) {
mTunerHal = tunerHal;
mEventDetector = new EventDetector(mTunerHal);
if (eventListener != null) {
mEventDetector.registerListener(eventListener);
}
mTsStreamWriter = context != null && TunerPreferences.getStoreTsStream(context) ?
new TsStreamWriter(context) : null;
}
public TunerTsStreamer(TunerHal tunerHal, EventListener eventListener) {
this(tunerHal, eventListener, null);
}
@Override
public boolean startStream(TunerChannel channel) {
if (mTunerHal.tune(channel.getFrequency(), channel.getModulation(),
channel.getDisplayNumber(false))) {
if (channel.hasVideo()) {
mTunerHal.addPidFilter(channel.getVideoPid(),
TunerHal.FILTER_TYPE_VIDEO);
}
boolean audioFilterSet = false;
for (Integer audioPid : channel.getAudioPids()) {
if (!audioFilterSet) {
mTunerHal.addPidFilter(audioPid, TunerHal.FILTER_TYPE_AUDIO);
audioFilterSet = true;
} else {
// FILTER_TYPE_AUDIO overrides the previous filter for audio. We use
// FILTER_TYPE_OTHER from the secondary one to get the all audio tracks.
mTunerHal.addPidFilter(audioPid, TunerHal.FILTER_TYPE_OTHER);
}
}
mTunerHal.addPidFilter(channel.getPcrPid(),
TunerHal.FILTER_TYPE_PCR);
if (mEventDetector != null) {
mEventDetector.startDetecting(channel.getFrequency(), channel.getModulation(),
channel.getProgramNumber());
}
mChannel = channel;
mChannelNumber = channel.getDisplayNumber();
synchronized (mCircularBufferMonitor) {
if (mStreaming) {
Log.w(TAG, "Streaming should be stopped before start streaming");
return true;
}
mStreaming = true;
mBytesFetched = 0;
mLastReadPosition.set(0L);
}
if (mTsStreamWriter != null) {
mTsStreamWriter.setChannel(mChannel);
mTsStreamWriter.openFile();
}
mStreamingThread = new StreamingThread();
mStreamingThread.start();
Log.i(TAG, "Streaming started");
return true;
}
return false;
}
@Override
public boolean startStream(ChannelScanFileParser.ScanChannel channel) {
if (mTunerHal.tune(channel.frequency, channel.modulation, null)) {
mEventDetector.startDetecting(
channel.frequency, channel.modulation, EventDetector.ALL_PROGRAM_NUMBERS);
synchronized (mCircularBufferMonitor) {
if (mStreaming) {
Log.w(TAG, "Streaming should be stopped before start streaming");
return true;
}
mStreaming = true;
mBytesFetched = 0;
mLastReadPosition.set(0L);
}
mStreamingThread = new StreamingThread();
mStreamingThread.start();
Log.i(TAG, "Streaming started");
return true;
}
return false;
}
/**
* Blocks the current thread until the streaming thread stops. In rare cases when the tuner
* device is overloaded this can take a while, but usually it returns pretty quickly.
*/
@Override
public void stopStream() {
mChannel = null;
synchronized (mCircularBufferMonitor) {
mStreaming = false;
mCircularBufferMonitor.notifyAll();
}
try {
if (mStreamingThread != null) {
mStreamingThread.join();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (mTsStreamWriter != null) {
mTsStreamWriter.closeFile(true);
mTsStreamWriter.setChannel(null);
}
}
@Override
public TsDataSource createDataSource() {
return new TunerDataSource(this);
}
/**
* Returns incomplete channel lists which was scanned so far. Incomplete channel means
* the channel whose channel information is not complete or is not well-formed.
* @return {@link List} of {@link TunerChannel}
*/
public List<TunerChannel> getMalFormedChannels() {
return mEventDetector.getMalFormedChannels();
}
/**
* Returns the current {@link TunerHal} which provides MPEG-TS stream for TunerTsStreamer.
* @return {@link TunerHal}
*/
public TunerHal getTunerHal() {
return mTunerHal;
}
/**
* Returns the current tuned channel for TunerTsStreamer.
* @return {@link TunerChannel}
*/
public TunerChannel getChannel() {
return mChannel;
}
/**
* Returns the current buffered position from tuner.
* @return the current buffered position
*/
public long getBufferedPosition() {
synchronized (mCircularBufferMonitor) {
return mBytesFetched;
}
}
public String getStreamerInfo() {
return "Channel: " + mChannelNumber + ", Streaming: " + mStreaming;
}
public void registerListener(EventListener listener) {
if (mEventDetector != null && listener != null) {
synchronized (mEventListenerActions) {
mEventListenerActions.add(new Pair<>(listener, true));
}
}
}
public void unregisterListener(EventListener listener) {
if (mEventDetector != null) {
synchronized (mEventListenerActions) {
mEventListenerActions.add(new Pair(listener, false));
}
}
}
private class StreamingThread extends Thread {
@Override
public void run() {
// Buffers for streaming data from the tuner and the internal buffer.
byte[] dataBuffer = new byte[READ_BUFFER_SIZE];
while (true) {
synchronized (mCircularBufferMonitor) {
if (!mStreaming) {
break;
}
}
if (mEventDetector != null) {
synchronized (mEventListenerActions) {
for (Pair listenerAction : mEventListenerActions) {
EventListener listener = (EventListener) listenerAction.first;
if ((boolean) listenerAction.second) {
mEventDetector.registerListener(listener);
} else {
mEventDetector.unregisterListener(listener);
}
}
mEventListenerActions.clear();
}
}
int bytesWritten = mTunerHal.readTsStream(dataBuffer, dataBuffer.length);
if (bytesWritten <= 0) {
try {
// When buffer is underrun, we sleep for short time to prevent
// unnecessary CPU draining.
sleep(BUFFER_UNDERRUN_SLEEP_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
continue;
}
if (mTsStreamWriter != null) {
mTsStreamWriter.writeToFile(dataBuffer, bytesWritten);
}
if (mEventDetector != null) {
mEventDetector.feedTSStream(dataBuffer, 0, bytesWritten);
}
synchronized (mCircularBufferMonitor) {
int posInBuffer = (int) (mBytesFetched % CIRCULAR_BUFFER_SIZE);
int bytesToCopyInFirstPass = bytesWritten;
if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) {
bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer;
}
System.arraycopy(dataBuffer, 0, mCircularBuffer, posInBuffer,
bytesToCopyInFirstPass);
if (bytesToCopyInFirstPass < bytesWritten) {
System.arraycopy(dataBuffer, bytesToCopyInFirstPass, mCircularBuffer, 0,
bytesWritten - bytesToCopyInFirstPass);
}
mBytesFetched += bytesWritten;
mCircularBufferMonitor.notifyAll();
}
}
Log.i(TAG, "Streaming stopped");
}
}
/**
* Reads data from internal buffer.
* @param pos the position to read from
* @param buffer to read
* @param offset start position of the read buffer
* @param amount number of bytes to read
* @return number of read bytes when successful, {@code -1} otherwise
* @throws IOException
*/
public int readAt(long pos, byte[] buffer, int offset, int amount) throws IOException {
while (true) {
synchronized (mCircularBufferMonitor) {
if (!mStreaming) {
return READ_ERROR_STREAMING_ENDED;
}
if (mBytesFetched - CIRCULAR_BUFFER_SIZE > pos) {
Log.w(TAG, "Demux is requesting the data which is already overwritten.");
return READ_ERROR_BUFFER_OVERWRITTEN;
}
if (mBytesFetched < pos + amount) {
try {
mCircularBufferMonitor.wait(READ_TIMEOUT_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// Try again to prevent starvation.
// Give chances to read from other threads.
continue;
}
int startPos = (int) (pos % CIRCULAR_BUFFER_SIZE);
int endPos = (int) ((pos + amount) % CIRCULAR_BUFFER_SIZE);
int firstLength = (startPos > endPos ? CIRCULAR_BUFFER_SIZE : endPos) - startPos;
System.arraycopy(mCircularBuffer, startPos, buffer, offset, firstLength);
if (firstLength < amount) {
System.arraycopy(mCircularBuffer, 0, buffer, offset + firstLength,
amount - firstLength);
}
mCircularBufferMonitor.notifyAll();
return amount;
}
}
}
}