| /** |
| * Copyright (C) ARM Limited 2013. All rights reserved. |
| * |
| * This program is free software; you can redistribute it and/or modify |
| * it under the terms of the GNU General Public License version 2 as |
| * published by the Free Software Foundation. |
| */ |
| |
| #include "Buffer.h" |
| |
| #include "Logging.h" |
| #include "Sender.h" |
| #include "SessionData.h" |
| |
| #define mask (size - 1) |
| |
| Buffer::Buffer (const int32_t core, const int32_t buftype, const int size, sem_t *const readerSem) : core(core), buftype(buftype), size(size), readPos(0), writePos(0), commitPos(0), available(true), done(false), buf(new char[size]), commitTime(gSessionData->mLiveRate), readerSem(readerSem) { |
| if ((size & mask) != 0) { |
| logg->logError(__FILE__, __LINE__, "Buffer size is not a power of 2"); |
| handleException(); |
| } |
| frame(); |
| } |
| |
| Buffer::~Buffer () { |
| delete [] buf; |
| } |
| |
| void Buffer::write (Sender * const sender) { |
| if (!commitReady()) { |
| return; |
| } |
| |
| // determine the size of two halves |
| int length1 = commitPos - readPos; |
| char * buffer1 = buf + readPos; |
| int length2 = 0; |
| char * buffer2 = buf; |
| if (length1 < 0) { |
| length1 = size - readPos; |
| length2 = commitPos; |
| } |
| |
| logg->logMessage("Sending data length1: %i length2: %i", length1, length2); |
| |
| // start, middle or end |
| if (length1 > 0) { |
| sender->writeData(buffer1, length1, RESPONSE_APC_DATA); |
| } |
| |
| // possible wrap around |
| if (length2 > 0) { |
| sender->writeData(buffer2, length2, RESPONSE_APC_DATA); |
| } |
| |
| readPos = commitPos; |
| } |
| |
| bool Buffer::commitReady () const { |
| return commitPos != readPos; |
| } |
| |
| int Buffer::bytesAvailable () const { |
| int filled = writePos - readPos; |
| if (filled < 0) { |
| filled += size; |
| } |
| |
| int remaining = size - filled; |
| |
| if (available) { |
| // Give some extra room; also allows space to insert the overflow error packet |
| remaining -= 200; |
| } else { |
| // Hysteresis, prevents multiple overflow messages |
| remaining -= 2000; |
| } |
| |
| return remaining; |
| } |
| |
| bool Buffer::checkSpace (const int bytes) { |
| const int remaining = bytesAvailable(); |
| |
| if (remaining < bytes) { |
| available = false; |
| } else { |
| available = true; |
| } |
| |
| return available; |
| } |
| |
| void Buffer::commit (const uint64_t time) { |
| // post-populate the length, which does not include the response type length nor the length itself, i.e. only the length of the payload |
| const int typeLength = gSessionData->mLocalCapture ? 0 : 1; |
| int length = writePos - commitPos; |
| if (length < 0) { |
| length += size; |
| } |
| length = length - typeLength - sizeof(int32_t); |
| for (size_t byte = 0; byte < sizeof(int32_t); byte++) { |
| buf[(commitPos + typeLength + byte) & mask] = (length >> byte * 8) & 0xFF; |
| } |
| |
| logg->logMessage("Committing data readPos: %i writePos: %i commitPos: %i", readPos, writePos, commitPos); |
| commitPos = writePos; |
| |
| if (gSessionData->mLiveRate > 0) { |
| while (time > commitTime) { |
| commitTime += gSessionData->mLiveRate; |
| } |
| } |
| |
| if (!done) { |
| frame(); |
| } |
| |
| // send a notification that data is ready |
| sem_post(readerSem); |
| } |
| |
| void Buffer::check (const uint64_t time) { |
| int filled = writePos - commitPos; |
| if (filled < 0) { |
| filled += size; |
| } |
| if (filled >= ((size * 3) / 4) || (gSessionData->mLiveRate > 0 && time >= commitTime)) { |
| commit(time); |
| } |
| } |
| |
| void Buffer::packInt (int32_t x) { |
| int packedBytes = 0; |
| int more = true; |
| while (more) { |
| // low order 7 bits of x |
| char b = x & 0x7f; |
| x >>= 7; |
| |
| if ((x == 0 && (b & 0x40) == 0) || (x == -1 && (b & 0x40) != 0)) { |
| more = false; |
| } else { |
| b |= 0x80; |
| } |
| |
| buf[(writePos + packedBytes) & mask] = b; |
| packedBytes++; |
| } |
| |
| writePos = (writePos + packedBytes) & mask; |
| } |
| |
| void Buffer::packInt64 (int64_t x) { |
| int packedBytes = 0; |
| int more = true; |
| while (more) { |
| // low order 7 bits of x |
| char b = x & 0x7f; |
| x >>= 7; |
| |
| if ((x == 0 && (b & 0x40) == 0) || (x == -1 && (b & 0x40) != 0)) { |
| more = false; |
| } else { |
| b |= 0x80; |
| } |
| |
| buf[(writePos + packedBytes) & mask] = b; |
| packedBytes++; |
| } |
| |
| writePos = (writePos + packedBytes) & mask; |
| } |
| |
| void Buffer::frame () { |
| if (!gSessionData->mLocalCapture) { |
| packInt(RESPONSE_APC_DATA); |
| } |
| // Reserve space for the length |
| writePos += sizeof(int32_t); |
| packInt(buftype); |
| packInt(core); |
| } |
| |
| bool Buffer::eventHeader (const uint64_t curr_time) { |
| bool retval = false; |
| if (checkSpace(MAXSIZE_PACK32 + MAXSIZE_PACK64)) { |
| packInt(0); // key of zero indicates a timestamp |
| packInt64(curr_time); |
| retval = true; |
| } |
| |
| return retval; |
| } |
| |
| void Buffer::event (const int32_t key, const int32_t value) { |
| if (checkSpace(2 * MAXSIZE_PACK32)) { |
| packInt(key); |
| packInt(value); |
| } |
| } |
| |
| void Buffer::event64 (const int64_t key, const int64_t value) { |
| if (checkSpace(2 * MAXSIZE_PACK64)) { |
| packInt64(key); |
| packInt64(value); |
| } |
| } |
| |
| void Buffer::setDone () { |
| done = true; |
| commit(0); |
| } |
| |
| bool Buffer::isDone () const { |
| return done && readPos == commitPos && commitPos == writePos; |
| } |