I have around 125 million event records on s3. The s3 bucket structure is:
year/month/day/hour/*. Inside each hour directory, we have files for every minute. A typical filename looks like this:
Each file contains subscription events in json. The subscription record has the following fields:
- Time of creation
- Time of arrival
- User id
- And other user data such as: age, gender, country, state, etc. Consider these to be filters.
I was to find the derive the following things from the data based on a given date range:
- Opening active subscribers: The closing active of the previous day. Ofcourse the opening active of the first day is 0. The first day was 01/01/2017.
- Acquired Subscription: All the subscriptions that occurred on the day.
- Renewed Subscription: All subscriptions that occurred on that day by users who had subscribed before.
- Churned Subscription: All subscriptions that expired on that day.
- Closing Active subscribers: Opening active + Acquired subscription + Renewed subscription – Churned Subscription.
This closing active will be the opening active for the next day. So you see there is a recursive pattern here. Closing active needs the opening active. Opening active is the closing active of the previous day.
I was to provide a rest API, which upon receiving a date range, could provide these 5 metrics for each day in the date range, so that we could plot a graph for the same.
The first approach was to run a batch process on the s3 data and calculate these results for each day and store them on a database. We used mongodb for storage (we tried Cassandra but didn’t get too far with it because we lacked expertise and the client was wanted the solution very quickly) and pyspark for data processing.
Upon the query to the REST API, the API would simply query MongoDB with a date range and get the results.
We ran the pyspark job on the entire data on s3 and once finished we would simply monitor new events and add the calculations to mongodb.
Problem with approach 1:
- There was a problem with backfilling. We were told to use time of creation for the calculation and sometimes, data that had been created along time ago would arrive late.
Since, the late arriving data would impact the closing active of a previous day, the opening active and closing active of every other day after that would get affected.
For this, we had a condition in the pyspark code. Every record that would have a difference of more than a day between the time of creation and time of arrival would be dealt by a different function. The function would update the calculations for a day and then update the calculations for every other day after that. The worst case was, we would get backfill data for the first day. Because after updating the calculations for the first day, we would have to update the calculations for every other day upto 02/01/2019.
This approach was painfully slow, but it was all good because things were happening in the background and did not impact the performance of the REST API. The REST API would simply yield correct results when the update would be complete.
- FILTERS. Like I mentioned above. We had user data such as age, gender, country, state, etc. We were told to filter these results based on these values. The REST API would now also receive filters along with dates. Now, this might seem like no problem at all at first glance. Simply apply the filters to the results returned by MongoDB. But the problem was the opening and closing active. The closing active would change based on filter and with that the opening and closing active of every other day after that would change. This would mean with every filter combination, we would have to recalculate the whole thing.
So with the introduction of filters, we could no longer store the calculated results on the database, because the calculations would change based on the filter, and that too for the whole data.
Instead of storing calculations, we decided to store the entire data from s3 (125 million records) into MongoDB (We had to shard mongo). We simply could not store calculated results for each and every filter combination on MongoDB as the filters would keep growing with more user data getting added into the json. So we had to query the data source itself. So we decided to store the data into mongodb and once the data is on mongo we would use first apply the filters and then use aggregate queries to calculate the opening and closing active.
Problem with Approach 2:
Remember calculation of opening and closing active has to happen from the first record. This process took around 4 – 10 minutes in total.
Since a REST API cannot wait for that long, this process happened in the background. The results would be stored on REDIS as key value pairs and the front end would periodically keep querying another REST endpoint which would then query Redis for updates and provide the results.
This process was a hack, but it wouldn’t be accepted. The client wanted the latest data to appear first. The latest data would take the longest to calculate. This meant the client had to wait for 4-10 minutes or the latest correct calculations to appear.
This approach was to use Pyspark dataframes for MongoDB and calculate the results. We would then do the same thing we did in approach 2. Upload the results asynchronously into Redis, For some reason, my boss thought it would work. Luckily, I never got to try this solution as I left the company.
So obviously, I lack expertise in the domain of big data. I went from building REST APIs to suddenly building these huge data systems which none of us in the company had any idea of. Obviously, I made a lot of bad choices in the design of the system.
I am currently working with Pyspark and Kafka a lot but still am no expert. I also have never encountered a scenario like this after that company. So I ask the community, what would be the correct approach to building a system to solve a problem like this.