Concurrency#

Preparation#

Read the following:

from the book Grokking concurrency by Bobrov

Introduction#

Exercise 1

Look out the window, and observe the world around you. Do you see

  • individual things moving, i.e., in a sequential manner?

  • things happening simultaneously and interacting with each other, i.e., in concurrency?

Exercise 2

Come up with three examples of coordination from daily life.

Exercise 3

Why is concurrency essential in software engineering?

Building blocks of concurrency#

  • Processes

  • Threads

https://upload.wikimedia.org/wikipedia/commons/2/25/Concepts-_Program_vs._Process_vs._Thread.jpg

Fig. 3 Program vs. Process vs. Thread Scheduling, Preemption, Context Switching
CC BY-SA 4.0. By Hooman Mallahzadeh. Source: Wikimedia Commons
#

https://upload.wikimedia.org/wikipedia/commons/8/83/Process_states.svg

Fig. 4 Process states
Public domain. By No machine-readable author provided. A3r0 assumed (based on copyright claims).. Source: Wikimedia Commons
#

Multiple threads for the pizza-ordering service#

from socket import create_server, socket
from threading import Thread

BUFFER_SIZE = 1024
ADDRESS = ("127.0.0.1", 12345)


class Handler(Thread):
    def __init__(self, conn: socket):
        super().__init__()
        self.conn = conn

    def run(self) -> None:
        """Serve the incoming connection in a thread by sending and
        receiving data."""
        print(f"Connected to {self.conn.getpeername()}")
        try:
            while True:
                data = self.conn.recv(BUFFER_SIZE)
                if not data:
                    break
                try:
                    order = int(data.decode())
                    response = f"Thank you for ordering {order} pizzas!\n"
                except ValueError:
                    response = "Wrong number of pizzas, please try again\n"
                print(f"Sending message to {self.conn.getpeername()}")
                # send a response
                self.conn.send(response.encode())
        finally:
            print(
                f"Connection with {self.conn.getpeername()} "
                f"has been closed"
            )
            self.conn.close()


class Server:
    def __init__(self) -> None:
        try:
            print(f"Starting up at: {ADDRESS}")
            self.server_socket = create_server(ADDRESS)
        except OSError:
            self.server_socket.close()
            print("\nServer stopped.")

    def start(self) -> None:
        """Start the server by continuously accepting and serving incoming
        connections."""
        print("Server listening for incoming connections")
        try:
            while True:
                conn, address = self.server_socket.accept()
                print(f"Client connection request from {address}")
                thread = Handler(conn)
                thread.start()
        finally:
            self.server_socket.close()
            print("\nServer stopped.")


if __name__ == "__main__":
    server = Server()
    server.start()

Exercise 4

Analyze the code above. What has changed compared to the previous code?

🎉 We are able to server many customers now.

Behavior of recv(BUFFER_SIZE)

  • blocks until data arrives, i.e., thread sleeps (this is an optimization by the OS)

  • returns b'' when the connection is closed.

Threads are used in:

  • Apache web server MPM prefork module

  • servlets in Jakarta EE (< version 3)

  • Spring framework (< version 5)

  • Ruby on Rails’ Phusion Passenger

  • Python Flask

Exercise 5

Do you see any problems with the threaded solution knowing that your pizza service can expect millions of requests concurrently?

Hint

Imagine each second a thread requires about 100 us execution time. You have two threads and OS needs about 1 us to switch between threads. How much time does each thread get per second?

Now do the same calculation with 10000 threads.

Why did we use threads?

  • We need to wait for replies, i.e., I/O.

We used blocking I/O

Blocking I/O#

Any I/O operation is sequential by nature

  • we send a signal and wait for a response

title Blocking recv() Call

participant "Application" as App
participant "User Space" as User
participant "OS" as OS

App -> User: recv()
User -> OS: Check for data

alt No data ready
    OS -> OS: Wait for data
end

alt Data is ready
    OS -> OS: Copy the data
    OS -> User: Copy data from OS to user space
end

User -> App: Process data

Exercise 6

Is I/O a bottleneck for the following scenarios?

  • web server

  • conventional desktop application

Threads are a way of context switching.

  • CPU-bound

    • context switching does not have any benefits, because CPU is busy anyway

  • I/O-bound

    • CPU can serve task B, while task A blocks

Is there an alternative?

  • Non-blocking I/O

Non-blocking I/O#

Request an I/O operation, but not wait for the OS to wake you up again.

import typing as T
from socket import socket, create_server

BUFFER_SIZE = 1024
ADDRESS = ("127.0.0.1", 12345)


class Server:
    clients: T.Set[socket] = set()

    def __init__(self) -> None:
        try:
            print(f"Starting up at: {ADDRESS}")
            self.server_socket = create_server(ADDRESS)
            self.server_socket.setblocking(False)
        except OSError:
            self.server_socket.close()
            print("\nServer stopped.")

    def accept(self) -> None:
        try:
            conn, address = self.server_socket.accept()
            print(f"Connected to {address}")
            conn.setblocking(False)
            self.clients.add(conn)
        except BlockingIOError:
            pass

    def serve(self, conn: socket) -> None:
        """Serve the incoming connection by sending and receiving data."""
        try:
            while True:
                data = conn.recv(BUFFER_SIZE)
                if not data:
                    break
                try:
                    order = int(data.decode())
                    response = f"Thank you for ordering {order} pizzas!\n"
                except ValueError:
                    response = "Wrong number of pizzas, please try again\n"
                print(f"Sending message to {conn.getpeername()}")
                conn.send(response.encode())
        except BlockingIOError:
            pass

    def start(self) -> None:
        """Start the server by continuously accepting and serving incoming
        connections."""
        print("Server listening for incoming connections")
        try:
            while True:
                self.accept()
                for conn in self.clients.copy():
                    self.serve(conn)
        finally:
            self.server_socket.close()
            print("\nServer stopped.")


if __name__ == "__main__":
    server = Server()
    server.start()

Exercise 7

Analyze the code above.

  1. What has changed compared to the previous code?

  2. Do you see any problems with this code?

  3. What would be an improvement?

  • Requires polling

  • Improvement: event-driven programming

Event-based concurrency#

  • event loop is the heart & soul of JavaScript

Event-driven pizza-ordering service#

Best practice for scaling#

Refer to Concurrency.

High-level tools#

We mostly talked about low-level concepts until now. Here are some high-level tools

Backend tools

  • in Kubernetes, applications are built as microservices that communicate asynchronously

  • Distributed locking Redis

  • Kubernetes Jobs & CronJobs enable running batch processes concurrently.

  • message queues

    • Apache Kafka

      • a distributed event store and stream-processing platform.

  • Load testing tool JMeter

    • Chapter 8, Mohan, Full Stack Testing, 2022 covers JMeter

Other Resources#

  • Web page loading speed affects user bounce

  • from the book Full stack testing by Mohan

    • section Performance, sales, and weekends off are correlated!

  • from the book Multithreaded JavaScript by Hunter et.al.

    • chapter 8, section When Not to use

Summary#

  • in client-server applications interacting via inter-process communication, concurrency is unavoidable

  • In I/O bound code, the processor often spends doing nothing

  • blocking interfaces do not return before completion, non-blocking return immediately

  • If the tasks are long-running, then OS threads are a good choice. Otherwise busy-waiting can save context switching.

Appendix#

I/O multiplexing#

  • an event notification system

  • We don’t have to keep track of all socket events as in the busy-waiting approach

  • applications asks OS with select to monitor, until data is ready

  • a non-blocking event loop

  • we concurrently perform several I/O operations using the same thread

Examples

  • POSIX poll

  • Linux epoll

  • FreeBSD macOS kqueue

  • Windows IOCP

Reactor pattern#

reactor pattern

is an event handling strategy that can respond to many potential service requests concurrently. The pattern’s key component is an event loop, running in a single thread or process, which demultiplexes incoming requests and dispatches them to the correct request handler

Reactor pattern:

  • only one execution thread is employed

  • avoids locks

  • avoids the overhead of threads

  • allows event-driven concurrency

  • synchronous processing of events, but asynchronous I/O processing

Reactor libraries & frameworks

  • libevent

  • libuv

    • abstraction layer on top of other low level I/O libraries

    • used in node.js

  • Java NIO

    • Java APIs that offer features for intensive I/O applications

  • Nginx

  • Vert.x

    • event-driven networking

I/O models#

  • synchronous, blocking

    • most common

    • app in the user space makes a system call which is blocked.

    • blocks until the system call completes

  • synchronous, non-blocking

    • app accesses the I/O device in non-blocking mode

    • OS returns the call immediately

    • inefficient busy-wait pattern

  • asynchronous, blocking

    • example: reactor pattern

    • uses non-blocking mode for I/O operations, but the notification is blocked

    • avoids busy-waiting

    • if the notification system is performant, good for highly performant I/O

    • select

  • asynchronous, non-blocking

    • returns immediately

    • a callback is used when a response arrives

    • enjoys extra processor time

    • performs well with high-performance I/O

These are low level models

  • an application framework can provide I/O access using synchronous blocking through background threads

    • but an asynchronous interface for developers using callbacks

  • or vice versa

Asynchronous AIO in Linux

allows applications to initiate one or more I/O operations that are performed asynchronously (i.e., in the background)