Thread sync problems

Hi everyone, really hoping someone can give me a pointer here

I am using Rad Studio 2007, in the C++ personality

I need to read streaming data from a com port as fast as i can

I have created a thread object which uses overlapped IO to read from a com port, and the recieved data is stored in a circular buffer owned by this same thread

The class has 3 members, 1 private, which is used to copy the recieved data from the com port into the front of the buffer

2 public, one called DataAvail() which simply returns the current size of the buffer, the other called GetData() which copies data from the tail of the buffer for use by the main process.

my idea was to use something like:
1
2
3
4
5
6
7
8
while (1){
  while (COM->DataAvail() < 200)
  {
    Application->ProcessMessages();
  }

COM->GetData(Buffer, 200);
}


Now this works for a short time, but then my thread stops running its Execute() loop and makes the thread object a FFinished = true, i cannot figure out why, i certainly do not stop it and i dont seem to get any info back as to why it has happened

I think it has to do with syncronising the shared access to the buffer between the main process thread and the com thread, so i have tried using Critical sections around all the buffer code, this didnt help, also i have tried using mutexes around the private function to copy data into the buffer, and the public function to get it out, still no luck

If i remove the

COM->GetData(Buffer, 200);

line from the process, the thread runs forever and i do not get the problem, which is why im sure its a sync issue with the buffer


Can anyone tell me a way to safely get the data out of a buffer in another thread? or maybe why my thread stops so unexpectedly?

Im sure its something simple but im lost

I have included my class (sorry about all the mods but i have tried so many things

Kindest regards

Billy
Last edited on

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
#ifndef thdCC_ChannelH
#define thdCC_ChannelH
#include <Classes.hpp>
#include "stdio.h"

//Created on the Heap
#define CC_IN_BUFFER_SIZE	4096
//Created on the stack
#define CC_MAX_RX_CHUNK		512

class TCC_Channel : public TThread
{
private:
	HANDLE hSerial;
	HANDLE hReadEvent;


	unsigned int FBaud;
	unsigned char FComNumber;
	unsigned int FRXChunkSize;
	int FLastError;
	bool FPortState;

	CRITICAL_SECTION FRXBufferLock;

	unsigned char *FInBuffer;
	unsigned int FInBuffer_Size;
	unsigned int FInBuffer_In_Pos;
	unsigned int FInBuffer_Out_Pos;

	bool __fastcall SetCommDefaults(HANDLE hSerial);
	void __fastcall ShutdownCom(void);
	void __fastcall CopyBlockToInBuffer(void* Data, unsigned int Length);
	bool __fastcall SetupAndOpenCom(void);
	void __fastcall SetRXChunkSize(unsigned int Value) { FRXChunkSize = (Value > CC_MAX_RX_CHUNK) ? CC_MAX_RX_CHUNK : Value; };

protected:
	void __fastcall Execute();
public:
	HANDLE hMutex;
	__fastcall TCC_Channel(bool CreateSuspended);
	__fastcall ~TCC_Channel();
	unsigned int __fastcall DataAvail(void);
	unsigned int __fastcall GetData(void* Data, unsigned int Length);
	void __fastcall ClearRecieveBuffer(void);

	__property unsigned int Baud = { read = FBaud, write = FBaud };
	__property unsigned char ComNumber = { read = FComNumber, write = FComNumber };
	__property unsigned int RXChunkSize = { read = FRXChunkSize, write = SetRXChunkSize };
	__property int LastError = { read = FLastError };
	__property bool PortOpen = { read = FPortState };
};
#endif

#include <vcl.h>
#pragma hdrstop
#include "thdCC_Channel.h"
#pragma package(smart_init)

__fastcall TCC_Channel::TCC_Channel(bool CreateSuspended)
	: TThread(CreateSuspended)
{
	FLastError = 0;
	FBaud = 115200;
	FComNumber = 1;
	FInBuffer = new unsigned char [CC_IN_BUFFER_SIZE];
	FInBuffer_Size = 0;
	FInBuffer_In_Pos = 0;
	FInBuffer_Out_Pos = 0;
	FRXChunkSize = 1;
	FPortState = false;

	InitializeCriticalSectionAndSpinCount(&FRXBufferLock, 0x80000400))
	
	hMutex = CreateMutex(NULL, false, "COM3RXLOCK");
}

__fastcall TCC_Channel::~TCC_Channel()
{
	ShutdownCom();

	DeleteCriticalSection(&FRXBufferLock);

	delete [] FInBuffer;
}

bool __fastcall TCC_Channel::SetupAndOpenCom(void)
{
	AnsiString PipeName = "COM" + IntToStr(FComNumber);
	hSerial = CreateFile(PipeName.c_str(), GENERIC_READ | GENERIC_WRITE, 0, NULL, OPEN_EXISTING,
			FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED | FILE_FLAG_NO_BUFFERING, NULL);

	if (hSerial == INVALID_HANDLE_VALUE){
		FLastError = GetLastError();
		return false;
		}
	//COMMPROP Props;
	//GetCommProperties(hSerial, &Props);

	SetCommDefaults(hSerial);

	PipeName = "RxEvent";// + IntToStr(FComNumber);
	hReadEvent = CreateEvent(NULL, true, false, PipeName.c_str());
	if (!hReadEvent){
		FLastError = GetLastError();
		return false;
		}

	//This is for debug on the buffer
	memset(FInBuffer, 0x00, CC_IN_BUFFER_SIZE);

	FPortState = true;
	return true;
}

void __fastcall TCC_Channel::Execute()
{
	OVERLAPPED ovRead;
	OVERLAPPED ovWrite;
    DWORD dwBytesRead = 0;
	DWORD dwBytesWritten = 0;
	unsigned char RecTemp[CC_MAX_RX_CHUNK];

	memset(&ovRead, 0, sizeof(ovRead));

	if (SetupAndOpenCom())
		ovRead.hEvent = hReadEvent;
	else {
		MessageDlg("Error creating port: " + IntToStr(LastError), mtError, TMsgDlgButtons() << mbOK, 0);
		return;
		}

	while(1){

		if (!FPortState){
			SleepEx(250, false);
			continue;
			}

		WaitForSingleObject(hMutex, INFINITE);


		// Check if a read is outstanding
		if (HasOverlappedIoCompleted(&ovRead)){

			//Copy the last read contents
			GetOverlappedResult(hSerial, &ovRead, &dwBytesRead, false);
			if (dwBytesRead > 0)
				CopyBlockToInBuffer(RecTemp, dwBytesRead);
			// Issue a serial port read
			if (!ReadFile(hSerial, RecTemp, FRXChunkSize, &dwBytesRead, &ovRead)){

				FLastError = GetLastError();
				if (FLastError != ERROR_IO_PENDING){
					ShutdownCom();
					MessageDlg("RX Error, comm shutting down", mtError, TMsgDlgButtons() << mbOK, 0);
					return;
					}
				}
			}


		ReleaseMutex(hMutex);
		}
}

void __fastcall TCC_Channel::ShutdownCom(void)
{
    	ClearRecieveBuffer();
	CloseHandle(hSerial);
	CloseHandle(hReadEvent);
	FPortState = false;
}

bool __fastcall TCC_Channel::SetCommDefaults(HANDLE hSerial)
{
	DCB dcb;
	COMMTIMEOUTS commtimeouts;

	memset(&dcb,0,sizeof(dcb));
	dcb.DCBlength = sizeof(dcb);
	if (!GetCommState(hSerial,&dcb))
		return FALSE;

	dcb.BaudRate = FBaud;
	dcb.ByteSize = 8;
	dcb.Parity = 0;
	dcb.StopBits = ONESTOPBIT;

	if (!SetCommState(hSerial, &dcb))
		return FALSE;

	commtimeouts.ReadIntervalTimeout = 3000;
	if (!SetCommTimeouts(hSerial, &commtimeouts))
		return FALSE;

	return TRUE;   
}

void __fastcall TCC_Channel::ClearRecieveBuffer(void)
{
	PurgeComm(hSerial, PURGE_RXABORT | PURGE_RXCLEAR);
	EnterCriticalSection(&FRXBufferLock);
	try {
		FInBuffer_Size = FInBuffer_In_Pos = FInBuffer_Out_Pos = 0;
		}
	__finally {
		LeaveCriticalSection(&FRXBufferLock);
		}
};

unsigned int __fastcall TCC_Channel::GetData(void* Data, unsigned int Length)
{
	//EnterCriticalSection(&FRXBufferLock);
	//WaitForSingleObject(hMutex, INFINITE);

	unsigned int SizeModifier = FInBuffer_Size;

	try {

		if (Length > SizeModifier)
			Length = SizeModifier;

		if ((FInBuffer_Out_Pos + Length) < CC_IN_BUFFER_SIZE){
			memcpy(Data, &FInBuffer[FInBuffer_Out_Pos], Length);
			FInBuffer_Out_Pos += Length;
			SizeModifier = Length;
			}
		else {
			int Remain = (CC_IN_BUFFER_SIZE - FInBuffer_Out_Pos);
			memcpy(Data, &FInBuffer[FInBuffer_Out_Pos], Remain);
			FInBuffer_Out_Pos = 0;
			SizeModifier = Remain;
			int Remain2 = (Length - Remain);

			if (Remain2 > 0){
				memcpy((unsigned char*)Data + Remain, &FInBuffer[FInBuffer_Out_Pos], Remain2);
				FInBuffer_Out_Pos = Remain2;
				SizeModifier += Remain2;
				}
			}

		FInBuffer_Size -= SizeModifier;
		}
	__finally {
		//LeaveCriticalSection(&FRXBufferLock);
		//ReleaseMutex(hMutex);
		}

	return Length;
}

unsigned int __fastcall TCC_Channel::DataAvail(void)
{
	WaitForSingleObject(hMutex, INFINITE);

	unsigned int val;
	//EnterCriticalSection(&FRXBufferLock);
	try {
		val = FInBuffer_Size;
		}
	__finally {
		//LeaveCriticalSection(&FRXBufferLock);
		}
	ReleaseMutex(hMutex);
	return val;
}

void __fastcall TCC_Channel::CopyBlockToInBuffer(void* Data, unsigned int Length)
{
	//EnterCriticalSection(&FRXBufferLock);
	//WaitForSingleObject(hMutex, INFINITE);

	//try {
		unsigned int SpaceAvailable = CC_IN_BUFFER_SIZE - FInBuffer_Size;
		unsigned int SizeModifier;

		if ((FInBuffer_In_Pos + Length) < CC_IN_BUFFER_SIZE){
			memcpy(&FInBuffer[FInBuffer_In_Pos], Data, Length);
			FInBuffer_In_Pos += Length;
			SizeModifier = Length;
			}
		else {
			memcpy(&FInBuffer[FInBuffer_In_Pos], Data, SpaceAvailable);
			FInBuffer_In_Pos = 0;
			SizeModifier = SpaceAvailable;
			int Remain = (Length - SpaceAvailable);

			if (Remain > 0){
				memcpy(&FInBuffer[FInBuffer_In_Pos], (unsigned char*)Data + SpaceAvailable, Remain);
				FInBuffer_In_Pos = Remain;
				SizeModifier += Length;
				}
			}

		if (SizeModifier > CC_IN_BUFFER_SIZE){
			FInBuffer_Size = CC_IN_BUFFER_SIZE;
			FInBuffer_Out_Pos = FInBuffer_In_Pos + 1;
			}
		else
			FInBuffer_Size += SizeModifier;
		//}
	//__finally {
	   //	ReleaseMutex(hMutex);
		//LeaveCriticalSection(&FRXBufferLock);
	   //	}
}

Billy are you there?
Billy i have been working on a similar program but mine is a windows bank database program but its not working maybe we can work to gether on your program. if yes i will be waitin for your reply.
P.s
Smokie.
Hi Smokie,

Yes i would be very interested in sharing, i have really been down the long path with getting the best performance out of this serial port, now i have a working solution that reads the data without loss fast enough i get stumped with the thread syncs!

feel free to email me at mwdev [at] billysdomain [dot] net

billy
Hi billy,

Regardless of if you are suffering problems or not, you will still need to ensure thread safety when accessing your buffer (either for read of write).

Any objects that can be accessed concurrently must have a locking mechanism around them. This is not an optional extra, and failing to do so correctly will likely end up in unexpected behavior (not crashes usually). This includes ALL code accessing that object. Ideally you'd have an individual lock per object (e.g a buffer lock for just the buffer). How you implement it is up-to you (mutex, semaphore etc). But make sure you pick 1 and stick to it.

e.g ClearReceivedBuffer() still has critical section while execute has a mutex enabled.


One word of caution, expanding on the point Zaita made about having one lock per object. If the objects are related and threads need to swap and change locks to do their job, fewer locks is better. The more inter-related locks you have, the higher the risk of getting a deadlock condition. There is a fine balance between too many locks (getting deadlocks) and not enough locks (threads having to wait unnecessarily). Do your analysis on which thread needs access to what and when, and then plan you locks carefully, it will pay dividends.

One last point, when shifting data needs to be done quickly, the pipeline thread model is usually the best one to use rather than having a static circular buffer that needs locks.
Topic archived. No new replies allowed.