Archive for category Snippets

Happy Repdigit Day!

Today being February 2, 2022, I got to thinking about dates. Since most people will write today as “2/2/22”, today is one of the few days that can be written all using the same digit. I thought about how to find all such dates in a century, and the first thing I needed was a name for such dates. I came up with “uninumeric”, but a Google search turned up this Wikipedia article, which calls them repdigits, as in “repeated digits”.

SPOILER ALERT: the solution is shown at the end of this post. Try to come up with your own guess as to how many such dates there are before continuing.

We have to set some ground rules on this exercise first. Should January 1, 2001 count as a repdigit date? I’m going to say that leading zeroes can be dropped when finding repdigit dates, so this date, when written as “1/1/1”, would be a repdigit. Trailing zeroes cannot be dropped, so January 10, 2001 would not be a repdigit date.

I have some other date filters in mind, so I decided to create a CSV table of all the dates in a century, truncating the year to modulo 100 so that the list would apply to any modern century. (Leap days are always the fly in the ointment in such tables, and if I use the current century to make this table, it would omit February 29 from the 0 year. Fortunately, leap days could not qualify as repdigit dates, so I’m not going to worry about it today.)

I pulled up my trusty Python IDE and imported my simple table package littletable, plus some names from the Python datetime package:

from datetime import datetime, timedelta
import littletable as lt

I need something to give me all the days in a century, but unfortunately, Python’s range() builtin won’t allow this:

dates = range(datetime(2000, 1,1), datetime(2100, 1,1), timedelta(1))

range() only accepts int arguments.

So I needed to write a daterange function (I have said this so many times, that it might qualify as McGuire’s Law – Before you can ever do anything, you always have to do something else first. Unfortunately, McGuire’s Law is recursive.)

def daterange(from_: datetime, to_: datetime, step_: timedelta = timedelta(days=1)):
    cur = from_
    while cur < to_:
        yield cur
        cur += step_

This generator function follows the same convention as Python’s range(): it is inclusive of the starting value, but exclusive of the ending value.

With daterange, I can now create my littletable Table of dates in the current century:

dates = lt.Table()
dates.insert_many({"year": date.year % 100, "month": date.month, "day": date.day}
                  for date in daterange(datetime(2000, 1, 1), datetime(2100, 1, 1)))

For nice presentation, we can also add a str field (I’m showing year/month/day for the format, to avoid any discussions about why Americans write month/day/year while the rest of the world writes day/month/year – fortunately, when looking for repdigit dates, either style works the same):

dates.add_field("str",
                lambda rec: f"{rec.year:02d}/{rec.month:02d}/{rec.day:02d}")

Now to detect which dates are repdigit dates, we need a predicate function that takes a record from our table, and returns whether the date is a repdigit. A function to detect a repdigit string is just:

def is_repdigit(intstr: str):
    return len(set(intstr)) == 1

And for easy filtering, add a “YMD” field to all the records in our table, which contains the digits used to write each date, dropping leading zeroes:

dates.add_field("YMD", lambda rec: f"{rec.year}{rec.month}{rec.day}")

Getting and presenting the matching dates is now just:

repdigit_dates = dates.where(lambda rec: is_repdigit(rec.YMD))
repdigit_dates.present(caption=f"{len(repdigit_dates} dates")

Which shows these dates:

  Year   Month   Day   Str        Ymd     
 ──────────────────────────────────────── 
     1       1     1   01/01/01   111     
     1       1    11   01/01/11   1111    
     1      11     1   01/11/01   1111    
     1      11    11   01/11/11   11111   
     2       2     2   02/02/02   222     
     2       2    22   02/02/22   2222    
     3       3     3   03/03/03   333     
     4       4     4   04/04/04   444     
     5       5     5   05/05/05   555     
     6       6     6   06/06/06   666     
     7       7     7   07/07/07   777     
     8       8     8   08/08/08   888     
     9       9     9   09/09/09   999     
    11       1     1   11/01/01   1111    
    11       1    11   11/01/11   11111
    11      11     1   11/11/01   11111
    11      11    11   11/11/11   111111
    22       2     2   22/02/02   2222
    22       2    22   22/02/22   22222
    33       3     3   33/03/03   3333
    44       4     4   44/04/04   4444
    55       5     5   55/05/05   5555
    66       6     6   66/06/06   6666
    77       7     7   77/07/07   7777
    88       8     8   88/08/08   8888
    99       9     9   99/09/09   9999

26 dates

With this table, we can now try some other what-ifs (left as exercises for the reader):

  • dates that are repdigits if you don’t drop the leading zeroes
  • dates that are binary dates (written with just 0’s and 1’s)
  • dates that are palindromes (now you have to decide on month/day/year vs. day/month/year)

Rather than regenerate the table every time, I wrapped my table create code in the else clause of a try-except, trying to first load the table from a CSV:

try:
    dates = lt.Table().csv_import("dates_in_century.csv",
                                  transforms={}.fromkeys("year month day".split(), int))
except FileNotFoundError:
    dates = lt.Table()
    dates.insert_many({"year": date.year % 100, "month": date.month, "day": date.day}
                      for date in daterange(datetime(2000, 1, 1), datetime(2100, 1, 1)))
    dates.add_field("str",
                    lambda rec: f"{rec.year:02d}/{rec.month:02d}/{rec.day:02d}")
    dates.csv_export("dates_in_century.csv")

Leave a comment

How to try something ‘N’ times in Python

Photo by Andrea Piacquadio on Pexels.com

“If at first you don’t succeed, try, try again.”

There are many times when we will write Python code that might encounter temporary exceptions that will clear up if we would just have our code pause, take a breath, and then try again. This is actually pretty common with scripts that access some network resource, whether it is a web page or a REST API, or some other service that is either resource-constrained or has built in rate-limiting.

Here is an example of a function that might be part of a larger program to get the contents from a list of web URLs. The function is pretty simple: if successful, it returns the HTML contents of the page; if not, it raises an exception:

from contextlib import closing
import urllib.request

def get_data_for_url(url):
    with closing(urllib.request.urlopen(url, timeout=3)) as response:
        if response.status == 200:
            return response.read()
        else:
            raise Exception(f"failed with status {response.status}")

We may find that for some of these URLs, our request just times out or has some other temporary limit that causes it to fail, when a retry might succeed. So we could wrap the code in this function inside a while loop:

def get_data_for_url(url):
    while True:
        with closing(urllib.request.urlopen(url, timeout=3)) as response:
            if response.status == 200:
                return response.read()

        time.sleep(2)

(We add a short sleep in the loop, to give the web server a chance to “settle down” between retries.)

Unfortunately, this kind of “loop-forever” while loop is a pretty well-known anti-pattern. The request may fail for lots of reasons, some of which might not be cured by just trying again – for instance, if we had invalid request login credentials, or if the server was down. In that case, our script would just loop without stopping, since the request would continue to return the same “Not Authorized” or “Not Found” response over and over again.

So we should limit the number of times we try to get the web page. We could add a “number of tries” counter:

def get_data_for_url(url):
    num_tries = 0
    while True:
        with closing(urllib.request.urlopen(url, timeout=3)) as response:
            if response.status == 200:
                return response.read()

        time.sleep(2)

        num_tries += 1
        if num_tries > 5:
            raise Exception(f"failed with status {response.status}")

There are a couple of issues with this code. First off, there is an “off-by-one” error – this code will actually try 6 times, not 5. Easily fixed by changing “>” to “>=”. Also, our loop limit of 5 is buried down in the body of the code, and will take some searching when we decide to raise or lower the retry count. But more importantly, the code is very sensitive to the incrementing of num_tries. In fact, in my first test of writing this code, I left out the "num_tries += 1” statement, and I had another loop-forever loop. If at some point, you or another developer decide to refactor the code and add in a “continue” statement before the increment, then num_tries will stay 0 and you will be back to looping forever again.

In this code, I made a slight change here to change “while True” to “while num_tries < 5“. This makes the retry limit a little more prominent, but we still have the problem around incrementing num_tries.

def get_data_for_url(url):
    num_tries = 0
    while num_tries < 5:
        with closing(urllib.request.urlopen(url, timeout=3)) as response:
            if response.status == 200:
                return response.read()

        time.sleep(2)

        num_tries += 1

    raise Exception(f"failed with status {response.status}")

A good way to do auto-incrementing is, instead of using a while loop with a counter, to use a for loop over a range.

def get_data_for_url(url):
    for tries in range(5):
        with closing(urllib.request.urlopen(url, timeout=3)) as response:
            if response.status == 200:
                return response.read()

        time.sleep(2)

    raise Exception(f"failed with status {response.status}")

This is a big improvement, because now our counter will always increment, even if a “continue” gets inserted into the loop.

There is still a minor problem – on the last try, if we are about to give up, we still sleep for 2 seconds before exiting the loop and raising the exception. After the last try, there is really no need to sleep again before exiting the loop and raising the “we give up!” exception. It sounds like a minor issue, but it’s possible that other code might be needed between retries besides a simple sleep, and we really just want to quit looping at this point.

We could conditionalize the sleep with this kind of code:

            if tries < 4:
                time.sleep(2)

But if we decide to raise or lower the number of tries, we have to chase down and change this other value as well.

I’ve settled on this pattern for retrying code ‘n’ times.

    def get_data_for_url(url):
        for tries_remaining in reversed(range(5)):
            with closing(urllib.request.urlopen(url, timeout=3)) as response:
                if response.status == 200:
                    return response.read()

            if tries_remaining:
                time.sleep(2)

        raise Exception(f"failed with status {response.status}")

By reversing the range, the tries_remaining variable will count down from 4 to 0. Now the final try will always have a value of 0, so it can be easily checked as a boolean value – changing the variable name from “tries” to “tries_remaining” also makes this condition read a little more naturally.

Lastly, these values of 5 for number of tries and 2 for the retry interval are best defined at some global level, or in configuration settings for your script. So we can define these constants to simplify the maintenance of these values whenever they need to be updated.

NUM_PAGE_FETCH_TRIES = 5
PAGE_FETCH_TIMEOUT = 3
PAGE_FETCH_RETRY_WAIT = 2

def get_data_for_url(url):
    for tries_remaining in reversed(range(NUM_PAGE_FETCH_TRIES)):
        with closing(urllib.request.urlopen(url, timeout=PAGE_FETCH_TIMEOUT)) as response:
            if response.status == 200:
                return response.read()

        if tries_remaining:
            time.sleep(PAGE_FETCH_RETRY_WAIT)

    raise Exception(f"failed with status {response.status}")    

Now the purpose of these various constants is much clearer, and future maintainers will know where to make updates to these parameters without scanning through all the program code.

UPDATE: After posting on Twitter, I got a great reply from @GordonMcGregor, that he had written something similar as a decorator, and that this made for cleaner code. I gave it more thought, and this would make for a good decorator example – see below:

import functools
import time

# retry decorator
def retry(n: int, interval: int = 2):
    def _deco(fn):
        @functools.wraps(fn)
        def _inner(*args, **kwargs):
            for tries_remaining in reversed(range(n)):
                try:
                    return fn(*args, **kwargs):
                except Exception:
                    if not tries_remaining:
                        raise
                time.sleep(interval)
        return _inner
    return _deco


# example using the retry decorator
from contextlib import closing
import urllib.request
                    
@retry(3)
def get_data_for_url(url: str):
    with closing(urllib.request.urlopen(url, timeout=3)) as response:
        if response.status == 200:
            return response.read()
    raise Exception(f"failed with status {response.status}")

Leave a comment

Snippet: Class that never raises AttributeError

Sometimes we’d like to have a class that is tolerant of attribute accesses that weren’t anticipated at class design time, returning a default ‘None’ value for an uninitialized attribute. Using defaultdict to override the `__dict__` attribute makes this very easy:

from collections import defaultdict

class X(object):
    def __init__(self):
        self.__dict__ = defaultdict(lambda : None)
    def __getattr__(self, attr):
        return self.__dict__[attr]

Now we can initialize an object with some attributes, but access others and get back the default None:

x = X()
x.color = "RED"
x.size = 6

print x.color
print x.size
print x.material

prints:

RED
6
None

Of course, this defeats a key validation feature in Python, so you’ll need to take extra care that you specify attributes correctly.

Leave a comment

Snippet: Uniquify a sequence, preserving order

I came across some code today that used a set to keep track of previously seen values while iterating over a sequence, keeping just the not-seen-before items. A brute force kind of thing:

seen = set()
unique = []
for item in sequence:
    if not item in seen:
        unique.append(item)
        seen.add(item)

I remembered that I had come up with a simple form of this a while ago, using a list comprehension to do this in a single expression. I dug up the code, and wrapped it up in a nice little method. This version accepts any sequence or generator, and makes an effort to return a value of the same type as the input sequence:

def unique(seq):
    """Function to keep only the unique values supplied in a given 
       sequence, preserving original order."""
       
    # determine what type of return sequence to construct
    if isinstance(seq, (list,tuple)):
        returnType = type(seq)
    elif isinstance(seq, basestring):
        returnType = type(seq)('').join
    else:
        # - generators and their ilk should just return a list
        returnType = list

    try:
        seen = set()
        return returnType(item for item in seq if not (item in seen or seen.add(item)))
    except TypeError:
        # sequence items are not of a hashable type, can't use a set for uniqueness
        seen = []
        return returnType(item for item in seq if not (item in seen or seen.append(item)))

My first pass at this tried to compare the benefit of using a list vs. a set for seen – it turns out, both versions are useful, in case the items in the incoming sequence aren’t hashable, in which case the only option for seen is a list.

1 Comment

RK integrator

While moving files from my old laptop drive to my new one, I found a nice Runge-Kutta integrator class that I had written ages ago. So long ago, in fact, that I was a little embarrassed at the newbiness of some of my code. So I decided to update my code to get a nice RK class out of it, using list comprehensions instead of “for i in range” loops, and including an integrate method that acts as a generator so that the calling code can cycle through each integration step. As is typical in R-K, the system state is maintained in a vector X, and the calling method must provide a callback function that will return dX/dt.

Here is the class:

class RKIntegrator :
    "Class used to perform Runge-Kutta integration of set of ODE's"

    def __init__( self, dt, derivFunc, degree=0, initConds=None ):
        self.dt = float(dt)
        self.dt_2 = dt / 2.0
        self.t = float(0)
        if not (degree or initConds):
            raise ValueError("must specify degree or initial conditions")
        if initConds is not None:
            self.x = initConds[:]
        else:
            self.x = [0.0] * degree
        self.derivFunc = derivFunc


    def doIntegrationStep( self ):
        dt = self.dt
        dxFunc = self.derivFunc
        t2 = self.t + self.dt_2

        dx = dxFunc( self.t, self.x )
        delx0 = [ dx_i*dt for dx_i in dx ]
        xv = [x_i + delx0_i/2.0 for x_i, delx0_i in zip(self.x, delx0)]
        dx = dxFunc( t2, xv )
        delx1 = [ dx_i*dt for dx_i in dx ]
        xv = [x_i + delx1_i/2.0 for x_i,delx1_i in zip(self.x,delx1)]
        dx = dxFunc( t2, xv )
        delx2 = [ dx_i*dt for dx_i in dx ]
        xv = [x_i + delx1_2 for x_i,delx1_2 in zip(self.x,delx2)]
          
        self.t += dt
        dx = dxFunc(self.t, xv)
        
        self.x = [
            x_i + ( delx0_i + dx_i*dt + 2.0*(delx1_i + delx2_i) ) / 6.0
                for x_i, dx_i, delx0_i, delx1_i, delx2_i in 
                    zip(self.x, dx, delx0, delx1, delx2)
            ]
    
    def integrate(self):
        while True:
            self.doIntegrationStep()
            yield self.t, self.x

Here is an example of finding X with constant acceleration of 4:

def getDX( t, x ):
  return [ 
    x[1], 
    4.0 
    ]

isWhole = lambda x : abs(x-round(x)) < 1e6 

rk = RKIntegrator( dt=0.1, derivFunc=getDX, initConds = [0.0, 0.0] )
for t,x in rk.integrate():
    if t > 10: 
        break
    if isWhole(t):
        print(t, ', '.join('%.2f' % x_i for x_i in x))

Googling for ‘Python runge kutta’, I came across this blog posting:
http://doswa.com/blog/2009/01/02/fourth-order-runge-kutta-numerical-integration/
This does a good job, but hardcodes the vector size to just x, velocity, and acceleration. Here is how my R-K integrator would implement Doswa’s code:

def accel(t,x):
    stiffness = 1
    damping = -0.005
    x,v = x
    return -stiffness*x - damping*v

def getDX(t,x):
    return [
        x[1],
        accel(t,x)
        ]

rk = RKIntegrator( dt=1.0/40.0, derivFunc=getDX, initConds = [50.0, 5.0] )
for t,x in rk.integrate():
    if t > 100.1: 
        break
    if isWhole(t):
        print t,', '.join('%.2f' % x_i for x_i in x)

My results match the posted results to 2 places.

3 Comments

Code Golf – Converting seconds to hh:mm:ss.mmm, or reduce() ad absurdam

I’ve had to do some timing using time.clock(), but I like seeing output as hh:mm:ss.000.  divmod is the perfect tool for this, as it does the division and remainder in a single call (rekindling my love for Python’s ease in returning multiple values from a function):

def to_hhmmss(sec):
    min,sec = divmod(sec,60)
    hr,min = divmod(min,60)
    return "%d:%02d:%06.3f" % (hr,min,sec)

Right about the time I wrote that, I happened to see an example using reduce, and thought about using successive calls to divmod to build up the tuple to be passed in to this function’s string interpolation.  That is, I needed to call reduce in such a way to convert 3601.001 to the tuple (1, 0, 1, 1) (1 hour, 0 minutes, 1 second, and 1 millisecond).

reduce() applies a binary operation to a sequence by taking the first two items of the sequence, performing the operation on them and saving the result to a temporary accumulator, then applies the binary operation to the accumulated value and the 3rd item in the sequence, the to the 4th item, etc.  For instance to use reduce to sum up a list of integers [1,5,10,50], we would call reduce with the binary operation:

fn = lambda a,b: a+b

and reduce would work through the list as if running this explicit code:

acc = 1
acc = fn(acc, 5)
acc = fn(acc, 10)
acc = fn(acc, 50)
return acc

I need reduce to build up a tuple, which is a perfectly good thing to do with tuple addition.
Also, I’m going to convert the milliseconds field into its own integer field also, so I’ll need to convert our initial value containing seconds to a value containing milliseconds (also allowing me to round off any decimal portion smaller than 1 msec). And I’ll use divmod with a succession of divisors to get the correct time value for each field.

So the sequence of values that I will pass to reduce will be the succession of divisors for each field in the tuple, working right to left.  To convert to hh, mm, ss, and msec values, these divisors are (1000, 60, 60), and these will be the 2nd thru nth values of the input sequence – the 1st value will be the value in milliseconds to be converted.

The last thing to do is to define our binary function, that will perform the successive divmods, will accumulating our growing tuple of time fields.  Maybe it would be easiest to just map out how our sample conversion of 3601.001 seconds (or 3601001 msec) would work, with some yet-to-be-defined function F.  The last step in the reduce would give us:

(1,0,1,1) = F((0,1,1), 60)
  (0,1,1) = F((1,1), 60)
    (1,1) = F(msec, 1000)

Well, that last line (representing the first binary reduction) looks bad, since all the others are taking a tuple as the first value.  It seems that each successive reduction grows that value by one more term, so the first term should be a 1-value tuple, containing our value in milliseconds:

    (1,1) = F((msec,), 1000)

Also, it’s not really true that this step would have to return (1,1).  Just so long as the final tuple ends with (…,1,1).  So I’ll redo the sequence of steps showing X’s for the unknown intermediate values:

(1,0,1,1) = F((X,1,1), 60)
  (X,1,1) = F((X,1), 60)
    (X,1) = F((msec,), 1000)

The heart of our mystery function F is essentially a call to divmod, using the 0’th element in the current accumulator:

F = lambda a,b : divmod(a[0],b)

But F has to also carry forward (accumulate) the previous divmod remainders, found in the 1-thru-n values in the tuple in a.  So we modify F to include these:

F = lambda a,b : divmod(a[0],b) + a[1:]

Now we have just about all the pieces we need. We have our the binary function F.  The sequence of values to pass to F is composed of the initial accumulator (msec,), followed by the divisors (1000, 60, 60).  We just need to build our call to reduce, and use it to interpolate into a suitable hh:mm:ss-style string:

def to_hhmmss(s):
    return "%d:%02d:%02d.%03d" % \
        reduce(lambda a,b : divmod(a[0],b) + a[1:], [(s*1000,),1000,60,60])

This may not be the most easy-to-read version of to_hhmmss, but it is good to stretch our thinking into some alternative problem-solving approaches once in a while.

, ,

Leave a comment