C++ • Concurrency

Introduction to C++ Concurrency and Multithreading

C++並行処理とマルチスレッドプログラミング入門

With the rise of multi-core processors, concurrent programming has become essential for maximizing performance in modern applications. C++11 introduced a comprehensive threading library that provides powerful tools for developing concurrent programs. This article explores the fundamentals of concurrency in C++, providing practical examples and best practices for writing efficient, thread-safe code.

Why Write Concurrent Programs?

There are several compelling reasons to adopt concurrent programming techniques:

  • Natural problem decomposition - Some problems are naturally expressed as multiple independently executing components.
  • Performance - Multi-threaded applications can significantly improve performance by utilizing multiple CPU cores.
  • Responsiveness - User interfaces remain responsive while computation-intensive tasks run in background threads.
  • Resource utilization - Concurrent programs can make better use of system resources, especially in I/O-bound applications.

Thread Creation in C++

C++11 introduced the std::thread class, which makes creating and managing threads straightforward. Let's start with a simple example:

#include <iostream>
#include <thread>

void hello() {
    std::cout << "Hello from thread " << std::this_thread::get_id() << std::endl;
}

int main() {
    // Create a new thread
    std::thread t(hello);
    
    // Print from main thread
    std::cout << "Hello from main thread " << std::this_thread::get_id() << std::endl;
    
    // Wait for the thread to finish
    t.join();
    
    return 0;
}

In this example, we create a thread t that executes the hello() function. The main thread continues execution and then waits for thread t to complete usingjoin(). The join() function is crucial—without it, the main function might exit before the thread completes, leading to program termination.

Note:

If you don't want to wait for a thread to finish, you can detach it using t.detach(). However, be careful—detached threads continue running even after the main function exits, and you lose any control over them.

You can also pass arguments to thread functions:

#include <iostream>
#include <thread>
#include <string>

void greet(std::string name, int times) {
    for(int i = 0; i < times; ++i) {
        std::cout << "Hello, " << name << "! Count: " << i << std::endl;
    }
}

int main() {
    // Create thread with arguments
    std::thread t(greet, "Tanaka", 5);
    
    // Wait for thread to finish
    t.join();
    
    return 0;
}

Understanding Race Conditions

When multiple threads access shared data simultaneously and at least one thread modifies it, we enter dangerous territory known as a "race condition." Let's look at a classic example:

#include <iostream>
#include <thread>
#include <vector>

// Shared counter
int counter = 0;

void increment_counter(int iterations) {
    for (int i = 0; i < iterations; ++i) {
        // This is not atomic!
        counter++;
    }
}

int main() {
    constexpr int num_threads = 10;
    constexpr int iterations_per_thread = 1000000;
    
    std::vector<std::thread> threads;
    
    // Create multiple threads that increment the counter
    for (int i = 0; i < num_threads; ++i) {
        threads.push_back(std::thread(increment_counter, iterations_per_thread));
    }
    
    // Wait for all threads to finish
    for (auto& t : threads) {
        t.join();
    }
    
    // The expected value is num_threads * iterations_per_thread
    std::cout << "Expected: " << num_threads * iterations_per_thread << std::endl;
    std::cout << "Actual: " << counter << std::endl;
    
    return 0;
}

If you run this program, you'll almost certainly get a result that's smaller than expected. This happens because the increment operation (counter++) is not atomic—it consists of three separate steps: read the current value, add one, and write back the result. When multiple threads execute these steps concurrently, updates can be lost.

Critical Sections and Mutual Exclusion

To solve race conditions, we need to protect shared data with synchronization mechanisms. A critical section is a segment of code that accesses shared resources and must not be executed concurrently by multiple threads.

Protecting Shared Data with Mutexes

The most common synchronization primitive is the mutex (mutual exclusion):

#include <iostream>
#include <thread>
#include <vector>
#include <mutex>

int counter = 0;
std::mutex counter_mutex;  // Protects the counter

void increment_counter(int iterations) {
    for (int i = 0; i < iterations; ++i) {
        // Lock the mutex before accessing the shared resource
        counter_mutex.lock();
        counter++;
        // Unlock the mutex when done
        counter_mutex.unlock();
    }
}

int main() {
    constexpr int num_threads = 10;
    constexpr int iterations_per_thread = 1000000;
    
    std::vector<std::thread> threads;
    
    for (int i = 0; i < num_threads; ++i) {
        threads.push_back(std::thread(increment_counter, iterations_per_thread));
    }
    
    for (auto& t : threads) {
        t.join();
    }
    
    std::cout << "Expected: " << num_threads * iterations_per_thread << std::endl;
    std::cout << "Actual: " << counter << std::endl;
    
    return 0;
}

Now the result will match our expectations because we've ensured that only one thread can increment the counter at a time. However, manually locking and unlocking mutexes is error-prone and can lead to deadlocks if you forget to unlock.

RAII Lock Guards

C++ provides RAII (Resource Acquisition Is Initialization) wrappers for mutexes that automatically handle locking and unlocking:

#include <iostream>
#include <thread>
#include <vector>
#include <mutex>

int counter = 0;
std::mutex counter_mutex;

void increment_counter(int iterations) {
    for (int i = 0; i < iterations; ++i) {
        // The lock_guard locks the mutex in its constructor and
        // unlocks it in its destructor (when it goes out of scope)
        std::lock_guard<std::mutex> lock(counter_mutex);
        counter++;
        // No need to manually unlock! It happens automatically
    }
}

int main() {
    // ... (same as before)
}

Best Practice:

Always use RAII wrappers like std::lock_guard, std::unique_lock, or std::scoped_lock (C++17) instead of directly calling lock() andunlock(). This avoids forgetting to unlock mutexes and handles exceptions properly.

Advanced Locking with Mutexes

Recursive Mutexes

Standard mutexes will deadlock if the same thread tries to lock them twice. Recursive mutexes solve this problem:

#include <iostream>
#include <thread>
#include <mutex>

std::recursive_mutex rm;

void recursive_function(int depth) {
    std::lock_guard<std::recursive_mutex> lock(rm);
    std::cout << "Recursion depth: " << depth << std::endl;
    
    if (depth > 0) {
        // This works with recursive_mutex but would deadlock with regular mutex
        recursive_function(depth - 1);
    }
}

int main() {
    std::thread t(recursive_function, 5);
    t.join();
    return 0;
}

Timed Mutexes

Sometimes you want to try to acquire a lock but not wait indefinitely:

#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>

std::timed_mutex tm;

void worker(int id) {
    // Try to acquire lock for 200ms
    if (tm.try_lock_for(std::chrono::milliseconds(200))) {
        std::cout << "Thread " << id << " acquired lock" << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        tm.unlock();
    } else {
        std::cout << "Thread " << id << " couldn't acquire lock, doing something else" << std::endl;
        // Do alternative work that doesn't require the lock
    }
}

int main() {
    std::thread t1(worker, 1);
    std::thread t2(worker, 2);
    
    t1.join();
    t2.join();
    
    return 0;
}

std::call_once

For one-time initialization in a multithreaded context, use std::call_once:

#include <iostream>
#include <thread>
#include <mutex>
#include <vector>

std::once_flag initialization_flag;

void initialize() {
    std::cout << "Initialization performed once" << std::endl;
    // Expensive initialization here
}

void worker() {
    // This ensures initialization happens exactly once, even with multiple threads
    std::call_once(initialization_flag, initialize);
    std::cout << "Worker thread doing regular work" << std::endl;
}

int main() {
    std::vector<std::thread> threads;
    
    for (int i = 0; i < 5; ++i) {
        threads.push_back(std::thread(worker));
    }
    
    for (auto& t : threads) {
        t.join();
    }
    
    return 0;
}

Using Atomic Types

For simple data types, C++ provides atomic operations that don't require mutexes:

#include <iostream>
#include <thread>
#include <vector>
#include <atomic>

// Atomic counter - no mutex needed!
std::atomic<int> counter(0);

void increment_counter(int iterations) {
    for (int i = 0; i < iterations; ++i) {
        // This increment is atomic and thread-safe
        counter++;
    }
}

int main() {
    constexpr int num_threads = 10;
    constexpr int iterations_per_thread = 1000000;
    
    std::vector<std::thread> threads;
    
    for (int i = 0; i < num_threads; ++i) {
        threads.push_back(std::thread(increment_counter, iterations_per_thread));
    }
    
    for (auto& t : threads) {
        t.join();
    }
    
    std::cout << "Expected: " << num_threads * iterations_per_thread << std::endl;
    std::cout << "Actual: " << counter << std::endl;
    
    return 0;
}

Atomic operations are often lock-free and more efficient than mutexes for simple operations on built-in types. You can check if an atomic type is actually lock-free:

std::atomic<int> a;
std::cout << "Is lock free: " << std::boolalpha << a.is_lock_free() << std::endl;

For complex operations, atomic types provide compare-and-swap functionality:

#include <iostream>
#include <atomic>
#include <thread>
#include <vector>

std::atomic<int> value(0);

void increment_if_even() {
    int expected = 0;
    // Keep trying until we successfully update or find an odd value
    while (true) {
        expected = value.load();  // Get current value
        
        // If it's odd, our work is done
        if (expected % 2 != 0) {
            return;
        }
        
        // Try to update (but only if it hasn't changed)
        if (value.compare_exchange_strong(expected, expected + 1)) {
            // Success! We incremented it
            return;
        }
        
        // If we get here, someone else modified the value before we could
        // Loop and try again
    }
}

int main() {
    std::vector<std::thread> threads;
    
    for (int i = 0; i < 10; ++i) {
        threads.push_back(std::thread(increment_if_even));
    }
    
    for (auto& t : threads) {
        t.join();
    }
    
    std::cout << "Final value: " << value << std::endl;
    
    return 0;
}

Futures and Promises

While threads and mutexes are powerful, sometimes you need higher-level abstractions. C++11 introduced futures and promises for managing asynchronous results:

#include <iostream>
#include <future>
#include <thread>
#include <chrono>

// Function that performs calculation and returns a result
int calculate_answer(int input) {
    std::cout << "Calculating result for " << input << "..." << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(2));  // Simulate work
    return input * 42;
}

int main() {
    // Launch calculation asynchronously
    std::future<int> result_future = std::async(std::launch::async, calculate_answer, 10);
    
    std::cout << "Doing other work while calculation proceeds..." << std::endl;
    
    // Wait for and get the result when needed
    int answer = result_future.get();  // This blocks until the result is available
    
    std::cout << "The answer is: " << answer << std::endl;
    
    return 0;
}

You can also create promise/future pairs manually:

#include <iostream>
#include <future>
#include <thread>
#include <chrono>

void process_data(std::promise<int> result_promise) {
    std::cout << "Worker thread processing data..." << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(2));  // Simulate work
    
    // Set the result when done
    result_promise.set_value(42);
}

int main() {
    // Create promise and future
    std::promise<int> answer_promise;
    std::future<int> answer_future = answer_promise.get_future();
    
    // Launch worker thread with the promise
    std::thread worker(process_data, std::move(answer_promise));
    
    std::cout << "Main thread waiting for result..." << std::endl;
    
    // Get the result when it's ready
    int answer = answer_future.get();
    
    std::cout << "Got result: " << answer << std::endl;
    
    worker.join();
    return 0;
}

For multiple consumers of the same result, use std::shared_future:

#include <iostream>
#include <future>
#include <thread>
#include <vector>

void consumer(std::shared_future<int> future, int id) {
    std::cout << "Consumer " << id << " waiting for result..." << std::endl;
    int result = future.get();  // Multiple threads can call get()
    std::cout << "Consumer " << id << " got result: " << result << std::endl;
}

int main() {
    // Create promise and convert future to shared_future
    std::promise<int> promise;
    std::shared_future<int> shared_result = promise.get_future().share();
    
    // Create multiple consumers
    std::vector<std::thread> threads;
    for (int i = 0; i < 5; ++i) {
        threads.push_back(std::thread(consumer, shared_result, i));
    }
    
    std::cout << "Setting result..." << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(1));
    promise.set_value(42);
    
    for (auto& t : threads) {
        t.join();
    }
    
    return 0;
}

Considerations and Trade-offs

Advantages

  • Better performance on multi-core systems
  • Improved responsiveness in interactive applications
  • Better resource utilization
  • Natural modeling of independent tasks
  • Potentially simpler code for inherently parallel problems

Challenges

  • Increased complexity and risk of bugs
  • Difficult to debug race conditions
  • Potential for deadlocks and other concurrency hazards
  • Overhead of synchronization can negate performance benefits
  • Non-deterministic behavior makes testing harder

Pro Tip:

When designing concurrent programs, try to partition data to minimize sharing between threads. The less shared data, the fewer synchronization points needed, resulting in better performance and fewer potential bugs.

Practical Example: Parallel Image Processing

Let's look at a more practical example—applying a simple blur filter to an image in parallel:

#include <iostream>
#include <vector>
#include <thread>
#include <mutex>

// Simple struct representing an RGB pixel
struct Pixel {
    unsigned char r, g, b;
};

// Simulate a simple 2D image
class Image {
private:
    int width, height;
    std::vector<Pixel> pixels;

public:
    Image(int w, int h) : width(w), height(h), pixels(w * h, {0, 0, 0}) {
        // Initialize with some pattern
        for (int y = 0; y < height; ++y) {
            for (int x = 0; x < width; ++x) {
                Pixel& p = at(x, y);
                p.r = (x * 255) / width;
                p.g = (y * 255) / height;
                p.b = ((x+y) * 255) / (width + height);
            }
        }
    }

    Pixel& at(int x, int y) {
        return pixels[y * width + x];
    }

    const Pixel& at(int x, int y) const {
        return pixels[y * width + x];
    }

    int getWidth() const { return width; }
    int getHeight() const { return height; }
};

// Apply a simple blur filter to a region of the image
void blur_region(const Image& input, Image& output, int start_y, int end_y) {
    const int width = input.getWidth();
    const int height = input.getHeight();
    
    for (int y = start_y; y < end_y; ++y) {
        for (int x = 0; x < width; ++x) {
            // Simple 3x3 box blur (with boundary checks)
            int count = 0;
            int sum_r = 0, sum_g = 0, sum_b = 0;
            
            for (int dy = -1; dy <= 1; ++dy) {
                for (int dx = -1; dx <= 1; ++dx) {
                    int nx = x + dx;
                    int ny = y + dy;
                    
                    if (nx >= 0 && nx < width && ny >= 0 && ny < height) {
                        const Pixel& p = input.at(nx, ny);
                        sum_r += p.r;
                        sum_g += p.g;
                        sum_b += p.b;
                        count++;
                    }
                }
            }
            
            // Average the values
            Pixel& out = output.at(x, y);
            out.r = sum_r / count;
            out.g = sum_g / count;
            out.b = sum_b / count;
        }
    }
}

int main() {
    const int width = 1000;
    const int height = 1000;
    
    Image input(width, height);
    Image output(width, height);
    
    // Determine number of threads to use
    const int num_threads = std::thread::hardware_concurrency();
    std::cout << "Using " << num_threads << " threads" << std::endl;
    
    std::vector<std::thread> threads;
    
    // Divide the image into horizontal strips, one per thread
    const int rows_per_thread = height / num_threads;
    
    auto start_time = std::chrono::high_resolution_clock::now();
    
    // Create and launch threads
    for (int i = 0; i < num_threads; ++i) {
        int start_y = i * rows_per_thread;
        int end_y = (i == num_threads - 1) ? height : (i + 1) * rows_per_thread;
        
        threads.push_back(std::thread(blur_region, 
                                      std::ref(input), 
                                      std::ref(output), 
                                      start_y, 
                                      end_y));
    }
    
    // Wait for all threads to finish
    for (auto& t : threads) {
        t.join();
    }
    
    auto end_time = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
    
    std::cout << "Image blur completed in " << duration.count() << " ms" << std::endl;
    
    // In a real program, you would save the output image here
    
    return 0;
}

This example demonstrates a common pattern in parallel programming—dividing work into non-overlapping chunks that can be processed independently. Because each thread works on a separate portion of the image, no synchronization is needed during the actual processing, which maximizes performance.

Thread Pools

For applications that need to execute many small tasks concurrently, creating and destroying threads for each task can be inefficient. Thread pools solve this problem by maintaining a set of worker threads that process tasks from a queue:

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <functional>
#include <vector>

class ThreadPool {
private:
    std::vector<std::thread> workers;
    std::queue<std::function<void()>> tasks;
    
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;

public:
    ThreadPool(size_t num_threads) : stop(false) {
        for (size_t i = 0; i < num_threads; ++i) {
            workers.emplace_back([this] {
                while (true) {
                    std::function<void()> task;
                    
                    {
                        std::unique_lock<std::mutex> lock(queue_mutex);
                        
                        // Wait until there's a task or stop signal
                        condition.wait(lock, [this] { 
                            return stop || !tasks.empty(); 
                        });
                        
                        // Exit if stopping and no tasks
                        if (stop && tasks.empty()) {
                            return;
                        }
                        
                        // Get task from queue
                        task = std::move(tasks.front());
                        tasks.pop();
                    }
                    
                    // Execute the task
                    task();
                }
            });
        }
    }
    
    template<class F>
    void enqueue(F&& f) {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            if (stop) {
                throw std::runtime_error("Enqueue on stopped ThreadPool");
            }
            tasks.emplace(std::forward<F>(f));
        }
        condition.notify_one();
    }
    
    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            stop = true;
        }
        
        condition.notify_all();
        
        for (std::thread &worker: workers) {
            worker.join();
        }
    }
};

int main() {
    // Create a thread pool with 4 worker threads
    ThreadPool pool(4);
    
    // Enqueue some tasks
    for (int i = 0; i < 8; ++i) {
        pool.enqueue([i] {
            std::cout << "Processing task " << i 
                      << " on thread " << std::this_thread::get_id() << std::endl;
            
            // Simulate work
            std::this_thread::sleep_for(std::chrono::milliseconds(500));
            
            std::cout << "Completed task " << i << std::endl;
        });
    }
    
    // Sleep to allow tasks to finish before pool is destroyed
    std::this_thread::sleep_for(std::chrono::seconds(5));
    
    return 0;
}

Thread pools provide several advantages:

  • Reduced overhead of thread creation and destruction
  • Better control over system resource usage
  • Simplified task submission model
  • Ability to implement load balancing and priority queues

Note:

Starting with C++17, the standard library provides std::execution policies that allow parallel algorithm execution without explicitly managing threads. In C++20, coroutines provide another approach to concurrent programming, especially for asynchronous tasks.

Conclusion

Concurrent programming in C++ offers powerful tools to take advantage of modern multi-core processors. The C++11 threading library provides a comprehensive set of low-level primitives, while higher-level abstractions like futures, promises, and parallel algorithms make it easier to write correct concurrent code.

When writing concurrent programs, always be mindful of data sharing and synchronization needs. Race conditions, deadlocks, and other concurrency bugs can be notoriously difficult to debug. Using appropriate abstractions and following best practices can help you harness the power of concurrency while minimizing its pitfalls.

Whether you're writing high-performance scientific computing applications, responsive GUIs, or networked services, understanding C++ concurrency is an invaluable skill in today's multi-core computing environment.

C++の並行処理とマルチスレッドプログラミングは、多くの応用分野で重要な役割を果たしています。 スレッド、ミューテックス、アトミック型などのC++11で導入された機能を使うことによって、 マルチコアプロセッサの能力を最大限に活用した効率的なプログラムを作成することができます。 この記事が皆様のC++プログラミングスキル向上に役立つことを願っています。