Concurrency#
Preparation#
Read the following:
from the book Grokking concurrency by Bobrov
- chapter Building blocks of concurrency 
Introduction#
Exercise 1
Look out the window, and observe the world around you. Do you see
- individual things moving, i.e., in a sequential manner? 
- things happening simultaneously and interacting with each other, i.e., in concurrency? 
Exercise 2
Come up with three examples of coordination from daily life.
Exercise 3
Why is concurrency essential in software engineering?
Building blocks of concurrency#
- Processes 
- Threads 
 
Fig. 3 Program vs. Process vs. Thread Scheduling, Preemption, Context Switching
 CC BY-SA 4.0. By Hooman Mallahzadeh. Source: Wikimedia Commons#
Fig. 4 Process states
 Public domain. By No machine-readable author provided. A3r0 assumed (based on copyright claims).. Source: Wikimedia Commons#
Multiple threads for the pizza-ordering service#
from socket import create_server, socket
from threading import Thread
BUFFER_SIZE = 1024
ADDRESS = ("127.0.0.1", 12345)
class Handler(Thread):
    def __init__(self, conn: socket):
        super().__init__()
        self.conn = conn
    def run(self) -> None:
        """Serve the incoming connection in a thread by sending and
        receiving data."""
        print(f"Connected to {self.conn.getpeername()}")
        try:
            while True:
                data = self.conn.recv(BUFFER_SIZE)
                if not data:
                    break
                try:
                    order = int(data.decode())
                    response = f"Thank you for ordering {order} pizzas!\n"
                except ValueError:
                    response = "Wrong number of pizzas, please try again\n"
                print(f"Sending message to {self.conn.getpeername()}")
                # send a response
                self.conn.send(response.encode())
        finally:
            print(
                f"Connection with {self.conn.getpeername()} "
                f"has been closed"
            )
            self.conn.close()
class Server:
    def __init__(self) -> None:
        try:
            print(f"Starting up at: {ADDRESS}")
            self.server_socket = create_server(ADDRESS)
        except OSError:
            self.server_socket.close()
            print("\nServer stopped.")
    def start(self) -> None:
        """Start the server by continuously accepting and serving incoming
        connections."""
        print("Server listening for incoming connections")
        try:
            while True:
                conn, address = self.server_socket.accept()
                print(f"Client connection request from {address}")
                thread = Handler(conn)
                thread.start()
        finally:
            self.server_socket.close()
            print("\nServer stopped.")
if __name__ == "__main__":
    server = Server()
    server.start()
Exercise 4
Analyze the code above. What has changed compared to the previous code?
🎉 We are able to server many customers now.
Behavior of recv(BUFFER_SIZE)
- blocks until data arrives, i.e., thread sleeps (this is an optimization by the OS) 
- returns - b''when the connection is closed.
Threads are used in:
- Apache web server MPM prefork module 
- servlets in Jakarta EE ( - <version 3)
- Spring framework ( - <version 5)
- Ruby on Rails’ Phusion Passenger 
- Python Flask 
Exercise 5
Do you see any problems with the threaded solution knowing that your pizza service can expect millions of requests concurrently?
Hint
Imagine each second a thread requires about 100 us execution time. You have two threads and OS needs about 1 us to switch between threads. How much time does each thread get per second?
Now do the same calculation with 10000 threads.
Why did we use threads?
- We need to wait for replies, i.e., I/O. 
We used blocking I/O
Blocking I/O#
Any I/O operation is sequential by nature
- we send a signal and wait for a response 
 
Exercise 6
Is I/O a bottleneck for the following scenarios?
- web server 
- conventional desktop application 
Threads are a way of context switching.
- CPU-bound - context switching does not have any benefits, because CPU is busy anyway 
 
- I/O-bound - CPU can serve task B, while task A blocks 
 
Is there an alternative?
- Non-blocking I/O 
Non-blocking I/O#
Request an I/O operation, but not wait for the OS to wake you up again.
import typing as T
from socket import socket, create_server
BUFFER_SIZE = 1024
ADDRESS = ("127.0.0.1", 12345)
class Server:
    clients: T.Set[socket] = set()
    def __init__(self) -> None:
        try:
            print(f"Starting up at: {ADDRESS}")
            self.server_socket = create_server(ADDRESS)
            self.server_socket.setblocking(False)
        except OSError:
            self.server_socket.close()
            print("\nServer stopped.")
    def accept(self) -> None:
        try:
            conn, address = self.server_socket.accept()
            print(f"Connected to {address}")
            conn.setblocking(False)
            self.clients.add(conn)
        except BlockingIOError:
            pass
    def serve(self, conn: socket) -> None:
        """Serve the incoming connection by sending and receiving data."""
        try:
            while True:
                data = conn.recv(BUFFER_SIZE)
                if not data:
                    break
                try:
                    order = int(data.decode())
                    response = f"Thank you for ordering {order} pizzas!\n"
                except ValueError:
                    response = "Wrong number of pizzas, please try again\n"
                print(f"Sending message to {conn.getpeername()}")
                conn.send(response.encode())
        except BlockingIOError:
            pass
    def start(self) -> None:
        """Start the server by continuously accepting and serving incoming
        connections."""
        print("Server listening for incoming connections")
        try:
            while True:
                self.accept()
                for conn in self.clients.copy():
                    self.serve(conn)
        finally:
            self.server_socket.close()
            print("\nServer stopped.")
if __name__ == "__main__":
    server = Server()
    server.start()
Exercise 7
Analyze the code above.
- What has changed compared to the previous code? 
- Do you see any problems with this code? 
- What would be an improvement? 
- Requires polling 
- Improvement: event-driven programming 
Event-based concurrency#
- event loop is the heart & soul of JavaScript 
Event-driven pizza-ordering service#
Best practice for scaling#
Refer to Concurrency.
High-level tools#
We mostly talked about low-level concepts until now. Here are some high-level tools
Backend tools
- in Kubernetes, applications are built as microservices that communicate asynchronously 
- Distributed locking Redis 
- Kubernetes Jobs & CronJobs enable running batch processes concurrently. 
- message queues - 
- a distributed event store and stream-processing platform. 
 
 
- 
- Load testing tool JMeter - Chapter 8, Mohan, Full Stack Testing, 2022 covers JMeter 
 
Other Resources#
- from the book Full stack testing by Mohan - section Performance, sales, and weekends off are correlated! 
 
- from the book Multithreaded JavaScript by Hunter et.al. - chapter 8, section When Not to use 
 
Summary#
- in client-server applications interacting via inter-process communication, concurrency is unavoidable 
- In I/O bound code, the processor often spends doing nothing 
- blocking interfaces do not return before completion, non-blocking return immediately 
- If the tasks are long-running, then OS threads are a good choice. Otherwise busy-waiting can save context switching. 
Appendix#
I/O multiplexing#
- an event notification system 
- We don’t have to keep track of all socket events as in the busy-waiting approach 
- applications asks OS with - selectto monitor, until data is ready
- a non-blocking event loop 
- we concurrently perform several I/O operations using the same thread 
Examples
- POSIX - poll
- Linux - epoll
- FreeBSD macOS - kqueue
- Windows - IOCP
Reactor pattern#
- reactor pattern
- is an event handling strategy that can respond to many potential service requests concurrently. The pattern’s key component is an event loop, running in a single thread or process, which demultiplexes incoming requests and dispatches them to the correct request handler 
Reactor pattern:
- only one execution thread is employed 
- avoids locks 
- avoids the overhead of threads 
- allows event-driven concurrency 
- synchronous processing of events, but asynchronous I/O processing 
Reactor libraries & frameworks
I/O models#
- synchronous, blocking - most common 
- app in the user space makes a system call which is blocked. 
- blocks until the system call completes 
 
- synchronous, non-blocking - app accesses the I/O device in non-blocking mode 
- OS returns the call immediately 
- inefficient busy-wait pattern 
 
- asynchronous, blocking - example: reactor pattern 
- uses non-blocking mode for I/O operations, but the notification is blocked 
- avoids busy-waiting 
- if the notification system is performant, good for highly performant I/O 
- select
 
- asynchronous, non-blocking - returns immediately 
- a callback is used when a response arrives 
- enjoys extra processor time 
- performs well with high-performance I/O 
 
These are low level models
- an application framework can provide I/O access using synchronous blocking through background threads - but an asynchronous interface for developers using callbacks 
 
- or vice versa 
- Asynchronous AIO in Linux
- allows applications to initiate one or more I/O operations that are performed asynchronously (i.e., in the background)