resta fechas sqlite python – Stack Overflow en español

¡Gracias por contribuir en StackOverflow en español con una respuesta!

  • Por favor, asegúrate de responder a la pregunta. ¡Proporciona información y comparte tu investigación!

Pero evita

  • Pedir ayuda o aclaraciones, o responder a otras respuestas.
  • Hacer declaraciones basadas en opiniones; asegúrate de respaldarlas con referencias o con tu propia experiencia personal.

Para obtener más información, consulta nuestros consejos sobre cómo escribir grandes respuestas.

python – producer-consumer Pipeline problem implementation in asyncio

I wrote this code to make a non-blocking manager along with pipeline operations using asyncio, my main concern is to catch received items producer, and when the received operation is complete. I want to return all together and merged if keys matches, however, I have a doubt where should I have to join the data at the end producer or consumer in my case that the current workflow is the following

1 – scrape all databases ( multiples clients) (simulate)

2 – pushes to manager (proxy level server) where multiples clients send its data to the manger

3 – Merge multiple data sources into one list as per incoming data no DB operations yet example {"ID-2002-0201": {"id":"ID-2002-0201","updated_at":"2018-05-14T22:25:51Z","html_url":"xxxxxxxxxxxx"}} > maybe producer

4 – use a get_or_create ( check in the database if there isnt a record with that data , otherwise create it) > consumer

5 – create a bulk of data (maybe chunk the data into smaller chunks to be scalable when growing data source from 2 to 100+) > consumer

server.py

# #!/usr/bin/env python3
import asyncio
import logging
import random
from pipeline_async import Pipeline

class A:
    def __init__(self):
        pass

    def run(self):
        return {"ID-2002-0201":{"id":"ID-2002-0201","updated_at":"2018-05-14T22:25:51Z","html_url":"xxxxxxxxxxxx"}}

class B:
    def __init__(self):
        pass

    def run(self):
        return {"ID-2002-0202":{"id":"ID-2002-0202","updated_at":"2018-05-14T22:25:51Z","html_url":"xxxxxxxxxxxx"}}

class Manager:

    async def producer(self, pipeline, data_sources):
        """Pretend we're getting a number from the network."""
        for data_stream in data_sources:
            await pipeline.set_message(data_stream.run(), "Producer")
            logging.info("Producer got message: %s", data_stream)

    async def consumer(self, pipeline):
        """ Pretend we're saving a number in the database. """
        while True:
            # wait for an item from the Producer
            message = await pipeline.get_message("Consumer")
            # process the msg
            logging.info(
                "Consumer storing message: %s", message
            )
            # simulate i/o operation using sleep
            await asyncio.sleep(random.random())
            pipeline.task_done()

    async def start(self):
        pipeline = Pipeline()
        data_sources = (A(),B())
        # schedule the consumer
        consume = asyncio.ensure_future(self.consumer(pipeline))
        # run the producer and wait for completion
        await self.producer(pipeline, data_sources)
        # wait until the consumer has processed all items
        await pipeline.join()
        # the consumer is still awaiting for an item, cancel it
        consume.cancel()
        logging.info("Successfully shutdown the service.")

if __name__ == '__main__':
    asyncio.run(Manager().start())

pipeline.py

class Pipeline(asyncio.Queue):
    def __init__(self):
        super().__init__(maxsize=10)

    async def get_message(self, name):
        logging.debug("%s:about to get from queue", name)
        value = await self.get()
        logging.debug("%s:got %s from queue", name, value)
        return value

    async def set_message(self, value, name):
        logging.debug("%s:about to add %s to queue", name, value)
        await self.put(value)
        print(name, value)
        logging.debug("%s:added %s to queue", name, value)

I would appreciate some feedback on cases I missed

Listas e sub listas em python

Oi, gente!!
Estou fazendo um trabalho com listas.
O problema é o seguinte: eu queria criar 3 listas.

Exemplo:

Lista 1: (50, 80)

Lista 2: (casa, apto)

Lista 3: ((50, casa), (80, apto))

Então, a cada cada laço eu preciso criar uma lista só com valores, outra lista só com os tipos, e a última lista deve ter os dois elementos. O problema é que eu não consigo fazer isso.

n = int(input())
lista_valores = ()
lista_tipos = ()
lista = ()
for i in range(n):
    valores = input()
    lista_valores.append(int(valores))
    lista.append(list(valores))

    tipos = input()
    lista_tipos.append(tipos)
    lista.append(tipos)

print(lista_valores)
print(lista_tipos)
print(lista)

python – ¿Por que no esta ejecutandose el while en mi codigo?

Estoy haciendo un ejercicio basico en python dado que estoy aprendiendo. Al usar la funcion while no estoy logrando que me siga el bucle. El codigo es el siguiente:

peso=int(input("Ingrese peso de persona: "))
edad=int(input("Ingrese edad de persona: "))
niños=0
jovenes=0
adultos=0
viejos=0
continuar=1
while continuar==1:
    if edad <=12:
        print("Categoria Niños")
    elif edad <=29:
        print("Categoria Jovenes")
    elif edad <=59:
        print("Categoria Adultos")
    elif edad >60:
        print("Categoria Viejos")
    continuar=input("Desea continuar? (1) si (0) no: ")

    
print("---FIN---")

Por que motivo al dar en la variable continuar = 1 me da el print (FIN) y no vuelve nuevamente a consultarme los datos del inicio? que es lo que esta mal en mi sentencia?

Gracias

python 3.x – Multiple HTTP requests with threading and queues

Im working on a I/O bounds application where I want to learn how to use threading property as well as queues to minimize the CPU usage as well as RAM resources. My plan was to use threading queues to do it and this is what I have done so far

# System modules
import time
from queue import Queue
from threading import Thread
from loguru import logger
import requests

feed_urls = (
    'http://www.foxnews.com/',
    'http://www.cnn.com/',
    'http://europe.wsj.com/',
)

# Set up some global variables
num_fetch_threads = 5
queue_exploring = Queue()
queue_monitoring = Queue()
save_length = {}


def get_requests(url):
    return len(requests.get(url).text)


def send_notifcation(queue: Queue):
    while True:
        url = queue.get()
        logger.info(f'Sending notifications: {url}')
        # FIXME Send notifications
        queue.task_done()


def explore_links(queue: Queue):
    """This is the worker thread function.
    It processes items in the queue one after
    another.  These daemon threads go into an
    infinite loop, and only exit when
    the main thread ends.
    """
    while True:
        url = queue.get()
        get_data_length = get_requests(url)

        if save_length(url) != get_data_length:
            logger.info(f"New changes on the webpage found! -> {url}")
            queue_monitoring.put(url)

        logger.info(f"No new changes in the page found! -> {url}")
        time.sleep(60)

        # Add back to queue
        queue.put(url)
        queue.task_done()


for i in range(num_fetch_threads):
    worker_one = Thread(target=explore_links, args=(queue_exploring,))
    worker_two = Thread(target=send_notifcation, args=(queue_monitoring,))

    worker_one.setDaemon(True)
    worker_two.setDaemon(True)
    worker_one.start()
    worker_two.start()


def main():
    logger.info('*** Main thread waiting ***')
    for url in feed_urls:
        response = get_requests(url)
        save_length(url) = response
        queue_exploring.put(url)

    queue_exploring.join()
    queue_monitoring.join()
    logger.info('*** Done ***')


if __name__ == '__main__':
    main()

The idea is that we want forever loop and see if a webpage has done any changes and if they have done any changes, we want to be notified that there has been a change. Simple as that. However here I do use multiple threads as well as queues, one for monitoring to see if there has been a change in the requests and the second one is to send notification

Python alguém pode me ajudar

A língua do pê é uma lingua criada numa brincadeira de crianças. Na língua do pê cada palavra é repetida com o acréscimo de uma sílaba começada em P. Exemplo: “pedro tinha uma casa bonita” vira “pepedro patinha peuma pacasa pobonita”. Faça um programa que identifique se uma palavra lida pode ser, ou não da língua do pê, isto é, se ela começa com “pa”, “pe”, “pi”, “po” ou “pu”.

python – Read Data from a serial port and write to influxdb

I have a energy meter which sends the kWh count periodically every few seconds via a serial port.
To store this data I write the counter value and the calculated average power of the last 10s in a influxdb measurment.

To calculate the power i use a loop that compares the current counter value with the one 10s ago. To avoid that the loop is interrupted by reading the serial interface or sending the data via http i use serial_asyncio and aiohttp.

I am pretty new to Python and got my script running by doing serveral asyncio tutorials, so i am not sure if i mixed some old and new syntax.

The script is working, but I am not happy with a few things.

  1. Is there a better way to pass the counter value from the serial data_received function to my calc_delta function without the use of global?

  2. Am I using asyncio properly? I found different examples using asyncio.ensure_future() or loop.run_until_complete().

  3. Is the overall structure okay, what could i have done better?

import aiohttp
import asyncio
from datetime import datetime
import serial_asyncio
import serial
import time


value = None
last_value = 0
token = "myInfluxdbToken"


class Input(asyncio.Protocol):
    data_buffer = ""

    def connection_made(self, transport):
        self.transport = transport
        print("port opened", transport)

    def data_received(self, data):
        global value

        self.data_buffer += data.decode("utf-8")
        # find counter value in received string
        if "eHZ" in self.data_buffer:
            data_tmp = self.data_buffer.replace("rn", "")
            value_str = data_tmp.partition("1.8.1*255(")(2)(:11)

            if len(value_str) == 11:
                value = float(value_str)
                print(str(value))
                asyncio.ensure_future(send_data(value, "counter"))
            # Reset the data_buffer!
            self.data_buffer = ""


async def calc_delta():
    global value, last_value
    # calculate counter delta
    while True:
        delta = 0
        print(value)
        if value:
            delta = value - last_value
            last_value = value
        # 10s * 3600s/h * 1000W  Calculates Avg Power of last 10s from kWh delta
        deltaW = delta / 10 * 3600 * 1000
        if deltaW < 50000:
            await send_data(deltaW, "deltaW")
            print(time.perf_counter(), deltaW)
        else:
            print(time.perf_counter(), "data invalid")

        await asyncio.sleep(10)


async def send_data(data, tag):
    async with aiohttp.ClientSession() as session:
        async with session.post(
            "http://localhost:8086/api/v2/write?org=my_org&bucket=testdata&precision=s",
            data=f"ehzdata,type={tag} value={data}",
            headers={"Authorization": f"Token {token}"},
        ) as response:
            print("Status:", response.status)


loop = asyncio.get_event_loop()
serial_coro = serial_asyncio.create_serial_connection(
    loop, Input, "/dev/ttyUSB0", baudrate=9600, parity=serial.PARITY_EVEN, bytesize=7
)
counter_coro = calc_delta()
asyncio.ensure_future(serial_coro)
loop.run_until_complete(counter_coro)
loop.run_forever()
loop.close()

readable validation with python regex and simple logic

I have come with such a code for checking IP addresses. But would like to know how clean and good it is from 1 to 10. What I care about are readability and simplicity. Please give me any feedback.

Should I go here with unittests?

import re
import logging

logging.basicConfig(level=logging.ERROR)

pattern = re.compile(r'^(d{1,3}).(d{1,3}).(d{1,3}).(d{1,3})$')

def is_in_bounds(s): 
    return int(s) >= 0 and int(s) <= 255

def is_leading_zeros(s):
    if s is not '0':
        return s(0) is not '0'
    return True

def validate_ipv4(ipv4: str, expectFail: bool = False) -> bool:
    logging.info(f"Validating IP '{ipv4}' START.")

    try:
        match = pattern.match(ipv4)
        assert(match)

        groups = match.groups()
        assert(len(groups)==4)

        for s in match.groups():
            assert(is_leading_zeros(s))
            assert(is_in_bounds(s))
    
        logging.info(f"Validating IP '{ipv4}' SUCCESS.")

    except AssertionError:
        logging.info(f"Validating IP '{ipv4}' FAILED.", exc_info=True)
        if not expectFail:
            raise


if __name__ == '__main__':
    octets = ()
    octets.extend(range(0,3))
    octets.extend(range(9,12))
    octets.extend(range(98,102))
    octets.extend(range(198,202))
    octets.extend(range(250,256))

    for i in octets:
        for j in octets:
            for k in octets:
                for l in octets:
                    validate_ipv4(f'{i}.{j}.{k}.{l}')

    octets = ()
    octets.extend(range(-3,0))
    octets.extend(range(256, 260))  

    exceptions = ()

    for i in octets:
        for j in octets:
            for k in octets:
                for l in octets:
                    validate_ipv4(f'{i}.{j}.{k}.{l}', True)

```

bip32 hd wallets – Generating Derivation keys from root XPRV in Python

Here’s what I’m currently doing in JavaScript, I was wondering if there’s anyone who knows how to recreate this functionality in Python.

all I need to do is: take a base58 xprv (rootKey) -> derive path m/44/0/0 (node) -> output new xprv

const bitcoin = require('bitcoinjs-lib');
const bip32 = require('bip32');

let rootKey = 'xprv9s21ZrQH143K32CfqoCQHtbKiEb5BqFazXX6jCtNCnuty3gUjxS4CsXWi9rcNyHdjDVPiC6P1bnyEZr2ioouRq56h6HAdwejeTty1BsSEtL'
const node = bip32.fromBase58(rootKey, bitcoin.networks.bitcoin);

//legacy account extended private key
console.log(node.derivePath("m/44'/0'/0").toBase58())
//xprv9y7nL2k2diLdA9bmop94K8Pec2WdL998PLdvTraxj5jiQ36gnLmhVJG4bY8FzgqZPaBM5HDBi9sgRK1ZCi14n5j4Pk1o6d3j4Y6T9NS4nA6

Quick Sort Array python – Stack Overflow

in this program im trying to get the total number of swaps and the amount of time, it takes to sort the random array. The time part seems to be working correctly but the swap count is not. it just currently prints 4,3,3,2,2,1 and so on. Please help me to figure out why the swap counter is not working correctly.

import random
import time



def partition(nums, low, high):
    swap = 0  
    pivot = nums((low + high) // 2)
    i = low - 1
    j = high + 1
    while True:
        i += 1
        while nums(i) < pivot:
            i += 1
            swap+=1

        j -= 1
        while nums(j) > pivot:
            j -= 1
            swap+=1

        if i >= j:
            return j

        nums(i), nums(j) = nums(j), nums(i)
        print('swaps',swap)


def quick_sort(nums):
    
    def _quick_sort(items, low, high):
        if low < high:
           
            split_index = partition(items, low, high)
            _quick_sort(items, low, split_index)
            _quick_sort(items, split_index + 1, high)

    _quick_sort(nums, 0, len(nums) - 1)



A = (random.randint(0, 1000) for x in range(8))
print("Unsorted array", A)
start = time.time()
quick_sort(A)
end = time.time()
print(f"Runtime of the program is {end - start}")
print("Sorted array", A)