Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev/frame synchronizer #968

Open
wants to merge 18 commits into
base: developer
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions slsReceiverSoftware/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,33 @@ if (SLS_USE_RECEIVER_BINARIES)
slsProjectWarnings
)

install(TARGETS slsReceiver slsMultiReceiver
add_executable(slsFrameSynchronizer
src/FrameSynchronizerApp.cpp
)

set_target_properties(slsFrameSynchronizer PROPERTIES
RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin
)
if((CMAKE_BUILD_TYPE STREQUAL "Release") AND SLS_LTO_AVAILABLE)
set_property(TARGET slsFrameSynchronizer PROPERTY INTERPROCEDURAL_OPTIMIZATION True)
endif()

target_link_libraries(slsFrameSynchronizer
PUBLIC
slsReceiverStatic
pthread
rt
PRIVATE
slsProjectWarnings
)

install(TARGETS slsReceiver slsMultiReceiver slsFrameSynchronizer
EXPORT "${TARGETS_EXPORT_NAME}"
RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}
PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/sls
)
)

endif(SLS_USE_RECEIVER_BINARIES)

Expand Down
41 changes: 13 additions & 28 deletions slsReceiverSoftware/include/sls/Receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,51 +41,36 @@ class Receiver : private virtual slsDetectorDefs {
/**
* Start Acquisition Call back (slsMultiReceiver writes data if file write
* enabled) if registerCallBackRawDataReady or
* registerCallBackRawDataModifyReady registered, users get data callback
* arguments are:
* - file path
* - file name prefix
* - file index
* - image size in bytes
* registerCallBackRawDataModifyReady registered
* Call back arguments are:
* - startCallbackHeader metadata
*/
void registerCallBackStartAcquisition(int (*func)(const std::string &,
const std::string &,
uint64_t, size_t, void *),
void registerCallBackStartAcquisition(int (*func)(const startCallbackHeader,
void *),
void *arg);

/**
* Call back for acquisition finished
* callback argument is:
* - total frames caught
* - startCallbackHeader metadata
*/
void registerCallBackAcquisitionFinished(void (*func)(uint64_t, void *),
void *arg);
void registerCallBackAcquisitionFinished(
void (*func)(const endCallbackHeader, void *), void *arg);

/**
* Call back for raw data
* args to raw data ready callback are:
* - sls_receiver_header frame metadata,
* - dataCallbackHeader metadata
* - pointer to data
* - image size in bytes
* - image size in bytes. Can be modified to the new size to be
* written/streamed. (only smaller value allowed).
*/
void registerCallBackRawDataReady(void (*func)(sls_receiver_header &,
char *, size_t, void *),
const dataCallbackHeader,
char *, size_t &, void *),
void *arg);

/**
* Call back for raw data (modified)
* args to raw data ready callback are:
* - sls_receiver_header frame metadata,
* - pointer to data
* - revDatasize is the reference of data size in bytes.
* Can be modified to the new size to be written/streamed. (only smaller
* value allowed).
*/
void registerCallBackRawDataModifyReady(void (*func)(sls_receiver_header &,
char *, size_t &,
void *),
void *arg);

private:
std::unique_ptr<ClientInterface> tcpipInterface;
};
Expand Down
22 changes: 6 additions & 16 deletions slsReceiverSoftware/src/ClientInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,32 +54,25 @@ std::string ClientInterface::getReceiverVersion() { return APIRECEIVER; }

/***callback functions***/
void ClientInterface::registerCallBackStartAcquisition(
int (*func)(const std::string &, const std::string &, uint64_t, size_t,
void *),
void *arg) {
int (*func)(const startCallbackHeader, void *), void *arg) {
startAcquisitionCallBack = func;
pStartAcquisition = arg;
}

void ClientInterface::registerCallBackAcquisitionFinished(void (*func)(uint64_t,
void *),
void *arg) {
void ClientInterface::registerCallBackAcquisitionFinished(
void (*func)(const endCallbackHeader, void *), void *arg) {
acquisitionFinishedCallBack = func;
pAcquisitionFinished = arg;
}

void ClientInterface::registerCallBackRawDataReady(
void (*func)(sls_receiver_header &, char *, size_t, void *), void *arg) {
void (*func)(sls_receiver_header &, dataCallbackHeader, char *, size_t &,
void *),
void *arg) {
rawDataReadyCallBack = func;
pRawDataReady = arg;
}

void ClientInterface::registerCallBackRawDataModifyReady(
void (*func)(sls_receiver_header &, char *, size_t &, void *), void *arg) {
rawDataModifyReadyCallBack = func;
pRawDataReady = arg;
}

void ClientInterface::startTCPServer() {
tcpThreadId = gettid();
LOG(logINFOBLUE) << "Created [ TCP server Tid: " << tcpThreadId << "]";
Expand Down Expand Up @@ -477,9 +470,6 @@ void ClientInterface::setDetectorType(detectorType arg) {
if (rawDataReadyCallBack != nullptr)
impl()->registerCallBackRawDataReady(rawDataReadyCallBack,
pRawDataReady);
if (rawDataModifyReadyCallBack != nullptr)
impl()->registerCallBackRawDataModifyReady(rawDataModifyReadyCallBack,
pRawDataReady);

impl()->setThreadIds(parentThreadId, tcpThreadId);
}
Expand Down
31 changes: 12 additions & 19 deletions slsReceiverSoftware/src/ClientInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,20 @@ class ClientInterface : private virtual slsDetectorDefs {

//***callback functions***
/** params: file path, file name, file index, image size */
void registerCallBackStartAcquisition(int (*func)(const std::string &,
const std::string &,
uint64_t, size_t, void *),
void registerCallBackStartAcquisition(int (*func)(const startCallbackHeader,
void *),
void *arg);

/** params: total frames caught */
void registerCallBackAcquisitionFinished(void (*func)(uint64_t, void *),
void *arg);
void registerCallBackAcquisitionFinished(
void (*func)(const endCallbackHeader, void *), void *arg);

/** params: sls_receiver_header, pointer to data, image size */
void registerCallBackRawDataReady(void (*func)(sls_receiver_header &,
char *, size_t, void *),
const dataCallbackHeader,
char *, size_t &, void *),
void *arg);

/** params: sls_receiver_header, pointer to data, reference to image size */
void registerCallBackRawDataModifyReady(void (*func)(sls_receiver_header &,
char *, size_t &,
void *),
void *arg);

private:
void startTCPServer();
int functionTable();
Expand Down Expand Up @@ -186,15 +180,14 @@ class ClientInterface : private virtual slsDetectorDefs {

//***callback parameters***

int (*startAcquisitionCallBack)(const std::string &, const std::string &,
uint64_t, size_t, void *) = nullptr;
int (*startAcquisitionCallBack)(const startCallbackHeader,
void *) = nullptr;
void *pStartAcquisition{nullptr};
void (*acquisitionFinishedCallBack)(uint64_t, void *) = nullptr;
void (*acquisitionFinishedCallBack)(const endCallbackHeader,
void *) = nullptr;
void *pAcquisitionFinished{nullptr};
void (*rawDataReadyCallBack)(sls_receiver_header &, char *, size_t,
void *) = nullptr;
void (*rawDataModifyReadyCallBack)(sls_receiver_header &, char *, size_t &,
void *) = nullptr;
void (*rawDataReadyCallBack)(sls_receiver_header &, dataCallbackHeader,
char *, size_t &, void *) = nullptr;
void *pRawDataReady{nullptr};

pid_t parentThreadId{0};
Expand Down
72 changes: 54 additions & 18 deletions slsReceiverSoftware/src/DataProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ void DataProcessor::SetFifo(Fifo *f) { fifo = f; }

void DataProcessor::SetGeneralData(GeneralData *g) { generalData = g; }

void DataProcessor::SetUdpPortNumber(const uint16_t portNumber) {
udpPortNumber = portNumber;
}

void DataProcessor::SetActivate(bool enable) { activated = enable; }

void DataProcessor::SetReceiverROI(ROI roi) {
Expand Down Expand Up @@ -73,6 +77,27 @@ void DataProcessor::SetCtbDbitList(std::vector<int> value) {

void DataProcessor::SetCtbDbitOffset(int value) { ctbDbitOffset = value; }

void DataProcessor::SetQuadEnable(bool value) { quadEnable = value; }

void DataProcessor::SetFlipRows(bool fd) {
flipRows = fd;
// flip only right port of quad
if (quadEnable) {
flipRows = (index == 1 ? true : false);
}
}

void DataProcessor::SetNumberofTotalFrames(uint64_t value) {
nTotalFrames = value;
}

void DataProcessor::SetAdditionalJsonHeader(
const std::map<std::string, std::string> &json) {
std::lock_guard<std::mutex> lock(additionalJsonMutex);
additionalJsonHeader = json;
isAdditionalJsonUpdated = true;
}

void DataProcessor::ResetParametersforNewAcquisition() {
StopRunning();
startedFlag = false;
Expand Down Expand Up @@ -127,8 +152,6 @@ void DataProcessor::CreateFirstFiles(const std::string &fileNamePrefix,
const uint64_t fileIndex,
const bool overWriteEnable,
const bool silentMode,
const uint16_t udpPortNumber,
const uint64_t numImages,
const bool detectorDataStream) {
if (dataFile == nullptr) {
throw RuntimeError("file object not contstructed");
Expand Down Expand Up @@ -156,7 +179,7 @@ void DataProcessor::CreateFirstFiles(const std::string &fileNamePrefix,
case HDF5:
dataFile->CreateFirstHDF5DataFile(
fileNamePrefix, fileIndex, overWriteEnable, silentMode,
udpPortNumber, generalData->framesPerFile, numImages, nx, ny,
udpPortNumber, generalData->framesPerFile, nTotalFrames, nx, ny,
generalData->dynamicRange);
break;
#endif
Expand All @@ -182,8 +205,8 @@ uint32_t DataProcessor::GetFilesInAcquisition() const {
std::string DataProcessor::CreateVirtualFile(
const std::string &filePath, const std::string &fileNamePrefix,
const uint64_t fileIndex, const bool overWriteEnable, const bool silentMode,
const int modulePos, const uint64_t numImages, const int numModX,
const int numModY, std::mutex *hdf5LibMutex) {
const int modulePos, const int numModX, const int numModY,
std::mutex *hdf5LibMutex) {

if (receiverRoiEnabled) {
throw std::runtime_error(
Expand Down Expand Up @@ -361,14 +384,31 @@ void DataProcessor::ProcessAnImage(sls_receiver_header &header, size_t &size,
}

try {
// normal call back
// callbacks
if (rawDataReadyCallBack != nullptr) {
rawDataReadyCallBack(header, data, size, pRawDataReady);
}

// call back with modified size
else if (rawDataModifyReadyCallBack != nullptr) {
rawDataModifyReadyCallBack(header, data, size, pRawDataReady);
uint64_t frameIndex = fnum - firstIndex;
// update local copy only if it was updated (to prevent locking each
// time)
if (isAdditionalJsonUpdated) {
std::lock_guard<std::mutex> lock(additionalJsonMutex);
localAdditionalJsonHeader = additionalJsonHeader;
isAdditionalJsonUpdated = false;
}

dataCallbackHeader callbackHeader = {
udpPortNumber,
{static_cast<int>(generalData->nPixelsX),
static_cast<int>(generalData->nPixelsY)},
fnum,
frameIndex,
(100 * ((double)(frameIndex + 1) / (double)(nTotalFrames))),
(nump == generalData->packetsPerFrame ? true : false),
flipRows,
localAdditionalJsonHeader};

rawDataReadyCallBack(header, callbackHeader, data, size,
pRawDataReady);
}
} catch (const std::exception &e) {
throw RuntimeError("Get Data Callback Error: " + std::string(e.what()));
Expand Down Expand Up @@ -427,17 +467,13 @@ bool DataProcessor::CheckCount() {
}

void DataProcessor::registerCallBackRawDataReady(
void (*func)(sls_receiver_header &, char *, size_t, void *), void *arg) {
void (*func)(sls_receiver_header &, dataCallbackHeader, char *, size_t &,
void *),
void *arg) {
rawDataReadyCallBack = func;
pRawDataReady = arg;
}

void DataProcessor::registerCallBackRawDataModifyReady(
void (*func)(sls_receiver_header &, char *, size_t &, void *), void *arg) {
rawDataModifyReadyCallBack = func;
pRawDataReady = arg;
}

void DataProcessor::PadMissingPackets(sls_receiver_header header, char *data) {
LOG(logDEBUG) << index << ": Padding Missing Packets";

Expand Down
Loading
Loading