Reading unkown number of bytes from a non-blocking socket

Pages: 12
Hi,

I am trying to learn C++ and my first project is to convert a Basex Client I wrote in R to a C++ client.
From the earlier experiences, I know that I have to use a non-blocking socket.
In C++ I use this code to create such a socket:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
struct addrinfo hints;
struct addrinfo *result = NULL, *rp;
int sfd, bytes_read;

memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family   = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags    = AI_NUMERICSERV;

Master_sfd = getaddrinfo(host.c_str(), port.c_str(), &hints,&result);
if (Master_sfd != 0) {
  Socket_ID = -1;
  return *this;
}
for (rp = result; rp != NULL; rp = rp->ai_next) {
  sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
  if (sfd == -1) continue;
  if (connect(sfd, rp->ai_addr, rp->ai_addrlen) != -1) break;
    close(sfd);
  }

int flags = fcntl(sfd, F_GETFL);
int resultFlag = fcntl(sfd, F_SETFL, flags & ~O_NONBLOCK);

if (rp == NULL) {
  warnx("Can't make a connection");
  Master_sfd = -1;
  return *this;
  }
freeaddrinfo(result);
Master_sfd = sfd;

return *this;


To my knowledge, the result from lines 23 and 24 would be that the socket is set to non-blocking

I use this function to read from the socket:

1
2
3
4
5
6
7
8
9
10
int readSocket( vector <char> &sockResponseVector) {
  int ret = 0;
  char buf[BUFSIZ];
  while ((ret = read(Master_sfd, buf, sizeof(buf)-1)) > 0) {
    sockResponseVector.resize(sockResponseVector.size()+ret);
    sockResponseVector.insert(sockResponseVector.end(), buf, buf + ret);
    select(Master_sfd + 1, NULL, NULL, NULL, NULL);
  };
  return sockResponseVector.size();
}


My experiences from R were that in R a non-blocking socket could only be used together with the socketSelect function. My guess is that this also applies to C++.

In the debuger I see that the first time the while-loop is entered, 1 byte is read (=correct). But after the select command, execution freezes.

What am I doing wrong?

Ben
You probably need to call the select before the first read. Also, you need an fd_set for the second parameter and a timeout for the last.
From https://stackoverflow.com/questions/8107271/c-unix-sockets-non-blocking-read

1
2
3
4
5
6
7
8
9
10
11
12
13
14
fd_set input;
FD_ZERO(&input);
FD_SET(sd, &input);
struct timeval timeout;
timeout.tv_sec  = sec;
timeout.tv_usec = msec * 1000;
int n = select(sd + 1, &input, NULL, NULL, &timeout);
if (n == -1) {
    //something wrong
} else if (n == 0)
    continue;//timeout
if (!FD_ISSET(sd, &input))
   ;//again something wrong
//here we can call non blockable read 

Last edited on
According to https://www.gnu.org/software/libc/manual/html_node/Getting-File-Status-Flags.html, the following code should set the O_NONBLOKCFLAG:

1
2
3
4
5
6
7
8
  int old_flag, blocked_flag;
  old_flag = fcntl(Master_sfd, F_GETFL, 0);
  if (old_flag == -1) {
    Master_sfd = -1;
	return *this;
  }
  blocked_flag |= O_NONBLOCK;
  fcntl(Master_sfd, F_SETFL, blocked_flag);


Adding
1
2
3
4
5
  set_non_blocking = fcntl(Master_sfd, F_GETFL, 0);
  if (set_non_blocking & O_NONBLOCK)
    cout << "Socket in blocking mode: " << endl;
  else
    cout << "Socket not in blocking mode: " << endl;

showed that the socket is NOT in blocking mode.

I adapted my read function to:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
    int readSocket( vector <char> &sockResponseVector) {
      fd_set read_Master_set;
      struct timeval timeout;
      int ret = 0, start = 0;
      char buf[BUFSIZ];
      FD_ZERO(&read_Master_set);
      FD_SET(Master_sfd, &read_Master_set);
      memset(&timeout, 0, sizeof(timeout));

      select(Master_sfd + 1, &read_Master_set, NULL, NULL, &timeout);
      while ((ret = read(Master_sfd, buf, sizeof(buf)-1)) > 0) {
    	for (int i = 0; i < ret; i++) {
    		sockResponseVector.push_back(buf[i]);
    	}
        select(Master_sfd + 1, &read_Master_set, NULL, NULL, &timeout);
      };
      return sockResponseVector.size();
    }


When running the code in the debugger there are no problems. But when running as normal application the program freezes.

From my experiences when using a nonblocking socket in R, I know that this is probably caused by the `select` function that is not working.

How comes that in the debugger there are no problems and when running as normal C++ application, the program freezes?

Ben
Last edited on
Your code for setting the blocking mode is incorrect.
You are using an uninitialized value (blocked_flag) on this line:
 
  blocked_flag |= O_NONBLOCK;  // random value ORed with O_NONBLOCK 

Your compiler should warn you about that. Don't ignore compiler warnings!

The code should be more like:

1
2
3
4
5
6
7
8
  int old_flag, blocked_flag;
  old_flag = fcntl(Master_sfd, F_GETFL, 0);
  if (old_flag == -1) {
    Master_sfd = -1;
	return *this;
  }
  blocked_flag = old_flag | O_NONBLOCK;
  fcntl(Master_sfd, F_SETFL, blocked_flag);

I think to do what you want you can just use read.
And instead of a vector of char, you could just use a std::string.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
std::string sockResponse;
...
    while (1) {
        char buf[BUFSIZ];
        errno = 0;
        int ret = read(Master_sfd, buf, sizeof(buf));
        if (ret == -1) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                // The read would block ... not sure what you want to do here
            }
            else {
                perror("read");
                exit(EXIT_FAILURE);
            }
        }
        if (ret == 0) break; // EOF
        sockResponse.append(buf, ret); // append ret chars to response (can contain '\0' chars)
    }
Last edited on
I know for certain that the stream will contain embedded \0's and other non-printable characters. That's why in the Basex protocol the stream is defined as a raw array of chars and not as a string.
I'll experiment with the blocked_flag and will let you know the result.
I copied this function from https://www.gnu.org/software/libc/manual/html_node/Getting-File-Status-Flags.html :
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/* Set the O_NONBLOCK flag of desc if value is nonzero,
   or clear the flag if value is 0.
   Return 0 on success, or -1 on error with errno set. */

int
set_nonblock_flag (int desc, int value)
{
  int oldflags = fcntl (desc, F_GETFL, 0);
  /* If reading the flags failed, return error indication now. */
  if (oldflags == -1)
    return -1;
  /* Set just the flag we want to set. */
  if (value != 0)
    oldflags |= O_NONBLOCK;
  else
    oldflags &= ~O_NONBLOCK;
  /* Store modified flag word in the descriptor. */
  return fcntl (desc, F_SETFL, oldflags);
}


After inserting 'std::cout << __func__ << std::endl;` as the first line in all the functions I wrote, I now see that the stream is in non-blocking mode and that the problem is caused by code at an unexpected point.
Last edited on
@DizzyDon's code does all you want to do:
* simplified use non-blocking sockets
* dealing with blocks of memeory, and not null terminated strings
* correct use of select()

Why are you using non-blocking sockets? It's not clear that you need them.

select() can be used to multiplex a mixture of blocking and non-blocking sockets in a single thread.
That's why in the Basex protocol the stream is defined as a raw array of chars and not as a string.

std::string can hold '\0' chars just fine. That's why I suggested it. Its append function simplifies things.
@DizzDyon,
Your sockResponse() does exactly what is needed and I have implemented the function and changed vector <char> sockResponseVector to std::string sockResponseString. In the debugger everything is fine, but when running as local C/C++application it seems that there are memory probles.

Do I have to allocate and release memory? (One reason why I opted for vector <char> was that to my knowledge when using vectors there is no need for explicit memory management)

@kbw
While developing my RBasex client, I initially opted for a blocking socket. Reading from this socket was very easy. However, the downside was that in R the minimum timeout was 1 second. Each read action therefore took at least 1 second.
No timeout was required when using the non-blocking socket. When I ran the application in the debugger, the performance was optimal. However, when the script was executed normally, the program hung.
It took more than 1 year before I got a tip to use the 'socketSelect' function in R. This resolved all problems.

Based on this experience, I thought I should also use a non-blocking socket and the select function in C++.
Last edited on
@Bengbers, No you do not need to explicitly manage a std::string's memory.
What kind of "memory problems"?
Post your current function.
Last edited on
I am off till monday and am not able to copy the code.
After a few runs, the debugger opens basic_string.h and freezes. I'll copy the exact output.

In the original code I used some 'string' variables. Your function introduced 'std::string' as type. Is it allowed to mix string and str::string in one application?
Of course it's allowed. They're the same thing.

(Or at least, it's allowed as long as you've used an appropriate using statement, to pull std::string into the global namespace.)
Last edited on
@DizzyDon

In most cases the final 'readSocket' returns the expected result:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
	int readSocket( std::string &sockResponseString) {
		struct timeval timeout;
		memset(&timeout, 0, sizeof(timeout));
		// char buf[BUFSIZ];
		char buf[8];  // test if reading small chunks functions
		fd_set read_Master_set;
		FD_ZERO(&read_Master_set);
		FD_SET(Master_sfd, &read_Master_set);
		int ret;

		memset(&buf, 0, sizeof(buf));
		select(Master_sfd + 1, &read_Master_set, NULL, NULL, &timeout);
		while (1) {
			errno = 0;
			ret = read(Master_sfd, buf, sizeof(buf));
			if (ret == -1) {
				if (errno == EAGAIN || errno == EWOULDBLOCK) {
					break;
				} else {
					perror("Read");
					exit(EXIT_FAILURE);
				}
			}
			if (ret == 0) break;
			sockResponseString.append(buf, ret);
			if (sizeof(buf) > 0) cout << buf << endl;
			// I read somewhere that 'select()' is necessary before each read ?...
			memset(&buf, 0, sizeof(buf));
			select(Master_sfd + 1, &read_Master_set, NULL, NULL, &timeout);
		}

		return sockResponseString.size();
	}


But this works only in the debugger and after setting several breakpoints.
Since the breakpoints in the code for creating the socket and reading the data seem to beparticularly relevant, I think that's where the source of the error lies. I suspect that frequent use of the breakpoints prevents timeout problems.
This is how I create the socket:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
BasexSocket& BasexSocket::CreateSocket (string host, string port) {
	std::cout << __PRETTY_FUNCTION__ << std::endl;
	if (host.empty() || port.empty()) {
#if DEBUG
		warnx("Missing hostname / port");
#endif
		Master_sfd = -1;
		return *this;
	}
	// Create a socket. Socket() takes as parameter: int domain, int type and int protocol
	struct addrinfo hints;
	struct addrinfo *result = NULL, *rp;
	// Initialize hints
	memset(&hints, 0, sizeof(struct addrinfo));
	hints.ai_family   = AF_UNSPEC;                // Accept both AF_INET and AF_INET6
	hints.ai_socktype = SOCK_STREAM;              // stream
	hints.ai_flags    = AI_NUMERICSERV;           // Port must be specified as number
	int rc;
	rc = getaddrinfo( host.c_str(), port.c_str(), &hints, &result);
	if (rc != 0) perror(gai_strerror(rc));

	// result is a linked list of address structures.
	// Try to connect. Return the first successfull connect or abort
	for (rp = result; rp != NULL; rp = rp->ai_next) {
		Master_sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
		if (Master_sfd == -1) continue;
		if (connect(Master_sfd, rp->ai_addr, rp->ai_addrlen) != -1) break;
		close(Master_sfd);
	}
	string blocking = (set_nonblock_flag( Master_sfd, 1) == 0) ? "non-" : "";
	cout << "Socket in " << blocking << "blocking mode" << endl;

	if (rp == NULL) {
		warnx("Can not connect to Basex server");
	}
	freeaddrinfo(result);

	return *this;
}


Until now, readSocket is only used in the authentication procedure in lines 11 and 44.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
BasexSocket& BasexSocket::Authenticate (const string user, const string passwd) {
	std::cout << __PRETTY_FUNCTION__ << std::endl;
	int bytes_read;
	vector <string> strings;
	string realm, nonce, code_nonce, code_wd;
	string md5_hash_1, md5_hash_2;

	// 1: Start reading the timestamp
	std::string sock_read_string = {};
	std::string result;
  bytes_read = readSocket( sock_read_string);       // number of read bytes
  cout << "Teststring : " << sock_read_string << endl;
	if (bytes_read == -1) {
		warnx("Reading timestamp failed");
		Master_sfd = -1;
	}

	// 2: Client sends the user name (must be \0-terminated)
	writeSocket(user);

	// 3: Compose the md5_auth hash  // md5_auth code = md5(md5("<user>:BaseX:<password>") + "1234567890123")
	strings = StringSplit( sock_read_string, ':');    // Split string on occurence of ':'
	for (int i = strings.size(); i < strings.size() ; i++) {
		cout << "readSocket: " << strings[i] << endl;
	}
	if (strings.size() == 1) {                        // CRAM-MD5
		nonce = sock_read_string.c_str();
		code_wd = passwd;
	} else {                                          // Digest
		realm = strings[0];
		nonce = strings[1];
		code_wd = user + ':' + realm + ':' + passwd;    // code_wd, being a 'string' is \0-terminated.
	}
	cout << realm << endl;
	cout << nonce << endl;

	md5_hash_1 = md5_basex(code_wd);                  // First hash
	code_nonce = md5_hash_1 + nonce;                  // Add nonce
	md5_hash_2 = md5_basex(code_nonce);               // Second hash
	writeSocket(md5_hash_2);                          // Send md5_hash_2 to socket

	// 4: Handle server-response
	sock_read_string = {};
	bytes_read = readSocket( sock_read_string);
	char success = sock_read_string.front();
	if (success != '\0') {
		Master_sfd = -1;
		warnx("Authentication failed");
	}
	return *this;
};

What I don't understand is why line 44 sometimes returns '000' (as expected) and sometimes '001' (meaning authentication has failed.

Sometimes, after several testruns, in Eclipse a tab 'basic_string.h' is opened. At first sight I thought that this was triggered by memory problems but since I can't reproduce this error at the moment, I can't confirm this.
You mean L45? However you don't test bytes_read on L44 to see if any bytes have actually been read - but assume they have for the test on L45. If read() (L15 readSocket() ) succeeds but reads no data (??) (L24), then the return value will be 0 and obtaining .front() will be invalid.

At this point in the authentication process, only a \0 or a \01 can be expected. But to deal with the situation where nothing is received, I have changed the lines 43-49 to:
1
2
3
4
5
6
7
8
9
10
11
12
sock_read_string = {};
bytes_read = readSocket( sock_read_string);
if (bytes_read > 0) {
	char success = sock_read_string.front();
		if (success != '\0') {
			Master_sfd = -1;
			warnx("Authentication failed");
		}
} else {
	Master_sfd = -1;
	warnx("Reading authentication status byte failed");
}
Last edited on
I've written a readSocket() for you that works with blocking/non-blocking sockets.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
int readSocket( std::string &sockResponseString) {
	// use lambdas as local functions
	auto can_read = [](int s) -> bool {
		fd_set read_set;
		FD_ZERO(&read_set);
		FD_SET(s, &read_set);
		struct timeval timeout {};
		int rc = select(s + 1, &read_set, NULL, NULL, &timeout);
		return (rc == 1) && FD_ISSET(s, &read_set);
	};
	auto do_read = [&sockResponseString](int s) -> bool {
		// don't need non-blocking checks, code works with both blocking
		// and non-blocking sockets as select() says we're good to read
		// char buf[BUFSIZ];
		char buf[8];  // test if reading small chunks functions
		int nbytes = recv(s, buf, sizeof(buf), 0);
		if (nbytes <= 0)
			return false;
		sockResponseString += std::string(buf, static_cast<size_t>(nbytes));
		return true;
	};

	sockResponseString.clear();
	bool done{};
	while (!done) {
		// keep looping until first read
		if (can_read(Master_sfd) && do_read(Master_sfd)) {
			// then return once all the buffered input is read
			while (!done) {
				if (!can_read(Master_sfd))
					done = true;
				do_read(Master_sfd);
			}
		}
	}
	return static_cast<int>(sockResponseString.size());
}


As it blocks (irrespective of the socket type), it may been to be tweaked further.

I don't have much context around authentication, but can comment if there's more info.
Last edited on
Also note that L13 bytes_read can never be -1. Do you mean == 0 ?

Also in readSocket(), as the return value/type is from .size() then this is a type size_t (unsigned) and not a type int (signed). Mixing int and size_t will usually give a compiler warning. If the return value from readSocket() is always from .size() then the return type should be of type size_t. If the return value could be < 0 (eg to indicate an error - but not used here) then the return type should be ptrdiff_t and the return valid value should be std::ssize(sockResponseString)
Bengbers wrote:
I copied this function from

Unfortunately the HTML/URL processor for the CPlusPlus fora doesn't like extra characters added to URLs, you appended a : to the link.

A common "oops!" issue, even the regulars do it from time to time.

There are ways to force the processor to act less stupid, manually add blank opening and closing format tags at the end of the URL before adding the character(s).

I personally add the teletype format tags using the format toolbox to the right of this edit box, adding format tags to the start and end of a URL so I always get a link that doesn't run home and cry to Mama.
Last edited on
@kbw
With copy-paste I was able to fit the custom readSocket function into my code. However, this function has the same drawback as my original code, which is the timeout. So I will have to see how I can tweak this example to use non-blocking.
(It is a nice example of the use of lambda!)

@seeplus
I wasn't aware of the size_t datatype. I'll see if the compiler actually gives warnings.
Pages: 12