239 lines
7.7 KiB
Go
239 lines
7.7 KiB
Go
package mongodb
|
|
|
|
import (
|
|
"context"
|
|
"encoding/base64"
|
|
|
|
"go.mongodb.org/mongo-driver/bson"
|
|
"go.mongodb.org/mongo-driver/mongo"
|
|
"go.mongodb.org/mongo-driver/mongo/options"
|
|
)
|
|
|
|
type MongoOperator interface {
|
|
Create(ctx context.Context, collectionType int, document interface{}) error
|
|
CreateMany(ctx context.Context, collectionType int, documents []interface{}) error
|
|
Get(ctx context.Context, collectionType int, query bson.D) (*mongo.SingleResult, error)
|
|
List(ctx context.Context, collectionType int, query bson.D, opts ...*options.FindOptions) (*mongo.Cursor, error)
|
|
Update(ctx context.Context, collectionType int, query, update bson.D,
|
|
opts ...*options.UpdateOptions) (*mongo.UpdateResult, error)
|
|
UpdateMany(ctx context.Context, collectionType int, query, update bson.D,
|
|
opts ...*options.UpdateOptions) (*mongo.UpdateResult, error)
|
|
Replace(ctx context.Context, collectionType int, query bson.D, replacement interface{}) (*mongo.UpdateResult, error)
|
|
Delete(ctx context.Context, collectionType int, query bson.D, opts ...*options.DeleteOptions) (*mongo.DeleteResult, error)
|
|
CountDocuments(ctx context.Context, collectionType int, query bson.D, opts ...*options.CountOptions) (int64, error)
|
|
Aggregate(ctx context.Context, collectionType int, pipeline interface{}, opts ...*options.AggregateOptions) (*mongo.Cursor, error)
|
|
GetCollection(collectionType int) (*mongo.Collection, error)
|
|
ListCollection(ctx context.Context, mclient *mongo.Client) ([]string, error)
|
|
ListDataBase(ctx context.Context, mclient *mongo.Client) ([]string, error)
|
|
WatchEvents(ctx context.Context, client *mongo.Client, collectionType int, pipeline mongo.Pipeline, opts ...*options.ChangeStreamOptions) (*mongo.ChangeStream, error)
|
|
GetAuthConfig(ctx context.Context, key string) (*AuthConfig, error)
|
|
}
|
|
|
|
type MongoOperations struct {
|
|
MongoClient *MongoClient
|
|
}
|
|
|
|
var (
|
|
// Operator contains all the CRUD operations of the mongo database
|
|
Operator MongoOperator = &MongoOperations{}
|
|
)
|
|
|
|
func NewMongoOperations(mongoClient *MongoClient) *MongoOperations {
|
|
return &MongoOperations{
|
|
MongoClient: mongoClient,
|
|
}
|
|
}
|
|
|
|
// Create puts a document in the database
|
|
func (m *MongoOperations) Create(ctx context.Context, collectionType int, document interface{}) error {
|
|
collection, err := m.GetCollection(collectionType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = collection.InsertOne(ctx, document)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// CreateMany puts an array of documents in the database
|
|
func (m *MongoOperations) CreateMany(ctx context.Context, collectionType int, documents []interface{}) error {
|
|
collection, err := m.GetCollection(collectionType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = collection.InsertMany(ctx, documents)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Get fetches a document from the database based on a query
|
|
func (m *MongoOperations) Get(ctx context.Context, collectionType int, query bson.D) (*mongo.SingleResult, error) {
|
|
collection, err := m.GetCollection(collectionType)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
result := collection.FindOne(ctx, query)
|
|
return result, nil
|
|
}
|
|
|
|
// List fetches a list of documents from the database based on a query
|
|
func (m *MongoOperations) List(ctx context.Context, collectionType int, query bson.D, opts ...*options.FindOptions) (*mongo.Cursor, error) {
|
|
collection, err := m.GetCollection(collectionType)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result, err := collection.Find(ctx, query, opts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// Update updates a document in the database based on a query
|
|
func (m *MongoOperations) Update(ctx context.Context, collectionType int, query, update bson.D, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) {
|
|
var result *mongo.UpdateResult
|
|
collection, err := m.GetCollection(collectionType)
|
|
if err != nil {
|
|
return result, err
|
|
}
|
|
result, err = collection.UpdateOne(ctx, query, update, opts...)
|
|
if err != nil {
|
|
return result, err
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// UpdateMany updates multiple documents in the database based on a query
|
|
func (m *MongoOperations) UpdateMany(ctx context.Context, collectionType int, query, update bson.D, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) {
|
|
var result *mongo.UpdateResult
|
|
collection, err := m.GetCollection(collectionType)
|
|
if err != nil {
|
|
return result, err
|
|
}
|
|
result, err = collection.UpdateMany(ctx, query, update, opts...)
|
|
if err != nil {
|
|
return result, err
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// Replace changes a document with a new one in the database based on a query
|
|
func (m *MongoOperations) Replace(ctx context.Context, collectionType int, query bson.D, replacement interface{}) (*mongo.UpdateResult, error) {
|
|
var result *mongo.UpdateResult
|
|
collection, err := m.GetCollection(collectionType)
|
|
if err != nil {
|
|
return result, err
|
|
}
|
|
// If the given item is not present then insert.
|
|
opts := options.Replace().SetUpsert(true)
|
|
result, err = collection.ReplaceOne(ctx, query, replacement, opts)
|
|
if err != nil {
|
|
return result, err
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// Delete removes a document from the database based on a query
|
|
func (m *MongoOperations) Delete(ctx context.Context, collectionType int, query bson.D, opts ...*options.DeleteOptions) (*mongo.DeleteResult, error) {
|
|
var result *mongo.DeleteResult
|
|
collection, err := m.GetCollection(collectionType)
|
|
if err != nil {
|
|
return result, err
|
|
}
|
|
result, err = collection.DeleteOne(ctx, query, opts...)
|
|
if err != nil {
|
|
return result, err
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// CountDocuments returns the number of documents in the collection that matches a query
|
|
func (m *MongoOperations) CountDocuments(ctx context.Context, collectionType int, query bson.D, opts ...*options.CountOptions) (int64, error) {
|
|
var result int64 = 0
|
|
collection, err := m.GetCollection(collectionType)
|
|
if err != nil {
|
|
return result, err
|
|
}
|
|
|
|
result, err = collection.CountDocuments(ctx, query, opts...)
|
|
if err != nil {
|
|
return result, err
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func (m *MongoOperations) Aggregate(ctx context.Context, collectionType int, pipeline interface{}, opts ...*options.AggregateOptions) (*mongo.Cursor, error) {
|
|
collection, err := m.GetCollection(collectionType)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result, err := collection.Aggregate(ctx, pipeline, opts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// GetCollection fetches the correct collection based on the collection type
|
|
func (m *MongoOperations) GetCollection(collectionType int) (*mongo.Collection, error) {
|
|
return GetCollectionClient.getCollection(collectionType)
|
|
}
|
|
|
|
func (m *MongoOperations) ListDataBase(ctx context.Context, mclient *mongo.Client) ([]string, error) {
|
|
dbs, err := mclient.ListDatabaseNames(ctx, bson.D{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return dbs, nil
|
|
}
|
|
|
|
func (m *MongoOperations) ListCollection(ctx context.Context, mclient *mongo.Client) ([]string, error) {
|
|
cols, err := mclient.Database("litmus").ListCollectionNames(ctx, bson.D{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return cols, nil
|
|
}
|
|
|
|
func (m *MongoOperations) WatchEvents(ctx context.Context, client *mongo.Client, collectionType int, pipeline mongo.Pipeline, opts ...*options.ChangeStreamOptions) (*mongo.ChangeStream, error) {
|
|
|
|
authDb := client.Database("auth")
|
|
|
|
events, err := authDb.Collection("project").Watch(ctx, pipeline, opts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return events, nil
|
|
}
|
|
|
|
func (m *MongoOperations) GetAuthConfig(ctx context.Context, key string) (*AuthConfig, error) {
|
|
|
|
authDb := MgoClient.Database("auth")
|
|
find := authDb.Collection("auth-config").FindOne(ctx, bson.D{
|
|
{"key", key},
|
|
})
|
|
var conf AuthConfig
|
|
err := find.Decode(&conf)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
decodedValue, err := base64.URLEncoding.DecodeString(conf.Value)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
conf.Value = string(decodedValue)
|
|
return &conf, nil
|
|
}
|