asynchronous – Converting python code to async calls

I’m going to start out by saying this is the 1st python program I have ever written and have no real background in the language so my code is probably pretty rough. Take it easy on me!

When I wrote this it works fine for a small number of tickers but when I have a file with 6k+ it is very slow. Now I know there are other ways I can improve performance BUT I want to try and tackle the async thing first.

I thought it was as simple as making the read_ticker_file function async and adding await in front of the yfin_options() call but obviously that didn’t work.

I’m thinking possibly I need to restructure the way things are called but kind of stuck here. Hoping someone can point me in the right direction! Thanks in advance

import logging
import pyodbc
import config
import yahoo_fin as yfin
from yahoo_fin import options
from datetime import datetime, date
from selenium import webdriver


def main():
    read_ticker_file()


def init_selenium():
    driver = webdriver.Chrome(config.CHROME_DRIVER)
    return driver


def yfin_options(symbol):
    logging.basicConfig(filename='yfin.log', level=logging.INFO)
    logging.basicConfig(filename='no_options.log', level=logging.ERROR)

    try:

        # get all options dates (in epoch) from dropdown on yahoo finance options page
        dates = get_exp_dates(symbol)

        # iterate each date to get all calls and insert into sql db
        for date in dates:
            arr = yfin.options.get_calls(symbol, date)

            arr_length = len(arr.values)

            i = 0

            for x in range(0, arr_length):
                strike = str(arr.values(i)(2))
                volume = str(arr.values(i)(8))
                open_interest = str(arr.values(i)(9))
                convert_epoch = datetime.fromtimestamp(int(date))
                try:
                    sql_insert(symbol, strike, volume, open_interest, convert_epoch)
                    i += 1
                except Exception as insert_fail:
                    print("I failed at sqlinsert {0}".format(insert_fail))
            file_name_dir = "C:\temp\rh\options{0}{1}.xlsx".format(symbol, date)
            logging.info(arr.to_excel(file_name_dir))

    except Exception as e:
        bad_tickers_file_dir = config.BAD_TICKERS
        f = open(bad_tickers_file_dir, "a")
        f.write(symbol)
        f.write('n')


def sql_insert(symbol, strike, volume, open_interest, exp_date):
    conn_string = ('Driver={SQL Server};'
                   'Server={0};'
                   'Database={1};'
                   'Trusted_Connection=yes;').format(config.SERVER, config.DATABASE)
    conn = pyodbc.connect(conn_string)
    cursor = conn.cursor()

    insert_string = """INSERT INTO dbo.options (Ticker, Strike, Volume, OpenInterest, expDate)
                    VALUES
                    (?, ?, ?, ?, ?)"""

    cursor.execute(insert_string, symbol, strike, volume, open_interest, str(exp_date))

    conn.commit()


def get_exp_dates(symbol):
    url = "https://finance.yahoo.com/quote/" + symbol + "/options?p=" + symbol
    chromedriver = init_selenium()
    chromedriver.get(url)
    # Yahoo Finance options dropdown class name (find better way to do this)
    select_dropdown = chromedriver.find_element_by_css_selector("div(class='Fl(start) Pend(18px)') > select")
    options_list = (x for x in select_dropdown.find_elements_by_tag_name("option"))
    dates = ()
    for element in options_list:
        dates.append(element.get_attribute("value"))

    return dates


def read_ticker_file():
    file1 = open(config.TICKER_FILE, 'r')
    lines = file1.readlines()

    count = 0
    # loop to read each ticker in file
    for line in lines:
        count += 1
        line = line.strip('n')
        line = line.strip()
        yfin_options(line)


if __name__ == "__main__":
    main()

multithreading – Asynchronous Server in C++

We are looking to develop an asynchronous server in C++. We are coming from C#, which has built-in support for async-await networking. However, with C++ it appears as if it is basically mandatory to use ASIO, other you’re out of luck.

I am wondering if there is are any other options besides ASIO if we want to make an asynchronous server in C++? It would be preferable not to have to rely on a third party library.

rest – Asynchronous HTTP request pattern

Here is the pattern that I thought about for accepting a single asynchronous HTTP request at a time (i.e. a request which has not yet been fulfilled when the response is sent):

  1. Create the process:

    POST /process → 202
    POST /process → 409 "A request has already been accepted, retry later."
    
  2. Monitor its state:

    GET /process → 200 {"state": "ready"}
    GET /process → 200 {"state": "running", "progress": 0.75}
    GET /process → 200 {"state": "terminated", "product": "/product"}
    
  3. Retrieve its product:

    GET /product → 200 foo
    

The product is optional since it depends on the nature of the process (e.g. a video encoding process has a product while a machine shutdown process does not).

And here is the pattern that I thought about for accepting multiple asynchronous HTTP requests at a time:

  1. Create the processes:

    POST /process → 202 {"process": "/process/123"}
    POST /process → 202 {"process": "/process/456"}
    
  2. Monitor their states:

    GET /process/123 → 200 {"state": "ready"}
    GET /process/456 → 200 {"state": "ready"}
    GET /process/123 → 200 {"state": "running", "progress": 0.75}
    GET /process/456 → 200 {"state": "running", "progress": 0.50}
    GET /process/123 → 200 {"state": "terminated", "product": "/product/123"}
    GET /process/456 → 200 {"state": "terminated", "product": "/product/456"}
    
  3. Retrieve their products:

    GET /product/123 → 200 foo
    GET /product/456 → 200 bar
    

Again the product is optional since it depends on the nature of the process.

Now here are my questions:

  1. Are these two patterns the correct way to process asynchronous HTTP requests?
  2. Can asynchronous HTTP requests be idempotent?

A definition of idempotence is given in RFC 7231, § 4.2.2. ‘Idempotent Methods’:

A request method is considered “idempotent” if the intended effect on the server of multiple identical requests with that method is the same as the effect for a single such request. Of the request methods defined by this specification, PUT, DELETE, and safe request methods are idempotent.

So it seems to me that asynchronous HTTP requests cannot be idempotent with the pattern accepting multiple asynchronous HTTP requests at a time, since part of their intended effect is to always create resources on the server (e.g. /process/123 and /product/123). However it seems to me that asynchronous HTTP requests can be idempotent with the pattern accepting a single asynchronous HTTP request at a time, depending on their intended effect, since they do not have this systematic resource creation as part of their intended effect.

python – Asynchronous Web Scraper

This is my first asyncio/aiohttp web scraper I am trying to wrap my head around Python’s asyncio/aiohttp libs these days and I am not sure yet I fully understand it or not so I’d like have some constructive enhancement reviews here.

I’m scraping https://www.spoonflower.com/ which contains some public API’s for design data and pricing per fabric type data .My challenge was to get the design name, creator name and price of each design as per fabric type.Design name and creator name comes from this endpoint

https://pythias.spoonflower.com/search/v1/designs?lang=en&page_offset=0&sort=bestSelling&product=Fabric&forSale=true&showMatureContent=false&page_locale=en

and other pricing per fabric type data coming from this endpoint.

https://api-gateway.spoonflower.com/alpenrose/pricing/fabrics/FABRIC_’+ fab_type +’?quantity=1&shipping_country=PK&currency=EUR&measurement_system=METRIC&design_id=’+str(item(‘designId’))+’&page_locale=en

Each page has 84 items and 24 fabric types.I’m first getting all the names of the fabric types and storing in a list so I can loop through it and change the url dynamically then extracting designName and screenName from design page and finally extracting the price data.

Here is my code:

import asyncio
import aiohttp
import json
import requests
from bs4 import BeautifulSoup
from collections import OrderedDict


item_endpoint = 'https://pythias.spoonflower.com/search/v1/designs?lang=en&page_offset=0&sort=bestSelling&product=Fabric&forSale=true&showMatureContent=false&page_locale=en'

def get_fabric_names():
    res = requests.get('https://www.spoonflower.com/spoonflower_fabrics')
    soup = BeautifulSoup(res.text, 'lxml')
    fabrics = (fabric.find('h2').text.strip() for fabric in soup.find_all('div', {'class': 'product_detail medium_text'}))
    fabric = (("_".join(fab.upper().replace(u"u2122", '').split())) for fab in fabrics)
    for index in range(len(fabric)):
        if 'COTTON_LAWN_(BETA)' in fabric(index):
            fabric(index) = 'COTTON_LAWN_APPAREL'
        elif 'COTTON_POPLIN' in fabric(index):
            fabric(index) = 'COTTON_POPLIN_BRAVA'
        elif 'ORGANIC_COTTON_KNIT' in fabric(index):
            fabric(index) = 'ORGANIC_COTTON_KNIT_PRIMA'
        elif 'PERFORMANCE_PIQUÉ' in fabric(index):
            fabric(index) = 'PERFORMANCE_PIQUE'
        elif 'CYPRESS_COTTON' in fabric(index):
            fabric(index) = 'CYPRESS_COTTON_BRAVA'
    return fabric

async def fetch_design_endpoint(session, design_url):
    async with session.get(design_url) as response:
        extracting_endpoint = await response.text()
        _json_object = json.loads(extracting_endpoint)
        return _json_object('page_results')

async def fetch_pricing_data(session, pricing_endpoint):
    async with session.get(pricing_endpoint) as response:
        data_endpoint = await response.text()
        _json_object = json.loads(data_endpoint)
        items_dict = OrderedDict()
        for item in await fetch_design_endpoint(session, item_endpoint):
            designName = item('name')
            screenName = item('user')('screenName')
            fabric_name = _json_object('data')('fabric_code')
            try:
                test_swatch_meter = _json_object('data')('pricing')('TEST_SWATCH_METER')('price')
            except:
                test_swatch_meter = 'N/A'
            try:
                fat_quarter_meter = _json_object('data')('pricing')('FAT_QUARTER_METER')('price')
            except:
                fat_quarter_meter = 'N/A'
            try:
                meter = _json_object('data')('pricing')('METER')('price')
            except:
                meter = 'N/A'

            
            #print(designName, screenName, fabric_name, test_swatch_meter,fat_quarter_meter, meter)

            if (designName, screenName) not in items_dict.keys():
                items_dict((designName, screenName)) = {}
            itemCount = len(items_dict((designName, screenName)).values()) / 4
            return items_dict((designName, screenName)).update({'fabric_name_%02d' %itemCount: fabric_name,
            'test_swatch_meter_%02d' %itemCount: test_swatch_meter,
            'fat_quarter_meter_%02d' %itemCount: fat_quarter_meter,
            'meter_%02d' %itemCount: meter})
                

        

async def main():
    tasks = ()
    async with aiohttp.ClientSession() as session:
        fabric_type = get_fabric_names()
        design_page = await fetch_design_endpoint(session, item_endpoint)
        for item in design_page:
            for fab_type in fabric_type(0:-3):
                pricing_url = 'https://api-gateway.spoonflower.com/alpenrose/pricing/fabrics/FABRIC_'+ fab_type +'?quantity=1&shipping_country=PK&currency=EUR&measurement_system=METRIC&design_id='+str(item('designId'))+'&page_locale=en'
                print(pricing_url)
                await fetch_pricing_data(session, pricing_url)

                tasks.append(asyncio.create_task(
                    fetch_pricing_data(session, pricing_url)

                    )
                )

        content = await asyncio.gather(*tasks)
        return content
results = asyncio.run(main())
print(results)

Any ideas and suggestions are welcome to make this scraper more pythonic and smart.

magento2.4 – Does the default Magento 2.4.2 Require RabbitMQ Asynchronous message queueing?

Does Magento 2.4.2 (2.4.x) open source REQUIRE asynchronous message queueing (and thus installation of RabbitMQ) by default?

I was under the impression that ‘asynchronous message queueing’ is an option and not a requirement.

But after installation I see a common error out there with (“async.operations.all” skipped as required connection “amqp” is not configured”). With this error log message it seems the default installation requires amqp (Async message queue protocol) connection configuration and thus RabbitMQ — as a default setup — which means Asynchronous message queueing is REQUIRED for Magento 2.4 to work in a base/default setup.

Is this correct? AMQP and RabbitMQ are REQUIRED (not an option)?

I see lots of confusion on the net about this .. including this thread:
Consumer “async.operations.all” skipped as required connection “amqp” is not configured. Unknown connection name amqp

But in the above thread there doesn’t seem to be a consensus. It seems to me so far that RabbitMQ is required by default for Magento 2.4 to work properly. Can anyone confirm?

Thanks In Advance

Ray

how to avoid nested asynchronous subscriptions in RxJS?

I got this two subjects that both have a delay and I need the second one to be subscribed when the first one is completed, asynchronously. Both subjects only emit once (http requests) and I’m trying to avoid nested subscriptions but I can’t figure out an optimal solution.

What I’ve got so far: Stackblitz

const order = of('3- created order').pipe(delay(200));

const google = of('4- logged info to google').pipe(delay(200));

console.log('1- started creating order');
order.pipe(tap(() => google.subscribe(console.log))).subscribe(result => {
  console.log("2- didn't wait for google");
  console.log(result);
});

c# – Asynchronous Event Handler

This class acts as an asynchronous event handler that will execute all attached tasks in an async/await context. Requires Nuget Immutables. Example usage:

class MyEventArgs : EventArgs {}
async Task SomeAsyncMethodAobject src, EventArgs args) {
    Console.WriteLine("Task A...");
    await Task.Delay(2000);
}
async Task SomeAsyncMethodB(object src, EventArgs args) {
    Console.WriteLine("Task B...");
    await Task.Delay(1000);
}

static async Task Main(string() args) {
    AsyncEvent<MyEventArgs> Events;
    Events = new AsyncEvent<MyEventArgs>();
    Events += SomeAsyncMethodA;
    Events += SomeAsyncMethodB;
    await Events?.InvokeAsync(this, new MyEventArgs());
    // Use below to discard task and not event task to finish.
    // _ = Events?.InvokeAsync(this, new MyEventArgs()).ConfigureAwait(false);
}

Source for the AsyncEvent<EventArgsT> class:

// T is the EventArgs class type to pass to the callbacks on Invoke.
public class AsyncEvent<T> where T : EventArgs {
    // List of task methods to await.
    public ImmutableList<Func<object, T, Task>> Invokables;

    // on += add new callback method to AsyncEvent.
    public static AsyncEvent<T> operator+(AsyncEvent<T> source, Func<object, T, Task> callback) {
        if (callback == null) throw new NullReferenceException("Callback is null! <AsyncEvent<T>>");
        if (source == null) return null;
        if (source.Invokables == null) source.Invokables = ImmutableList<Func<object, T, Task>>.Empty;
        source.Invokables = source.Invokables.Add(callback);
        return source;
    }

    // on -= remove existing callback from AsyncEvent.
    public static AsyncEvent<T> operator -(AsyncEvent<T> source, Func<object, T, Task> callback) {
        if (callback == null) throw new NullReferenceException("Callback is null! <AsyncEvent<T>>");
        if (source == null) return null;

        source.Invokables = source.Invokables.Remove(callback);
        return source;
    }

    // Invoke the tasks asynchronously with a cancelation token.
    public async Task InvokeAsync(object source, T evArgs, CancellationToken token) {
        List<Task> tasks = new List<Task>();
        if (Invokables != null)
            foreach (var callback in Invokables)
                if (!token.IsCancellationRequested)
                    tasks.Add(callback(source, evArgs));

        await Task.WhenAll(tasks.ToArray());
    }

    // Invoke the tasks asynchronously.
    public async Task InvokeAsync(object source, T evArgs) {
        List<Task> tasks = new List<Task>();
        if (Invokables != null)
            foreach (var callback in Invokables)
                tasks.Add(callback(source, evArgs));

        await Task.WhenAll(tasks.ToArray());
    }
}

programming languages – How does asynchronous operations save time in I/O

I’m reading a book CLR via C# which says how Windows does asynchronous I/O:
enter image description here

To read data from the file, ReadAsync internally allocates a Task object to represent the pending completion of the read operation. Then, ReadAsync calls Win32’s ReadFile function (#1). ReadFile allocates its IRP (I/O Request Packet), and then passes it down to the Windows kernel (#3). Windows adds the IRP to the hard disk driver’s IRP queue (#4), but now, instead of blocking your thread, your thread is allowed to return to your code; your thread immediately returns from its call to ReadAsync (#5, #6,
and #7). Now, of course, the IRP has not necessarily been processed yet, so you cannot have code
after ReadAsync that attempts to access the bytes in the passed-in Byte(). Using the task object, you can call ContinueWith to register a callback method that should execute when the task completes and then
process the data in this callback method. Or, alternatively, you can use C#’s asynchronous function
feature to simplify your code by allowing you to write it sequentially.

The author also describes a scenario how asynchronous operations save time in I/O:

let’s say that your application wants to download 10 images from various websites, and that it takes 5 seconds to download each image. If you perform this work synchronously (downloading one image after another), then it takes you 50 seconds to get the 10 images. However, if you use just one thread to initiate 10 asynchronous download operations, then all 10 are being performed concurrently and all 10 images will come back in just 5 seconds! That is, when performing multiple synchronous I/O operations, the time it takes to get all the results is the sum of the times required for each individual result. However, when performing multiple asynchronous I/O operations, the time it takes to get all the results is the time required to get the single worst-performing operation.

I don’t understand how async operations can reduce time from 50 seconds to 5 seconds. Let’s change the NTFS Disk Driever to network adapter driver and change the read file async code to download file async code.

My understanding is, it is true that asynchronous operations allow fewer worker threads to handle incoming client’s I/O requests, contrasted by synchronous operations, every I/O request need a worker thread to handle, and when the synchronous operation takes time to complete, the worker thread get blocked too and therefore it cannot return to thread pool to serve other requests. But for network adapter driver, it can still only complete one I/O operation one at a time.

For example, let’s say the network adapter driver is handling the first I/O request (represented as a IRP queue) to download the first picture. Even though you use asynchronous operations, the worker thread that serve the first I/O request did return to thread pool to serve the second I/O request, but the network adapter driver is still processing the first I/O request to download the first picture, so the second I/O request as an IRP is still queuing in the network adapter driver. Network adapter driver only handle the second IRP after the first one is completed, so the network adapter driver still serve the second I/O request after 5 seconds, and so on, so it still need 50 seconds to download 10 pages. In other words, it is not worker threads blocking, it is the network adapter driver that’s blocking.

so how can how async operations can reduce time from 50 seconds to 5 seconds? Unless the network adapter driver itself is a small powerful computer that has multiple CPUs that can serve reuqests concurrently, but this is obviously not true, drivers like network adapter drivers are cheap devices, it can’t be having expensive components like CPUs?

architecture – Multithreading vs Asynchronous game loop for multiplayer online game?

I’m trying to create a turn based multiplayer online game where players can be grouped in a room/lobby and play (similar to Ludo but number of players can be more than 4). There can be multiple such rooms at a time.

Here is brief overview of the game: The game is not resource heavy. Each player gets his turn to perform some actions for some time (~90 seconds) and meanwhile other players just communicates with server to get the actions performed by the player. There can be multiple game rounds and a winner is declared at the end.

The game state of each room would be different at a given time. Therefore, to manage the game states for all the rooms, I could think of two approaches:

  1. Multithreading approach: Spawn a new thread for each room and run/manage the game loop and corresponding game states in room specific threads. But if there are too many rooms, the number of threads would be very large and this approach is not feasible as per this answer’s comment. However this approach is preferable for mass multiplayer games online games (MMO) as per this answer. I also think this should work for considerable number of rooms because most of the time, threads are waiting for player’s actions.
  2. Asynchronous approach: Here, the game states of all the rooms would be stored in server’s memory and whenever any requests is received from any player, the game state object will be retrieved using player’s room ID and based on the state, the server will command players in that room. The state will then be updated. There will be only one main thread here.

So my question is what would be the better approach for this type of games, considering the future scaling requirements? Also suggest any better alternatives which I couldn’t think of.

availability groups – What if I have Asynchronous replica within the AG?? this query still shows Synchronizing for the Asynchronous replicas?? SQL Server DAG

What if I have Asynchronous replica within the AG?? this query still shows Synchronizing for the Asynchronous replica i.e. Node3, Node4, Node5 and Node3 (cloud). Can’t I move forward, what can I do at this time??

FYI, I have already completed the 1st step

Basically, I am going to failover Node1 from on-premise to cloud Node1 and currently using distributed availability groups for syncing the two AG. I am going to failover the write node from Node1-onpremise to Node1-Cloud

https://docs.microsoft.com/en-us/sql/database-engine/availability-groups/windows/configure-distributed-availability-groups?view=sql-server-ver15&tabs=manual#failover

SELECT ag.name
, drs.database_id
, db_name(drs.database_id) as database_name
, drs.group_id
, drs.replica_id
, drs.synchronization_state_desc
, drs.last_hardened_lsn
FROM sys.dm_hadr_database_replica_states drs
INNER JOIN sys.availability_groups ag on drs.group_id = ag.group_id;

So My architecture is with SQL Server 2016 Enterprise edition

On-premise

Node1 (Global Primary – write node) (Synchronous),

Node2 (Synchronous),

Node3 (Asynchronous),

Node4 (Cloud – used for read-only purpose) (Asynchronous),

Node5 (Cloud – used for read-only purpose)(Asynchronous)

On Cloud

Node1 (Forwarder),

Node2 (used for read-only purpose) (Synchronous),

Node3 ( used for read-only purpose) (Asynchronous)