Worker Threads: What?

Basically I was too lazy to include the <chrono> header, but I have to wait for some time. The thing is I don’t know how long to wait, so I just made a “busy loop”(I think that’s what it’s called?) instead like so..

 
for(int i = 0;i < 100000000;i++);


Now the thing that I made was this: I made a server that has a couple things going on at once.
There’s a thread pool to handle connections,
a thread to accept connections,
and the main thread of execution for whatever(idk what that’s gonna do).

The thread pool just tries to grab a file descriptor from a queue which is added to in the thread that accepts connections. In order to make absolutely sure that the thread pool actually has a chance to grab a file descriptor, I decided to make the thread adding to the queue wait a little every 20 times it adds a connection to the queue so that the thread pool has a chance to actually get some of the file descriptors from the queue. I was wondering whether someone thought this was a weird thing to do, and if there is is a better way of doing all this, because what I’m doing right now seems kinda like a little weird but that might just be me...?

Thank you for taking the time to read through that mess :)
"Too lazy to include <chrono>" makes no sense! And a busy-wait doesn't give up the CPU, so it's not really what you want here since you definitely want to relinguish the CPU so the other thread can use it.

You should just sleep the thread for a definite time.

1
2
// Sleep for 1 millisecond:
std::this_thread::sleep_for( std::chrono::milliseconds( 1 ) );

Since I don't understand your description of why you are doing this, I can't say whether it is a good idea or not.
Last edited on
Oh, sorry. Uh.....

I have 5 worker threads, all that are just sitting there waiting to lock onto a queue of file descriptors. Each file descriptor is a socket(connection) to a client.

Then there is a 6th thread that simply accepts clients over and over and adds them to the queue.

Then there is the main thread of execution, which is irrelevant.

To handle data races between the 5 worker threads and the 6th thread that’s just accepting connections, I use a mutex. Now, Since all the 6th thread is doing is just continually locking the mutex, adding a connection, and unlocking, only to lock again, I’m afraid that there might be times when 5 worker threads might not even get a chance to lock on to the mutex and extract a connection from the queue, so I made it so the 6th thread occasionally waits a bit instead of locking on to the queue to allow the worker threads to get some of the connections.
Last edited on
Okay, I see what you're concern is.

Your "producer" thread is constantly locking the queue to add connections? But surely there will be a lull in connections.

1
2
3
4
5
6
7
8
9
Producer:
    loop:

        // this thread will spend most of it's time here
        wait for incoming connection

        lock queue
        use queue
        unlock queue

Can you show some pseudocode for your producer and consumer?
That’s true....

Here’s the pseudo

producer

while true:
  if has been 20 iterations:
    wait
    reset iterations
    continue
  else:
    accept connection
    lock mutex
    push connection
    unlock mutex


consumer

while true:
  lock mutex
  pop connection
  unlock mutex
  handle connection (it’s a function pointer I run)


Blech. I’m bad at pseudo code. :(
Last edited on
So the "20 iterations" and "wait" part is all about the delay you're trying to put in? Taking that out, I don't see what the problem is. Surely the producer is stuck in the "accept connection" state most of the time. I don't see how it will block out the consumers. The only problem I see is that the consumer will busy-wait on an empty queue. You should probably use a condition variable to signal when the queue has data.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Producer:
  while true:

    accept connection

    lock mutex
    push connection
    unlock mutex

    notify waiting threads that queue has data

Consumer:
  while true:

    lock mutex
    pop connection
    unlock mutex

    if there was a connection
      handle connection
    else
      wait until notified that queue is not empty

Oh yeah I kinda have something like that I just return -1 (not a valid socket descriptor) if the queue is empty. It seems kinda strange to add a thing that waits for a flag that then allows you to check for a though. Can I send you my code? It’s Unix sockets and I’ll heavily comment if needed.

Edit: well not the entire code, just the actual worker thread function and the producer function.
Last edited on
I don't think just returning -1 if the queue is empty is the same thing at all.
The point here is to use a condition variable to notify the consumer when the queue has data, that way the consumer doesn't need to busy-wait, checking the queue over and over.

You can post your code somewhere and put a link here.
Just post the code here. Berkeley-style sockets are familiar to many (most?) of us.
Bear with me here, it’s unfinished. (Nothing big though. Just need to pass some args for accept()..)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#ifndef P_SERVER_H
#define P_SERVER_H
#include <thread>
#include <sys/socket.h>
#include <vector>
#include <mutex> 

class CONN_QUEUE {
  int * fds;
  int size; 
  public:
  std::mutex lk;
  CONN_QUEUE() {
    size = 0;
    fds = nullptr;
  }
  void push(int fd) {
    if(size != 0) { 
      int * tmp = fds;
      fds = new int[++size];
      fds[0] = fd;
      for(int i = 1;i < size;i++){
        fds[i] = tmp[i-1];
      }
      delete [] tmp;
    } 
    else {
      fds = new int[++size];
    }
  }
  
  int pop() {
    if(size != 0) {
      int * tmp = fds;
      fds = new int[--size];
      int ret = tmp[size + 1]; // edit: oops. That should be size not size + 1 :/
      for(int i = 0;i < size;i++){
        fds[i] = tmp[i];
      }
      delete [] tmp;
      return ret;
    }
    else {
      return -1;
    }
  }

  ~CONN_QUEUE() {
    if(size != 0) delete [] fds;
  }
};

class Server {
  int m_fd;
  // struct { std::mutex lk; queue q; }
  void (*hndlr) (int);
  CONN_QUEUE cq;

  void fill_queue(CONN_QUEUE* q){
    for(int i = 1;true; i++) {
      /* no longer needed
      if(i % 20 == 0){
        for(int i = 0;i < 100000000;i++); // wait for a bunch of cycles
        i = 1;
        continue;
      }
      */
      int fd = accept(/* m_fd, (*) ,  */);
      if(fd != -1) {
        q->lk.lock();
        q->push(fd);
        q->lk.unlock();
      }
    }
  }

  void worker_thread(CONN_QUEUE* q) {
    while(true) {
      q->lk.lock();
      int fd = q->pop();
      q->lk.unlock();
      hndlr(fd);
    }
  }
  public:
  Server(void (*h)(int), const char* port);

  ~Server();

  std::vector<std::thread> start(int pool_size);
};

#endif 


Edit: ok, well maybe a little more than just that, but they all have known, fast solutions.
Last edited on
Your queue is pretty wacky. I suggest you use std::queue if possible. It looks like your queue may have an actual error since it doesn't store the first fd passed to push.

As for the thread coordination, I'm no expert but maybe something like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#ifndef P_SERVER_H
#define P_SERVER_H

#include <thread>
#include <sys/socket.h>
#include <vector>
#include <mutex>
#include <queue>

class Server
{
    void (*hndlr) (int);

    std::queue<int> cq;
    std::mutex mtx;
    std::mutex mtx_queue;
    std::condition_variable queue_ready;

    void fill_queue()
    {
        while (true)
        {
            int fd = accept(/* m_fd, (*) ,  */);

            if (fd != -1)
            {
                {
                    std::lock_guard<std::mutex> lk(mtx);
                    cq.push(fd);
                }
                queue_ready.notify_one();
            }
        }
    }

    void worker_thread() {
        while (true) {

            int fd = -1;
            {
                std::lock_guard<std::mutex> lk(mtx);
                if (!cq.empty())
                {
                    fd = cq.front();
                    cq.pop();
                }
            }

            if (fd != -1)
                hndlr(fd);
            else
            {
                std::unique_lock<std::mutex> lk(mtx_queue);
                queue_ready.wait(lk);
            }
        }
    }

public:
    Server(void (*h)(int), const char* port);
    ~Server();
    std::vector<std::thread> start(int pool_size);
};

#endif  

Last edited on

It looks like your queue may have an actual error ...

Oh yeah that yeah sorry bout that it’s still in the works right now, thanks for catching that! :)

[code ... code]

O.o fancy! yeah I think I’ll definitely use that, integrating now lol. I see now what you mean I think.


Edit: oh also here’s a link to where my actual code is if ya’ll wanna see it: https://repl.it/@Highwayman/We-are-going-to-try-this-one-more-time
Last edited on
I think this program demonstrates your thread situation.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#include <iostream>
#include <iomanip>
#include <vector>
#include <queue>
#include <chrono>
#include <thread>
#include <mutex>
#include <condition_variable>
using namespace std;

mutex mtx, mtx_queue;
condition_variable queue_ready;
queue< int > q;
bool loop = true;

void func( int n )
{
    while ( loop )
    {
        int seconds = -1;

        {
            lock_guard< mutex > lk( mtx );
            if ( !q.empty() )
            {
                seconds = q.front();
                q.pop();
            }
        }

        if ( seconds >= 0 )
        {
            {
                lock_guard< mutex > lk( mtx );
                cout << setw(n * 4) << "W" << seconds << '\n';
            }

            this_thread::sleep_for( chrono::seconds( seconds ) );

            {
                lock_guard< mutex > lk( mtx );
                cout << setw(n * 4) << "F" << seconds << '\n';
            }
        }
        else
        {
            unique_lock< mutex > lk( mtx_queue );
            queue_ready.wait( lk );
        }
    }
}

int main(int argc, char** argv)
{
    int NumThreads = 3;
    if (argc == 2) stoi( argv[1] );

    vector<thread> threads( NumThreads );
    for ( int i = 0; i < NumThreads; ++i )
        threads[i] = thread( func, i + 1 );

    for ( int n; cin >> n; )
    {
        q.push( n );
        queue_ready.notify_one();
    }

    queue_ready.notify_all();
    loop = false;
    for ( int i = 0; i < NumThreads; ++i )
        threads[i].join();
}

In the main thread you enter non-negative integers that represents the number of seconds for a thread to wait. The ints are pushed to the queue and the threads pop them off and wait for that amount of time (that's their "work"). They print Wn and Fn to say they are Waiting or have Finished waiting n seconds. Each thread's output appears in it's own column (i.e., the output is indented proportionally to the thread number).

Example output with 3 "worker" threads (input values are not indented):

3
   W3
2
       W2
3
           W3
4
3
2
3
       F2
       W4
1
2
   F3
   W3
           F3
           W2
           F2
           W3
   F3
   W1
       F4
       W2
   F1
       F2
           F3

Example with 6 threads, putting all input values on the first line.

4 2 0 3 1 4 2 3 2 3 1 3 2
   W4
       W2
           W0
           F0
           W3
                   W1
               W4
                       W2
                   F1
                   W3
       F2
       W2
                       F2
                       W3
           F3
           W1
   F4
   W3
               F4
               W2
           F1
                   F3
       F2
                       F3
               F2
   F3

Last edited on
I’m really confused now I’m sorry, what?
Forget it. :(
:( ok... :/ ok.
Ok, This is the finished product, what do you think?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include <mutex>
#include <condition_variable>
#include <list>
#include <queue>
#include <sys/socket.h>

class CONN_QUEUE {
  public:  // uh... ya know just roll with it.
  struct CONN;
  private:
  std::queue<CONN,std::list<CONN>> connections;
  std::condition_variable queue_ready;
  std::unique_lock<std::mutex> q_mtx;
  public:
  struct CONN {
    sockaddr_storage *sockaddress;
    int fd;
  };

  CONN_QUEUE() = default;

  void wait()   {  queue_ready.wait(q_mtx);  }

  void push(CONN connection) {
    q_mtx.lock();
    connections.push(connection);
    q_mtx.unlock();
    queue_ready.notify_one();
  }

  CONN pop() {
    q_mtx.lock();
    CONN ret = connections.back();
    connections.pop();
    q_mtx.unlock();
    return ret;
  }
};

namespace CONN_JOBS {
  void fill_queue(CONN_QUEUE* cq,int m_fd) {
    CONN_QUEUE::CONN connection;
    socklen_t addrlen = sizeof( *(connection.sockaddress) );
    int new_fd;
    while(true) {
      new_fd = accept(m_fd, (sockaddr*)&connection.sockaddress, &addrlen);
      cq->push(connection);
    }
  }
  void worker_thread(void( *operation )(int,sockaddr_storage*),CONN_QUEUE *cq) {
    CONN_QUEUE::CONN connection;
    while(true) {
      cq->wait();
      connection = cq->pop();
      operation( connection.fd , connection.sockaddress );
    }
  }
};
Actually this is the finished product. Lol.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#include <mutex>
#include <condition_variable>
#include <list>
#include <queue>
#include <sys/socket.h>

class CONN_QUEUE {
  public:  // uh... ya know just roll with it.
  struct CONN;
  private:
  std::queue<CONN,std::list<CONN>> connections;
  std::condition_variable queue_ready;
  std::unique_lock<std::mutex> q_mtx;
  public:
  struct CONN {
    sockaddr_storage *sockaddress;
    int fd;
  };

  CONN_QUEUE() = default;

  void wait()   {  queue_ready.wait(q_mtx);  }

  void push(CONN connection) {
    q_mtx.lock();
    connections.push(connection);
    q_mtx.unlock();
    queue_ready.notify_one();
  }

  CONN pop() {
    q_mtx.lock();
    CONN ret; 
    if(!connections.empty()) { 
      ret = connections.back();
      connections.pop();
    } 
    else {
      ret.fd = -1;
    }
    q_mtx.unlock();
    return ret;
  }
};

namespace CONN_JOBS {
  void fill_queue(CONN_QUEUE* cq,int m_fd) {
    CONN_QUEUE::CONN connection;
    socklen_t addrlen = sizeof( *(connection.sockaddress) );
    while(true) {
      connection.fd = accept(m_fd, (sockaddr*)&connection.sockaddress, &addrlen);
      cq->push(connection);
    }
  }
  void worker_thread(void( *operation )(int,sockaddr_storage*),CONN_QUEUE *cq) {
    CONN_QUEUE::CONN connection;
    while(true) {
      cq->wait();
      connection = cq->pop();
      if(connection.fd >= 0)
        operation( connection.fd , connection.sockaddress );
    }
  }
};
Last edited on
Topic archived. No new replies allowed.