-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathWriterThread.cpp
70 lines (49 loc) · 2.07 KB
/
WriterThread.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#include "jungfrau.h"
// Writer thread
void *WriterThread(void *in_threadarg) {
int ret;
WriterThreadArg *in = (WriterThreadArg *)in_threadarg;
if ((px.mode == UINT16_MODE) || (px.mode == INT16_MODE)) {
buffer[in->threadid] = malloc (px.nimages_per_file*YPIXEL*XPIXEL*2);
MALLOC_ERROR(buffer[in->threadid]);
} else {
buffer[in->threadid] = malloc (px.nimages_per_file*YPIXEL*XPIXEL*4);
MALLOC_ERROR(buffer[in->threadid]);
}
char *compression_buffer;
if (px.use_direct_chunk_writer) {
compression_buffer = (char *) malloc(wrt->GetOutputBufferMaxSize());
} else compression_buffer = NULL;
ret = pthread_mutex_lock(&tasks_assigned_semaphore);
PTHREAD_ERROR(ret,pthread_mutex_lock);
while (tasks_assigned < number_of_tasks) {
int curr_task = tasks_assigned;
tasks_assigned += 1;
ret = pthread_mutex_unlock(&tasks_assigned_semaphore);
PTHREAD_ERROR(ret,pthread_mutex_unlock);
ret = pthread_mutex_lock(&task_ready_semaphore[curr_task]);
PTHREAD_ERROR(ret,pthread_mutex_lock);
task_ready[curr_task] = in->threadid;
ret = pthread_cond_broadcast(&task_ready_cond[curr_task]);
PTHREAD_ERROR(ret,pthread_cond_broadcast);
ret = pthread_mutex_unlock(&task_ready_semaphore[curr_task]);
PTHREAD_ERROR(ret,pthread_mutex_unlock);
// Converters are cleared to do their job!
ret = pthread_mutex_lock(&converter_done_semaphore[curr_task]);
PTHREAD_ERROR(ret,pthread_mutex_lock);
while (converter_done[curr_task] != NUM_WORKERS) {
ret = pthread_cond_wait(&converter_done_cond[curr_task], &converter_done_semaphore[curr_task]);
PTHREAD_ERROR(ret,pthread_cond_wait);
}
ret = pthread_mutex_unlock(&converter_done_semaphore[curr_task]);
PTHREAD_ERROR(ret,pthread_mutex_unlock);
wrt->SaveData((char *)buffer[in->threadid], curr_task, compression_buffer);
ret = pthread_mutex_lock(&tasks_assigned_semaphore);
PTHREAD_ERROR(ret,pthread_mutex_lock);
}
ret = pthread_mutex_unlock(&tasks_assigned_semaphore);
PTHREAD_ERROR(ret,pthread_mutex_unlock);
if (px.use_direct_chunk_writer) free(compression_buffer);
free(buffer[in->threadid]);
pthread_exit(0);
};